mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-05 02:53:11 +00:00
32 lines
837 B
Python
32 lines
837 B
Python
|
from typing import Optional
|
||
|
|
||
|
from selfprivacy_api.jobs import Jobs, Job, JobStatus
|
||
|
from selfprivacy_api.services.service import Service
|
||
|
|
||
|
|
||
|
def backup_job_type(service: Service):
|
||
|
return f"services.{service.get_id()}.backup"
|
||
|
|
||
|
|
||
|
def add_backup_job(service: Service) -> Job:
|
||
|
display_name = service.get_display_name()
|
||
|
job = Jobs.add(
|
||
|
type_id=backup_job_type(service),
|
||
|
name=f"Backup {display_name}",
|
||
|
description=f"Backing up {display_name}",
|
||
|
)
|
||
|
return job
|
||
|
|
||
|
|
||
|
def get_job_by_type(type_id: str) -> Optional[Job]:
|
||
|
for job in Jobs.get_jobs():
|
||
|
if job.type_id == type_id and job.status in [
|
||
|
JobStatus.CREATED,
|
||
|
JobStatus.RUNNING,
|
||
|
]:
|
||
|
return job
|
||
|
|
||
|
|
||
|
def get_backup_job(service: Service) -> Optional[Job]:
|
||
|
return get_job_by_type(backup_job_type(service))
|