feature(backups): invalidate errored backups when a backup succeeds

This commit is contained in:
Houkime 2025-03-05 16:38:28 +00:00
parent 043d280d53
commit 3ad80397af
3 changed files with 86 additions and 20 deletions
selfprivacy_api/backup
tests

View file

@ -41,6 +41,7 @@ from selfprivacy_api.backup.jobs import (
add_backup_job,
get_restore_job,
add_restore_job,
get_backup_fails,
)
@ -267,8 +268,19 @@ class Backups:
Jobs.update(job, status=JobStatus.FINISHED, result="Backup finished")
if reason in [BackupReason.AUTO, BackupReason.PRE_RESTORE]:
Jobs.set_expiration(job, AUTOBACKUP_JOB_EXPIRATION_SECONDS)
# To not confuse user
if reason is not BackupReason.PRE_RESTORE:
Backups.clear_failed_backups(service)
return Backups.sync_date_from_cache(snapshot)
@staticmethod
def clear_failed_backups(service: Service):
jobs_to_clear = get_backup_fails(service)
for job in jobs_to_clear:
Jobs.remove(job)
@staticmethod
def sync_date_from_cache(snapshot: Snapshot) -> Snapshot:
"""

View file

@ -1,4 +1,4 @@
from typing import Optional, List
from typing import Optional, List, Iterable
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.jobs import Jobs, Job, JobStatus
@ -120,21 +120,47 @@ def add_restore_job(snapshot: Snapshot) -> Job:
return job
def last_if_any(jobs: List[Job]) -> Optional[Job]:
if not jobs:
return None
newest_jobs = sorted(jobs, key=lambda x: x.created_at, reverse=True)
return newest_jobs[0]
def get_job_by_type(type_id: str) -> Optional[Job]:
for job in Jobs.get_jobs():
if job.type_id == type_id and job.status in [
JobStatus.CREATED,
JobStatus.RUNNING,
]:
return job
return None
jobs = intersection(get_jobs_by_type(type_id), get_ok_jobs())
return last_if_any(jobs)
def get_failed_job_by_type(type_id: str) -> Optional[Job]:
for job in Jobs.get_jobs():
if job.type_id == type_id and job.status == JobStatus.ERROR:
return job
return None
jobs = intersection(get_jobs_by_type(type_id), get_failed_jobs())
return last_if_any(jobs)
def get_jobs_by_type(type_id: str):
return [job for job in Jobs.get_jobs() if job.type_id == type_id]
# Can be moved out to Jobs
def get_ok_jobs() -> List[Job]:
return [
job
for job in Jobs.get_jobs()
if job.status
in [
JobStatus.CREATED,
JobStatus.RUNNING,
]
]
# Can be moved out to Jobs
def get_failed_jobs() -> List[Job]:
return [job for job in Jobs.get_jobs() if job.status == JobStatus.ERROR]
def intersection(a: Iterable, b: Iterable):
return [x for x in a if x in b]
def get_backup_job(service: Service) -> Optional[Job]:
@ -145,5 +171,9 @@ def get_backup_fail(service: Service) -> Optional[Job]:
return get_failed_job_by_type(backup_job_type(service))
def get_backup_fails(service: Service) -> List[Job]:
return intersection(get_failed_jobs(), get_jobs_by_type(backup_job_type(service)))
def get_restore_job(service: Service) -> Optional[Job]:
return get_job_by_type(restore_job_type(service))

View file

@ -245,15 +245,10 @@ def test_error_censoring_encryptionkey(dummy_service, backups):
LocalBackupSecret.reset()
new_key = LocalBackupSecret.get()
with pytest.raises(ValueError):
# Should fail without correct key
Backups.back_up(dummy_service)
# Should fail without correct key and create a failed job
failed_job = assert_backup_fails(dummy_service)
job = get_backup_fail(dummy_service)
assert job is not None
assert_job_errored(job)
job_text = all_job_text(job)
job_text = all_job_text(failed_job)
assert old_key not in job_text
assert new_key not in job_text
@ -263,6 +258,35 @@ def test_error_censoring_encryptionkey(dummy_service, backups):
assert "CENSORED" in job_text
def assert_backup_fails(service) -> Job:
with pytest.raises(ValueError):
Backups.back_up(service)
job = get_backup_fail(service)
assert job is not None
assert_job_errored(job)
return job
def test_backup_clears_failed_jobs(dummy_service, backups):
assert get_backup_fail(dummy_service) is None
# Discard our key to inject a failure
old_key = LocalBackupSecret.get()
LocalBackupSecret.reset()
assert_backup_fails(dummy_service)
# Restore the key
LocalBackupSecret.set(old_key)
assert LocalBackupSecret.get() == old_key
Backups.back_up(dummy_service)
assert get_backup_fail(dummy_service) is None
def test_error_censoring_loginkey(dummy_service, backups, fp):
# We do not want to screw up our teardown
old_provider = Backups.provider()