selfprivacy-rest-api/selfprivacy_api/migrations/check_for_system_rebuild_jobs.py

49 lines
1.5 KiB
Python
Raw Permalink Normal View History

from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.jobs import JobStatus, Jobs
class CheckForSystemRebuildJobs(Migration):
"""Check if there are unfinished system rebuild jobs and finish them"""
2024-06-26 12:00:51 +00:00
def get_migration_name(self) -> str:
return "check_for_system_rebuild_jobs"
2024-06-26 12:00:51 +00:00
def get_migration_description(self) -> str:
return "Check if there are unfinished system rebuild jobs and finish them"
2024-06-26 12:00:51 +00:00
def is_migration_needed(self) -> bool:
# Check if there are any unfinished system rebuild jobs
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
return True
2024-06-26 12:00:51 +00:00
return False
2024-06-26 12:00:51 +00:00
def migrate(self) -> None:
# As the API is restarted, we assume that the jobs are finished
for job in Jobs.get_jobs():
if (
job.type_id
in [
"system.nixos.rebuild",
"system.nixos.upgrade",
]
) and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result="System rebuilt.",
progress=100,
)