fix(service manager): debug and test backup hooks

This commit is contained in:
Houkime 2024-07-25 17:30:46 +00:00
parent f8c6a8b9d6
commit f068329153
2 changed files with 63 additions and 21 deletions

View file

@ -4,6 +4,7 @@ import base64
import typing import typing
from typing import List from typing import List
from os import path, mkdir from os import path, mkdir
from os.path import join
from pathlib import Path from pathlib import Path
from selfprivacy_api.services.bitwarden import Bitwarden from selfprivacy_api.services.bitwarden import Bitwarden
@ -114,8 +115,12 @@ class ServiceManager(Service):
def get_subdomain() -> typing.Optional[str]: def get_subdomain() -> typing.Optional[str]:
return None return None
@classmethod @staticmethod
def is_movable(cls) -> bool: def is_always_active() -> bool:
return True
@staticmethod
def is_movable() -> bool:
return False return False
@staticmethod @staticmethod
@ -152,8 +157,8 @@ class ServiceManager(Service):
"""`True` if the service can be backed up.""" """`True` if the service can be backed up."""
return True return True
@staticmethod @classmethod
def merge_settings(restored_settings_folder: str): def merge_settings(cls, restored_settings_folder: str):
# For now we will just copy settings EXCEPT the locations of services # For now we will just copy settings EXCEPT the locations of services
# Stash locations as they are set by user right now # Stash locations as they are set by user right now
locations = {} locations = {}
@ -161,19 +166,14 @@ class ServiceManager(Service):
locations[service.get_id()] = service.get_drive() locations[service.get_id()] = service.get_drive()
# Copy files # Copy files
userdata_name = path.basename(USERDATA_FILE) for p in [USERDATA_FILE, SECRETS_FILE, DKIM_DIR]:
secretfile_name = path.basename(SECRETS_FILE) cls.retrieve_stashed_path(p)
dkim_dirname = path.basename(DKIM_DIR)
copyfile(path.join(restored_settings_folder, userdata_name), USERDATA_FILE)
copyfile(path.join(restored_settings_folder, secretfile_name), SECRETS_FILE)
copytree(path.join(restored_settings_folder, dkim_dirname), DKIM_DIR)
# Pop locations # Pop locations
for service in services: for service in services:
device = BlockDevices().get_block_device(locations[service.get_id()]) device = BlockDevices().get_block_device(locations[service.get_id()])
if device is not None: if device is not None:
service.set_location(device.name) service.set_location(device)
@classmethod @classmethod
def stop(cls): def stop(cls):
@ -204,15 +204,40 @@ class ServiceManager(Service):
def get_folders(cls) -> List[str]: def get_folders(cls) -> List[str]:
return cls.folders return cls.folders
@classmethod
def stash_for(cls, p: str) -> str:
basename = path.basename(p)
tempdir = cls.folders[0]
stashed_file_location = join(tempdir, basename)
return stashed_file_location
@classmethod
def stash_a_path(cls, p: str):
if path.isdir(p):
rmtree(cls.stash_for(p), ignore_errors=True)
copytree(p, cls.stash_for(p))
else:
copyfile(p, cls.stash_for(p))
@classmethod
def retrieve_stashed_path(cls, p: str):
"""
Takes an original path, hopefully it is stashed somewhere
"""
if path.isdir(p):
rmtree(p, ignore_errors=True)
copytree(cls.stash_for(p), p)
else:
copyfile(cls.stash_for(p), p)
@classmethod @classmethod
def pre_backup(cls): def pre_backup(cls):
tempdir = cls.folders[0] tempdir = cls.folders[0]
rmtree(tempdir, ignore_errors=True) rmtree(tempdir, ignore_errors=True)
mkdir(tempdir) mkdir(tempdir)
copyfile(USERDATA_FILE, tempdir) for p in [USERDATA_FILE, SECRETS_FILE, DKIM_DIR]:
copyfile(SECRETS_FILE, tempdir) cls.stash_a_path(p)
copytree(DKIM_DIR, tempdir)
@classmethod @classmethod
def post_restore(cls): def post_restore(cls):

View file

@ -44,6 +44,7 @@ from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import get_backup_fail from selfprivacy_api.backup.jobs import get_backup_fail
from tests.common import assert_job_errored from tests.common import assert_job_errored
from tests.test_dkim import dkim_file
REPO_NAME = "test_backup" REPO_NAME = "test_backup"
@ -795,10 +796,26 @@ def test_cache_invalidaton_task(backups, dummy_service):
assert len(Storage.get_cached_snapshots()) == 1 assert len(Storage.get_cached_snapshots()) == 1
# def test_service_manager_backs_up_without_crashing(backups): def test_service_manager_backup_snapshot_persists(backups, generic_userdata, dkim_file):
# """ # There was a bug with snapshot disappearance due to post_restore hooks, checking for that
# Service manager is special and needs testing. manager = ServiceManager.get_service_by_id("api")
# """ assert manager is not None
# snapshot = Backups.back_up(ServiceManager.get_service_by_id("api")) snapshot = Backups.back_up(manager)
# Backups.restore_snapshot(snapshot)
Backups.force_snapshot_cache_reload()
ids = [snap.id for snap in Backups.get_all_snapshots()]
assert snapshot.id in ids
def test_service_manager_backs_up_without_crashing(
backups, generic_userdata, dkim_file, dummy_service
):
"""
Service manager is special and needs testing.
"""
manager = ServiceManager.get_service_by_id("api")
assert manager is not None
snapshot = Backups.back_up(manager)
Backups.restore_snapshot(snapshot)