refactor: Add more validation to server configuration

This commit is contained in:
Inex Code 2024-07-23 17:05:29 +03:00
parent 0a112f9c0a
commit 969b3b1417
16 changed files with 270 additions and 115 deletions

View file

@ -10,12 +10,6 @@ from selfprivacy_api.services import get_service_by_id, get_services_by_location
from selfprivacy_api.services import Service as ServiceInterface
from selfprivacy_api.services import ServiceDnsRecord
from selfprivacy_api.services.config_item import (
ServiceConfigItem,
StringServiceConfigItem,
BoolServiceConfigItem,
EnumServiceConfigItem,
)
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.utils.network import get_ip4, get_ip6
@ -147,7 +141,7 @@ def config_item_to_graphql(item: dict) -> ConfigItem:
type=item_type,
value=item["value"],
default_value=item["default_value"],
regex=item.get("regex")
regex=item.get("regex"),
)
elif item_type == "bool":
return BoolConfigItem(

View file

@ -1,9 +1,7 @@
"""Class representing Bitwarden service"""
import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.utils import get_domain
from typing import List
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
@ -37,16 +35,6 @@ class Bitwarden(Service):
def get_user() -> str:
return "vaultwarden"
@classmethod
def get_url(cls) -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://password.{domain}"
@classmethod
def get_subdomain(cls) -> Optional[str]:
return "password"
@staticmethod
def is_movable() -> bool:
return True

View file

@ -2,6 +2,8 @@ from abc import ABC, abstractmethod
import re
from typing import Optional
from selfprivacy_api.utils import check_if_subdomain_is_taken
class ServiceConfigItem(ABC):
id: str
@ -17,6 +19,10 @@ class ServiceConfigItem(ABC):
def set_value(self, value, service_options):
pass
@abstractmethod
def validate_value(self, value):
return True
def as_dict(self, service_options):
return {
"id": self.id,
@ -35,18 +41,24 @@ class StringServiceConfigItem(ServiceConfigItem):
description: str,
regex: Optional[str] = None,
widget: Optional[str] = None,
allow_empty: bool = False,
):
if widget == "subdomain" and not regex:
raise ValueError("Subdomain widget requires regex")
self.id = id
self.type = "string"
self.default_value = default_value
self.description = description
self.regex = re.compile(regex) if regex else None
self.widget = widget if widget else "text"
self.allow_empty = allow_empty
def get_value(self, service_options):
return service_options.get(self.id, self.default_value)
def set_value(self, value, service_options):
if not self.validate_value(value):
raise ValueError(f"Value {value} is not valid")
if self.regex and not self.regex.match(value):
raise ValueError(f"Value {value} does not match regex {self.regex}")
service_options[self.id] = value
@ -62,6 +74,18 @@ class StringServiceConfigItem(ServiceConfigItem):
"regex": self.regex.pattern if self.regex else None,
}
def validate_value(self, value):
if not isinstance(value, str):
return False
if not self.allow_empty and not value:
return False
if self.regex and not self.regex.match(value):
return False
if self.widget == "subdomain":
if check_if_subdomain_is_taken(value):
return False
return True
class BoolServiceConfigItem(ServiceConfigItem):
def __init__(
@ -81,6 +105,8 @@ class BoolServiceConfigItem(ServiceConfigItem):
return service_options.get(self.id, self.default_value)
def set_value(self, value, service_options):
if not isinstance(value, bool):
raise ValueError(f"Value {value} is not a boolean")
service_options[self.id] = value
def as_dict(self, service_options):
@ -93,6 +119,9 @@ class BoolServiceConfigItem(ServiceConfigItem):
"default_value": self.default_value,
}
def validate_value(self, value):
return isinstance(value, bool)
class EnumServiceConfigItem(ServiceConfigItem):
def __init__(
@ -114,6 +143,8 @@ class EnumServiceConfigItem(ServiceConfigItem):
return service_options.get(self.id, self.default_value)
def set_value(self, value, service_options):
if not self.validate_value(value):
raise ValueError(f"Value {value} is not valid")
if value not in self.options:
raise ValueError(f"Value {value} not in options {self.options}")
service_options[self.id] = value
@ -129,6 +160,11 @@ class EnumServiceConfigItem(ServiceConfigItem):
"options": self.options,
}
def validate_value(self, value):
if not isinstance(value, str):
return False
return value in self.options
# TODO: unused for now
class IntServiceConfigItem(ServiceConfigItem):
@ -156,7 +192,9 @@ class IntServiceConfigItem(ServiceConfigItem):
if self.min_value is not None and value < self.min_value:
raise ValueError(f"Value {value} is less than min_value {self.min_value}")
if self.max_value is not None and value > self.max_value:
raise ValueError(f"Value {value} is greater than max_value {self.max_value}")
raise ValueError(
f"Value {value} is greater than max_value {self.max_value}"
)
service_options[self.id] = value
def as_dict(self, service_options):
@ -170,3 +208,10 @@ class IntServiceConfigItem(ServiceConfigItem):
"min_value": self.min_value,
"max_value": self.max_value,
}
def validate_value(self, value):
if not isinstance(value, int):
return False
return (self.min_value is None or value >= self.min_value) and (
self.max_value is None or value <= self.max_value
)

