mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-17 08:02:36 +00:00
test(services): refactor dummy service creation so that we can test restores more easily
This commit is contained in:
parent
ca86e4fcc0
commit
39312a0937
|
@ -34,6 +34,12 @@ class DummyService(Service):
|
|||
cls.folders = folders
|
||||
|
||||
def __init__(self):
|
||||
# Maybe init it with some dummy files right here
|
||||
# currently it is done in a fixture but if we do it here
|
||||
# then we can do some convenience methods of writing and reading
|
||||
# from test files so that
|
||||
# we can easily check integrity in numerous restore tests
|
||||
|
||||
super().__init__()
|
||||
with open(self.status_file(), "w") as file:
|
||||
file.write(ServiceStatus.ACTIVE.value)
|
||||
|
|
|
@ -25,9 +25,11 @@ from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
|
|||
RedisTokensRepository,
|
||||
)
|
||||
|
||||
API_REBUILD_SYSTEM_UNIT = "sp-nixos-rebuild.service"
|
||||
API_UPGRADE_SYSTEM_UNIT = "sp-nixos-upgrade.service"
|
||||
|
||||
TESTFILE_BODY = "testytest!"
|
||||
TESTFILE_2_BODY = "testissimo!"
|
||||
TESTFILE2_BODY = "testissimo!"
|
||||
|
||||
TOKENS_FILE_CONTENTS = {
|
||||
"tokens": [
|
||||
|
@ -149,6 +151,43 @@ def volume_folders(tmpdir, mocker):
|
|||
mock = mocker.patch("selfprivacy_api.services.owned_path.VOLUMES_PATH", volumes_dir)
|
||||
|
||||
|
||||
TESTFILE_NAME = "testfile.txt"
|
||||
TESTFILE2_NAME = "testfile2.txt"
|
||||
|
||||
from typing import List
|
||||
from os import listdir
|
||||
|
||||
|
||||
def testfile_paths(service_dirs: List[str]) -> List[str]:
|
||||
testfile_path_1 = path.join(service_dirs[0], TESTFILE_NAME)
|
||||
testfile_path_2 = path.join(service_dirs[1], TESTFILE2_NAME)
|
||||
return [testfile_path_1, testfile_path_2]
|
||||
|
||||
|
||||
def write_testfile_bodies(service: DummyService, bodies: List[str]):
|
||||
# Convenience for restore tests
|
||||
paths = testfile_paths(service.get_folders())
|
||||
for p, body in zip(paths, bodies):
|
||||
with open(p, "w") as file:
|
||||
file.write(body)
|
||||
|
||||
|
||||
def get_testfile_bodies(service: DummyService):
|
||||
# Convenience for restore tests
|
||||
testfiles: List[str] = []
|
||||
for folder in service.get_folders():
|
||||
files = listdir(folder)
|
||||
testfiles.extend(files)
|
||||
bodies = []
|
||||
for f in testfiles:
|
||||
pass
|
||||
|
||||
with open("/usr/tsr.txt") as file:
|
||||
content = file.read()
|
||||
bodies.append(content)
|
||||
return bodies
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
def raw_dummy_service(tmpdir) -> DummyService:
|
||||
dirnames = ["test_service", "also_test_service"]
|
||||
|
@ -158,13 +197,13 @@ def raw_dummy_service(tmpdir) -> DummyService:
|
|||
makedirs(service_dir)
|
||||
service_dirs.append(service_dir)
|
||||
|
||||
testfile_path_1 = path.join(service_dirs[0], "testfile.txt")
|
||||
with open(testfile_path_1, "w") as file:
|
||||
file.write(TESTFILE_BODY)
|
||||
bodies = [TESTFILE_BODY, TESTFILE2_BODY]
|
||||
|
||||
testfile_path_2 = path.join(service_dirs[1], "testfile2.txt")
|
||||
with open(testfile_path_2, "w") as file:
|
||||
file.write(TESTFILE_2_BODY)
|
||||
for fullpath, body in zip(testfile_paths(service_dirs), bodies):
|
||||
with open(fullpath, "w") as file:
|
||||
file.write(TESTFILE2_BODY)
|
||||
|
||||
paths = testfile_paths
|
||||
|
||||
# we need this to not change get_folders() much
|
||||
class TestDummyService(DummyService, folders=service_dirs):
|
||||
|
@ -222,3 +261,46 @@ def dummy_service(
|
|||
# Some tests may remove it from the list intentionally, this is fine
|
||||
if service in services.services:
|
||||
services.services.remove(service)
|
||||
|
||||
|
||||
def prepare_nixos_rebuild_calls(fp, unit_name):
|
||||
# Start the unit
|
||||
fp.register(["systemctl", "start", unit_name])
|
||||
|
||||
# Wait for it to start
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
|
||||
# Check its exectution
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
|
||||
stdout="Starting rebuild...",
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
|
||||
|
||||
# My best-effort attempt at making tests involving rebuild friendlier
|
||||
@pytest.fixture()
|
||||
def catch_nixos_rebuild_calls(fp):
|
||||
# A helper function to be used in tests of all systems that requires
|
||||
# rebuilds
|
||||
prepare_nixos_rebuild_calls(fp, API_REBUILD_SYSTEM_UNIT)
|
||||
|
||||
|
||||
def assert_rebuild_was_made(fp, unit_name):
|
||||
# You call it after you have done the operation that
|
||||
# calls a rebuild
|
||||
assert_rebuild_or_upgrade_was_made(fp, API_REBUILD_SYSTEM_UNIT)
|
||||
|
||||
|
||||
def assert_rebuild_or_upgrade_was_made(fp, unit_name):
|
||||
assert fp.call_count(["systemctl", "start", unit_name]) == 1
|
||||
assert fp.call_count(["systemctl", "show", unit_name]) == 6
|
||||
|
|
|
@ -38,14 +38,24 @@ from selfprivacy_api.backup.tasks import (
|
|||
start_backup,
|
||||
restore_snapshot,
|
||||
reload_snapshot_cache,
|
||||
total_backup,
|
||||
do_full_restore,
|
||||
)
|
||||
from selfprivacy_api.backup.storage import Storage
|
||||
from selfprivacy_api.backup.local_secret import LocalBackupSecret
|
||||
from selfprivacy_api.backup.jobs import get_backup_fail
|
||||
from selfprivacy_api.backup.jobs import (
|
||||
get_backup_fail,
|
||||
add_total_backup_job,
|
||||
add_total_restore_job,
|
||||
)
|
||||
|
||||
from tests.common import assert_job_errored
|
||||
from tests.test_dkim import dkim_file
|
||||
|
||||
from tests.test_graphql.test_services import (
|
||||
only_dummy_service_and_api,
|
||||
only_dummy_service,
|
||||
)
|
||||
|
||||
REPO_NAME = "test_backup"
|
||||
|
||||
|
@ -819,3 +829,19 @@ def test_service_manager_backs_up_without_crashing(
|
|||
|
||||
snapshot = Backups.back_up(manager)
|
||||
Backups.restore_snapshot(snapshot)
|
||||
|
||||
|
||||
# def test_backup_all_restore_all(backups, generic_userdata, dkim_file, only_dummy_service_and_api, catch_nixos_rebuild_calls):
|
||||
# dummy_service = only_dummy_service_and_api
|
||||
# fp = catch_nixos_rebuild_calls
|
||||
# backup_job = add_total_backup_job()
|
||||
# total_backup(backup_job)
|
||||
# assert len(Backups.get_all_snapshots()) == 2
|
||||
|
||||
# restore_job = add_total_restore_job()
|
||||
|
||||
# do_full_restore(restore_job)
|
||||
|
||||
# # TODO: maybe test integrity.
|
||||
# # since it is standard restore operations they would have failed if something went wrong
|
||||
# # however, one still needs to check that all of the required restores have even started
|
||||
|
|
|
@ -17,6 +17,7 @@ from selfprivacy_api.backup.storage import Storage
|
|||
from selfprivacy_api.backup.local_secret import LocalBackupSecret
|
||||
|
||||
from tests.test_graphql.test_services import (
|
||||
# TODO: shuffle them to conftest
|
||||
only_dummy_service_and_api,
|
||||
only_dummy_service,
|
||||
dkim_file,
|
||||
|
@ -43,6 +44,24 @@ mutation TestForcedAutobackup {
|
|||
success
|
||||
message
|
||||
code
|
||||
job{
|
||||
uid
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
API_TOTAL_RESTORE = """
|
||||
mutation TestTotalRestore {
|
||||
backup {
|
||||
fullRestore{
|
||||
success
|
||||
message
|
||||
code
|
||||
job{
|
||||
uid
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
|
|
@ -6,6 +6,13 @@ import pytest
|
|||
from selfprivacy_api.jobs import JobStatus, Jobs
|
||||
from tests.test_graphql.common import assert_empty, assert_ok, get_data
|
||||
|
||||
from tests.conftest import (
|
||||
API_REBUILD_SYSTEM_UNIT,
|
||||
API_UPGRADE_SYSTEM_UNIT,
|
||||
assert_rebuild_or_upgrade_was_made,
|
||||
prepare_nixos_rebuild_calls,
|
||||
)
|
||||
|
||||
|
||||
class ProcessMock:
|
||||
"""Mock subprocess.Popen"""
|
||||
|
@ -97,39 +104,17 @@ def test_graphql_system_rebuild_unauthorized(client, fp, action):
|
|||
assert fp.call_count([fp.any()]) == 0
|
||||
|
||||
|
||||
def prepare_nixos_rebuild_calls(fp, unit_name):
|
||||
# Start the unit
|
||||
fp.register(["systemctl", "start", unit_name])
|
||||
|
||||
# Wait for it to start
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
|
||||
# Check its exectution
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"],
|
||||
stdout="Starting rebuild...",
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=active")
|
||||
fp.register(
|
||||
["journalctl", "-u", unit_name, "-n", "1", "-o", "cat"], stdout="Rebuilding..."
|
||||
)
|
||||
|
||||
fp.register(["systemctl", "show", unit_name], stdout="ActiveState=inactive")
|
||||
|
||||
|
||||
@pytest.mark.parametrize("action", ["rebuild", "upgrade"])
|
||||
def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_intervals):
|
||||
"""Test system rebuild"""
|
||||
unit_name = f"sp-nixos-{action}.service"
|
||||
query = (
|
||||
API_REBUILD_SYSTEM_MUTATION
|
||||
if action == "rebuild"
|
||||
else API_UPGRADE_SYSTEM_MUTATION
|
||||
)
|
||||
unit_name = (
|
||||
API_REBUILD_SYSTEM_UNIT if action == "rebuild" else API_UPGRADE_SYSTEM_UNIT
|
||||
)
|
||||
|
||||
prepare_nixos_rebuild_calls(fp, unit_name)
|
||||
|
||||
|
@ -142,8 +127,7 @@ def test_graphql_system_rebuild(authorized_client, fp, action, mock_sleep_interv
|
|||
data = get_data(response)["system"][f"runSystem{action.capitalize()}"]
|
||||
assert_ok(data)
|
||||
|
||||
assert fp.call_count(["systemctl", "start", unit_name]) == 1
|
||||
assert fp.call_count(["systemctl", "show", unit_name]) == 6
|
||||
assert_rebuild_or_upgrade_was_made(fp, unit_name)
|
||||
|
||||
job_id = response.json()["data"]["system"][f"runSystem{action.capitalize()}"][
|
||||
"job"
|
||||
|
|
Loading…
Reference in a new issue