test(backups): test out that pre-restore backup plays nice with jobs

This commit is contained in:
Houkime 2023-07-07 12:49:52 +00:00
parent af5edb695f
commit 9075afd38a
3 changed files with 30 additions and 10 deletions

View file

@ -29,14 +29,17 @@ def get_jobs_by_service(service: Service) -> List[Job]:
return result return result
def is_something_queued_for(service: Service) -> bool: def is_something_running_for(service: Service) -> bool:
return len(get_jobs_by_service(service)) != 0 running_jobs = [
job for job in get_jobs_by_service(service) if job.status == JobStatus.RUNNING
]
return len(running_jobs) != 0
def add_backup_job(service: Service) -> Job: def add_backup_job(service: Service) -> Job:
if is_something_queued_for(service): if is_something_running_for(service):
message = ( message = (
f"Cannot start a backup of {service.get_id()}, another operation is queued: " f"Cannot start a backup of {service.get_id()}, another operation is running: "
+ get_jobs_by_service(service)[0].type_id + get_jobs_by_service(service)[0].type_id
) )
raise ValueError(message) raise ValueError(message)
@ -53,9 +56,9 @@ def add_restore_job(snapshot: Snapshot) -> Job:
service = get_service_by_id(snapshot.service_name) service = get_service_by_id(snapshot.service_name)
if service is None: if service is None:
raise ValueError(f"no such service: {snapshot.service_name}") raise ValueError(f"no such service: {snapshot.service_name}")
if is_something_queued_for(service): if is_something_running_for(service):
message = ( message = (
f"Cannot start a restore of {service.get_id()}, another operation is queued: " f"Cannot start a restore of {service.get_id()}, another operation is running: "
+ get_jobs_by_service(service)[0].type_id + get_jobs_by_service(service)[0].type_id
) )
raise ValueError(message) raise ValueError(message)

View file

@ -1,5 +1,7 @@
from datetime import datetime from datetime import datetime
from selfprivacy_api.graphql.common_types.backup import RestoreStrategy
from selfprivacy_api.models.backup.snapshot import Snapshot from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.utils.huey import huey from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import get_service_by_id
@ -28,8 +30,11 @@ def start_backup(service: Service) -> bool:
@huey.task() @huey.task()
def restore_snapshot(snapshot: Snapshot) -> bool: def restore_snapshot(
Backups.restore_snapshot(snapshot) snapshot: Snapshot,
strategy: RestoreStrategy = RestoreStrategy.DOWNLOAD_VERIFY_OVERWRITE,
) -> bool:
Backups.restore_snapshot(snapshot, strategy)
return True return True

View file

@ -13,6 +13,7 @@ from selfprivacy_api.services import Service
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.graphql.queries.providers import BackupProvider from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.graphql.common_types.backup import RestoreStrategy
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot from selfprivacy_api.models.backup.snapshot import Snapshot
@ -360,7 +361,15 @@ def test_backup_larger_file(backups, dummy_service):
remove(dir) remove(dir)
def test_restore_snapshot_task(backups, dummy_service): @pytest.fixture(params=["verify", "inplace"])
def restore_strategy(request) -> RestoreStrategy:
if request.param == "verify":
return RestoreStrategy.DOWNLOAD_VERIFY_OVERWRITE
else:
return RestoreStrategy.INPLACE
def test_restore_snapshot_task(backups, dummy_service, restore_strategy):
Backups.back_up(dummy_service) Backups.back_up(dummy_service)
snaps = Backups.get_snapshots(dummy_service) snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1 assert len(snaps) == 1
@ -375,7 +384,7 @@ def test_restore_snapshot_task(backups, dummy_service):
for p in paths_to_nuke: for p in paths_to_nuke:
remove(p) remove(p)
handle = restore_snapshot(snaps[0]) handle = restore_snapshot(snaps[0], restore_strategy)
handle(blocking=True) handle(blocking=True)
for p, content in zip(paths_to_nuke, contents): for p, content in zip(paths_to_nuke, contents):
@ -383,6 +392,9 @@ def test_restore_snapshot_task(backups, dummy_service):
with open(p, "r") as file: with open(p, "r") as file:
assert file.read() == content assert file.read() == content
snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1
def test_autobackup_enable_service(backups, dummy_service): def test_autobackup_enable_service(backups, dummy_service):
assert not Backups.is_autobackup_enabled(dummy_service) assert not Backups.is_autobackup_enabled(dummy_service)