From 969b3b1417f6112f102c00abc3995324244ae6cc Mon Sep 17 00:00:00 2001 From: Inex Code Date: Tue, 23 Jul 2024 17:05:29 +0300 Subject: [PATCH] refactor: Add more validation to server configuration --- .../graphql/common_types/service.py | 8 +- .../services/bitwarden/__init__.py | 14 +- selfprivacy_api/services/config_item.py | 47 +++++- selfprivacy_api/services/forgejo/__init__.py | 22 +-- .../services/jitsimeet/__init__.py | 13 +- .../services/nextcloud/__init__.py | 14 +- selfprivacy_api/services/ocserv/__init__.py | 4 - selfprivacy_api/services/pleroma/__init__.py | 14 +- .../services/roundcube/__init__.py | 18 +- selfprivacy_api/services/service.py | 20 ++- .../services/test_service/__init__.py | 11 -- selfprivacy_api/utils/__init__.py | 19 +++ selfprivacy_api/utils/default_subdomains.py | 21 +++ selfprivacy_api/utils/regex_strings.py | 1 + tests/test_config_item.py | 157 ++++++++++++++++++ tests/test_graphql/test_services.py | 2 + 16 files changed, 270 insertions(+), 115 deletions(-) create mode 100644 selfprivacy_api/utils/default_subdomains.py create mode 100644 selfprivacy_api/utils/regex_strings.py create mode 100644 tests/test_config_item.py diff --git a/selfprivacy_api/graphql/common_types/service.py b/selfprivacy_api/graphql/common_types/service.py index 680c2a7..5be6d3a 100644 --- a/selfprivacy_api/graphql/common_types/service.py +++ b/selfprivacy_api/graphql/common_types/service.py @@ -10,12 +10,6 @@ from selfprivacy_api.services import get_service_by_id, get_services_by_location from selfprivacy_api.services import Service as ServiceInterface from selfprivacy_api.services import ServiceDnsRecord -from selfprivacy_api.services.config_item import ( - ServiceConfigItem, - StringServiceConfigItem, - BoolServiceConfigItem, - EnumServiceConfigItem, -) from selfprivacy_api.utils.block_devices import BlockDevices from selfprivacy_api.utils.network import get_ip4, get_ip6 @@ -147,7 +141,7 @@ def config_item_to_graphql(item: dict) -> ConfigItem: type=item_type, value=item["value"], default_value=item["default_value"], - regex=item.get("regex") + regex=item.get("regex"), ) elif item_type == "bool": return BoolConfigItem( diff --git a/selfprivacy_api/services/bitwarden/__init__.py b/selfprivacy_api/services/bitwarden/__init__.py index fc8f309..addf928 100644 --- a/selfprivacy_api/services/bitwarden/__init__.py +++ b/selfprivacy_api/services/bitwarden/__init__.py @@ -1,9 +1,7 @@ """Class representing Bitwarden service""" import base64 import subprocess -from typing import Optional, List - -from selfprivacy_api.utils import get_domain +from typing import List from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus @@ -37,16 +35,6 @@ class Bitwarden(Service): def get_user() -> str: return "vaultwarden" - @classmethod - def get_url(cls) -> Optional[str]: - """Return service url.""" - domain = get_domain() - return f"https://password.{domain}" - - @classmethod - def get_subdomain(cls) -> Optional[str]: - return "password" - @staticmethod def is_movable() -> bool: return True diff --git a/selfprivacy_api/services/config_item.py b/selfprivacy_api/services/config_item.py index 90f0ed7..abddd46 100644 --- a/selfprivacy_api/services/config_item.py +++ b/selfprivacy_api/services/config_item.py @@ -2,6 +2,8 @@ from abc import ABC, abstractmethod import re from typing import Optional +from selfprivacy_api.utils import check_if_subdomain_is_taken + class ServiceConfigItem(ABC): id: str @@ -17,6 +19,10 @@ class ServiceConfigItem(ABC): def set_value(self, value, service_options): pass + @abstractmethod + def validate_value(self, value): + return True + def as_dict(self, service_options): return { "id": self.id, @@ -35,18 +41,24 @@ class StringServiceConfigItem(ServiceConfigItem): description: str, regex: Optional[str] = None, widget: Optional[str] = None, + allow_empty: bool = False, ): + if widget == "subdomain" and not regex: + raise ValueError("Subdomain widget requires regex") self.id = id self.type = "string" self.default_value = default_value self.description = description self.regex = re.compile(regex) if regex else None self.widget = widget if widget else "text" + self.allow_empty = allow_empty def get_value(self, service_options): return service_options.get(self.id, self.default_value) def set_value(self, value, service_options): + if not self.validate_value(value): + raise ValueError(f"Value {value} is not valid") if self.regex and not self.regex.match(value): raise ValueError(f"Value {value} does not match regex {self.regex}") service_options[self.id] = value @@ -62,6 +74,18 @@ class StringServiceConfigItem(ServiceConfigItem): "regex": self.regex.pattern if self.regex else None, } + def validate_value(self, value): + if not isinstance(value, str): + return False + if not self.allow_empty and not value: + return False + if self.regex and not self.regex.match(value): + return False + if self.widget == "subdomain": + if check_if_subdomain_is_taken(value): + return False + return True + class BoolServiceConfigItem(ServiceConfigItem): def __init__( @@ -81,6 +105,8 @@ class BoolServiceConfigItem(ServiceConfigItem): return service_options.get(self.id, self.default_value) def set_value(self, value, service_options): + if not isinstance(value, bool): + raise ValueError(f"Value {value} is not a boolean") service_options[self.id] = value def as_dict(self, service_options): @@ -93,6 +119,9 @@ class BoolServiceConfigItem(ServiceConfigItem): "default_value": self.default_value, } + def validate_value(self, value): + return isinstance(value, bool) + class EnumServiceConfigItem(ServiceConfigItem): def __init__( @@ -114,6 +143,8 @@ class EnumServiceConfigItem(ServiceConfigItem): return service_options.get(self.id, self.default_value) def set_value(self, value, service_options): + if not self.validate_value(value): + raise ValueError(f"Value {value} is not valid") if value not in self.options: raise ValueError(f"Value {value} not in options {self.options}") service_options[self.id] = value @@ -129,6 +160,11 @@ class EnumServiceConfigItem(ServiceConfigItem): "options": self.options, } + def validate_value(self, value): + if not isinstance(value, str): + return False + return value in self.options + # TODO: unused for now class IntServiceConfigItem(ServiceConfigItem): @@ -156,7 +192,9 @@ class IntServiceConfigItem(ServiceConfigItem): if self.min_value is not None and value < self.min_value: raise ValueError(f"Value {value} is less than min_value {self.min_value}") if self.max_value is not None and value > self.max_value: - raise ValueError(f"Value {value} is greater than max_value {self.max_value}") + raise ValueError( + f"Value {value} is greater than max_value {self.max_value}" + ) service_options[self.id] = value def as_dict(self, service_options): @@ -170,3 +208,10 @@ class IntServiceConfigItem(ServiceConfigItem): "min_value": self.min_value, "max_value": self.max_value, } + + def validate_value(self, value): + if not isinstance(value, int): + return False + return (self.min_value is None or value >= self.min_value) and ( + self.max_value is None or value <= self.max_value + ) diff --git a/selfprivacy_api/services/forgejo/__init__.py b/selfprivacy_api/services/forgejo/__init__.py index ff6d12c..8c6ef4f 100644 --- a/selfprivacy_api/services/forgejo/__init__.py +++ b/selfprivacy_api/services/forgejo/__init__.py @@ -1,9 +1,9 @@ """Class representing Bitwarden service""" import base64 import subprocess -from typing import Optional, List +from typing import List -from selfprivacy_api.utils import get_domain, ReadUserData, WriteUserData +from selfprivacy_api.utils import ReadUserData, WriteUserData from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus @@ -14,6 +14,7 @@ from selfprivacy_api.services.config_item import ( EnumServiceConfigItem, ServiceConfigItem, ) +from selfprivacy_api.utils.regex_strings import SUBDOMAIN_REGEX class Forgejo(Service): @@ -27,7 +28,7 @@ class Forgejo(Service): id="subdomain", default_value="git", description="Subdomain", - regex=r"^[A-Za-z0-9][A-Za-z0-9\-]{0,61}[A-Za-z0-9]$", + regex=SUBDOMAIN_REGEX, widget="subdomain", ), "appName": StringServiceConfigItem( @@ -90,21 +91,6 @@ class Forgejo(Service): """Read SVG icon from file and return it as base64 encoded string.""" return base64.b64encode(FORGEJO_ICON.encode("utf-8")).decode("utf-8") - @classmethod - def get_url(cls) -> Optional[str]: - """Return service url.""" - domain = get_domain() - subdomain = cls.get_subdomain() - return f"https://{subdomain}.{domain}" - - @classmethod - def get_subdomain(cls) -> Optional[str]: - with ReadUserData() as data: - if "gitea" in data["modules"]: - return data["modules"]["gitea"]["subdomain"] - - return "git" - @staticmethod def is_movable() -> bool: return True diff --git a/selfprivacy_api/services/jitsimeet/__init__.py b/selfprivacy_api/services/jitsimeet/__init__.py index 1df7c2a..08462e0 100644 --- a/selfprivacy_api/services/jitsimeet/__init__.py +++ b/selfprivacy_api/services/jitsimeet/__init__.py @@ -1,14 +1,13 @@ """Class representing Jitsi Meet service""" import base64 import subprocess -from typing import Optional, List +from typing import List from selfprivacy_api.jobs import Job from selfprivacy_api.utils.systemd import ( get_service_status_from_several_units, ) from selfprivacy_api.services.service import Service, ServiceStatus -from selfprivacy_api.utils import get_domain from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON @@ -36,16 +35,6 @@ class JitsiMeet(Service): """Read SVG icon from file and return it as base64 encoded string.""" return base64.b64encode(JITSI_ICON.encode("utf-8")).decode("utf-8") - @classmethod - def get_url(cls) -> Optional[str]: - """Return service url.""" - domain = get_domain() - return f"https://meet.{domain}" - - @classmethod - def get_subdomain(cls) -> Optional[str]: - return "meet" - @staticmethod def is_movable() -> bool: return False diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py index 43caecd..14ea078 100644 --- a/selfprivacy_api/services/nextcloud/__init__.py +++ b/selfprivacy_api/services/nextcloud/__init__.py @@ -1,9 +1,7 @@ """Class representing Nextcloud service.""" import base64 import subprocess -from typing import Optional, List - -from selfprivacy_api.utils import get_domain +from typing import List from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.services.service import Service, ServiceStatus @@ -34,16 +32,6 @@ class Nextcloud(Service): """Read SVG icon from file and return it as base64 encoded string.""" return base64.b64encode(NEXTCLOUD_ICON.encode("utf-8")).decode("utf-8") - @classmethod - def get_url(cls) -> Optional[str]: - """Return service url.""" - domain = get_domain() - return f"https://cloud.{domain}" - - @classmethod - def get_subdomain(cls) -> Optional[str]: - return "cloud" - @staticmethod def is_movable() -> bool: return True diff --git a/selfprivacy_api/services/ocserv/__init__.py b/selfprivacy_api/services/ocserv/__init__.py index e7f3ffd..818b1d8 100644 --- a/selfprivacy_api/services/ocserv/__init__.py +++ b/selfprivacy_api/services/ocserv/__init__.py @@ -33,10 +33,6 @@ class Ocserv(Service): """Return service url.""" return None - @classmethod - def get_subdomain(cls) -> typing.Optional[str]: - return "vpn" - @staticmethod def is_movable() -> bool: return False diff --git a/selfprivacy_api/services/pleroma/__init__.py b/selfprivacy_api/services/pleroma/__init__.py index 8367901..e300844 100644 --- a/selfprivacy_api/services/pleroma/__init__.py +++ b/selfprivacy_api/services/pleroma/__init__.py @@ -1,9 +1,7 @@ """Class representing Nextcloud service.""" import base64 import subprocess -from typing import Optional, List - -from selfprivacy_api.utils import get_domain +from typing import List from selfprivacy_api.services.owned_path import OwnedPath from selfprivacy_api.utils.systemd import get_service_status @@ -31,16 +29,6 @@ class Pleroma(Service): def get_svg_icon() -> str: return base64.b64encode(PLEROMA_ICON.encode("utf-8")).decode("utf-8") - @classmethod - def get_url(cls) -> Optional[str]: - """Return service url.""" - domain = get_domain() - return f"https://social.{domain}" - - @classmethod - def get_subdomain(cls) -> Optional[str]: - return "social" - @staticmethod def is_movable() -> bool: return True diff --git a/selfprivacy_api/services/roundcube/__init__.py b/selfprivacy_api/services/roundcube/__init__.py index 22604f5..2493c87 100644 --- a/selfprivacy_api/services/roundcube/__init__.py +++ b/selfprivacy_api/services/roundcube/__init__.py @@ -2,14 +2,13 @@ import base64 import subprocess -from typing import List, Optional +from typing import List from selfprivacy_api.jobs import Job from selfprivacy_api.utils.systemd import ( get_service_status_from_several_units, ) from selfprivacy_api.services.service import Service, ServiceStatus -from selfprivacy_api.utils import ReadUserData, get_domain from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.services.roundcube.icon import ROUNDCUBE_ICON @@ -37,21 +36,6 @@ class Roundcube(Service): """Read SVG icon from file and return it as base64 encoded string.""" return base64.b64encode(ROUNDCUBE_ICON.encode("utf-8")).decode("utf-8") - @classmethod - def get_url(cls) -> Optional[str]: - """Return service url.""" - domain = get_domain() - subdomain = cls.get_subdomain() - return f"https://{subdomain}.{domain}" - - @classmethod - def get_subdomain(cls) -> Optional[str]: - with ReadUserData() as data: - if "roundcube" in data["modules"]: - return data["modules"]["roundcube"]["subdomain"] - - return "roundcube" - @staticmethod def is_movable() -> bool: return False diff --git a/selfprivacy_api/services/service.py b/selfprivacy_api/services/service.py index e199346..ba1ba10 100644 --- a/selfprivacy_api/services/service.py +++ b/selfprivacy_api/services/service.py @@ -1,9 +1,10 @@ """Abstract class for a service running on a server""" from abc import ABC, abstractmethod -from typing import Any, List, Optional +from typing import List, Optional from selfprivacy_api import utils -from selfprivacy_api.utils import ReadUserData, WriteUserData +from selfprivacy_api.utils.default_subdomains import DEFAULT_SUBDOMAINS +from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain from selfprivacy_api.utils.waitloop import wait_until_true from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices @@ -66,20 +67,27 @@ class Service(ABC): pass @classmethod - @abstractmethod def get_url(cls) -> Optional[str]: """ The url of the service if it is accessible from the internet browser. """ - pass + domain = get_domain() + subdomain = cls.get_subdomain() + return f"https://{subdomain}.{domain}" @classmethod - @abstractmethod def get_subdomain(cls) -> Optional[str]: """ The assigned primary subdomain for this service. """ - pass + name = cls.get_id() + with ReadUserData() as user_data: + if "modules" in user_data: + if name in user_data["modules"]: + if "subdomain" in user_data["modules"][name]: + return user_data["modules"][name]["subdomain"] + + return DEFAULT_SUBDOMAINS.get(name) @classmethod def get_user(cls) -> Optional[str]: diff --git a/selfprivacy_api/services/test_service/__init__.py b/selfprivacy_api/services/test_service/__init__.py index b808ac6..1a2fd9d 100644 --- a/selfprivacy_api/services/test_service/__init__.py +++ b/selfprivacy_api/services/test_service/__init__.py @@ -1,6 +1,5 @@ """Class representing Bitwarden service""" import base64 -import typing import subprocess from typing import List @@ -57,16 +56,6 @@ class DummyService(Service): # return "" return base64.b64encode(BITWARDEN_ICON.encode("utf-8")).decode("utf-8") - @classmethod - def get_url(cls) -> typing.Optional[str]: - """Return service url.""" - domain = "test.com" - return f"https://password.{domain}" - - @classmethod - def get_subdomain(cls) -> typing.Optional[str]: - return "password" - @classmethod def is_movable(cls) -> bool: return cls.movable diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index 779bdf6..9972ff2 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -8,6 +8,11 @@ import subprocess import portalocker import typing +from selfprivacy_api.utils.default_subdomains import ( + DEFAULT_SUBDOMAINS, + RESERVED_SUBDOMAINS, +) + USERDATA_FILE = "/etc/nixos/userdata.json" SECRETS_FILE = "/etc/selfprivacy/secrets.json" @@ -133,6 +138,20 @@ def is_username_forbidden(username): return False +def check_if_subdomain_is_taken(subdomain: str) -> bool: + """Check if subdomain is already taken or reserved""" + if subdomain in RESERVED_SUBDOMAINS: + return True + with ReadUserData() as data: + for module in data["modules"]: + if ( + data["modules"][module].get("subdomain", DEFAULT_SUBDOMAINS[module]) + == subdomain + ): + return True + return False + + def parse_date(date_str: str) -> datetime.datetime: """Parse date string which can be in one of these formats: - %Y-%m-%dT%H:%M:%S.%fZ diff --git a/selfprivacy_api/utils/default_subdomains.py b/selfprivacy_api/utils/default_subdomains.py new file mode 100644 index 0000000..f6665fc --- /dev/null +++ b/selfprivacy_api/utils/default_subdomains.py @@ -0,0 +1,21 @@ +DEFAULT_SUBDOMAINS = { + "bitwarden": "password", + "gitea": "git", + "jitsi-meet": "meet", + "simple-nixos-mailserver": "", + "nextcloud": "cloud", + "ocserv": "vpn", + "pleroma": "social", + "roundcube": "roundcube", + "testservice": "test", +} + +RESERVED_SUBDOMAINS = [ + "admin", + "administrator", + "api", + "auth", + "user", + "users", + "ntfy", +] diff --git a/selfprivacy_api/utils/regex_strings.py b/selfprivacy_api/utils/regex_strings.py new file mode 100644 index 0000000..718e2ab --- /dev/null +++ b/selfprivacy_api/utils/regex_strings.py @@ -0,0 +1 @@ +SUBDOMAIN_REGEX = r"^[A-Za-z0-9][A-Za-z0-9\-]{0,61}[A-Za-z0-9]$" diff --git a/tests/test_config_item.py b/tests/test_config_item.py new file mode 100644 index 0000000..724c29e --- /dev/null +++ b/tests/test_config_item.py @@ -0,0 +1,157 @@ +import pytest +from selfprivacy_api.services.config_item import ( + StringServiceConfigItem, + BoolServiceConfigItem, + EnumServiceConfigItem, +) +from selfprivacy_api.utils.regex_strings import SUBDOMAIN_REGEX + + +@pytest.fixture +def service_options(): + return {} + + +def test_string_service_config_item(service_options): + item = StringServiceConfigItem( + id="test_string", + default_value="1337", + description="Test digits string", + regex=r"^\d+$", + widget="text", + allow_empty=False, + ) + assert item.get_value(service_options) == "1337" + item.set_value("123", service_options) + assert item.get_value(service_options) == "123" + with pytest.raises(ValueError): + item.set_value("abc", service_options) + assert item.validate_value("123") is True + assert item.validate_value("abc") is False + assert item.validate_value("123abc") is False + assert item.validate_value("") is False + assert item.validate_value(None) is False + assert item.validate_value(123) is False + assert item.validate_value("123.0") is False + assert item.validate_value(True) is False + + +def test_string_service_config_item_allows_empty(service_options): + item = StringServiceConfigItem( + id="test_string", + default_value="1337", + description="Test digits string", + widget="text", + allow_empty=True, + ) + assert item.get_value(service_options) == "1337" + item.set_value("", service_options) + assert item.get_value(service_options) == "" + assert item.validate_value("") is True + assert item.validate_value(None) is False + assert item.validate_value(123) is False + assert item.validate_value("123") is True + assert item.validate_value("abc") is True + assert item.validate_value("123abc") is True + assert item.validate_value("123.0") is True + assert item.validate_value(True) is False + + +def test_string_service_config_item_not_allows_empty(service_options): + item = StringServiceConfigItem( + id="test_string", + default_value="1337", + description="Test digits string", + widget="text", + ) + assert item.get_value(service_options) == "1337" + with pytest.raises(ValueError): + item.set_value("", service_options) + assert item.get_value(service_options) == "1337" + assert item.validate_value("") is False + assert item.validate_value(None) is False + assert item.validate_value(123) is False + assert item.validate_value("123") is True + assert item.validate_value("abc") is True + assert item.validate_value("123abc") is True + assert item.validate_value("123.0") is True + assert item.validate_value(True) is False + + +def test_bool_service_config_item(service_options): + item = BoolServiceConfigItem( + id="test_bool", + default_value=True, + description="Test bool", + widget="switch", + ) + assert item.get_value(service_options) is True + item.set_value(False, service_options) + assert item.get_value(service_options) is False + assert item.validate_value(True) is True + assert item.validate_value(False) is True + assert item.validate_value("True") is False + assert item.validate_value("False") is False + assert item.validate_value(1) is False + assert item.validate_value(0) is False + assert item.validate_value("1") is False + + +def test_enum_service_config_item(service_options): + item = EnumServiceConfigItem( + id="test_enum", + default_value="option1", + description="Test enum", + options=["option1", "option2", "option3"], + widget="select", + ) + assert item.get_value(service_options) == "option1" + item.set_value("option2", service_options) + assert item.get_value(service_options) == "option2" + with pytest.raises(ValueError): + item.set_value("option4", service_options) + assert item.validate_value("option1") is True + assert item.validate_value("option4") is False + assert item.validate_value("option2") is True + assert item.validate_value(1) is False + assert item.validate_value("1") is False + assert item.validate_value(True) is False + + +def test_string_service_config_item_subdomain(service_options, dummy_service): + item = StringServiceConfigItem( + id="test_subdomain", + default_value="example", + description="Test subdomain string", + widget="subdomain", + allow_empty=False, + regex=SUBDOMAIN_REGEX, + ) + assert item.get_value(service_options) == "example" + item.set_value("subdomain", service_options) + assert item.get_value(service_options) == "subdomain" + with pytest.raises(ValueError): + item.set_value( + "invalid-subdomain-because-it-is-very-very-very-very-very-very-long", + service_options, + ) + assert item.validate_value("subdomain") is True + assert ( + item.validate_value( + "invalid-subdomain-because-it-is-very-very-very-very-very-very-long" + ) + is False + ) + assert item.validate_value("api") is False + assert item.validate_value("auth") is False + assert item.validate_value("user") is False + assert item.validate_value("users") is False + assert item.validate_value("ntfy") is False + assert item.validate_value("") is False + assert item.validate_value(None) is False + assert item.validate_value(123) is False + assert item.validate_value("123") is True + assert item.validate_value("abc") is True + assert item.validate_value("123abc") is True + assert item.validate_value("123.0") is False + assert item.validate_value(True) is False diff --git a/tests/test_graphql/test_services.py b/tests/test_graphql/test_services.py index b7faf3d..b349f53 100644 --- a/tests/test_graphql/test_services.py +++ b/tests/test_graphql/test_services.py @@ -188,6 +188,7 @@ allServices { id status isEnabled + url } """ @@ -347,6 +348,7 @@ def test_get_services(authorized_client, only_dummy_service): assert api_dummy_service["id"] == "testservice" assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value assert api_dummy_service["isEnabled"] is True + assert api_dummy_service["url"] == "https://test.test-domain.tld" def test_enable_return_value(authorized_client, only_dummy_service):