import pytest import os.path as path from os import makedirs from os import remove from os import listdir from os import urandom from datetime import datetime, timedelta, timezone from subprocess import Popen import selfprivacy_api.services as services from selfprivacy_api.services import Service, get_all_services from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.graphql.queries.providers import BackupProvider from selfprivacy_api.graphql.common_types.backup import RestoreStrategy from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.models.backup.snapshot import Snapshot from selfprivacy_api.backup import Backups import selfprivacy_api.backup.providers as providers from selfprivacy_api.backup.providers import AbstractBackupProvider from selfprivacy_api.backup.providers.backblaze import Backblaze from selfprivacy_api.backup.util import sync from selfprivacy_api.backup.backuppers.restic_backupper import ResticBackupper from selfprivacy_api.backup.jobs import add_backup_job, add_restore_job from selfprivacy_api.backup.tasks import start_backup, restore_snapshot from selfprivacy_api.backup.storage import Storage from selfprivacy_api.backup.jobs import get_backup_job TESTFILE_BODY = "testytest!" TESTFILE_2_BODY = "testissimo!" REPO_NAME = "test_backup" @pytest.fixture(scope="function") def backups(tmpdir): Backups.reset() test_repo_path = path.join(tmpdir, "totallyunrelated") Backups.set_localfile_repo(test_repo_path) Jobs.reset() @pytest.fixture() def backups_backblaze(generic_userdata): Backups.reset(reset_json=False) @pytest.fixture() def raw_dummy_service(tmpdir): dirnames = ["test_service", "also_test_service"] service_dirs = [] for d in dirnames: service_dir = path.join(tmpdir, d) makedirs(service_dir) service_dirs.append(service_dir) testfile_path_1 = path.join(service_dirs[0], "testfile.txt") with open(testfile_path_1, "w") as file: file.write(TESTFILE_BODY) testfile_path_2 = path.join(service_dirs[1], "testfile2.txt") with open(testfile_path_2, "w") as file: file.write(TESTFILE_2_BODY) # we need this to not change get_folders() much class TestDummyService(DummyService, folders=service_dirs): pass service = TestDummyService() return service @pytest.fixture() def dummy_service(tmpdir, backups, raw_dummy_service) -> Service: service = raw_dummy_service repo_path = path.join(tmpdir, "test_repo") assert not path.exists(repo_path) # assert not repo_path Backups.init_repo() # register our service services.services.append(service) assert get_service_by_id(service.get_id()) is not None yield service # cleanup because apparently it matters wrt tasks services.services.remove(service) @pytest.fixture() def memory_backup() -> AbstractBackupProvider: ProviderClass = providers.get_provider(BackupProvider.MEMORY) assert ProviderClass is not None memory_provider = ProviderClass(login="", key="") assert memory_provider is not None return memory_provider @pytest.fixture() def file_backup(tmpdir) -> AbstractBackupProvider: test_repo_path = path.join(tmpdir, "test_repo") ProviderClass = providers.get_provider(BackupProvider.FILE) assert ProviderClass is not None provider = ProviderClass(location=test_repo_path) assert provider is not None return provider def test_config_load(generic_userdata): Backups.reset(reset_json=False) provider = Backups.provider() assert provider is not None assert isinstance(provider, Backblaze) assert provider.login == "ID" assert provider.key == "KEY" assert provider.location == "selfprivacy" assert provider.backupper.account == "ID" assert provider.backupper.key == "KEY" def test_json_reset(generic_userdata): Backups.reset(reset_json=False) provider = Backups.provider() assert provider is not None assert isinstance(provider, Backblaze) assert provider.login == "ID" assert provider.key == "KEY" assert provider.location == "selfprivacy" Backups.reset() provider = Backups.provider() assert provider is not None assert isinstance(provider, AbstractBackupProvider) assert provider.login == "" assert provider.key == "" assert provider.location == "" assert provider.repo_id == "" def test_select_backend(): provider = providers.get_provider(BackupProvider.BACKBLAZE) assert provider is not None assert provider == Backblaze def test_file_backend_init(file_backup): file_backup.backupper.init() def test_backup_simple_file(raw_dummy_service, file_backup): # temporarily incomplete service = raw_dummy_service assert service is not None assert file_backup is not None name = service.get_id() file_backup.backupper.init() def test_backup_service(dummy_service, backups): id = dummy_service.get_id() assert_job_finished(f"services.{id}.backup", count=0) assert Backups.get_last_backed_up(dummy_service) is None Backups.back_up(dummy_service) now = datetime.now(timezone.utc) date = Backups.get_last_backed_up(dummy_service) assert date is not None assert now > date assert now - date < timedelta(minutes=1) assert_job_finished(f"services.{id}.backup", count=1) def test_no_repo(memory_backup): with pytest.raises(ValueError): assert memory_backup.backupper.get_snapshots() == [] def test_one_snapshot(backups, dummy_service): Backups.back_up(dummy_service) snaps = Backups.get_snapshots(dummy_service) assert len(snaps) == 1 snap = snaps[0] assert snap.service_name == dummy_service.get_id() def test_backup_returns_snapshot(backups, dummy_service): service_folders = dummy_service.get_folders() provider = Backups.provider() name = dummy_service.get_id() snapshot = provider.backupper.start_backup(service_folders, name) assert snapshot.id is not None assert len(snapshot.id) == len(Backups.get_all_snapshots()[0].id) assert Backups.get_snapshot_by_id(snapshot.id) is not None assert snapshot.service_name == name assert snapshot.created_at is not None def folder_files(folder): return [ path.join(folder, filename) for filename in listdir(folder) if filename is not None ] def service_files(service): result = [] for service_folder in service.get_folders(): result.extend(folder_files(service_folder)) return result def test_restore(backups, dummy_service): paths_to_nuke = service_files(dummy_service) contents = [] for service_file in paths_to_nuke: with open(service_file, "r") as file: contents.append(file.read()) Backups.back_up(dummy_service) snap = Backups.get_snapshots(dummy_service)[0] assert snap is not None for p in paths_to_nuke: assert path.exists(p) remove(p) assert not path.exists(p) Backups._restore_service_from_snapshot(dummy_service, snap.id) for p, content in zip(paths_to_nuke, contents): assert path.exists(p) with open(p, "r") as file: assert file.read() == content def test_sizing(backups, dummy_service): Backups.back_up(dummy_service) snap = Backups.get_snapshots(dummy_service)[0] size = Backups.snapshot_restored_size(snap.id) assert size is not None assert size > 0 def test_init_tracking(backups, raw_dummy_service): assert Backups.is_initted() is False Backups.init_repo() assert Backups.is_initted() is True def finished_jobs(): return [job for job in Jobs.get_jobs() if job.status is JobStatus.FINISHED] def assert_job_finished(job_type, count): finished_types = [job.type_id for job in finished_jobs()] assert finished_types.count(job_type) == count def assert_job_has_run(job_type): job = [job for job in finished_jobs() if job.type_id == job_type][0] assert JobStatus.RUNNING in Jobs.status_updates(job) def job_progress_updates(job_type): job = [job for job in finished_jobs() if job.type_id == job_type][0] return Jobs.progress_updates(job) def assert_job_had_progress(job_type): assert len(job_progress_updates(job_type)) > 0 def make_large_file(path: str, bytes: int): with open(path, "wb") as file: file.write(urandom(bytes)) def test_snapshots_by_id(backups, dummy_service): snap1 = Backups.back_up(dummy_service) snap2 = Backups.back_up(dummy_service) snap3 = Backups.back_up(dummy_service) assert snap2.id is not None assert snap2.id != "" assert len(Backups.get_snapshots(dummy_service)) == 3 assert Backups.get_snapshot_by_id(snap2.id).id == snap2.id @pytest.fixture(params=["instant_server_stop", "delayed_server_stop"]) def simulated_service_stopping_delay(request) -> float: if request.param == "instant_server_stop": return 0.0 else: return 0.3 def test_backup_service_task(backups, dummy_service, simulated_service_stopping_delay): dummy_service.set_delay(simulated_service_stopping_delay) handle = start_backup(dummy_service) handle(blocking=True) snaps = Backups.get_snapshots(dummy_service) assert len(snaps) == 1 id = dummy_service.get_id() job_type_id = f"services.{id}.backup" assert_job_finished(job_type_id, count=1) assert_job_has_run(job_type_id) assert_job_had_progress(job_type_id) def test_forget_snapshot(backups, dummy_service): snap1 = Backups.back_up(dummy_service) snap2 = Backups.back_up(dummy_service) assert len(Backups.get_snapshots(dummy_service)) == 2 Backups.forget_snapshot(snap2) assert len(Backups.get_snapshots(dummy_service)) == 1 Backups.force_snapshot_cache_reload() assert len(Backups.get_snapshots(dummy_service)) == 1 assert Backups.get_snapshots(dummy_service)[0].id == snap1.id Backups.forget_snapshot(snap1) assert len(Backups.get_snapshots(dummy_service)) == 0 def test_forget_nonexistent_snapshot(backups, dummy_service): bogus = Snapshot( id="gibberjibber", service_name="nohoho", created_at=datetime.now(timezone.utc) ) with pytest.raises(ValueError): Backups.forget_snapshot(bogus) def test_backup_larger_file(backups, dummy_service): dir = path.join(dummy_service.get_folders()[0], "LARGEFILE") mega = 2**20 make_large_file(dir, 100 * mega) handle = start_backup(dummy_service) handle(blocking=True) # results will be slightly different on different machines. if someone has troubles with it on their machine, consider dropping this test. id = dummy_service.get_id() job_type_id = f"services.{id}.backup" assert_job_finished(job_type_id, count=1) assert_job_has_run(job_type_id) updates = job_progress_updates(job_type_id) assert len(updates) > 3 assert updates[int((len(updates) - 1) / 2.0)] > 10 # clean up a bit remove(dir) @pytest.fixture(params=["verify", "inplace"]) def restore_strategy(request) -> RestoreStrategy: if request.param == "verify": return RestoreStrategy.DOWNLOAD_VERIFY_OVERWRITE else: return RestoreStrategy.INPLACE def test_restore_snapshot_task( backups, dummy_service, restore_strategy, simulated_service_stopping_delay ): dummy_service.set_delay(simulated_service_stopping_delay) Backups.back_up(dummy_service) snaps = Backups.get_snapshots(dummy_service) assert len(snaps) == 1 paths_to_nuke = service_files(dummy_service) contents = [] for service_file in paths_to_nuke: with open(service_file, "r") as file: contents.append(file.read()) for p in paths_to_nuke: remove(p) handle = restore_snapshot(snaps[0], restore_strategy) handle(blocking=True) for p, content in zip(paths_to_nuke, contents): assert path.exists(p) with open(p, "r") as file: assert file.read() == content snaps = Backups.get_snapshots(dummy_service) if restore_strategy == RestoreStrategy.INPLACE: assert len(snaps) == 2 else: assert len(snaps) == 1 def test_set_autobackup_period(backups): assert Backups.autobackup_period_minutes() is None Backups.set_autobackup_period_minutes(2) assert Backups.autobackup_period_minutes() == 2 Backups.disable_all_autobackup() assert Backups.autobackup_period_minutes() is None Backups.set_autobackup_period_minutes(3) assert Backups.autobackup_period_minutes() == 3 Backups.set_autobackup_period_minutes(0) assert Backups.autobackup_period_minutes() is None Backups.set_autobackup_period_minutes(3) assert Backups.autobackup_period_minutes() == 3 Backups.set_autobackup_period_minutes(-1) assert Backups.autobackup_period_minutes() is None def test_no_default_autobackup(backups, dummy_service): now = datetime.now(timezone.utc) assert not Backups.is_time_to_backup_service(dummy_service, now) assert not Backups.is_time_to_backup(now) def backuppable_services() -> list[Service]: return [service for service in get_all_services() if service.can_be_backed_up()] def test_services_to_back_up(backups, dummy_service): backup_period = 13 # minutes now = datetime.now(timezone.utc) dummy_service.set_backuppable(False) services = Backups.services_to_back_up(now) assert len(services) == 0 dummy_service.set_backuppable(True) services = Backups.services_to_back_up(now) assert len(services) == 0 Backups.set_autobackup_period_minutes(backup_period) services = Backups.services_to_back_up(now) assert len(services) == len(backuppable_services()) assert dummy_service.get_id() in [ service.get_id() for service in backuppable_services() ] def test_autobackup_timer_periods(backups, dummy_service): now = datetime.now(timezone.utc) backup_period = 13 # minutes assert not Backups.is_time_to_backup_service(dummy_service, now) assert not Backups.is_time_to_backup(now) Backups.set_autobackup_period_minutes(backup_period) assert Backups.is_time_to_backup_service(dummy_service, now) assert Backups.is_time_to_backup(now) Backups.set_autobackup_period_minutes(0) assert not Backups.is_time_to_backup_service(dummy_service, now) assert not Backups.is_time_to_backup(now) def test_autobackup_timer_enabling(backups, dummy_service): now = datetime.now(timezone.utc) backup_period = 13 # minutes dummy_service.set_backuppable(False) Backups.set_autobackup_period_minutes(backup_period) assert Backups.is_time_to_backup( now ) # there are other services too, not just our dummy # not backuppable service is not backuppable even if period is set assert not Backups.is_time_to_backup_service(dummy_service, now) dummy_service.set_backuppable(True) assert dummy_service.can_be_backed_up() assert Backups.is_time_to_backup_service(dummy_service, now) Backups.disable_all_autobackup() assert not Backups.is_time_to_backup_service(dummy_service, now) assert not Backups.is_time_to_backup(now) def test_autobackup_timing(backups, dummy_service): backup_period = 13 # minutes now = datetime.now(timezone.utc) Backups.set_autobackup_period_minutes(backup_period) assert Backups.is_time_to_backup_service(dummy_service, now) assert Backups.is_time_to_backup(now) Backups.back_up(dummy_service) now = datetime.now(timezone.utc) assert not Backups.is_time_to_backup_service(dummy_service, now) past = datetime.now(timezone.utc) - timedelta(minutes=1) assert not Backups.is_time_to_backup_service(dummy_service, past) future = datetime.now(timezone.utc) + timedelta(minutes=backup_period + 2) assert Backups.is_time_to_backup_service(dummy_service, future) # Storage def test_snapshots_caching(backups, dummy_service): Backups.back_up(dummy_service) # we test indirectly that we do redis calls instead of shell calls start = datetime.now() for i in range(10): snapshots = Backups.get_snapshots(dummy_service) assert len(snapshots) == 1 assert datetime.now() - start < timedelta(seconds=0.5) cached_snapshots = Storage.get_cached_snapshots() assert len(cached_snapshots) == 1 Storage.delete_cached_snapshot(cached_snapshots[0]) cached_snapshots = Storage.get_cached_snapshots() assert len(cached_snapshots) == 0 snapshots = Backups.get_snapshots(dummy_service) assert len(snapshots) == 1 cached_snapshots = Storage.get_cached_snapshots() assert len(cached_snapshots) == 1 # Storage def test_init_tracking_caching(backups, raw_dummy_service): assert Storage.has_init_mark() is False Storage.mark_as_init() assert Storage.has_init_mark() is True assert Backups.is_initted() is True # Storage def test_init_tracking_caching2(backups, raw_dummy_service): assert Storage.has_init_mark() is False Backups.init_repo() assert Storage.has_init_mark() is True # Storage def test_provider_storage(backups_backblaze): provider = Backups.provider() assert provider is not None assert isinstance(provider, Backblaze) assert provider.login == "ID" assert provider.key == "KEY" Storage.store_provider(provider) restored_provider = Backups._load_provider_redis() assert isinstance(restored_provider, Backblaze) assert restored_provider.login == "ID" assert restored_provider.key == "KEY" def test_sync(dummy_service): src = dummy_service.get_folders()[0] dst = dummy_service.get_folders()[1] old_files_src = set(listdir(src)) old_files_dst = set(listdir(dst)) assert old_files_src != old_files_dst sync(src, dst) new_files_src = set(listdir(src)) new_files_dst = set(listdir(dst)) assert new_files_src == old_files_src assert new_files_dst == new_files_src def test_sync_nonexistent_src(dummy_service): src = "/var/lib/nonexistentFluffyBunniesOfUnix" dst = dummy_service.get_folders()[1] with pytest.raises(ValueError): sync(src, dst) # Restic lowlevel def test_mount_umount(backups, dummy_service, tmpdir): Backups.back_up(dummy_service) backupper = Backups.provider().backupper assert isinstance(backupper, ResticBackupper) mountpoint = tmpdir / "mount" makedirs(mountpoint) assert path.exists(mountpoint) assert len(listdir(mountpoint)) == 0 handle = backupper.mount_repo(mountpoint) assert len(listdir(mountpoint)) != 0 backupper.unmount_repo(mountpoint) # handle.terminate() assert len(listdir(mountpoint)) == 0 def test_move_blocks_backups(backups, dummy_service, restore_strategy): snap = Backups.back_up(dummy_service) job = Jobs.add( type_id=f"services.{dummy_service.get_id()}.move", name="Move Dummy", description=f"Moving Dummy data to the Rainbow Land", status=JobStatus.RUNNING, ) with pytest.raises(ValueError): Backups.back_up(dummy_service) with pytest.raises(ValueError): Backups.restore_snapshot(snap, restore_strategy)