refactor(services): migrate service management to a special service

This commit is contained in:
Houkime 2024-07-24 15:15:31 +00:00
parent 2ef674a037
commit d4998ded46
17 changed files with 268 additions and 69 deletions

View file

@ -1,7 +1,7 @@
from selfprivacy_api.utils.block_devices import BlockDevices from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.jobs import Jobs, Job from selfprivacy_api.jobs import Jobs, Job
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
from selfprivacy_api.services.tasks import move_service as move_service_task from selfprivacy_api.services.tasks import move_service as move_service_task
@ -14,7 +14,7 @@ class VolumeNotFoundError(Exception):
def move_service(service_id: str, volume_name: str) -> Job: def move_service(service_id: str, volume_name: str) -> Job:
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
raise ServiceNotFoundError(f"No such service:{service_id}") raise ServiceNotFoundError(f"No such service:{service_id}")

View file

@ -18,7 +18,7 @@ from selfprivacy_api.backup.util import output_yielder, sync
from selfprivacy_api.backup.backuppers import AbstractBackupper from selfprivacy_api.backup.backuppers import AbstractBackupper
from selfprivacy_api.models.backup.snapshot import Snapshot from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup.jobs import get_backup_job from selfprivacy_api.backup.jobs import get_backup_job
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
from selfprivacy_api.jobs import Jobs, JobStatus, Job from selfprivacy_api.jobs import Jobs, JobStatus, Job
from selfprivacy_api.backup.local_secret import LocalBackupSecret from selfprivacy_api.backup.local_secret import LocalBackupSecret
@ -191,7 +191,7 @@ class ResticBackupper(AbstractBackupper):
@staticmethod @staticmethod
def _get_backup_job(service_name: str) -> Optional[Job]: def _get_backup_job(service_name: str) -> Optional[Job]:
service = get_service_by_id(service_name) service = ServiceManager.get_service_by_id(service_name)
if service is None: if service is None:
raise ValueError("No service with id ", service_name) raise ValueError("No service with id ", service_name)

View file

