selfprivacy-rest-api/tests/test_backup.py
2024-09-23 10:05:24 +00:00

857 lines
25 KiB
Python

import pytest
import os
import os.path as path
from os import remove
from os import listdir
from os import urandom
from datetime import datetime, timedelta, timezone
import tempfile
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.services import ServiceManager
from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum
from selfprivacy_api.graphql.common_types.backup import (
RestoreStrategy,
BackupReason,
)
from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup import Backups, BACKUP_PROVIDER_ENVS
import selfprivacy_api.backup.providers as providers
from selfprivacy_api.backup.providers import AbstractBackupProvider
from selfprivacy_api.backup.providers.backblaze import Backblaze
from selfprivacy_api.backup.providers.none import NoBackups
from selfprivacy_api.backup.providers import get_kind
from selfprivacy_api.backup.util import sync
from selfprivacy_api.backup.tasks import (
start_backup,
restore_snapshot,
reload_snapshot_cache,
total_backup,
do_full_restore,
)
from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import (
get_backup_fail,
add_total_backup_job,
add_total_restore_job,
)
from tests.common import assert_job_errored
from tests.test_dkim import dkim_file
from tests.test_graphql.test_services import (
only_dummy_service_and_api,
only_dummy_service,
)
REPO_NAME = "test_backup"
REPOFILE_NAME = "totallyunrelated"
def prepare_localfile_backups(temp_dir):
test_repo_path = path.join(temp_dir, REPOFILE_NAME)
assert not path.exists(test_repo_path)
Backups.set_localfile_repo(test_repo_path)
@pytest.fixture(scope="function")
def backups_local(tmpdir):
Backups.reset()
prepare_localfile_backups(tmpdir)
Jobs.reset()
Backups.init_repo()
@pytest.fixture(scope="function")
def backups(tmpdir):
"""
For those tests that are supposed to pass with
both local and cloud repos
"""
# Sometimes this is false. Idk why.
huey.immediate = True
assert huey.immediate is True
Backups.reset()
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
Backups.set_provider_from_envs()
else:
prepare_localfile_backups(tmpdir)
Jobs.reset()
Backups.init_repo()
assert Backups.provider().location == str(tmpdir) + "/" + REPOFILE_NAME
yield
Backups.erase_repo()
@pytest.fixture()
def memory_backup() -> AbstractBackupProvider:
ProviderClass = providers.get_provider(BackupProvider.MEMORY)
assert ProviderClass is not None
memory_provider = ProviderClass(login="", key="")
assert memory_provider is not None
return memory_provider
@pytest.fixture()
def file_backup(tmpdir) -> AbstractBackupProvider:
test_repo_path = path.join(tmpdir, "test_repo")
ProviderClass = providers.get_provider(BackupProvider.FILE)
assert ProviderClass is not None
provider = ProviderClass(location=test_repo_path)
assert provider is not None
return provider
def test_reset_sets_to_none1():
Backups.reset()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, NoBackups)
def test_reset_sets_to_none2(backups):
# now with something set up first^^^
Backups.reset()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, NoBackups)
def test_setting_from_envs(tmpdir):
Backups.reset()
environment_stash = {}
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
# we are running under special envs, stash them before rewriting them
for key in BACKUP_PROVIDER_ENVS.values():
environment_stash[key] = os.environ[key]
os.environ[BACKUP_PROVIDER_ENVS["kind"]] = "BACKBLAZE"
os.environ[BACKUP_PROVIDER_ENVS["login"]] = "ID"
os.environ[BACKUP_PROVIDER_ENVS["key"]] = "KEY"
os.environ[BACKUP_PROVIDER_ENVS["location"]] = "selfprivacy"
Backups.set_provider_from_envs()
provider = Backups.provider()
assert provider is not None
assert isinstance(provider, Backblaze)
assert provider.login == "ID"
assert provider.key == "KEY"
assert provider.location == "selfprivacy"
assert provider.backupper.account == "ID"
assert provider.backupper.key == "KEY"
if environment_stash != {}:
for key in BACKUP_PROVIDER_ENVS.values():
os.environ[key] = environment_stash[key]
else:
for key in BACKUP_PROVIDER_ENVS.values():
del os.environ[key]
def test_select_backend():
provider = providers.get_provider(BackupProvider.BACKBLAZE)
assert provider is not None
assert provider == Backblaze
def test_file_backend_init(file_backup):
file_backup.backupper.init()
def test_reinit_after_purge(backups):
assert Backups.is_initted() is True
Backups.erase_repo()
assert Backups.is_initted() is False
with pytest.raises(ValueError):
Backups.force_snapshot_cache_reload()
Backups.init_repo()
assert Backups.is_initted() is True
assert len(Backups.get_all_snapshots()) == 0
def test_backup_service(dummy_service, backups):
id = dummy_service.get_id()
assert_job_finished(f"services.{id}.backup", count=0)
assert Backups.get_last_backed_up(dummy_service) is None
Backups.back_up(dummy_service)
now = datetime.now(timezone.utc)
date = Backups.get_last_backed_up(dummy_service)
assert date is not None
assert now > date
assert now - date < timedelta(minutes=1)
assert_job_finished(f"services.{id}.backup", count=1)
def all_job_text(job: Job) -> str:
# Use when we update to pydantic 2.xxx
# return Job.model_dump_json()
result = ""
if job.status_text is not None:
result += job.status_text
if job.description is not None:
result += job.description
if job.error is not None:
result += job.error
return result
def test_error_censoring_encryptionkey(dummy_service, backups):
# Discard our key to inject a failure
old_key = LocalBackupSecret.get()
LocalBackupSecret.reset()
new_key = LocalBackupSecret.get()
with pytest.raises(ValueError):
# Should fail without correct key
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert old_key not in job_text
assert new_key not in job_text
# local backups do not have login key
# assert Backups.provider().key not in job_text
assert "CENSORED" in job_text
def test_error_censoring_loginkey(dummy_service, backups, fp):
# We do not want to screw up our teardown
old_provider = Backups.provider()
secret = "aSecretNYA"
Backups.set_provider(
ProviderEnum.BACKBLAZE, login="meow", key=secret, location="moon"
)
assert Backups.provider().key == secret
# We could have called real backblaze but it is kind of not privacy so.
fp.allow_unregistered(True)
fp.register(
["restic", fp.any()],
returncode=1,
stdout="only real cats are allowed",
# We do not want to suddenly call real backblaze even if code changes
occurrences=100,
)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
job = get_backup_fail(dummy_service)
assert_job_errored(job)
job_text = all_job_text(job)
assert secret not in job_text
assert job_text.count("CENSORED") == 2
# We do not want to screw up our teardown
Storage.store_provider(old_provider)
def test_no_repo(memory_backup):
with pytest.raises(ValueError):
assert memory_backup.backupper.get_snapshots() == []
def test_one_snapshot(backups, dummy_service):
Backups.back_up(dummy_service)
snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1
snap = snaps[0]
assert snap.service_name == dummy_service.get_id()
def test_backup_returns_snapshot(backups, dummy_service):
service_folders = dummy_service.get_folders()
provider = Backups.provider()
name = dummy_service.get_id()
snapshot = provider.backupper.start_backup(service_folders, name)
assert snapshot.id is not None
snapshots = provider.backupper.get_snapshots()
assert snapshots != []
assert len(snapshot.id) == len(snapshots[0].id)
assert Backups.get_snapshot_by_id(snapshot.id) is not None
assert snapshot.service_name == name
assert snapshot.created_at is not None
assert snapshot.reason == BackupReason.EXPLICIT
def test_backup_reasons(backups, dummy_service):
snap = Backups.back_up(dummy_service, BackupReason.AUTO)
assert snap.reason == BackupReason.AUTO
Backups.force_snapshot_cache_reload()
snaps = Backups.get_snapshots(dummy_service)
assert snaps[0].reason == BackupReason.AUTO
def folder_files(folder):
return [
path.join(folder, filename)
for filename in listdir(folder)
if filename is not None
]
def service_files(service):
result = []
for service_folder in service.get_folders():
result.extend(folder_files(service_folder))
return result
def test_restore(backups, dummy_service):
paths_to_nuke = service_files(dummy_service)
contents = []
for service_file in paths_to_nuke:
with open(service_file, "r") as file:
contents.append(file.read())
Backups.back_up(dummy_service)
snap = Backups.get_snapshots(dummy_service)[0]
assert snap is not None
for p in paths_to_nuke:
assert path.exists(p)
remove(p)
assert not path.exists(p)
Backups._restore_service_from_snapshot(dummy_service, snap.id)
for p, content in zip(paths_to_nuke, contents):
assert path.exists(p)
with open(p, "r") as file:
assert file.read() == content
def test_sizing(backups, dummy_service):
Backups.back_up(dummy_service)
snap = Backups.get_snapshots(dummy_service)[0]
size = Backups.snapshot_restored_size(snap.id)
assert size is not None
assert size > 0
def test_init_tracking(backups, tmpdir):
assert Backups.is_initted() is True
Backups.reset()
assert Backups.is_initted() is False
separate_dir = tmpdir / "out_of_the_way"
prepare_localfile_backups(separate_dir)
Backups.init_repo()
assert Backups.is_initted() is True
def finished_jobs():
return [job for job in Jobs.get_jobs() if job.status is JobStatus.FINISHED]
def assert_job_finished(job_type, count):
finished_types = [job.type_id for job in finished_jobs()]
assert finished_types.count(job_type) == count
def assert_job_has_run(job_type):
job = [job for job in finished_jobs() if job.type_id == job_type][0]
assert JobStatus.RUNNING in Jobs.status_updates(job)
def job_progress_updates(job_type):
job = [job for job in finished_jobs() if job.type_id == job_type][0]
return Jobs.progress_updates(job)
def assert_job_had_progress(job_type):
assert len(job_progress_updates(job_type)) > 0
def make_large_file(path: str, bytes: int):
with open(path, "wb") as file:
file.write(urandom(bytes))
def test_snapshots_by_id(backups, dummy_service):
snap1 = Backups.back_up(dummy_service)
snap2 = Backups.back_up(dummy_service)
snap3 = Backups.back_up(dummy_service)
assert snap2.id is not None
assert snap2.id != ""
assert len(Backups.get_snapshots(dummy_service)) == 3
assert Backups.get_snapshot_by_id(snap2.id).id == snap2.id
@pytest.fixture(params=["instant_server_stop", "delayed_server_stop"])
def simulated_service_stopping_delay(request) -> float:
if request.param == "instant_server_stop":
return 0.0
else:
return 0.3
def test_backup_service_task(backups, dummy_service, simulated_service_stopping_delay):
dummy_service.set_delay(simulated_service_stopping_delay)
handle = start_backup(dummy_service.get_id())
handle(blocking=True)
snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1
id = dummy_service.get_id()
job_type_id = f"services.{id}.backup"
assert_job_finished(job_type_id, count=1)
assert_job_has_run(job_type_id)
assert_job_had_progress(job_type_id)
def test_forget_snapshot(backups, dummy_service):
snap1 = Backups.back_up(dummy_service)
snap2 = Backups.back_up(dummy_service)
assert len(Backups.get_snapshots(dummy_service)) == 2
Backups.forget_snapshot(snap2)
assert len(Backups.get_snapshots(dummy_service)) == 1
Backups.force_snapshot_cache_reload()
assert len(Backups.get_snapshots(dummy_service)) == 1
assert Backups.get_snapshots(dummy_service)[0].id == snap1.id
Backups.forget_snapshot(snap1)
assert len(Backups.get_snapshots(dummy_service)) == 0
def test_forget_nonexistent_snapshot(backups, dummy_service):
bogus = Snapshot(
id="gibberjibber",
service_name="nohoho",
created_at=datetime.now(timezone.utc),
reason=BackupReason.EXPLICIT,
)
with pytest.raises(ValueError):
Backups.forget_snapshot(bogus)
def test_backup_larger_file(backups, dummy_service):
dir = path.join(dummy_service.get_folders()[0], "LARGEFILE")
mega = 2**20
make_large_file(dir, 100 * mega)
handle = start_backup(dummy_service.get_id())
handle(blocking=True)
# results will be slightly different on different machines. if someone has troubles with it on their machine, consider dropping this test.
id = dummy_service.get_id()
job_type_id = f"services.{id}.backup"
assert_job_finished(job_type_id, count=1)
assert_job_has_run(job_type_id)
updates = job_progress_updates(job_type_id)
assert len(updates) > 3
assert updates[int((len(updates) - 1) / 2.0)] > 10
# clean up a bit
remove(dir)
@pytest.fixture(params=["verify", "inplace"])
def restore_strategy(request) -> RestoreStrategy:
if request.param == "verify":
return RestoreStrategy.DOWNLOAD_VERIFY_OVERWRITE
else:
return RestoreStrategy.INPLACE
@pytest.fixture(params=["failed", "healthy", "fail_to_stop"])
def failed(request) -> str:
return request.param
def test_restore_snapshot_task(
backups, dummy_service, restore_strategy, simulated_service_stopping_delay, failed
):
dummy_service.set_delay(simulated_service_stopping_delay)
if failed == "failed":
dummy_service.set_status(ServiceStatus.FAILED)
if failed == "fail_to_stop":
dummy_service.simulate_fail_to_stop(True)
Backups.back_up(dummy_service)
snaps = Backups.get_snapshots(dummy_service)
assert len(snaps) == 1
paths_to_nuke = service_files(dummy_service)
contents = []
for service_file in paths_to_nuke:
with open(service_file, "r") as file:
contents.append(file.read())
for p in paths_to_nuke:
remove(p)
handle = restore_snapshot(snaps[0], restore_strategy)
handle(blocking=True)
for p, content in zip(paths_to_nuke, contents):
assert path.exists(p)
with open(p, "r") as file:
assert file.read() == content
snaps = Backups.get_snapshots(dummy_service)
if restore_strategy == RestoreStrategy.INPLACE:
assert len(snaps) == 2
reasons = [snap.reason for snap in snaps]
assert BackupReason.PRE_RESTORE in reasons
else:
assert len(snaps) == 1
def test_backup_unbackuppable(backups, dummy_service):
dummy_service.set_backuppable(False)
assert dummy_service.can_be_backed_up() is False
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
# Storage
def test_snapshots_caching(backups, dummy_service):
Backups.back_up(dummy_service)
# we test indirectly that we do redis calls instead of shell calls
start = datetime.now()
for i in range(10):
snapshots = Backups.get_snapshots(dummy_service)
assert len(snapshots) == 1
assert datetime.now() - start < timedelta(seconds=0.5)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
snap_to_uncache = cached_snapshots[0]
Storage.delete_cached_snapshot(snap_to_uncache)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
# We do not assume that no snapshots means we need to reload the cache
snapshots = Backups.get_snapshots(dummy_service)
assert len(snapshots) == 0
# No cache reload happened
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
# Storage
def test_snapshot_cache_autoreloads(backups, dummy_service):
Backups.back_up(dummy_service)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
snap_to_uncache = cached_snapshots[0]
Storage.delete_cached_snapshot(snap_to_uncache)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
# When we create a snapshot we do reload cache
Backups.back_up(dummy_service)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 2
assert snap_to_uncache in cached_snapshots
Storage.delete_cached_snapshot(snap_to_uncache)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
# When we try to delete a snapshot we cannot find in cache, it is ok and we do reload cache
Backups.forget_snapshot(snap_to_uncache)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
assert snap_to_uncache not in cached_snapshots
def lowlevel_forget(snapshot_id):
Backups.provider().backupper.forget_snapshot(snapshot_id)
# Storage
def test_snapshots_cache_invalidation(backups, dummy_service):
Backups.back_up(dummy_service)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
Storage.invalidate_snapshot_storage()
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
Backups.force_snapshot_cache_reload()
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
snap = cached_snapshots[0]
lowlevel_forget(snap.id)
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 1
Backups.force_snapshot_cache_reload()
cached_snapshots = Storage.get_cached_snapshots()
assert len(cached_snapshots) == 0
# Storage
def test_init_tracking_caching(backups, raw_dummy_service):
assert Storage.has_init_mark() is True
Backups.reset()
assert Storage.has_init_mark() is False
Storage.mark_as_init()
assert Storage.has_init_mark() is True
assert Backups.is_initted() is True
# Storage
def test_init_tracking_caching2(backups, tmpdir):
assert Storage.has_init_mark() is True
Backups.reset()
assert Storage.has_init_mark() is False
separate_dir = tmpdir / "out_of_the_way"
prepare_localfile_backups(separate_dir)
assert Storage.has_init_mark() is False
Backups.init_repo()
assert Storage.has_init_mark() is True
# Storage
def test_provider_storage(backups):
test_login = "ID"
test_key = "KEY"
test_location = "selprivacy_bin"
old_provider = Backups.provider()
assert old_provider is not None
assert not isinstance(old_provider, Backblaze)
assert old_provider.login != test_login
assert old_provider.key != test_key
assert old_provider.location != test_location
test_provider = Backups._construct_provider(
kind=BackupProvider.BACKBLAZE, login="ID", key=test_key, location=test_location
)
assert isinstance(test_provider, Backblaze)
assert get_kind(test_provider) == "BACKBLAZE"
assert test_provider.login == test_login
assert test_provider.key == test_key
assert test_provider.location == test_location
Storage.store_provider(test_provider)
restored_provider_model = Storage.load_provider()
assert restored_provider_model.kind == "BACKBLAZE"
assert restored_provider_model.login == test_login
assert restored_provider_model.key == test_key
assert restored_provider_model.location == test_location
restored_provider = Backups._load_provider_redis()
assert isinstance(restored_provider, Backblaze)
assert restored_provider.login == test_login
assert restored_provider.key == test_key
assert restored_provider.location == test_location
# Revert our mess so we can teardown ok
Storage.store_provider(old_provider)
def test_sync(dummy_service):
src = dummy_service.get_folders()[0]
dst = dummy_service.get_folders()[1]
old_files_src = set(listdir(src))
old_files_dst = set(listdir(dst))
assert old_files_src != old_files_dst
sync(src, dst)
new_files_src = set(listdir(src))
new_files_dst = set(listdir(dst))
assert new_files_src == old_files_src
assert new_files_dst == new_files_src
def test_sync_nonexistent_src(dummy_service):
src = "/var/lib/nonexistentFluffyBunniesOfUnix"
dst = dummy_service.get_folders()[1]
with pytest.raises(ValueError):
sync(src, dst)
def test_move_blocks_backups(backups, dummy_service, restore_strategy):
snap = Backups.back_up(dummy_service)
job = Jobs.add(
type_id=f"services.{dummy_service.get_id()}.move",
name="Move Dummy",
description=f"Moving Dummy data to the Rainbow Land",
status=JobStatus.RUNNING,
)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
with pytest.raises(ValueError):
Backups.restore_snapshot(snap, restore_strategy)
def test_double_lock_unlock(backups, dummy_service):
# notice that introducing stale locks is only safe for other tests if we erase repo in between
# which we do at the time of writing this test
Backups.provider().backupper.lock()
with pytest.raises(ValueError):
Backups.provider().backupper.lock()
Backups.provider().backupper.unlock()
Backups.provider().backupper.lock()
Backups.provider().backupper.unlock()
Backups.provider().backupper.unlock()
def test_operations_while_locked(backups, dummy_service):
# Stale lock prevention test
# consider making it fully at the level of backupper?
# because this is where prevention lives?
# Backups singleton is here only so that we can run this against B2, S3 and whatever
# But maybe it is not necessary (if restic treats them uniformly enough)
Backups.provider().backupper.lock()
snap = Backups.back_up(dummy_service)
assert snap is not None
Backups.provider().backupper.lock()
# using lowlevel to make sure no caching interferes
assert Backups.provider().backupper.is_initted() is True
Backups.provider().backupper.lock()
assert Backups.snapshot_restored_size(snap.id) > 0
Backups.provider().backupper.lock()
Backups.restore_snapshot(snap)
Backups.provider().backupper.lock()
Backups.forget_snapshot(snap)
Backups.provider().backupper.lock()
assert Backups.provider().backupper.get_snapshots() == []
# check that no locks were left
Backups.provider().backupper.lock()
Backups.provider().backupper.unlock()
# a paranoid check to weed out problems with tempdirs that are not dependent on us
def test_tempfile():
with tempfile.TemporaryDirectory() as temp:
assert path.exists(temp)
assert not path.exists(temp)
# Storage
def test_cache_invalidaton_task(backups, dummy_service):
Backups.back_up(dummy_service)
assert len(Storage.get_cached_snapshots()) == 1
# Does not trigger resync
Storage.invalidate_snapshot_storage()
assert Storage.get_cached_snapshots() == []
reload_snapshot_cache()
assert len(Storage.get_cached_snapshots()) == 1
def test_service_manager_backup_snapshot_persists(backups, generic_userdata, dkim_file):
# There was a bug with snapshot disappearance due to post_restore hooks, checking for that
manager = ServiceManager.get_service_by_id("api")
assert manager is not None
snapshot = Backups.back_up(manager)
Backups.force_snapshot_cache_reload()
ids = [snap.id for snap in Backups.get_all_snapshots()]
assert snapshot.id in ids
def test_service_manager_backs_up_without_crashing(
backups, generic_userdata, dkim_file, dummy_service
):
"""
Service manager is special and needs testing.
"""
manager = ServiceManager.get_service_by_id("api")
assert manager is not None
snapshot = Backups.back_up(manager)
Backups.restore_snapshot(snapshot)
def test_backup_all_restore_all(
backups,
generic_userdata,
dkim_file,
only_dummy_service_and_api,
catch_nixos_rebuild_calls,
):
dummy_service = only_dummy_service_and_api
fp = catch_nixos_rebuild_calls
fp.pass_command(["restic", fp.any()])
fp.keep_last_process(True)
backup_job = add_total_backup_job()
total_backup(backup_job)
assert len(Backups.get_all_snapshots()) == 2
restore_job = add_total_restore_job()
do_full_restore(restore_job)
# TODO: maybe test integrity.
# since it is standard restore operations they would have failed if something went wrong
# however, one still needs to check that all of the required restores have even started