fix validation

This commit is contained in:
Inex Code 2024-07-23 19:50:00 +03:00
parent 9fb47272f8
commit 0384d7585a
4 changed files with 68 additions and 77 deletions

View file

@ -24,7 +24,6 @@ from selfprivacy_api.actions.services import (
) )
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.utils import write_to_log
@strawberry.type @strawberry.type
@ -173,8 +172,6 @@ class ServicesMutations:
self, input: SetServiceConfigurationInput self, input: SetServiceConfigurationInput
) -> ServiceMutationReturn: ) -> ServiceMutationReturn:
"""Set the new configuration values""" """Set the new configuration values"""
write_to_log('set_service_configuration')
write_to_log(f"{input=}")
service = get_service_by_id(input.service_id) service = get_service_by_id(input.service_id)
if service is None: if service is None:
return ServiceMutationReturn( return ServiceMutationReturn(
@ -183,9 +180,7 @@ class ServicesMutations:
code=404, code=404,
) )
try: try:
write_to_log('Got service by id.')
service.set_configuration(input.configuration) service.set_configuration(input.configuration)
write_to_log('Configuration set.')
return ServiceMutationReturn( return ServiceMutationReturn(
success=True, success=True,
message="Service configuration updated.", message="Service configuration updated.",

View file

@ -2,7 +2,11 @@ from abc import ABC, abstractmethod
import re import re
from typing import Optional from typing import Optional
from selfprivacy_api.utils import check_if_subdomain_is_taken, write_to_log from selfprivacy_api.utils import (
ReadUserData,
WriteUserData,
check_if_subdomain_is_taken,
)
class ServiceConfigItem(ABC): class ServiceConfigItem(ABC):
@ -12,11 +16,11 @@ class ServiceConfigItem(ABC):
type: str type: str
@abstractmethod @abstractmethod
def get_value(self, service_options): def get_value(self, service_id):
pass pass
@abstractmethod @abstractmethod
def set_value(self, value, service_options): def set_value(self, value, service_id):
pass pass
@abstractmethod @abstractmethod
@ -53,17 +57,21 @@ class StringServiceConfigItem(ServiceConfigItem):
self.widget = widget if widget else "text" self.widget = widget if widget else "text"
self.allow_empty = allow_empty self.allow_empty = allow_empty
def get_value(self, service_options): def get_value(self, service_id):
return service_options.get(self.id, self.default_value) with ReadUserData() as user_data:
if "modules" in user_data and service_id in user_data["modules"]:
return user_data["modules"][service_id].get(self.id, self.default_value)
return self.default_value
def set_value(self, value, service_options): def set_value(self, value, service_id):
write_to_log('set_value called')
if not self.validate_value(value): if not self.validate_value(value):
raise ValueError(f"Value {value} is not valid") raise ValueError(f"Value {value} is not valid")
if self.regex and not self.regex.match(value): with WriteUserData() as user_data:
raise ValueError(f"Value {value} does not match regex {self.regex}") if "modules" not in user_data:
write_to_log('seting actual value') user_data["modules"] = {}
service_options[self.id] = value if service_id not in user_data["modules"]:
user_data["modules"][service_id] = {}
user_data["modules"][service_id][self.id] = value
def as_dict(self, service_options): def as_dict(self, service_options):
return { return {
@ -77,18 +85,13 @@ class StringServiceConfigItem(ServiceConfigItem):
} }
def validate_value(self, value): def validate_value(self, value):
write_to_log('validate_value called')
if not isinstance(value, str): if not isinstance(value, str):
return False return False
write_to_log('value is string')
if not self.allow_empty and not value: if not self.allow_empty and not value:
return False return False
write_to_log('value is not empty')
if self.regex and not self.regex.match(value): if self.regex and not self.regex.match(value):
return False return False
write_to_log('regex match')
if self.widget == "subdomain": if self.widget == "subdomain":
write_to_log('subdomain widget')
if check_if_subdomain_is_taken(value): if check_if_subdomain_is_taken(value):
return False return False
return True return True
@ -108,13 +111,21 @@ class BoolServiceConfigItem(ServiceConfigItem):
self.description = description self.description = description
self.widget = widget if widget else "switch" self.widget = widget if widget else "switch"
def get_value(self, service_options): def get_value(self, service_id):
return service_options.get(self.id, self.default_value) with ReadUserData() as user_data:
if "modules" in user_data and service_id in user_data["modules"]:
return user_data["modules"][service_id].get(self.id, self.default_value)
return self.default_value
def set_value(self, value, service_options): def set_value(self, value, service_id):
if not isinstance(value, bool): if not self.validate_value(value):
raise ValueError(f"Value {value} is not a boolean") raise ValueError(f"Value {value} is not a boolean")
service_options[self.id] = value with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if service_id not in user_data["modules"]:
user_data["modules"][service_id] = {}
user_data["modules"][service_id][self.id] = value
def as_dict(self, service_options): def as_dict(self, service_options):
return { return {
@ -146,15 +157,21 @@ class EnumServiceConfigItem(ServiceConfigItem):
self.options = options self.options = options
self.widget = widget if widget else "select" self.widget = widget if widget else "select"
def get_value(self, service_options): def get_value(self, service_id):
return service_options.get(self.id, self.default_value) with ReadUserData() as user_data:
if "modules" in user_data and service_id in user_data["modules"]:
return user_data["modules"][service_id].get(self.id, self.default_value)
return self.default_value
def set_value(self, value, service_options): def set_value(self, value, service_id):
if not self.validate_value(value): if not self.validate_value(value):
raise ValueError(f"Value {value} is not valid") raise ValueError(f"Value {value} is not in options")
if value not in self.options: with WriteUserData() as user_data:
raise ValueError(f"Value {value} not in options {self.options}") if "modules" not in user_data:
service_options[self.id] = value user_data["modules"] = {}
if service_id not in user_data["modules"]:
user_data["modules"][service_id] = {}
user_data["modules"][service_id][self.id] = value
def as_dict(self, service_options): def as_dict(self, service_options):
return { return {
@ -192,17 +209,21 @@ class IntServiceConfigItem(ServiceConfigItem):
self.min_value = min_value self.min_value = min_value
self.max_value = max_value self.max_value = max_value
def get_value(self, service_options): def get_value(self, service_id):
return service_options.get(self.id, self.default_value) with ReadUserData() as user_data:
if "modules" in user_data and service_id in user_data["modules"]:
return user_data["modules"][service_id].get(self.id, self.default_value)
return self.default_value
def set_value(self, value, service_options): def set_value(self, value, service_id):
if self.min_value is not None and value < self.min_value: if not self.validate_value(value):
raise ValueError(f"Value {value} is less than min_value {self.min_value}") raise ValueError(f"Value {value} is not valid")
if self.max_value is not None and value > self.max_value: with WriteUserData() as user_data:
raise ValueError( if "modules" not in user_data:
f"Value {value} is greater than max_value {self.max_value}" user_data["modules"] = {}
) if service_id not in user_data["modules"]:
service_options[self.id] = value user_data["modules"][service_id] = {}
user_data["modules"][service_id][self.id] = value
def as_dict(self, service_options): def as_dict(self, service_options):
return { return {

View file

@ -5,7 +5,7 @@ from typing import List, Optional
from selfprivacy_api import utils from selfprivacy_api import utils
from selfprivacy_api.services.config_item import ServiceConfigItem from selfprivacy_api.services.config_item import ServiceConfigItem
from selfprivacy_api.utils.default_subdomains import DEFAULT_SUBDOMAINS from selfprivacy_api.utils.default_subdomains import DEFAULT_SUBDOMAINS
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain, write_to_log from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils.waitloop import wait_until_true from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
@ -192,44 +192,22 @@ class Service(ABC):
@classmethod @classmethod
def get_configuration(cls): def get_configuration(cls):
with ReadUserData() as user_data: return {
return { key: cls.config_items[key].as_dict(cls.get_id()) for key in cls.config_items
key: cls.config_items[key].as_dict( }
user_data.get("modules", {}).get(cls.get_id(), {})
)
for key in cls.config_items
}
@classmethod @classmethod
def set_configuration(cls, config_items): def set_configuration(cls, config_items):
write_to_log('set_configuration')
write_to_log(f'{config_items=}')
write_to_log('Starting pre-check for config items')
for key, value in config_items.items(): for key, value in config_items.items():
write_to_log(f'{key=}')
write_to_log(f'{value=}')
if key not in cls.config_items: if key not in cls.config_items:
raise ValueError(f"Key {key} is not valid for {cls.get_id()}") raise ValueError(f"Key {key} is not valid for {cls.get_id()}")
write_to_log('key in cls.config_items')
if cls.config_items[key].validate_value(value) is False: if cls.config_items[key].validate_value(value) is False:
raise ValueError(f"Value {value} is not valid for {key}") raise ValueError(f"Value {value} is not valid for {key}")
write_to_log('value is valid') for key, value in config_items.items():
with WriteUserData() as user_data: cls.config_items[key].set_value(
write_to_log('Writing to user_data') value,
if "modules" not in user_data: cls.get_id(),
write_to_log('modules not in user_data') )
user_data["modules"] = {}
if cls.get_id() not in user_data["modules"]:
write_to_log('cls.get_id() not in user_data["modules"]')
user_data["modules"][cls.get_id()] = {}
for key, value in config_items.items():
write_to_log('Starting writing')
write_to_log(f'{key=}')
write_to_log(f'{value=}')
cls.config_items[key].set_value(
value,
user_data["modules"][cls.get_id()],
)
@staticmethod @staticmethod
@abstractmethod @abstractmethod

View file

@ -140,12 +140,9 @@ def is_username_forbidden(username):
def check_if_subdomain_is_taken(subdomain: str) -> bool: def check_if_subdomain_is_taken(subdomain: str) -> bool:
"""Check if subdomain is already taken or reserved""" """Check if subdomain is already taken or reserved"""
write_to_log(f"Checking if subdomain {subdomain} is taken")
if subdomain in RESERVED_SUBDOMAINS: if subdomain in RESERVED_SUBDOMAINS:
return True return True
write_to_log(f"Subdomain {subdomain} is not reserved")
with ReadUserData() as data: with ReadUserData() as data:
write_to_log("entered with ReadUserData")
for module in data["modules"]: for module in data["modules"]:
if ( if (
data["modules"][module].get("subdomain", DEFAULT_SUBDOMAINS[module]) data["modules"][module].get("subdomain", DEFAULT_SUBDOMAINS[module])