test(services): refactor dummy service creation so that we can test restores more easily

This commit is contained in:
Houkime 2024-09-22 14:33:06 +00:00
parent faa4402030
commit 2053c9a489
5 changed files with 152 additions and 35 deletions

View file

@ -34,6 +34,12 @@ class DummyService(Service):
cls.folders = folders cls.folders = folders
def __init__(self): def __init__(self):
# Maybe init it with some dummy files right here
# currently it is done in a fixture but if we do it here
# then we can do some convenience methods of writing and reading
# from test files so that
# we can easily check integrity in numerous restore tests
super().__init__() super().__init__()
with open(self.status_file(), "w") as file: with open(self.status_file(), "w") as file:
file.write(ServiceStatus.ACTIVE.value) file.write(ServiceStatus.ACTIVE.value)

View file

@ -25,9 +25,11 @@ from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository, RedisTokensRepository,
) )
API_REBUILD_SYSTEM_UNIT = "sp-nixos-rebuild.service"
API_UPGRADE_SYSTEM_UNIT = "sp-nixos-upgrade.service"
TESTFILE_BODY = "testytest!" TESTFILE_BODY = "testytest!"
TESTFILE_2_BODY = "testissimo!" TESTFILE2_BODY = "testissimo!"
TOKENS_FILE_CONTENTS = { TOKENS_FILE_CONTENTS = {
"tokens": [ "tokens": [
@ -149,6 +151,43 @@ def volume_folders(tmpdir, mocker):
mock = mocker.patch("selfprivacy_api.services.owned_path.VOLUMES_PATH", volumes_dir) mock = mocker.patch("selfprivacy_api.services.owned_path.VOLUMES_PATH", volumes_dir)
TESTFILE_NAME = "testfile.txt"
TESTFILE2_NAME = "testfile2.txt"
from typing import List
from os import listdir
def testfile_paths(service_dirs: List[str]) -> List[str]:
testfile_path_1 = path.join(service_dirs[0], TESTFILE_NAME)
testfile_path_2 = path.join(service_dirs[1], TESTFILE2_NAME)
return [testfile_path_1, testfile_path_2]
def write_testfile_bodies(service: DummyService, bodies: List[str]):
# Convenience for restore tests
paths = testfile_paths(service.get_folders())
for p, body in zip(paths, bodies):
with open(p, "w") as file:
file.write(body)
def get_testfile_bodies(service: DummyService):
# Convenience for restore tests
testfiles: List[str] = []
for folder in service.get_folders():
files = listdir(folder)
testfiles.extend(files)
bodies = []
for f in testfiles:
pass
with open("/usr/tsr.txt") as file:
content = file.read()
bodies.append(content)
return bodies
@pytest.fixture() @pytest.fixture()
def raw_dummy_service(tmpdir) -> DummyService: def raw_dummy_service(tmpdir) -> DummyService:
dirnames = ["test_service", "also_test_service"] dirnames = ["test_service", "also_test_service"]
@ -158,13 +197,13 @@ def raw_dummy_service(tmpdir) -> DummyService:
makedirs(service_dir) makedirs(service_dir)
service_dirs.append(service_dir) service_dirs.append(service_dir)
testfile_path_1 = path.join(service_dirs[0], "testfile.txt") bodies = [TESTFILE_BODY, TESTFILE2_BODY]
with open(testfile_path_1, "w") as file:
file.write(TESTFILE_BODY)
testfile_path_2 = path.join(service_dirs[1], "testfile2.txt") for fullpath, body in zip(testfile_paths(service_dirs), bodies):
with open(testfile_path_2, "w") as file: with open(fullpath, "w") as file:
file.write(TESTFILE_2_BODY) file.write(TESTFILE2_BODY)
paths = testfile_paths
# we need this to not change get_folders() much # we need this to not change get_folders() much
class TestDummyService(DummyService, folders=service_dirs): class TestDummyService(DummyService, folders=service_dirs):
@ -222,3 +261,46 @@ def dummy_service(
# Some tests may remove it from the list intentionally, this is fine # Some tests may remove it from the list intentionally, this is fine
if service in services.services: if service in services.services:
services.services.remove(service) services.services.remove(service)
def prepare_nixos_rebuild_calls(fp, unit_name):
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
# My best-effort attempt at making tests involving rebuild friendlier
@pytest.fixture()
def catch_nixos_rebuild_calls(fp):
# A helper function to be used in tests of all systems that requires
# rebuilds
prepare_nixos_rebuild_calls(fp, API_REBUILD_SYSTEM_UNIT)
def assert_rebuild_was_made(fp, unit_name):
# You call it after you have done the operation that
# calls a rebuild
assert_rebuild_or_upgrade_was_made(fp, API_REBUILD_SYSTEM_UNIT)
def assert_rebuild_or_upgrade_was_made(fp, unit_name):
assert fp.call_count(["systemctl", "start", unit_name]) == 1
assert fp.call_count(["systemctl", "show", unit_name]) == 6

View file

@ -38,14 +38,24 @@ from selfprivacy_api.backup.tasks import (
start_backup, start_backup,
restore_snapshot, restore_snapshot,
reload_snapshot_cache, reload_snapshot_cache,
total_backup,
do_full_restore,
) )
from selfprivacy_api.backup.storage import Storage from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret from selfprivacy_api.backup.local_secret import LocalBackupSecret
from selfprivacy_api.backup.jobs import get_backup_fail from selfprivacy_api.backup.jobs import (
get_backup_fail,
add_total_backup_job,
add_total_restore_job,
)
from tests.common import assert_job_errored from tests.common import assert_job_errored
from tests.test_dkim import dkim_file from tests.test_dkim import dkim_file
from tests.test_graphql.test_services import (
only_dummy_service_and_api,
only_dummy_service,
)
REPO_NAME = "test_backup" REPO_NAME = "test_backup"
@ -819,3 +829,19 @@ def test_service_manager_backs_up_without_crashing(
snapshot = Backups.back_up(manager) snapshot = Backups.back_up(manager)
Backups.restore_snapshot(snapshot) Backups.restore_snapshot(snapshot)
# def test_backup_all_restore_all(backups, generic_userdata, dkim_file, only_dummy_service_and_api, catch_nixos_rebuild_calls):
# dummy_service = only_dummy_service_and_api
# fp = catch_nixos_rebuild_calls
# backup_job = add_total_backup_job()
# total_backup(backup_job)
# assert len(Backups.get_all_snapshots()) == 2
# restore_job = add_total_restore_job()
# do_full_restore(restore_job)
# # TODO: maybe test integrity.
# # since it is standard restore operations they would have failed if something went wrong
# # however, one still needs to check that all of the required restores have even started

View file

@ -17,6 +17,7 @@ from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.local_secret import LocalBackupSecret from selfprivacy_api.backup.local_secret import LocalBackupSecret
from tests.test_graphql.test_services import ( from tests.test_graphql.test_services import (
# TODO: shuffle them to conftest
only_dummy_service_and_api, only_dummy_service_and_api,
only_dummy_service, only_dummy_service,
dkim_file, dkim_file,
@ -43,6 +44,24 @@ mutation TestForcedAutobackup {
success success
message message
code code
job{
uid
}
}
}
}
"""
API_TOTAL_RESTORE = """
mutation TestTotalRestore {
backup {
fullRestore{
success
message
code
job{
uid
}
} }
} }
} }

View file

@ -6,6 +6,13 @@ import pytest
from selfprivacy_api.jobs import JobStatus, Jobs from selfprivacy_api.jobs import JobStatus, Jobs
from tests.test_graphql.common import assert_empty, assert_ok, get_data from tests.test_graphql.common import assert_empty, assert_ok, get_data
from tests.conftest import (
API_REBUILD_SYSTEM_UNIT,
API_UPGRADE_SYSTEM_UNIT,
assert_rebuild_or_upgrade_was_made,
prepare_nixos_rebuild_calls,
)
class ProcessMock: class ProcessMock:
"""Mock subprocess.Popen""" """Mock subprocess.Popen"""
@ -97,39 +104,17 @@ def test_graphql_system_rebuild_unauthorized(client, fp, action):
assert fp.call_count([fp.any()]) == 0 assert fp.call_count([fp.any()]) == 0
def prepare_nixos_rebuild_calls(fp, unit_name):
# Start the unit
fp.register(["systemctl", "start", unit_name])
# Wait for it to start
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
# Check its exectution
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
stdout="Starting rebuild...",
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
fp.register(
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
)
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
@pytest.mark.parametrize("action", ["rebuild", "upgrade"]) @pytest.mark.parametrize("action", ["rebuild", "upgrade"])
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals): def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
"""Test system rebuild""" """Test system rebuild"""
unit_name = f"sp-nixos-{action}.service"
query = ( query = (
API_REBUILD_SYSTEM_MUTATION API_REBUILD_SYSTEM_MUTATION
if action == "rebuild" if action == "rebuild"
else API_UPGRADE_SYSTEM_MUTATION else API_UPGRADE_SYSTEM_MUTATION
) )
unit_name = (
API_REBUILD_SYSTEM_UNIT if action == "rebuild" else API_UPGRADE_SYSTEM_UNIT
)
prepare_nixos_rebuild_calls(fp, unit_name) prepare_nixos_rebuild_calls(fp, unit_name)
@ -142,8 +127,7 @@ def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_interv
data = get_data(response)["system"][f"runSystem{action.capitalize()}"] data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
assert_ok(data) assert_ok(data)
assert fp.call_count(["systemctl", "start", unit_name]) == 1 assert_rebuild_or_upgrade_was_made(fp, unit_name)
assert fp.call_count(["systemctl", "show", unit_name]) == 6
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][ job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
"job" "job"