@ -3,7 +3,7 @@ from typing import Optional, List
from selfprivacy_api.models.backup.snapshot import Snapshot from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.jobs import Jobs, Job, JobStatus from selfprivacy_api.jobs import Jobs, Job, JobStatus
from selfprivacy_api.services.service import Service from selfprivacy_api.services.service import Service
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
def job_type_prefix(service: Service) -> str: def job_type_prefix(service: Service) -> str:
@ -67,16 +67,37 @@ def add_backup_job(service: Service) -> Job:
return job return job
def complain_about_service_operation_running(service: Service) -> str:
message = (
f"Cannot start a restore of {service.get_id()}, another operation is running: "
+ get_jobs_by_service(service)[0].type_id
)
raise ValueError(message)
def add_total_restore_job() -> Job:
for service in ServiceManager.get_all_services():
if (
not isinstance(service, ServiceManager)
and is_something_running_for(service) is True
):
complain_about_service_operation_running(service)
display_name = service.get_display_name()
job = Jobs.add(
type_id="backups.total_restore",
name=f"Restore {display_name}",
description="restoring all the services",
)
return job
def add_restore_job(snapshot: Snapshot) -> Job: def add_restore_job(snapshot: Snapshot) -> Job:
service = get_service_by_id(snapshot.service_name) service = ServiceManager.get_service_by_id(snapshot.service_name)
if service is None: if service is None:
raise ValueError(f"no such service: {snapshot.service_name}") raise ValueError(f"no such service: {snapshot.service_name}")
if is_something_running_for(service): if is_something_running_for(service):
message = ( complain_about_service_operation_running(service)
f"Cannot start a restore of {service.get_id()}, another operation is running: "
+ get_jobs_by_service(service)[0].type_id
)
raise ValueError(message)
display_name = service.get_display_name() display_name = service.get_display_name()
job = Jobs.add( job = Jobs.add(
type_id=restore_job_type(service), type_id=restore_job_type(service),

View file

@ -13,7 +13,7 @@ from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.utils.huey import huey from selfprivacy_api.utils.huey import huey
from huey import crontab from huey import crontab
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
from selfprivacy_api.backup import Backups from selfprivacy_api.backup import Backups
from selfprivacy_api.backup.jobs import add_autobackup_job from selfprivacy_api.backup.jobs import add_autobackup_job
from selfprivacy_api.jobs import Jobs, JobStatus, Job from selfprivacy_api.jobs import Jobs, JobStatus, Job
@ -38,7 +38,7 @@ def start_backup(service_id: str, reason: BackupReason = BackupReason.EXPLICIT)
""" """
The worker task that starts the backup process. The worker task that starts the backup process.
""" """
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
raise ValueError(f"No such service: {service_id}") raise ValueError(f"No such service: {service_id}")
Backups.back_up(service, reason) Backups.back_up(service, reason)

View file

@ -6,7 +6,7 @@ import strawberry
from selfprivacy_api.graphql.common_types.backup import BackupReason from selfprivacy_api.graphql.common_types.backup import BackupReason
from selfprivacy_api.graphql.common_types.dns import DnsRecord from selfprivacy_api.graphql.common_types.dns import DnsRecord
from selfprivacy_api.services import get_service_by_id, get_services_by_location from selfprivacy_api.services import ServiceManager
from selfprivacy_api.services import Service as ServiceInterface from selfprivacy_api.services import Service as ServiceInterface
from selfprivacy_api.services import ServiceDnsRecord from selfprivacy_api.services import ServiceDnsRecord
@ -23,7 +23,7 @@ def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]:
used_space=str(service.get_storage_usage()), used_space=str(service.get_storage_usage()),
volume=get_volume_by_id(service.get_drive()), volume=get_volume_by_id(service.get_drive()),
) )
for service in get_services_by_location(root.name) for service in ServiceManager.get_services_by_location(root.name)
] ]
@ -73,7 +73,7 @@ class ServiceStatusEnum(Enum):
def get_storage_usage(root: "Service") -> ServiceStorageUsage: def get_storage_usage(root: "Service") -> ServiceStorageUsage:
"""Get storage usage for a service""" """Get storage usage for a service"""
service = get_service_by_id(root.id) service = ServiceManager.get_service_by_id(root.id)
if service is None: if service is None:
return ServiceStorageUsage( return ServiceStorageUsage(
service=service, service=service,
@ -183,7 +183,7 @@ class Service:
@strawberry.field @strawberry.field
def dns_records(self) -> Optional[List[DnsRecord]]: def dns_records(self) -> Optional[List[DnsRecord]]:
service = get_service_by_id(self.id) service = ServiceManager.get_service_by_id(self.id)
if service is None: if service is None:
raise LookupError(f"no service {self.id}. Should be unreachable") raise LookupError(f"no service {self.id}. Should be unreachable")

View file

@ -19,13 +19,17 @@ from selfprivacy_api.graphql.common_types.backup import (
) )
from selfprivacy_api.backup import Backups from selfprivacy_api.backup import Backups
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
from selfprivacy_api.backup.tasks import ( from selfprivacy_api.backup.tasks import (
start_backup, start_backup,
restore_snapshot, restore_snapshot,
prune_autobackup_snapshots, prune_autobackup_snapshots,
) )
from selfprivacy_api.backup.jobs import add_backup_job, add_restore_job from selfprivacy_api.backup.jobs import (
add_backup_job,
add_restore_job,
add_total_restore_job,
)
from selfprivacy_api.backup.local_secret import LocalBackupSecret from selfprivacy_api.backup.local_secret import LocalBackupSecret
@ -42,7 +46,7 @@ class InitializeRepositoryInput:
login: str login: str
password: str password: str
# For migration. If set, no new secret is generated # For migration. If set, no new secret is generated
local_secret: typing.Optional[str] local_secret: typing.Optional[str] = None
@strawberry.type @strawberry.type
@ -146,7 +150,7 @@ class BackupMutations:
def start_backup(self, service_id: str) -> GenericJobMutationReturn: def start_backup(self, service_id: str) -> GenericJobMutationReturn:
"""Start backup""" """Start backup"""
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
return GenericJobMutationReturn( return GenericJobMutationReturn(
success=False, success=False,
@ -166,12 +170,20 @@ class BackupMutations:
) )
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def restore_all(self): def restore_all(self) -> GenericJobMutationReturn:
""" """
Restore all restorable and enabled services according to last autobackup snapshots Restore all restorable and enabled services according to last autobackup snapshots
This happens in sync with partial merging of old configuration for compatibility This happens in sync with partial merging of old configuration for compatibility
""" """
pass
job = add_total_restore_job()
return GenericJobMutationReturn(
success=True,
code=200,
message="restore job created",
job=job_to_api_job(job),
)
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def restore_backup( def restore_backup(
@ -189,7 +201,7 @@ class BackupMutations:
job=None, job=None,
) )
service = get_service_by_id(snap.service_name) service = ServiceManager.get_service_by_id(snap.service_name)
if service is None: if service is None:
return GenericJobMutationReturn( return GenericJobMutationReturn(
success=False, success=False,

View file

@ -26,7 +26,7 @@ from selfprivacy_api.actions.services import (
VolumeNotFoundError, VolumeNotFoundError,
) )
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
@strawberry.type @strawberry.type
@ -104,7 +104,7 @@ class ServicesMutations:
def enable_service(self, service_id: str) -> ServiceMutationReturn: def enable_service(self, service_id: str) -> ServiceMutationReturn:
"""Enable service.""" """Enable service."""
try: try:
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
@ -130,7 +130,7 @@ class ServicesMutations:
def disable_service(self, service_id: str) -> ServiceMutationReturn: def disable_service(self, service_id: str) -> ServiceMutationReturn:
"""Disable service.""" """Disable service."""
try: try:
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
@ -154,7 +154,7 @@ class ServicesMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def stop_service(self, service_id: str) -> ServiceMutationReturn: def stop_service(self, service_id: str) -> ServiceMutationReturn:
"""Stop service.""" """Stop service."""
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
@ -172,7 +172,7 @@ class ServicesMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def start_service(self, service_id: str) -> ServiceMutationReturn: def start_service(self, service_id: str) -> ServiceMutationReturn:
"""Start service.""" """Start service."""
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
@ -190,7 +190,7 @@ class ServicesMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def restart_service(self, service_id: str) -> ServiceMutationReturn: def restart_service(self, service_id: str) -> ServiceMutationReturn:
"""Restart service.""" """Restart service."""
service = get_service_by_id(service_id) service = ServiceManager.get_service_by_id(service_id)
if service is None: if service is None:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
@ -244,7 +244,7 @@ class ServicesMutations:
def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn: def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn:
"""Move service.""" """Move service."""
# We need a service instance for a reply later # We need a service instance for a reply later
service = get_service_by_id(input.service_id) service = ServiceManager.get_service_by_id(input.service_id)
if service is None: if service is None:
return ServiceJobMutationReturn( return ServiceJobMutationReturn(
success=False, success=False,

View file

@ -15,7 +15,7 @@ from selfprivacy_api.graphql.common_types.service import (
service_to_graphql_service, service_to_graphql_service,
) )
from selfprivacy_api.graphql.common_types.backup import AutobackupQuotas from selfprivacy_api.graphql.common_types.backup import AutobackupQuotas
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
@strawberry.type @strawberry.type
@ -76,7 +76,7 @@ class Backup:
snapshots = Backups.get_all_snapshots() snapshots = Backups.get_all_snapshots()
for snap in snapshots: for snap in snapshots:
api_service = None api_service = None
service = get_service_by_id(snap.service_name) service = ServiceManager.get_service_by_id(snap.service_name)
if service is None: if service is None:
api_service = tombstone_service(snap.service_name) api_service = tombstone_service(snap.service_name)

View file

@ -11,7 +11,7 @@ from selfprivacy_api.graphql.queries.common import Alert, Severity
from selfprivacy_api.graphql.queries.providers import DnsProvider, ServerProvider from selfprivacy_api.graphql.queries.providers import DnsProvider, ServerProvider
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Jobs
from selfprivacy_api.jobs.migrate_to_binds import is_bind_migrated from selfprivacy_api.jobs.migrate_to_binds import is_bind_migrated
from selfprivacy_api.services import get_all_required_dns_records from selfprivacy_api.services import ServiceManager
from selfprivacy_api.utils import ReadUserData from selfprivacy_api.utils import ReadUserData
import selfprivacy_api.actions.system as system_actions import selfprivacy_api.actions.system as system_actions
import selfprivacy_api.actions.ssh as ssh_actions import selfprivacy_api.actions.ssh as ssh_actions
@ -37,7 +37,7 @@ class SystemDomainInfo:
priority=record.priority, priority=record.priority,
display_name=record.display_name, display_name=record.display_name,
) )
for record in get_all_required_dns_records() for record in ServiceManager.get_all_required_dns_records()
] ]

