mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-11 18:39:30 +00:00
644 lines
19 KiB
Python
644 lines
19 KiB
Python
import pytest
|
|
import os.path as path
|
|
from os import makedirs
|
|
from os import remove
|
|
from os import listdir
|
|
from os import urandom
|
|
from datetime import datetime, timedelta, timezone
|
|
from subprocess import Popen
|
|
|
|
import selfprivacy_api.services as services
|
|
from selfprivacy_api.services import Service, get_all_services
|
|
|
|
from selfprivacy_api.services import get_service_by_id
|
|
from selfprivacy_api.services.test_service import DummyService
|
|
from selfprivacy_api.graphql.queries.providers import BackupProvider
|
|
from selfprivacy_api.graphql.common_types.backup import RestoreStrategy
|
|
from selfprivacy_api.jobs import Jobs, JobStatus
|
|
|
|
from selfprivacy_api.models.backup.snapshot import Snapshot
|
|
|
|
from selfprivacy_api.backup import Backups
|
|
import selfprivacy_api.backup.providers as providers
|
|
from selfprivacy_api.backup.providers import AbstractBackupProvider
|
|
from selfprivacy_api.backup.providers.backblaze import Backblaze
|
|
from selfprivacy_api.backup.util import sync
|
|
from selfprivacy_api.backup.backuppers.restic_backupper import ResticBackupper
|
|
from selfprivacy_api.backup.jobs import add_backup_job, add_restore_job
|
|
|
|
|
|
from selfprivacy_api.backup.tasks import start_backup, restore_snapshot
|
|
from selfprivacy_api.backup.storage import Storage
|
|
from selfprivacy_api.backup.jobs import get_backup_job
|
|
|
|
|
|
TESTFILE_BODY = "testytest!"
|
|
TESTFILE_2_BODY = "testissimo!"
|
|
REPO_NAME = "test_backup"
|
|
|
|
|
|
@pytest.fixture(scope="function")
|
|
def backups(tmpdir):
|
|
Backups.reset()
|
|
|
|
test_repo_path = path.join(tmpdir, "totallyunrelated")
|
|
Backups.set_localfile_repo(test_repo_path)
|
|
|
|
Jobs.reset()
|
|
|
|
|
|
@pytest.fixture()
|
|
def backups_backblaze(generic_userdata):
|
|
Backups.reset(reset_json=False)
|
|
|
|
|
|
@pytest.fixture()
|
|
def raw_dummy_service(tmpdir):
|
|
dirnames = ["test_service", "also_test_service"]
|
|
service_dirs = []
|
|
for d in dirnames:
|
|
service_dir = path.join(tmpdir, d)
|
|
makedirs(service_dir)
|
|
service_dirs.append(service_dir)
|
|
|
|
testfile_path_1 = path.join(service_dirs[0], "testfile.txt")
|
|
with open(testfile_path_1, "w") as file:
|
|
file.write(TESTFILE_BODY)
|
|
|
|
testfile_path_2 = path.join(service_dirs[1], "testfile2.txt")
|
|
with open(testfile_path_2, "w") as file:
|
|
file.write(TESTFILE_2_BODY)
|
|
|
|
# we need this to not change get_folders() much
|
|
class TestDummyService(DummyService, folders=service_dirs):
|
|
pass
|
|
|
|
service = TestDummyService()
|
|
return service
|
|
|
|
|
|
@pytest.fixture()
|
|
def dummy_service(tmpdir, backups, raw_dummy_service) -> Service:
|
|
service = raw_dummy_service
|
|
repo_path = path.join(tmpdir, "test_repo")
|
|
assert not path.exists(repo_path)
|
|
# assert not repo_path
|
|
|
|
Backups.init_repo()
|
|
|
|
# register our service
|
|
services.services.append(service)
|
|
|
|
assert get_service_by_id(service.get_id()) is not None
|
|
yield service
|
|
|
|
# cleanup because apparently it matters wrt tasks
|
|
services.services.remove(service)
|
|
|
|
|
|
@pytest.fixture()
|
|
def memory_backup() -> AbstractBackupProvider:
|
|
ProviderClass = providers.get_provider(BackupProvider.MEMORY)
|
|
assert ProviderClass is not None
|
|
memory_provider = ProviderClass(login="", key="")
|
|
assert memory_provider is not None
|
|
return memory_provider
|
|
|
|
|
|
@pytest.fixture()
|
|
def file_backup(tmpdir) -> AbstractBackupProvider:
|
|
test_repo_path = path.join(tmpdir, "test_repo")
|
|
ProviderClass = providers.get_provider(BackupProvider.FILE)
|
|
assert ProviderClass is not None
|
|
provider = ProviderClass(location=test_repo_path)
|
|
assert provider is not None
|
|
return provider
|
|
|
|
|
|
def test_config_load(generic_userdata):
|
|
Backups.reset(reset_json=False)
|
|
provider = Backups.provider()
|
|
|
|
assert provider is not None
|
|
assert isinstance(provider, Backblaze)
|
|
assert provider.login == "ID"
|
|
assert provider.key == "KEY"
|
|
assert provider.location == "selfprivacy"
|
|
|
|
assert provider.backupper.account == "ID"
|
|
assert provider.backupper.key == "KEY"
|
|
|
|
|
|
def test_json_reset(generic_userdata):
|
|
Backups.reset(reset_json=False)
|
|
provider = Backups.provider()
|
|
assert provider is not None
|
|
assert isinstance(provider, Backblaze)
|
|
assert provider.login == "ID"
|
|
assert provider.key == "KEY"
|
|
assert provider.location == "selfprivacy"
|
|
|
|
Backups.reset()
|
|
provider = Backups.provider()
|
|
assert provider is not None
|
|
assert isinstance(provider, AbstractBackupProvider)
|
|
assert provider.login == ""
|
|
assert provider.key == ""
|
|
assert provider.location == ""
|
|
assert provider.repo_id == ""
|
|
|
|
|
|
def test_select_backend():
|
|
provider = providers.get_provider(BackupProvider.BACKBLAZE)
|
|
assert provider is not None
|
|
assert provider == Backblaze
|
|
|
|
|
|
def test_file_backend_init(file_backup):
|
|
file_backup.backupper.init()
|
|
|
|
|
|
def test_backup_simple_file(raw_dummy_service, file_backup):
|
|
# temporarily incomplete
|
|
service = raw_dummy_service
|
|
assert service is not None
|
|
assert file_backup is not None
|
|
|
|
name = service.get_id()
|
|
file_backup.backupper.init()
|
|
|
|
|
|
def test_backup_service(dummy_service, backups):
|
|
id = dummy_service.get_id()
|
|
assert_job_finished(f"services.{id}.backup", count=0)
|
|
assert Backups.get_last_backed_up(dummy_service) is None
|
|
|
|
Backups.back_up(dummy_service)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
date = Backups.get_last_backed_up(dummy_service)
|
|
assert date is not None
|
|
assert now > date
|
|
assert now - date < timedelta(minutes=1)
|
|
|
|
assert_job_finished(f"services.{id}.backup", count=1)
|
|
|
|
|
|
def test_no_repo(memory_backup):
|
|
with pytest.raises(ValueError):
|
|
assert memory_backup.backupper.get_snapshots() == []
|
|
|
|
|
|
def test_one_snapshot(backups, dummy_service):
|
|
Backups.back_up(dummy_service)
|
|
|
|
snaps = Backups.get_snapshots(dummy_service)
|
|
assert len(snaps) == 1
|
|
snap = snaps[0]
|
|
assert snap.service_name == dummy_service.get_id()
|
|
|
|
|
|
def test_backup_returns_snapshot(backups, dummy_service):
|
|
service_folders = dummy_service.get_folders()
|
|
provider = Backups.provider()
|
|
name = dummy_service.get_id()
|
|
snapshot = provider.backupper.start_backup(service_folders, name)
|
|
|
|
assert snapshot.id is not None
|
|
assert snapshot.service_name == name
|
|
assert snapshot.created_at is not None
|
|
|
|
|
|
def folder_files(folder):
|
|
return [
|
|
path.join(folder, filename)
|
|
for filename in listdir(folder)
|
|
if filename is not None
|
|
]
|
|
|
|
|
|
def service_files(service):
|
|
result = []
|
|
for service_folder in service.get_folders():
|
|
result.extend(folder_files(service_folder))
|
|
return result
|
|
|
|
|
|
def test_restore(backups, dummy_service):
|
|
paths_to_nuke = service_files(dummy_service)
|
|
contents = []
|
|
|
|
for service_file in paths_to_nuke:
|
|
with open(service_file, "r") as file:
|
|
contents.append(file.read())
|
|
|
|
Backups.back_up(dummy_service)
|
|
snap = Backups.get_snapshots(dummy_service)[0]
|
|
assert snap is not None
|
|
|
|
for p in paths_to_nuke:
|
|
assert path.exists(p)
|
|
remove(p)
|
|
assert not path.exists(p)
|
|
|
|
Backups._restore_service_from_snapshot(dummy_service, snap.id)
|
|
for p, content in zip(paths_to_nuke, contents):
|
|
assert path.exists(p)
|
|
with open(p, "r") as file:
|
|
assert file.read() == content
|
|
|
|
|
|
def test_sizing(backups, dummy_service):
|
|
Backups.back_up(dummy_service)
|
|
snap = Backups.get_snapshots(dummy_service)[0]
|
|
size = Backups.snapshot_restored_size(snap.id)
|
|
assert size is not None
|
|
assert size > 0
|
|
|
|
|
|
def test_init_tracking(backups, raw_dummy_service):
|
|
assert Backups.is_initted() is False
|
|
|
|
Backups.init_repo()
|
|
|
|
assert Backups.is_initted() is True
|
|
|
|
|
|
def finished_jobs():
|
|
return [job for job in Jobs.get_jobs() if job.status is JobStatus.FINISHED]
|
|
|
|
|
|
def assert_job_finished(job_type, count):
|
|
finished_types = [job.type_id for job in finished_jobs()]
|
|
assert finished_types.count(job_type) == count
|
|
|
|
|
|
def assert_job_has_run(job_type):
|
|
job = [job for job in finished_jobs() if job.type_id == job_type][0]
|
|
assert JobStatus.RUNNING in Jobs.status_updates(job)
|
|
|
|
|
|
def job_progress_updates(job_type):
|
|
job = [job for job in finished_jobs() if job.type_id == job_type][0]
|
|
return Jobs.progress_updates(job)
|
|
|
|
|
|
def assert_job_had_progress(job_type):
|
|
assert len(job_progress_updates(job_type)) > 0
|
|
|
|
|
|
def make_large_file(path: str, bytes: int):
|
|
with open(path, "wb") as file:
|
|
file.write(urandom(bytes))
|
|
|
|
|
|
def test_snapshots_by_id(backups, dummy_service):
|
|
snap1 = Backups.back_up(dummy_service)
|
|
snap2 = Backups.back_up(dummy_service)
|
|
snap3 = Backups.back_up(dummy_service)
|
|
|
|
assert snap2.id is not None
|
|
assert snap2.id != ""
|
|
|
|
assert len(Backups.get_snapshots(dummy_service)) == 3
|
|
assert Backups.get_snapshot_by_id(snap2.id).id == snap2.id
|
|
|
|
|
|
@pytest.fixture(params=["instant_server_stop", "delayed_server_stop"])
|
|
def simulated_service_stopping_delay(request) -> float:
|
|
if request.param == "instant_server_stop":
|
|
return 0.0
|
|
else:
|
|
return 0.3
|
|
|
|
|
|
def test_backup_service_task(backups, dummy_service, simulated_service_stopping_delay):
|
|
dummy_service.set_delay(simulated_service_stopping_delay)
|
|
|
|
handle = start_backup(dummy_service)
|
|
handle(blocking=True)
|
|
|
|
snaps = Backups.get_snapshots(dummy_service)
|
|
assert len(snaps) == 1
|
|
|
|
id = dummy_service.get_id()
|
|
job_type_id = f"services.{id}.backup"
|
|
assert_job_finished(job_type_id, count=1)
|
|
assert_job_has_run(job_type_id)
|
|
assert_job_had_progress(job_type_id)
|
|
|
|
|
|
def test_forget_snapshot(backups, dummy_service):
|
|
snap1 = Backups.back_up(dummy_service)
|
|
snap2 = Backups.back_up(dummy_service)
|
|
assert len(Backups.get_snapshots(dummy_service)) == 2
|
|
|
|
Backups.forget_snapshot(snap2)
|
|
assert len(Backups.get_snapshots(dummy_service)) == 1
|
|
Backups.force_snapshot_cache_reload()
|
|
assert len(Backups.get_snapshots(dummy_service)) == 1
|
|
|
|
assert Backups.get_snapshots(dummy_service)[0].id == snap1.id
|
|
|
|
Backups.forget_snapshot(snap1)
|
|
assert len(Backups.get_snapshots(dummy_service)) == 0
|
|
|
|
|
|
def test_forget_nonexistent_snapshot(backups, dummy_service):
|
|
bogus = Snapshot(
|
|
id="gibberjibber", service_name="nohoho", created_at=datetime.now(timezone.utc)
|
|
)
|
|
with pytest.raises(ValueError):
|
|
Backups.forget_snapshot(bogus)
|
|
|
|
|
|
def test_backup_larger_file(backups, dummy_service):
|
|
dir = path.join(dummy_service.get_folders()[0], "LARGEFILE")
|
|
mega = 2**20
|
|
make_large_file(dir, 100 * mega)
|
|
|
|
handle = start_backup(dummy_service)
|
|
handle(blocking=True)
|
|
|
|
# results will be slightly different on different machines. if someone has troubles with it on their machine, consider dropping this test.
|
|
id = dummy_service.get_id()
|
|
job_type_id = f"services.{id}.backup"
|
|
assert_job_finished(job_type_id, count=1)
|
|
assert_job_has_run(job_type_id)
|
|
updates = job_progress_updates(job_type_id)
|
|
assert len(updates) > 3
|
|
assert updates[int((len(updates) - 1) / 2.0)] > 10
|
|
# clean up a bit
|
|
remove(dir)
|
|
|
|
|
|
@pytest.fixture(params=["verify", "inplace"])
|
|
def restore_strategy(request) -> RestoreStrategy:
|
|
if request.param == "verify":
|
|
return RestoreStrategy.DOWNLOAD_VERIFY_OVERWRITE
|
|
else:
|
|
return RestoreStrategy.INPLACE
|
|
|
|
|
|
def test_restore_snapshot_task(
|
|
backups, dummy_service, restore_strategy, simulated_service_stopping_delay
|
|
):
|
|
dummy_service.set_delay(simulated_service_stopping_delay)
|
|
|
|
Backups.back_up(dummy_service)
|
|
snaps = Backups.get_snapshots(dummy_service)
|
|
assert len(snaps) == 1
|
|
|
|
paths_to_nuke = service_files(dummy_service)
|
|
contents = []
|
|
|
|
for service_file in paths_to_nuke:
|
|
with open(service_file, "r") as file:
|
|
contents.append(file.read())
|
|
|
|
for p in paths_to_nuke:
|
|
remove(p)
|
|
|
|
handle = restore_snapshot(snaps[0], restore_strategy)
|
|
handle(blocking=True)
|
|
|
|
for p, content in zip(paths_to_nuke, contents):
|
|
assert path.exists(p)
|
|
with open(p, "r") as file:
|
|
assert file.read() == content
|
|
|
|
snaps = Backups.get_snapshots(dummy_service)
|
|
if restore_strategy == RestoreStrategy.INPLACE:
|
|
assert len(snaps) == 2
|
|
else:
|
|
assert len(snaps) == 1
|
|
|
|
|
|
def test_set_autobackup_period(backups):
|
|
assert Backups.autobackup_period_minutes() is None
|
|
|
|
Backups.set_autobackup_period_minutes(2)
|
|
assert Backups.autobackup_period_minutes() == 2
|
|
|
|
Backups.disable_all_autobackup()
|
|
assert Backups.autobackup_period_minutes() is None
|
|
|
|
Backups.set_autobackup_period_minutes(3)
|
|
assert Backups.autobackup_period_minutes() == 3
|
|
|
|
Backups.set_autobackup_period_minutes(0)
|
|
assert Backups.autobackup_period_minutes() is None
|
|
|
|
Backups.set_autobackup_period_minutes(3)
|
|
assert Backups.autobackup_period_minutes() == 3
|
|
|
|
Backups.set_autobackup_period_minutes(-1)
|
|
assert Backups.autobackup_period_minutes() is None
|
|
|
|
|
|
def test_no_default_autobackup(backups, dummy_service):
|
|
now = datetime.now(timezone.utc)
|
|
assert not Backups.is_time_to_backup_service(dummy_service, now)
|
|
assert not Backups.is_time_to_backup(now)
|
|
|
|
|
|
def backuppable_services() -> list[Service]:
|
|
return [service for service in get_all_services() if service.can_be_backed_up()]
|
|
|
|
|
|
def test_services_to_back_up(backups, dummy_service):
|
|
backup_period = 13 # minutes
|
|
now = datetime.now(timezone.utc)
|
|
|
|
dummy_service.set_backuppable(False)
|
|
services = Backups.services_to_back_up(now)
|
|
assert len(services) == 0
|
|
|
|
dummy_service.set_backuppable(True)
|
|
|
|
services = Backups.services_to_back_up(now)
|
|
assert len(services) == 0
|
|
|
|
Backups.set_autobackup_period_minutes(backup_period)
|
|
|
|
services = Backups.services_to_back_up(now)
|
|
assert len(services) == len(backuppable_services())
|
|
assert dummy_service.get_id() in [
|
|
service.get_id() for service in backuppable_services()
|
|
]
|
|
|
|
|
|
def test_autobackup_timer_periods(backups, dummy_service):
|
|
now = datetime.now(timezone.utc)
|
|
backup_period = 13 # minutes
|
|
|
|
assert not Backups.is_time_to_backup_service(dummy_service, now)
|
|
assert not Backups.is_time_to_backup(now)
|
|
|
|
Backups.set_autobackup_period_minutes(backup_period)
|
|
assert Backups.is_time_to_backup_service(dummy_service, now)
|
|
assert Backups.is_time_to_backup(now)
|
|
|
|
Backups.set_autobackup_period_minutes(0)
|
|
assert not Backups.is_time_to_backup_service(dummy_service, now)
|
|
assert not Backups.is_time_to_backup(now)
|
|
|
|
|
|
def test_autobackup_timer_enabling(backups, dummy_service):
|
|
now = datetime.now(timezone.utc)
|
|
backup_period = 13 # minutes
|
|
dummy_service.set_backuppable(False)
|
|
|
|
Backups.set_autobackup_period_minutes(backup_period)
|
|
assert Backups.is_time_to_backup(
|
|
now
|
|
) # there are other services too, not just our dummy
|
|
|
|
# not backuppable service is not backuppable even if period is set
|
|
assert not Backups.is_time_to_backup_service(dummy_service, now)
|
|
|
|
dummy_service.set_backuppable(True)
|
|
assert dummy_service.can_be_backed_up()
|
|
assert Backups.is_time_to_backup_service(dummy_service, now)
|
|
|
|
Backups.disable_all_autobackup()
|
|
assert not Backups.is_time_to_backup_service(dummy_service, now)
|
|
assert not Backups.is_time_to_backup(now)
|
|
|
|
|
|
def test_autobackup_timing(backups, dummy_service):
|
|
backup_period = 13 # minutes
|
|
now = datetime.now(timezone.utc)
|
|
|
|
Backups.set_autobackup_period_minutes(backup_period)
|
|
assert Backups.is_time_to_backup_service(dummy_service, now)
|
|
assert Backups.is_time_to_backup(now)
|
|
|
|
Backups.back_up(dummy_service)
|
|
|
|
now = datetime.now(timezone.utc)
|
|
assert not Backups.is_time_to_backup_service(dummy_service, now)
|
|
|
|
past = datetime.now(timezone.utc) - timedelta(minutes=1)
|
|
assert not Backups.is_time_to_backup_service(dummy_service, past)
|
|
|
|
future = datetime.now(timezone.utc) + timedelta(minutes=backup_period + 2)
|
|
assert Backups.is_time_to_backup_service(dummy_service, future)
|
|
|
|
|
|
# Storage
|
|
def test_snapshots_caching(backups, dummy_service):
|
|
Backups.back_up(dummy_service)
|
|
|
|
# we test indirectly that we do redis calls instead of shell calls
|
|
start = datetime.now()
|
|
for i in range(10):
|
|
snapshots = Backups.get_snapshots(dummy_service)
|
|
assert len(snapshots) == 1
|
|
assert datetime.now() - start < timedelta(seconds=0.5)
|
|
|
|
cached_snapshots = Storage.get_cached_snapshots()
|
|
assert len(cached_snapshots) == 1
|
|
|
|
Storage.delete_cached_snapshot(cached_snapshots[0])
|
|
cached_snapshots = Storage.get_cached_snapshots()
|
|
assert len(cached_snapshots) == 0
|
|
|
|
snapshots = Backups.get_snapshots(dummy_service)
|
|
assert len(snapshots) == 1
|
|
cached_snapshots = Storage.get_cached_snapshots()
|
|
assert len(cached_snapshots) == 1
|
|
|
|
|
|
# Storage
|
|
def test_init_tracking_caching(backups, raw_dummy_service):
|
|
assert Storage.has_init_mark() is False
|
|
|
|
Storage.mark_as_init()
|
|
|
|
assert Storage.has_init_mark() is True
|
|
assert Backups.is_initted() is True
|
|
|
|
|
|
# Storage
|
|
def test_init_tracking_caching2(backups, raw_dummy_service):
|
|
assert Storage.has_init_mark() is False
|
|
|
|
Backups.init_repo()
|
|
|
|
assert Storage.has_init_mark() is True
|
|
|
|
|
|
# Storage
|
|
def test_provider_storage(backups_backblaze):
|
|
provider = Backups.provider()
|
|
|
|
assert provider is not None
|
|
|
|
assert isinstance(provider, Backblaze)
|
|
assert provider.login == "ID"
|
|
assert provider.key == "KEY"
|
|
|
|
Storage.store_provider(provider)
|
|
restored_provider = Backups._load_provider_redis()
|
|
assert isinstance(restored_provider, Backblaze)
|
|
assert restored_provider.login == "ID"
|
|
assert restored_provider.key == "KEY"
|
|
|
|
|
|
def test_sync(dummy_service):
|
|
src = dummy_service.get_folders()[0]
|
|
dst = dummy_service.get_folders()[1]
|
|
old_files_src = set(listdir(src))
|
|
old_files_dst = set(listdir(dst))
|
|
assert old_files_src != old_files_dst
|
|
|
|
sync(src, dst)
|
|
new_files_src = set(listdir(src))
|
|
new_files_dst = set(listdir(dst))
|
|
assert new_files_src == old_files_src
|
|
assert new_files_dst == new_files_src
|
|
|
|
|
|
def test_sync_nonexistent_src(dummy_service):
|
|
src = "/var/lib/nonexistentFluffyBunniesOfUnix"
|
|
dst = dummy_service.get_folders()[1]
|
|
|
|
with pytest.raises(ValueError):
|
|
sync(src, dst)
|
|
|
|
|
|
# Restic lowlevel
|
|
def test_mount_umount(backups, dummy_service, tmpdir):
|
|
Backups.back_up(dummy_service)
|
|
backupper = Backups.provider().backupper
|
|
assert isinstance(backupper, ResticBackupper)
|
|
|
|
mountpoint = tmpdir / "mount"
|
|
makedirs(mountpoint)
|
|
assert path.exists(mountpoint)
|
|
assert len(listdir(mountpoint)) == 0
|
|
|
|
handle = backupper.mount_repo(mountpoint)
|
|
assert len(listdir(mountpoint)) != 0
|
|
|
|
backupper.unmount_repo(mountpoint)
|
|
# handle.terminate()
|
|
assert len(listdir(mountpoint)) == 0
|
|
|
|
|
|
def test_move_blocks_backups(backups, dummy_service, restore_strategy):
|
|
snap = Backups.back_up(dummy_service)
|
|
job = Jobs.add(
|
|
type_id=f"services.{dummy_service.get_id()}.move",
|
|
name="Move Dummy",
|
|
description=f"Moving Dummy data to the Rainbow Land",
|
|
status=JobStatus.RUNNING,
|
|
)
|
|
|
|
with pytest.raises(ValueError):
|
|
Backups.back_up(dummy_service)
|
|
|
|
with pytest.raises(ValueError):
|
|
Backups.restore_snapshot(snap, restore_strategy)
|