View file

@ -1,9 +1,9 @@
"""Class representing Bitwarden service"""
import base64
import subprocess
from typing import Optional, List
from typing import List
from selfprivacy_api.utils import get_domain, ReadUserData, WriteUserData
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
@ -14,6 +14,7 @@ from selfprivacy_api.services.config_item import (
EnumServiceConfigItem,
ServiceConfigItem,
)
from selfprivacy_api.utils.regex_strings import SUBDOMAIN_REGEX
class Forgejo(Service):
@ -27,7 +28,7 @@ class Forgejo(Service):
id="subdomain",
default_value="git",
description="Subdomain",
regex=r"^[A-Za-z0-9][A-Za-z0-9\-]{0,61}[A-Za-z0-9]$",
regex=SUBDOMAIN_REGEX,
widget="subdomain",
),
"appName": StringServiceConfigItem(
@ -90,21 +91,6 @@ class Forgejo(Service):
"""Read SVG icon from file and return it as base64 encoded string."""
return base64.b64encode(FORGEJO_ICON.encode("utf-8")).decode("utf-8")
@classmethod
def get_url(cls) -> Optional[str]:
"""Return service url."""
domain = get_domain()
subdomain = cls.get_subdomain()
return f"https://{subdomain}.{domain}"
@classmethod
def get_subdomain(cls) -> Optional[str]:
with ReadUserData() as data:
if "gitea" in data["modules"]:
return data["modules"]["gitea"]["subdomain"]
return "git"
@staticmethod
def is_movable() -> bool:
return True

View file

@ -1,14 +1,13 @@
"""Class representing Jitsi Meet service"""
import base64
import subprocess
from typing import Optional, List
from typing import List
from selfprivacy_api.jobs import Job
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON
@ -36,16 +35,6 @@ class JitsiMeet(Service):
"""Read SVG icon from file and return it as base64 encoded string."""
return base64.b64encode(JITSI_ICON.encode("utf-8")).decode("utf-8")
@classmethod
def get_url(cls) -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://meet.{domain}"
@classmethod
def get_subdomain(cls) -> Optional[str]:
return "meet"
@staticmethod
def is_movable() -> bool:
return False

View file

@ -1,9 +1,7 @@
"""Class representing Nextcloud service."""
import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.utils import get_domain
from typing import List
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
@ -34,16 +32,6 @@ class Nextcloud(Service):
"""Read SVG icon from file and return it as base64 encoded string."""
return base64.b64encode(NEXTCLOUD_ICON.encode("utf-8")).decode("utf-8")
@classmethod
def get_url(cls) -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://cloud.{domain}"
@classmethod
def get_subdomain(cls) -> Optional[str]:
return "cloud"
@staticmethod
def is_movable() -> bool:
return True

View file

@ -33,10 +33,6 @@ class Ocserv(Service):
"""Return service url."""
return None
@classmethod
def get_subdomain(cls) -> typing.Optional[str]:
return "vpn"
@staticmethod
def is_movable() -> bool:
return False

View file

@ -1,9 +1,7 @@
"""Class representing Nextcloud service."""
import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.utils import get_domain
from typing import List
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils.systemd import get_service_status
@ -31,16 +29,6 @@ class Pleroma(Service):
def get_svg_icon() -> str:
return base64.b64encode(PLEROMA_ICON.encode("utf-8")).decode("utf-8")
@classmethod
def get_url(cls) -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://social.{domain}"
@classmethod
def get_subdomain(cls) -> Optional[str]:
return "social"
@staticmethod
def is_movable() -> bool:
return True

View file

@ -2,14 +2,13 @@
import base64
import subprocess
from typing import List, Optional
from typing import List
from selfprivacy_api.jobs import Job
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import ReadUserData, get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.roundcube.icon import ROUNDCUBE_ICON
@ -37,21 +36,6 @@ class Roundcube(Service):
"""Read SVG icon from file and return it as base64 encoded string."""
return base64.b64encode(ROUNDCUBE_ICON.encode("utf-8")).decode("utf-8")
@classmethod
def get_url(cls) -> Optional[str]:
"""Return service url."""
domain = get_domain()
subdomain = cls.get_subdomain()
return f"https://{subdomain}.{domain}"
@classmethod
def get_subdomain(cls) -> Optional[str]:
with ReadUserData() as data:
if "roundcube" in data["modules"]:
return data["modules"]["roundcube"]["subdomain"]
return "roundcube"
@staticmethod
def is_movable() -> bool:
return False

View file

@ -1,9 +1,10 @@
"""Abstract class for a service running on a server"""
from abc import ABC, abstractmethod
from typing import Any, List, Optional
from typing import List, Optional
from selfprivacy_api import utils
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.default_subdomains import DEFAULT_SUBDOMAINS
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
@ -66,20 +67,27 @@ class Service(ABC):
pass
@classmethod
@abstractmethod
def get_url(cls) -> Optional[str]:
"""
The url of the service if it is accessible from the internet browser.
"""
pass
domain = get_domain()
subdomain = cls.get_subdomain()
return f"https://{subdomain}.{domain}"
@classmethod
@abstractmethod
def get_subdomain(cls) -> Optional[str]:
"""
The assigned primary subdomain for this service.
"""
pass
name = cls.get_id()
with ReadUserData() as user_data:
if "modules" in user_data:
if name in user_data["modules"]:
if "subdomain" in user_data["modules"][name]:
return user_data["modules"][name]["subdomain"]
return DEFAULT_SUBDOMAINS.get(name)
@classmethod
def get_user(cls) -> Optional[str]:

View file

@ -1,6 +1,5 @@
"""Class representing Bitwarden service"""
import base64
import typing
import subprocess
from typing import List
@ -57,16 +56,6 @@ class DummyService(Service):
# return ""
return base64.b64encode(BITWARDEN_ICON.encode("utf-8")).decode("utf-8")
@classmethod
def get_url(cls) -> typing.Optional[str]:
"""Return service url."""
domain = "test.com"
return f"https://password.{domain}"
@classmethod
def get_subdomain(cls) -> typing.Optional[str]:
return "password"
@classmethod
def is_movable(cls) -> bool:
return cls.movable

View file

@ -8,6 +8,11 @@ import subprocess
import portalocker
import typing
from selfprivacy_api.utils.default_subdomains import (
DEFAULT_SUBDOMAINS,
RESERVED_SUBDOMAINS,
)
USERDATA_FILE = "/etc/nixos/userdata.json"
SECRETS_FILE = "/etc/selfprivacy/secrets.json"
@ -133,6 +138,20 @@ def is_username_forbidden(username):
return False
def check_if_subdomain_is_taken(subdomain: str) -> bool:
"""Check if subdomain is already taken or reserved"""
if subdomain in RESERVED_SUBDOMAINS:
return True
with ReadUserData() as data:
for module in data["modules"]:
if (
data["modules"][module].get("subdomain", DEFAULT_SUBDOMAINS[module])
== subdomain
):
return True
return False
def parse_date(date_str: str) -> datetime.datetime:
"""Parse date string which can be in one of these formats:
- %Y-%m-%dT%H:%M:%S.%fZ

View file

@ -0,0 +1,21 @@
DEFAULT_SUBDOMAINS = {
"bitwarden": "password",
"gitea": "git",
"jitsi-meet": "meet",
"simple-nixos-mailserver": "",
"nextcloud": "cloud",
"ocserv": "vpn",
"pleroma": "social",
"roundcube": "roundcube",
"testservice": "test",
}
RESERVED_SUBDOMAINS = [
"admin",
"administrator",
"api",
"auth",
"user",
"users",
"ntfy",
]

View file

@ -0,0 +1 @@
SUBDOMAIN_REGEX = r"^[A-Za-z0-9][A-Za-z0-9\-]{0,61}[A-Za-z0-9]$"

157
tests/test_config_item.py Normal file
View file

@ -0,0 +1,157 @@
import pytest
from selfprivacy_api.services.config_item import (
StringServiceConfigItem,
BoolServiceConfigItem,
EnumServiceConfigItem,
)
from selfprivacy_api.utils.regex_strings import SUBDOMAIN_REGEX
@pytest.fixture
def service_options():
return {}
def test_string_service_config_item(service_options):
item = StringServiceConfigItem(
id="test_string",
default_value="1337",
description="Test digits string",
regex=r"^\d+$",
widget="text",
allow_empty=False,
)
assert item.get_value(service_options) == "1337"
item.set_value("123", service_options)
assert item.get_value(service_options) == "123"
with pytest.raises(ValueError):
item.set_value("abc", service_options)
assert item.validate_value("123") is True
assert item.validate_value("abc") is False
assert item.validate_value("123abc") is False
assert item.validate_value("") is False
assert item.validate_value(None) is False
assert item.validate_value(123) is False
assert item.validate_value("123.0") is False
assert item.validate_value(True) is False
def test_string_service_config_item_allows_empty(service_options):
item = StringServiceConfigItem(
id="test_string",
default_value="1337",
description="Test digits string",
widget="text",
allow_empty=True,
)
assert item.get_value(service_options) == "1337"
item.set_value("", service_options)
assert item.get_value(service_options) == ""
assert item.validate_value("") is True
assert item.validate_value(None) is False
assert item.validate_value(123) is False
assert item.validate_value("123") is True
assert item.validate_value("abc") is True
assert item.validate_value("123abc") is True
assert item.validate_value("123.0") is True
assert item.validate_value(True) is False
def test_string_service_config_item_not_allows_empty(service_options):
item = StringServiceConfigItem(
id="test_string",
default_value="1337",
description="Test digits string",
widget="text",
)
assert item.get_value(service_options) == "1337"
with pytest.raises(ValueError):
item.set_value("", service_options)
assert item.get_value(service_options) == "1337"
assert item.validate_value("") is False
assert item.validate_value(None) is False
assert item.validate_value(123) is False
assert item.validate_value("123") is True
assert item.validate_value("abc") is True
assert item.validate_value("123abc") is True
assert item.validate_value("123.0") is True
assert item.validate_value(True) is False
def test_bool_service_config_item(service_options):
item = BoolServiceConfigItem(
id="test_bool",
default_value=True,
description="Test bool",
widget="switch",
)
assert item.get_value(service_options) is True
item.set_value(False, service_options)
assert item.get_value(service_options) is False
assert item.validate_value(True) is True
assert item.validate_value(False) is True
assert item.validate_value("True") is False
assert item.validate_value("False") is False
assert item.validate_value(1) is False
assert item.validate_value(0) is False
assert item.validate_value("1") is False
def test_enum_service_config_item(service_options):
item = EnumServiceConfigItem(
id="test_enum",
default_value="option1",
description="Test enum",
options=["option1", "option2", "option3"],
widget="select",
)
assert item.get_value(service_options) == "option1"
item.set_value("option2", service_options)
assert item.get_value(service_options) == "option2"
with pytest.raises(ValueError):
item.set_value("option4", service_options)
assert item.validate_value("option1") is True
assert item.validate_value("option4") is False
assert item.validate_value("option2") is True
assert item.validate_value(1) is False
assert item.validate_value("1") is False
assert item.validate_value(True) is False
def test_string_service_config_item_subdomain(service_options, dummy_service):
item = StringServiceConfigItem(
id="test_subdomain",
default_value="example",
description="Test subdomain string",
widget="subdomain",
allow_empty=False,
regex=SUBDOMAIN_REGEX,
)
assert item.get_value(service_options) == "example"
item.set_value("subdomain", service_options)
assert item.get_value(service_options) == "subdomain"
with pytest.raises(ValueError):
item.set_value(
"invalid-subdomain-because-it-is-very-very-very-very-very-very-long",
service_options,
)
assert item.validate_value("subdomain") is True
assert (
item.validate_value(
"invalid-subdomain-because-it-is-very-very-very-very-very-very-long"
)
is False
)
assert item.validate_value("api") is False
assert item.validate_value("auth") is False
assert item.validate_value("user") is False
assert item.validate_value("users") is False
assert item.validate_value("ntfy") is False
assert item.validate_value("") is False
assert item.validate_value(None) is False
assert item.validate_value(123) is False
assert item.validate_value("123") is True
assert item.validate_value("abc") is True
assert item.validate_value("123abc") is True
assert item.validate_value("123.0") is False
assert item.validate_value(True) is False

View file

@ -188,6 +188,7 @@ allServices {
id
status
isEnabled
url
}
"""
@ -347,6 +348,7 @@ def test_get_services(authorized_client, only_dummy_service):
assert api_dummy_service["id"] == "testservice"
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
assert api_dummy_service["isEnabled"] is True
assert api_dummy_service["url"] == "https://test.test-domain.tld"
def test_enable_return_value(authorized_client, only_dummy_service):