View file

@ -1,6 +1,11 @@
"""Services module.""" """Services module."""
import base64
import typing import typing
from typing import List
from os import path, mkdir
from pathlib import Path
from selfprivacy_api.services.bitwarden import Bitwarden from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.forgejo import Forgejo from selfprivacy_api.services.forgejo import Forgejo
from selfprivacy_api.services.jitsimeet import JitsiMeet from selfprivacy_api.services.jitsimeet import JitsiMeet
@ -10,23 +15,22 @@ from selfprivacy_api.services.mailserver import MailServer
from selfprivacy_api.services.nextcloud import Nextcloud from selfprivacy_api.services.nextcloud import Nextcloud
from selfprivacy_api.services.pleroma import Pleroma from selfprivacy_api.services.pleroma import Pleroma
from selfprivacy_api.services.ocserv import Ocserv from selfprivacy_api.services.ocserv import Ocserv
from selfprivacy_api.services.service import Service, ServiceDnsRecord from selfprivacy_api.services.service import Service, ServiceDnsRecord
from selfprivacy_api.services.service import ServiceStatus
import selfprivacy_api.utils.network as network_utils import selfprivacy_api.utils.network as network_utils
services: list[Service] = [ from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
Bitwarden(), from selfprivacy_api.utils import USERDATA_FILE, DKIM_DIR, SECRETS_FILE
Forgejo(), from selfprivacy_api.utils.block_devices import BlockDevices
MailServer(), from shutil import copyfile, copytree, rmtree
Nextcloud(),
Pleroma(), CONFIG_STASH_DIR = "/tmp/selfprivacy_config_dump"
Ocserv(),
JitsiMeet(),
Roundcube(),
Prometheus(),
]
class ServiceManager(Service): class ServiceManager(Service):
folders: List[str] = [CONFIG_STASH_DIR]
@staticmethod @staticmethod
def get_all_services() -> list[Service]: def get_all_services() -> list[Service]:
return services return services
@ -42,6 +46,7 @@ class ServiceManager(Service):
def get_enabled_services() -> list[Service]: def get_enabled_services() -> list[Service]:
return [service for service in services if service.is_enabled()] return [service for service in services if service.is_enabled()]
# This one is not currently used by any code.
@staticmethod @staticmethod
def get_disabled_services() -> list[Service]: def get_disabled_services() -> list[Service]:
return [service for service in services if not service.is_enabled()] return [service for service in services if not service.is_enabled()]
@ -74,6 +79,157 @@ class ServiceManager(Service):
display_name="SelfPrivacy API (IPv6)", display_name="SelfPrivacy API (IPv6)",
) )
) )
for service in get_enabled_services(): for service in ServiceManager.get_enabled_services():
dns_records += service.get_dns_records(ip4, ip6) dns_records += service.get_dns_records(ip4, ip6)
return dns_records return dns_records
@staticmethod
def get_id() -> str:
"""Return service id."""
return "api"
@staticmethod
def get_display_name() -> str:
"""Return service display name."""
return "Selfprivacy API"
@staticmethod
def get_description() -> str:
"""Return service description."""
return "A proto-service for API itself. Currently manages backups of settings."
@staticmethod
def get_svg_icon() -> str:
"""Read SVG icon from file and return it as base64 encoded string."""
# return ""
return base64.b64encode(BITWARDEN_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
"""Return service url."""
# TODO : placeholder, get actual domain here
return f"https://domain"
@staticmethod
def get_subdomain() -> typing.Optional[str]:
return None
@classmethod
def is_movable(cls) -> bool:
return False
@staticmethod
def is_required() -> bool:
return True
@staticmethod
def is_enabled() -> bool:
return True
@staticmethod
def get_backup_description() -> str:
return "How did we get here?"
@classmethod
def status_file(cls) -> str:
dir = cls.folders[0]
# We do not want to store our state in our declared folders
# Because they are moved and tossed in tests wildly
parent = Path(dir).parent
return path.join(parent, "service_status")
@classmethod
def set_status(cls, status: ServiceStatus):
pass
@classmethod
def get_status(cls) -> ServiceStatus:
return ServiceStatus.ACTIVE
@classmethod
def can_be_backed_up(cls) -> bool:
"""`True` if the service can be backed up."""
return True
@staticmethod
def merge_settings(restored_settings_folder: str):
# For now we will just copy settings EXCEPT the locations of services
# Stash locations as they are set by user right now
locations = {}
for service in services:
locations[service.get_id()] = service.get_drive()
# Copy files
userdata_name = path.basename(USERDATA_FILE)
secretfile_name = path.basename(SECRETS_FILE)
dkim_dirname = path.basename(DKIM_DIR)
copyfile(path.join(restored_settings_folder, userdata_name), USERDATA_FILE)
copyfile(path.join(restored_settings_folder, secretfile_name), SECRETS_FILE)
copytree(path.join(restored_settings_folder, dkim_dirname), DKIM_DIR)
# Pop locations
for service in services:
device = BlockDevices().get_block_device(locations[service.get_id()])
if device is not None:
service.set_location(device.name)
@classmethod
def stop(cls):
# simulate a failing service unable to stop
if not cls.get_status() == ServiceStatus.FAILED:
cls.set_status(ServiceStatus.DEACTIVATING)
cls.change_status_with_async_delay(
ServiceStatus.INACTIVE, cls.startstop_delay
)
@classmethod
def start(cls):
pass
@classmethod
def restart(cls):
pass
@staticmethod
def get_logs():
return ""
@classmethod
def get_drive(cls) -> str:
return BlockDevices().get_root_block_device().name
@classmethod
def get_folders(cls) -> List[str]:
return cls.folders
@classmethod
def pre_backup(cls):
tempdir = cls.folders[0]
rmtree(tempdir, ignore_errors=True)
mkdir(tempdir)
copyfile(USERDATA_FILE, tempdir)
copyfile(SECRETS_FILE, tempdir)
copytree(DKIM_DIR, tempdir)
@classmethod
def post_restore(cls):
tempdir = cls.folders[0]
cls.merge_settings(tempdir)
rmtree(tempdir, ignore_errors=True)
services: list[Service] = [
Bitwarden(),
Forgejo(),
MailServer(),
Nextcloud(),
Pleroma(),
Ocserv(),
JitsiMeet(),
Roundcube(),
ServiceManager(),
Prometheus(),
]

View file

@ -5,7 +5,7 @@ import base64
import typing import typing
from typing import List from typing import List
from os import path,mkdir from os import path, mkdir
from pathlib import Path from pathlib import Path
# from enum import Enum # from enum import Enum
@ -27,16 +27,14 @@ from selfprivacy_api.services.pleroma import Pleroma
from selfprivacy_api.services.ocserv import Ocserv from selfprivacy_api.services.ocserv import Ocserv
CONFIG_STASH_DIR = "/tmp/selfprivacy_config_dump"
DEFAULT_DELAY = 0
CONFIG_STASH_DIR = "/tmp/selfprivacy_config_dump"
# it is too intimately tied to Services # it is too intimately tied to Services
# that's why it is so awkward. # that's why it is so awkward.
# service list is below # service list is below
class ConfigService(Service): class ConfigService(Service):
"""A fake service to store our configs""" """A fake service to store our configs"""
@ -130,8 +128,6 @@ class ConfigService(Service):
if device is not None: if device is not None:
service.set_location(device.name) service.set_location(device.name)
@classmethod @classmethod
def stop(cls): def stop(cls):
# simulate a failing service unable to stop # simulate a failing service unable to stop
@ -164,10 +160,9 @@ class ConfigService(Service):
@classmethod @classmethod
def pre_backup(cls): def pre_backup(cls):
tempdir = cls.folders[0] tempdir = cls.folders[0]
rmtree(tempdir,ignore_errors=True) rmtree(tempdir, ignore_errors=True)
mkdir(tempdir) mkdir(tempdir)
copyfile(USERDATA_FILE, tempdir) copyfile(USERDATA_FILE, tempdir)
copyfile(SECRETS_FILE, tempdir) copyfile(SECRETS_FILE, tempdir)
copytree(DKIM_DIR, tempdir) copytree(DKIM_DIR, tempdir)
@ -176,7 +171,8 @@ class ConfigService(Service):
def post_restore(cls): def post_restore(cls):
tempdir = cls.folders[0] tempdir = cls.folders[0]
cls.merge_settings(tempdir) cls.merge_settings(tempdir)
rmtree(tempdir,ignore_errors=True) rmtree(tempdir, ignore_errors=True)
# It is here because our thing needs to include itself # It is here because our thing needs to include itself
services: list[Service] = [ services: list[Service] = [

View file

@ -16,7 +16,7 @@ from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.utils.huey import huey from selfprivacy_api.utils.huey import huey
import selfprivacy_api.services as services import selfprivacy_api.services as services
from selfprivacy_api.services import get_service_by_id, Service from selfprivacy_api.services import Service, ServiceManager
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
@ -213,7 +213,7 @@ def dummy_service(
huey.immediate = True huey.immediate = True
assert huey.immediate is True assert huey.immediate is True
assert get_service_by_id(service.get_id()) is not None assert ServiceManager.get_service_by_id(service.get_id()) is not None
service.enable() service.enable()
yield service yield service

View file

@ -173,7 +173,7 @@ def test_services_to_autobackup(backups, dummy_service):
Backups.set_autobackup_period_minutes(backup_period) Backups.set_autobackup_period_minutes(backup_period)
services = Backups.services_to_back_up(now) services = Backups.services_to_back_up(now)
assert len(services) == len(backuppable_services()) assert set(services) == set(backuppable_services())
assert dummy_service.get_id() in [ assert dummy_service.get_id() in [
service.get_id() for service in backuppable_services() service.get_id() for service in backuppable_services()
] ]
@ -207,6 +207,10 @@ def test_failed_autoback_prevents_more_autobackup(backups, dummy_service):
assert Backups.is_time_to_backup_service(dummy_service, now) is False assert Backups.is_time_to_backup_service(dummy_service, now) is False
def test_induced_autobackup(backups, dummy_service):
pass
# --------------------- Quotas and Pruning ------------------------- # --------------------- Quotas and Pruning -------------------------

View file

@ -13,6 +13,7 @@ from selfprivacy_api.utils.huey import huey
from selfprivacy_api.services.service import ServiceStatus from selfprivacy_api.services.service import ServiceStatus
from selfprivacy_api.services import ServiceManager
from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum from selfprivacy_api.graphql.queries.providers import BackupProvider as ProviderEnum
from selfprivacy_api.graphql.common_types.backup import ( from selfprivacy_api.graphql.common_types.backup import (
@ -792,3 +793,12 @@ def test_cache_invalidaton_task(backups, dummy_service):
reload_snapshot_cache() reload_snapshot_cache()
assert len(Storage.get_cached_snapshots()) == 1 assert len(Storage.get_cached_snapshots()) == 1
# def test_service_manager_backs_up_without_crashing(backups):
# """
# Service manager is special and needs testing.
# """
# snapshot = Backups.back_up(ServiceManager.get_service_by_id("api"))
# Backups.restore_snapshot(snapshot)

View file

@ -4,7 +4,7 @@ from tests.common import generate_backup_query
import selfprivacy_api.services as all_services import selfprivacy_api.services as all_services
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
from selfprivacy_api.graphql.common_types.service import service_to_graphql_service from selfprivacy_api.graphql.common_types.service import service_to_graphql_service
from selfprivacy_api.graphql.common_types.backup import ( from selfprivacy_api.graphql.common_types.backup import (
_AutobackupQuotas, _AutobackupQuotas,
@ -315,7 +315,7 @@ def test_snapshots_orphaned_service(authorized_client, dummy_service, backups):
assert len(snaps) == 1 assert len(snaps) == 1
all_services.services.remove(dummy_service) all_services.services.remove(dummy_service)
assert get_service_by_id(dummy_service.get_id()) is None assert ServiceManager.get_service_by_id(dummy_service.get_id()) is None
snaps = api_snapshots(authorized_client) snaps = api_snapshots(authorized_client)
assert len(snaps) == 1 assert len(snaps) == 1

View file

@ -2,12 +2,12 @@ import pytest
import shutil import shutil
from typing import Generator from typing import Generator
from os import mkdir, rmdir from os import mkdir
from selfprivacy_api.utils.block_devices import BlockDevices from selfprivacy_api.utils.block_devices import BlockDevices
import selfprivacy_api.services as service_module import selfprivacy_api.services as service_module
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import ServiceManager
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
@ -716,7 +716,7 @@ def test_graphql_move_service(
def test_mailservice_cannot_enable_disable(authorized_client): def test_mailservice_cannot_enable_disable(authorized_client):
mailservice = get_service_by_id("simple-nixos-mailserver") mailservice = ServiceManager.get_service_by_id("simple-nixos-mailserver")
mutation_response = api_enable(authorized_client, mailservice) mutation_response = api_enable(authorized_client, mailservice)
data = get_data(mutation_response)["services"]["enableService"] data = get_data(mutation_response)["services"]["enableService"]

View file

@ -17,7 +17,7 @@ from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
from selfprivacy_api.services import get_enabled_services from selfprivacy_api.services import ServiceManager
from tests.test_dkim import dkim_file, no_dkim_file from tests.test_dkim import dkim_file, no_dkim_file
@ -164,4 +164,4 @@ def test_mailserver_with_no_dkim_returns_no_dns(no_dkim_file):
def test_services_enabled_by_default(generic_userdata): def test_services_enabled_by_default(generic_userdata):
assert set(get_enabled_services()) == set(services_module.services) assert set(ServiceManager.get_enabled_services()) == set(services_module.services)