"""Abstract class for a service running on a server""" from abc import ABC, abstractmethod from typing import Any, List, Optional from selfprivacy_api import utils from selfprivacy_api.utils import ReadUserData, WriteUserData from selfprivacy_api.utils.waitloop import wait_until_true from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress from selfprivacy_api.jobs.upgrade_system import rebuild_system from selfprivacy_api.models.services import ServiceStatus, ServiceDnsRecord from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.owned_path import OwnedPath, Bind from selfprivacy_api.services.moving import ( check_binds, check_volume, unbind_folders, bind_folders, ensure_folder_ownership, MoveError, move_data_to_volume, ) DEFAULT_START_STOP_TIMEOUT = 5 * 60 class Service(ABC): """ Service here is some software that is hosted on the server and can be installed, configured and used by a user. """ @staticmethod @abstractmethod def get_id() -> str: """ The unique id of the service. """ pass @staticmethod @abstractmethod def get_display_name() -> str: """ The name of the service that is shown to the user. """ pass @staticmethod @abstractmethod def get_description() -> str: """ The description of the service that is shown to the user. """ pass @staticmethod @abstractmethod def get_svg_icon() -> str: """ The monochrome svg icon of the service. """ pass @classmethod @abstractmethod def get_url(cls) -> Optional[str]: """ The url of the service if it is accessible from the internet browser. """ pass @classmethod @abstractmethod def get_subdomain(cls) -> Optional[str]: """ The assigned primary subdomain for this service. """ pass @classmethod def get_user(cls) -> Optional[str]: """ The user that owns the service's files. Defaults to the service's id. """ return cls.get_id() @classmethod def get_group(cls) -> Optional[str]: """ The group that owns the service's files. Defaults to the service's user. """ return cls.get_user() @staticmethod @abstractmethod def is_movable() -> bool: """`True` if the service can be moved to the non-system volume.""" pass @staticmethod @abstractmethod def is_required() -> bool: """`True` if the service is required for the server to function.""" pass @staticmethod def can_be_backed_up() -> bool: """`True` if the service can be backed up.""" return True @staticmethod @abstractmethod def get_backup_description() -> str: """ The text shown to the user that exlplains what data will be backed up. """ pass @classmethod def is_enabled(cls) -> bool: """ `True` if the service is enabled. `False` if it is not enabled or not defined in file If there is nothing in the file, this is equivalent to False because NixOS won't enable it then. """ name = cls.get_id() with ReadUserData() as user_data: return user_data.get("modules", {}).get(name, {}).get("enable", False) @staticmethod @abstractmethod def get_status() -> ServiceStatus: """The status of the service, reported by systemd.""" pass @classmethod def _set_enable(cls, enable: bool): name = cls.get_id() with WriteUserData() as user_data: if "modules" not in user_data: user_data["modules"] = {} if name not in user_data["modules"]: user_data["modules"][name] = {} user_data["modules"][name]["enable"] = enable @classmethod def enable(cls): """Enable the service. Usually this means enabling systemd unit.""" cls._set_enable(True) @classmethod def disable(cls): """Disable the service. Usually this means disabling systemd unit.""" cls._set_enable(False) @staticmethod @abstractmethod def stop(): """Stop the service. Usually this means stopping systemd unit.""" pass @staticmethod @abstractmethod def start(): """Start the service. Usually this means starting systemd unit.""" pass @staticmethod @abstractmethod def restart(): """Restart the service. Usually this means restarting systemd unit.""" pass @classmethod @abstractmethod def get_configuration(cls) -> dict: pass @classmethod @abstractmethod def set_configuration(cls, config_items): pass @staticmethod @abstractmethod def get_logs(): pass @classmethod def get_storage_usage(cls) -> int: """ Calculate the real storage usage of folders occupied by service Calculate using pathlib. Do not follow symlinks. """ storage_used = 0 for folder in cls.get_folders(): storage_used += get_storage_usage(folder) return storage_used @classmethod def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]: subdomain = cls.get_subdomain() display_name = cls.get_display_name() if subdomain is None: return [] dns_records = [ ServiceDnsRecord( type="A", name=subdomain, content=ip4, ttl=3600, display_name=display_name, ) ] if ip6 is not None: dns_records.append( ServiceDnsRecord( type="AAAA", name=subdomain, content=ip6, ttl=3600, display_name=f"{display_name} (IPv6)", ) ) return dns_records @classmethod def get_drive(cls) -> str: """ Get the name of the drive/volume where the service is located. Example values are `sda1`, `vda`, `sdb`. """ root_device: str = BlockDevices().get_root_block_device().name if not cls.is_movable(): return root_device with utils.ReadUserData() as userdata: if userdata.get("useBinds", False): return ( userdata.get("modules", {}) .get(cls.get_id(), {}) .get( "location", root_device, ) ) else: return root_device @classmethod def get_folders(cls) -> List[str]: """ get a plain list of occupied directories Default extracts info from overriden get_owned_folders() """ if cls.get_owned_folders == Service.get_owned_folders: raise NotImplementedError( "you need to implement at least one of get_folders() or get_owned_folders()" ) return [owned_folder.path for owned_folder in cls.get_owned_folders()] @classmethod def get_owned_folders(cls) -> List[OwnedPath]: """ Get a list of occupied directories with ownership info Default extracts info from overriden get_folders() """ if cls.get_folders == Service.get_folders: raise NotImplementedError( "you need to implement at least one of get_folders() or get_owned_folders()" ) return [cls.owned_path(path) for path in cls.get_folders()] @staticmethod def get_foldername(path: str) -> str: return path.split("/")[-1] # TODO: with better json utils, it can be one line, and not a separate function @classmethod def set_location(cls, volume: BlockDevice): """ Only changes userdata """ service_id = cls.get_id() with WriteUserData() as user_data: if "modules" not in user_data: user_data["modules"] = {} if service_id not in user_data["modules"]: user_data["modules"][service_id] = {} user_data["modules"][service_id]["location"] = volume.name def binds(self) -> List[Bind]: owned_folders = self.get_owned_folders() return [ Bind.from_owned_path(folder, self.get_drive()) for folder in owned_folders ] def assert_can_move(self, new_volume): """ Checks if the service can be moved to new volume Raises errors if it cannot """ service_name = self.get_display_name() if not self.is_movable(): raise MoveError(f"{service_name} is not movable") with ReadUserData() as user_data: if not user_data.get("useBinds", False): raise MoveError("Server is not using binds.") current_volume_name = self.get_drive() if current_volume_name == new_volume.name: raise MoveError(f"{service_name} is already on volume {new_volume}") check_volume(new_volume, space_needed=self.get_storage_usage()) binds = self.binds() if binds == []: raise MoveError("nothing to move") check_binds(current_volume_name, binds) def do_move_to_volume( self, new_volume: BlockDevice, job: Job, ): """ Move a service to another volume. Note: It may be much simpler to write it per bind, but a bit less safe? """ service_name = self.get_display_name() binds = self.binds() report_progress(10, job, "Unmounting folders from old volume...") unbind_folders(binds) report_progress(20, job, "Moving data to new volume...") binds = move_data_to_volume(binds, new_volume, job) report_progress(70, job, f"Making sure {service_name} owns its files...") try: ensure_folder_ownership(binds) except Exception as error: # We have logged it via print and we additionally log it here in the error field # We are continuing anyway but Job has no warning field Jobs.update( job, JobStatus.RUNNING, error=f"Service {service_name} will not be able to write files: " + str(error), ) report_progress(90, job, f"Mounting {service_name} data...") bind_folders(binds) report_progress(95, job, f"Finishing moving {service_name}...") self.set_location(new_volume) def move_to_volume(self, volume: BlockDevice, job: Job) -> Job: service_name = self.get_display_name() report_progress(0, job, "Performing pre-move checks...") self.assert_can_move(volume) report_progress(5, job, f"Stopping {service_name}...") assert self is not None with StoppedService(self): report_progress(9, job, "Stopped service, starting the move...") self.do_move_to_volume(volume, job) report_progress(98, job, "Move complete, rebuilding...") rebuild_system(job, upgrade=False) Jobs.update( job=job, status=JobStatus.FINISHED, result=f"{service_name} moved successfully.", status_text=f"Starting {service_name}...", progress=100, ) return job @classmethod def owned_path(cls, path: str): """Default folder ownership""" service_name = cls.get_display_name() try: owner = cls.get_user() if owner is None: # TODO: assume root? # (if we do not want to do assumptions, maybe not declare user optional?) raise LookupError(f"no user for service: {service_name}") group = cls.get_group() if group is None: raise LookupError(f"no group for service: {service_name}") except Exception as error: raise LookupError( f"when deciding a bind for folder {path} of service {service_name}, error: {str(error)}" ) return OwnedPath( path=path, owner=owner, group=group, ) def pre_backup(self): pass def post_restore(self): pass class StoppedService: """ A context manager that stops the service if needed and reactivates it after you are done if it was active Example: ``` assert service.get_status() == ServiceStatus.ACTIVE with StoppedService(service) [as stopped_service]: assert service.get_status() == ServiceStatus.INACTIVE ``` """ def __init__(self, service: Service): self.service = service self.original_status = service.get_status() def __enter__(self) -> Service: self.original_status = self.service.get_status() if self.original_status not in [ServiceStatus.INACTIVE, ServiceStatus.FAILED]: try: self.service.stop() wait_until_true( lambda: self.service.get_status() == ServiceStatus.INACTIVE, timeout_sec=DEFAULT_START_STOP_TIMEOUT, ) except TimeoutError as error: raise TimeoutError( f"timed out waiting for {self.service.get_display_name()} to stop" ) from error return self.service def __exit__(self, type, value, traceback): if self.original_status in [ServiceStatus.ACTIVATING, ServiceStatus.ACTIVE]: try: self.service.start() wait_until_true( lambda: self.service.get_status() == ServiceStatus.ACTIVE, timeout_sec=DEFAULT_START_STOP_TIMEOUT, ) except TimeoutError as error: raise TimeoutError( f"timed out waiting for {self.service.get_display_name()} to start" ) from error