mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-10 04:53:11 +00:00
test(backup): minimal snapshot slice test
This commit is contained in:
parent
bfb0442e94
commit
427fdbdb49
|
@ -752,25 +752,43 @@ class Backups:
|
|||
]:
|
||||
raise NotDeadError(service)
|
||||
|
||||
@staticmethod
|
||||
def is_same_slice(snap1: Snapshot, snap2: Snapshot) -> bool:
|
||||
# Protologic for slicing. Determines if the snaps were made during the same
|
||||
# autobackup operation
|
||||
# TODO: Replace with slice id tag comparison
|
||||
|
||||
period_minutes = Backups.autobackup_period_minutes()
|
||||
# Autobackups are not guaranteed to be enabled during restore.
|
||||
# If they are not, period will be none
|
||||
# We ASSUME that picking latest snap of the same day is safe enough
|
||||
# But it is potentlially problematic and is better done with metadata I think.
|
||||
if period_minutes is None:
|
||||
period_minutes = 24 * 60
|
||||
|
||||
if snap1.created_at > snap2.created_at + timedelta(minutes=period_minutes):
|
||||
return False
|
||||
if snap1.created_at < snap2.created_at - timedelta(minutes=period_minutes):
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def last_autobackup_slice() -> List[Snapshot]:
|
||||
"""
|
||||
Guarantees that the slice is valid, ie, it has an api snapshot too
|
||||
Or empty
|
||||
"""
|
||||
slice: List[Snapshot] = []
|
||||
|
||||
# We need ones that were made in the same autobackup session.
|
||||
# For this we step through api snapshots
|
||||
# We need snapshots that were made in the same autobackup session.
|
||||
# And we need to be sure that api snap is in there
|
||||
# That's why we form the slice around api snap
|
||||
api_autosnaps = Backups._auto_snaps(ServiceManager())
|
||||
api_autosnaps.sort(key=lambda x: x.created_at, reverse=True)
|
||||
if api_autosnaps == []:
|
||||
return []
|
||||
|
||||
api_snap = api_autosnaps[0]
|
||||
period_minutes = Backups.autobackup_period_minutes()
|
||||
if period_minutes is None:
|
||||
# this is potentially problematic. Maybe add more metainfo for autobackup snapshots, like a slice id or something?
|
||||
period_minutes = (
|
||||
24 * 60
|
||||
) # we ASSUME that picking latest snap of the same day is safe enough
|
||||
api_autosnaps.sort(key=lambda x: x.created_at, reverse=True)
|
||||
api_snap = api_autosnaps[0] # pick the latest one
|
||||
|
||||
for service in ServiceManager.get_all_services():
|
||||
if isinstance(service, ServiceManager):
|
||||
|
@ -778,12 +796,9 @@ class Backups:
|
|||
snaps = Backups._auto_snaps(service)
|
||||
snaps.sort(key=lambda x: x.created_at, reverse=True)
|
||||
for snap in snaps:
|
||||
if snap.created_at < api_snap.created_at + timedelta(
|
||||
minutes=period_minutes
|
||||
) and snap.created_at > api_snap.created_at - timedelta(
|
||||
minutes=period_minutes
|
||||
):
|
||||
if Backups.is_same_slice(snap, api_snap):
|
||||
slice.append(snap)
|
||||
break
|
||||
slice.append(api_snap)
|
||||
|
||||
return slice
|
||||
|
|
|
@ -100,7 +100,7 @@ def do_autobackup() -> None:
|
|||
status=JobStatus.ERROR,
|
||||
error=type(error).__name__ + ": " + str(error),
|
||||
)
|
||||
return
|
||||
raise error
|
||||
progress = progress + progress_per_service
|
||||
Jobs.update(job, JobStatus.RUNNING, progress=progress)
|
||||
|
||||
|
|
|
@ -19,7 +19,11 @@ from selfprivacy_api.backup.tasks import (
|
|||
from selfprivacy_api.backup.jobs import autobackup_job_type
|
||||
|
||||
from tests.test_backup import backups, assert_job_finished
|
||||
from tests.test_graphql.test_services import only_dummy_service
|
||||
from tests.test_graphql.test_services import (
|
||||
only_dummy_service,
|
||||
only_dummy_service_and_api,
|
||||
dkim_file,
|
||||
)
|
||||
|
||||
|
||||
def backuppable_services() -> list[Service]:
|
||||
|
@ -207,8 +211,26 @@ def test_failed_autoback_prevents_more_autobackup(backups, dummy_service):
|
|||
assert Backups.is_time_to_backup_service(dummy_service, now) is False
|
||||
|
||||
|
||||
def test_induced_autobackup(backups, dummy_service):
|
||||
pass
|
||||
def test_slices_minimal(backups, only_dummy_service_and_api):
|
||||
dummy_service = only_dummy_service_and_api
|
||||
backup_period = 13 # minutes
|
||||
now = datetime.now(timezone.utc)
|
||||
|
||||
Backups.set_autobackup_period_minutes(backup_period)
|
||||
assert Backups.is_time_to_backup_service(dummy_service, now)
|
||||
assert Backups.is_time_to_backup_service(ServiceManager(), now)
|
||||
|
||||
assert len(ServiceManager.get_all_services()) == 2
|
||||
assert len(Backups.services_to_back_up(now)) == 2
|
||||
|
||||
do_autobackup()
|
||||
|
||||
snaps = Backups.get_all_snapshots()
|
||||
assert len(snaps) == 2
|
||||
|
||||
slice = Backups.last_autobackup_slice()
|
||||
assert len(slice) == 2
|
||||
assert set([snap.id for snap in slice]) == set([snap.id for snap in snaps])
|
||||
|
||||
|
||||
# --------------------- Quotas and Pruning -------------------------
|
||||
|
|
|
@ -15,6 +15,8 @@ from tests.common import generate_service_query
|
|||
from tests.test_graphql.common import assert_empty, assert_ok, get_data
|
||||
from tests.test_graphql.test_system_nixos_tasks import prepare_nixos_rebuild_calls
|
||||
|
||||
from tests.test_dkim import dkim_file
|
||||
|
||||
LSBLK_BLOCKDEVICES_DICTS = [
|
||||
{
|
||||
"name": "sda1",
|
||||
|
@ -94,6 +96,14 @@ def only_dummy_service(dummy_service) -> Generator[DummyService, None, None]:
|
|||
service_module.services.extend(back_copy)
|
||||
|
||||
|
||||
@pytest.fixture
|
||||
def only_dummy_service_and_api(
|
||||
only_dummy_service, generic_userdata, dkim_file
|
||||
) -> Generator[DummyService, None, None]:
|
||||
service_module.services.append(ServiceManager())
|
||||
return only_dummy_service
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def mock_check_volume(mocker):
|
||||
mock = mocker.patch(
|
||||
|
|
Loading…
Reference in a new issue