selfprivacy-rest-api/selfprivacy_api/services/templated_service.py
2024-12-23 20:58:00 +03:00

533 lines
19 KiB
Python

"""A Service implementation that loads all needed data from a JSON file"""
import base64
from enum import Enum
import logging
import json
import subprocess
from typing import List, Optional
from os.path import join, exists
from os import mkdir, rmdir
from pydantic import BaseModel, ConfigDict
from pydantic.alias_generators import to_camel
from selfprivacy_api.backup.postgres import PostgresDumper
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.models.services import ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.flake_service_manager import FlakeServiceManager
from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.service import Service
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.services.config_item import (
ServiceConfigItem,
StringServiceConfigItem,
BoolServiceConfigItem,
EnumServiceConfigItem,
IntServiceConfigItem,
)
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
from selfprivacy_api.utils.systemd import get_service_status_from_several_units
SP_MODULES_DEFENITIONS_PATH = "/etc/sp-modules"
logger = logging.getLogger(__name__)
class SupportLevel(Enum):
"""Enum representing the support level of a service."""
NORMAL = "normal"
EXPERIMENTAL = "experimental"
DEPRECATED = "deprecated"
UNKNOWN = "unknown"
@classmethod
def from_str(cls, support_level: str) -> "SupportLevel":
"""Return the SupportLevel from a string."""
if support_level == "normal":
return cls.NORMAL
if support_level == "experimental":
return cls.EXPERIMENTAL
if support_level == "deprecated":
return cls.DEPRECATED
return cls.UNKNOWN
def config_item_from_json(json_data: dict) -> Optional[ServiceConfigItem]:
"""Create a ServiceConfigItem from JSON data."""
weight = json_data.get("meta", {}).get("weight", 50)
if json_data["meta"]["type"] == "enable":
return None
if json_data["meta"]["type"] == "location":
return None
if json_data["meta"]["type"] == "string":
return StringServiceConfigItem(
id=json_data["name"],
default_value=json_data["default"],
description=json_data["description"],
regex=json_data["meta"].get("regex"),
widget=json_data["meta"].get("widget"),
allow_empty=json_data["meta"].get("allowEmpty", False),
weight=weight,
)
if json_data["meta"]["type"] == "bool":
return BoolServiceConfigItem(
id=json_data["name"],
default_value=json_data["default"],
description=json_data["description"],
widget=json_data["meta"].get("widget"),
weight=weight,
)
if json_data["meta"]["type"] == "enum":
return EnumServiceConfigItem(
id=json_data["name"],
default_value=json_data["default"],
description=json_data["description"],
options=json_data["meta"]["options"],
widget=json_data["meta"].get("widget"),
weight=weight,
)
if json_data["meta"]["type"] == "int":
return IntServiceConfigItem(
id=json_data["name"],
default_value=json_data["default"],
description=json_data["description"],
widget=json_data["meta"].get("widget"),
min_value=json_data["meta"].get("minValue"),
max_value=json_data["meta"].get("maxValue"),
weight=weight,
)
raise ValueError("Unknown config item type")
class BaseSchema(BaseModel):
model_config = ConfigDict(
alias_generator=to_camel,
populate_by_name=True,
from_attributes=True,
)
class License(BaseSchema):
"""Model representing a license."""
deprecated: bool
free: bool
full_name: str
redistributable: bool
short_name: str
spdx_id: str
url: str
class ServiceMetaData(BaseSchema):
"""Model representing the meta data of a service."""
id: str
name: str
description: str = "No description found!"
svg_icon: str = ""
showUrl: bool = True
primary_subdomain: Optional[str] = None
is_movable: bool = False
is_required: bool = False
can_be_backed_up: bool = True
backup_description: str = "No backup description found!"
systemd_services: List[str]
user: Optional[str] = None
group: Optional[str] = None
folders: List[str] = []
owned_folders: List[OwnedPath] = []
postgresql_databases: List[str] = []
license: List[License] = []
homepage: Optional[str] = None
source_page: Optional[str] = None
support_level: SupportLevel = SupportLevel.UNKNOWN
class TemplatedService(Service):
"""Class representing a dynamically loaded service."""
def __init__(self, service_id: str, source_data: Optional[str] = None) -> None:
if source_data:
self.definition_data = json.loads(source_data)
else:
# Check if the service exists
if not exists(join(SP_MODULES_DEFENITIONS_PATH, service_id)):
raise FileNotFoundError(f"Service {service_id} not found")
# Load the service
with open(join(SP_MODULES_DEFENITIONS_PATH, service_id)) as file:
self.definition_data = json.load(file)
# Check if required fields are present
if "meta" not in self.definition_data:
raise ValueError("meta not found in service definition")
if "options" not in self.definition_data:
raise ValueError("options not found in service definition")
# Load the meta data
self.meta = ServiceMetaData(**self.definition_data["meta"])
# Load the options
self.options = self.definition_data["options"]
# Load the config items
self.config_items = {}
for option in self.options.values():
config_item = config_item_from_json(option)
if config_item:
self.config_items[config_item.id] = config_item
# If it is movable, check for the location option
if self.meta.is_movable and "location" not in self.options:
raise ValueError("Service is movable but does not have a location option")
# Load all subdomains via options with "subdomain" widget
self.subdomain_options: List[str] = []
for option in self.options.values():
if option.get("meta", {}).get("widget") == "subdomain":
self.subdomain_options.append(option["name"])
def get_id(self) -> str:
return self.meta.id
def get_display_name(self) -> str:
return self.meta.name
def get_description(self) -> str:
return self.meta.description
def get_svg_icon(self) -> str:
return base64.b64encode(self.meta.svg_icon.encode("utf-8")).decode("utf-8")
def get_subdomain(self) -> Optional[str]:
# If there are no subdomain options, return None
if not self.subdomain_options:
return None
# If primary_subdomain is set, try to find it in the options
if (
self.meta.primary_subdomain
and self.meta.primary_subdomain in self.subdomain_options
):
option_name = self.meta.primary_subdomain
# Otherwise, use the first subdomain option
else:
option_name = self.subdomain_options[0]
# Now, read the value from the userdata
name = self.get_id()
with ReadUserData() as user_data:
if "modules" in user_data:
if name in user_data["modules"]:
if option_name in user_data["modules"][name]:
return user_data["modules"][name][option_name]
# Otherwise, return default value for the option
return self.options[option_name].get("default")
def get_subdomains(self) -> List[str]:
# Return a current subdomain for every subdomain option
subdomains = []
with ReadUserData() as user_data:
for option in self.subdomain_options:
if "modules" in user_data:
if self.get_id() in user_data["modules"]:
if option in user_data["modules"][self.get_id()]:
subdomains.append(
user_data["modules"][self.get_id()][option]
)
continue
subdomains.append(self.options[option]["default"])
return subdomains
def get_url(self) -> Optional[str]:
if not self.meta.showUrl:
return None
subdomain = self.get_subdomain()
if not subdomain:
return None
return f"https://{subdomain}.{get_domain()}"
def get_user(self) -> Optional[str]:
if not self.meta.user:
return self.get_id()
return self.meta.user
def get_group(self) -> Optional[str]:
if not self.meta.group:
return self.get_user()
return self.meta.group
def is_movable(self) -> bool:
return self.meta.is_movable
def is_required(self) -> bool:
return self.meta.is_required
def can_be_backed_up(self) -> bool:
return self.meta.can_be_backed_up
def get_backup_description(self) -> str:
return self.meta.backup_description
def is_enabled(self) -> bool:
name = self.get_id()
with ReadUserData() as user_data:
return user_data.get("modules", {}).get(name, {}).get("enable", False)
def is_installed(self) -> bool:
name = self.get_id()
with FlakeServiceManager() as service_manager:
return name in service_manager.services
def get_status(self) -> ServiceStatus:
if not self.meta.systemd_services:
return ServiceStatus.INACTIVE
return get_service_status_from_several_units(self.meta.systemd_services)
def _set_enable(self, enable: bool):
name = self.get_id()
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if name not in user_data["modules"]:
user_data["modules"][name] = {}
user_data["modules"][name]["enable"] = enable
def enable(self):
"""Enable the service. Usually this means enabling systemd unit."""
self._set_enable(True)
def disable(self):
"""Disable the service. Usually this means disabling systemd unit."""
self._set_enable(False)
def start(self):
"""Start the systemd units"""
for unit in self.meta.systemd_services:
subprocess.run(["systemctl", "start", unit], check=False)
def stop(self):
"""Stop the systemd units"""
for unit in self.meta.systemd_services:
subprocess.run(["systemctl", "stop", unit], check=False)
def restart(self):
"""Restart the systemd units"""
for unit in self.meta.systemd_services:
subprocess.run(["systemctl", "restart", unit], check=False)
def get_configuration(self) -> dict:
# If there are no options, return an empty dict
if not self.config_items:
return {}
return {
key: self.config_items[key].as_dict(self.get_id())
for key in self.config_items
}
def set_configuration(self, config_items):
for key, value in config_items.items():
if key not in self.config_items:
raise ValueError(f"Key {key} is not valid for {self.get_id()}")
if self.config_items[key].validate_value(value) is False:
raise ValueError(f"Value {value} is not valid for {key}")
for key, value in config_items.items():
self.config_items[key].set_value(
value,
self.get_id(),
)
def get_storage_usage(self) -> int:
"""
Calculate the real storage usage of folders occupied by service
Calculate using pathlib.
Do not follow symlinks.
"""
storage_used = 0
for folder in self.get_folders():
storage_used += get_storage_usage(folder)
return storage_used
def has_folders(self) -> int:
"""
If there are no folders on disk, moving is noop
"""
for folder in self.get_folders():
if exists(folder):
return True
return False
def get_dns_records(self, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]:
display_name = self.get_display_name()
subdomains = self.get_subdomains()
# Generate records for every subdomain
records: List[ServiceDnsRecord] = []
for subdomain in subdomains:
if not subdomain:
continue
records.append(
ServiceDnsRecord(
type="A",
name=subdomain,
content=ip4,
ttl=3600,
display_name=display_name,
)
)
if ip6:
records.append(
ServiceDnsRecord(
type="AAAA",
name=subdomain,
content=ip6,
ttl=3600,
display_name=display_name,
)
)
return records
def get_drive(self) -> str:
"""
Get the name of the drive/volume where the service is located.
Example values are `sda1`, `vda`, `sdb`.
"""
root_device: str = BlockDevices().get_root_block_device().name
if not self.is_movable():
return root_device
with ReadUserData() as userdata:
if userdata.get("useBinds", False):
return (
userdata.get("modules", {})
.get(self.get_id(), {})
.get(
"location",
root_device,
)
)
else:
return root_device
def _get_db_dumps_folder(self) -> str:
# Get the drive where the service is located and append the folder name
return join("/", "volumes", self.get_drive(), f"db_dumps_{self.get_id()}")
def get_folders(self) -> List[str]:
folders = self.meta.folders
owned_folders = self.meta.owned_folders
# Include the contents of folders list
resulting_folders = folders.copy()
for folder in owned_folders:
resulting_folders.append(folder.path)
if self.get_postgresql_databases():
resulting_folders.append(self._get_db_dumps_folder())
return folders
def get_owned_folders(self) -> List[OwnedPath]:
folders = self.meta.folders
owned_folders = self.meta.owned_folders
resulting_folders = owned_folders.copy()
for folder in folders:
resulting_folders.append(self.owned_path(folder))
if self.get_postgresql_databases():
resulting_folders.append(
OwnedPath(
path=self._get_db_dumps_folder(),
owner="selfprivacy-api",
group="selfprivacy-api",
)
)
return resulting_folders
def set_location(self, volume: BlockDevice):
"""
Only changes userdata
"""
service_id = self.get_id()
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if service_id not in user_data["modules"]:
user_data["modules"][service_id] = {}
user_data["modules"][service_id]["location"] = volume.name
def get_postgresql_databases(self) -> List[str]:
return self.meta.postgresql_databases
def owned_path(self, path: str):
"""Default folder ownership"""
service_name = self.get_display_name()
try:
owner = self.get_user()
if owner is None:
# TODO: assume root?
# (if we do not want to do assumptions, maybe not declare user optional?)
raise LookupError(f"no user for service: {service_name}")
group = self.get_group()
if group is None:
raise LookupError(f"no group for service: {service_name}")
except Exception as error:
raise LookupError(
f"when deciding a bind for folder {path} of service {service_name}, error: {str(error)}"
)
return OwnedPath(
path=path,
owner=owner,
group=group,
)
def pre_backup(self, job: Job):
logger.warning("Pre backup")
if self.get_postgresql_databases():
logger.warning("Pre backup: postgresql databases")
# Create the folder for the database dumps
db_dumps_folder = self._get_db_dumps_folder()
logger.warning(f"Pre backup: db_dumps_folder: {db_dumps_folder}")
if not exists(db_dumps_folder):
logger.warning("Pre backup: db_dumps_folder does not exist")
mkdir(db_dumps_folder)
# Dump the databases
for db_name in self.get_postgresql_databases():
logger.warning(f"Pre backup: db_name: {db_name}")
if job is not None:
Jobs.update(
job,
status_text=f"Creating a dump of database {db_name}",
status=JobStatus.RUNNING,
)
db_dumper = PostgresDumper(db_name)
backup_file = join(db_dumps_folder, f"{db_name}.sql.gz")
logger.warning(f"Pre backup: backup_file: {backup_file}")
db_dumper.backup_database(backup_file)
def post_backup(self, job: Job):
if self.get_postgresql_databases():
# Remove the folder for the database dumps
db_dumps_folder = self._get_db_dumps_folder()
if exists(db_dumps_folder):
rmdir(db_dumps_folder)
def pre_restore(self, job: Job):
if self.get_postgresql_databases():
# Create the folder for the database dumps
db_dumps_folder = self._get_db_dumps_folder()
if not exists(db_dumps_folder):
mkdir(db_dumps_folder)
def post_restore(self, job: Job):
if self.get_postgresql_databases():
# Recover the databases
db_dumps_folder = self._get_db_dumps_folder()
for db_name in self.get_postgresql_databases():
if exists(join(db_dumps_folder, f"{db_name}.sql.gz")):
if job is not None:
Jobs.update(
job,
status_text=f"Restoring database {db_name}",
status=JobStatus.RUNNING,
)
db_dumper = PostgresDumper(db_name)
backup_file = join(db_dumps_folder, f"{db_name}.sql.gz")
db_dumper.restore_database(backup_file)
else:
logger.error(f"Database dump for {db_name} not found")
raise FileNotFoundError(f"Database dump for {db_name} not found")