feature(system): dns migration

This commit is contained in:
Houkime 2024-08-28 15:18:10 +00:00 committed by Inex Code
parent ac07090784
commit 77fb99d84e
6 changed files with 86 additions and 13 deletions

View file

@ -5,10 +5,14 @@ import subprocess
import pytz import pytz
from typing import Optional, List from typing import Optional, List
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.jobs.upgrade_system import rebuild_system_task from selfprivacy_api.jobs.upgrade_system import rebuild_system_task
from selfprivacy_api.utils import WriteUserData, ReadUserData from selfprivacy_api.utils import WriteUserData, ReadUserData
from selfprivacy_api.utils import UserDataFiles
from selfprivacy_api.graphql.queries.providers import DnsProvider
def get_timezone() -> str: def get_timezone() -> str:
@ -40,6 +44,18 @@ class UserDataAutoUpgradeSettings(BaseModel):
allowReboot: bool = False allowReboot: bool = False
def set_dns_provider(provider: DnsProvider, token: str):
with WriteUserData() as user_data:
if not "dns" in user_data.keys():
user_data["dns"] = {}
user_data["dns"]["provider"] = provider.value
with WriteUserData(file_type=UserDataFiles.SECRETS) as secrets:
if not "dns" in secrets.keys():
secrets["dns"] = {}
secrets["dns"]["apiKey"] = token
def get_auto_upgrade_settings() -> UserDataAutoUpgradeSettings: def get_auto_upgrade_settings() -> UserDataAutoUpgradeSettings:
"""Get the auto-upgrade settings""" """Get the auto-upgrade settings"""
with ReadUserData() as user_data: with ReadUserData() as user_data:
@ -49,14 +65,14 @@ def get_auto_upgrade_settings() -> UserDataAutoUpgradeSettings:
def set_auto_upgrade_settings( def set_auto_upgrade_settings(
enalbe: Optional[bool] = None, allowReboot: Optional[bool] = None enable: Optional[bool] = None, allowReboot: Optional[bool] = None
) -> None: ) -> None:
"""Set the auto-upgrade settings""" """Set the auto-upgrade settings"""
with WriteUserData() as user_data: with WriteUserData() as user_data:
if "autoUpgrade" not in user_data: if "autoUpgrade" not in user_data:
user_data["autoUpgrade"] = {} user_data["autoUpgrade"] = {}
if enalbe is not None: if enable is not None:
user_data["autoUpgrade"]["enable"] = enalbe user_data["autoUpgrade"]["enable"] = enable
if allowReboot is not None: if allowReboot is not None:
user_data["autoUpgrade"]["allowReboot"] = allowReboot user_data["autoUpgrade"]["allowReboot"] = allowReboot

View file

@ -3,11 +3,13 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import typing import typing
import strawberry import strawberry
from selfprivacy_api.utils import pretty_error
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs import JobStatus from selfprivacy_api.jobs import JobStatus
from traceback import format_tb as format_traceback
from selfprivacy_api.graphql.mutations.mutation_interface import ( from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn, GenericJobMutationReturn,
@ -291,8 +293,3 @@ class ServicesMutations:
service=service_to_graphql_service(service), service=service_to_graphql_service(service),
job=job_to_api_job(job), job=job_to_api_job(job),
) )
def pretty_error(e: Exception) -> str:
traceback = "/r".join(format_traceback(e.__traceback__))
return type(e).__name__ + ": " + str(e) + ": " + traceback

View file

@ -3,8 +3,14 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import typing import typing
import strawberry import strawberry
from selfprivacy_api.utils import pretty_error
from selfprivacy_api.jobs.nix_collect_garbage import start_nix_collect_garbage
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.graphql.queries.providers import DnsProvider
from selfprivacy_api.graphql.mutations.mutation_interface import ( from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn, GenericJobMutationReturn,
GenericMutationReturn, GenericMutationReturn,
@ -13,9 +19,8 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
) )
import selfprivacy_api.actions.system as system_actions import selfprivacy_api.actions.system as system_actions
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs.nix_collect_garbage import start_nix_collect_garbage
import selfprivacy_api.actions.ssh as ssh_actions import selfprivacy_api.actions.ssh as ssh_actions
from selfprivacy_api.actions.system import set_dns_provider
@strawberry.type @strawberry.type
@ -49,6 +54,14 @@ class SSHSettingsInput:
password_authentication: bool password_authentication: bool
@strawberry.input
class SetDNSProviderInput:
"""Input type to set the provider"""
provider: DnsProvider
api_token: str
@strawberry.input @strawberry.input
class AutoUpgradeSettingsInput: class AutoUpgradeSettingsInput:
"""Input type for auto upgrade settings""" """Input type for auto upgrade settings"""
@ -210,3 +223,20 @@ class SystemMutations:
message="Garbage collector started...", message="Garbage collector started...",
job=job_to_api_job(job), job=job_to_api_job(job),
) )
@strawberry.mutation(permission_classes=[IsAuthenticated])
def set_dns_provider(self, input: SetDNSProviderInput) -> GenericMutationReturn:
try:
set_dns_provider(input.provider, input.api_token)
return GenericMutationReturn(
success=True,
code=200,
message="Provider set",
)
except Exception as e:
return GenericMutationReturn(
success=False,
code=400,
message=pretty_error(e),
)

View file

@ -8,6 +8,8 @@ import subprocess
import portalocker import portalocker
import typing import typing
from traceback import format_tb as format_traceback
from selfprivacy_api.utils.default_subdomains import ( from selfprivacy_api.utils.default_subdomains import (
DEFAULT_SUBDOMAINS, DEFAULT_SUBDOMAINS,
RESERVED_SUBDOMAINS, RESERVED_SUBDOMAINS,
@ -227,3 +229,8 @@ def write_to_log(message):
log.write(f"{datetime.datetime.now()} {message}\n") log.write(f"{datetime.datetime.now()} {message}\n")
log.flush() log.flush()
os.fsync(log.fileno()) os.fsync(log.fileno())
def pretty_error(e: Exception) -> str:
traceback = "/r".join(format_traceback(e.__traceback__))
return type(e).__name__ + ": " + str(e) + ": " + traceback

View file

@ -88,14 +88,22 @@ def redis_repo_with_tokens():
@pytest.fixture @pytest.fixture
def generic_userdata(mocker, tmpdir): def generic_userdata(mocker, tmpdir):
filename = "turned_on.json" filename = "turned_on.json"
secrets_filename = "secrets.json"
source_path = path.join(global_data_dir(), filename) source_path = path.join(global_data_dir(), filename)
userdata_path = path.join(tmpdir, filename) userdata_path = path.join(tmpdir, filename)
secrets_path = path.join(tmpdir, secrets_filename)
with open(secrets_path, "w") as file:
file.write("{}")
with open(userdata_path, "w") as file: with open(userdata_path, "w") as file:
with open(source_path, "r") as source: with open(source_path, "r") as source:
file.write(source.read()) file.write(source.read())
mock = mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=userdata_path) mock = mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=userdata_path)
mocker.patch("selfprivacy_api.utils.SECRETS_FILE", new=secrets_path)
return mock return mock

View file

@ -1,9 +1,24 @@
import pytest import pytest
from selfprivacy_api.utils import ReadUserData, UserDataFiles
from selfprivacy_api.actions.system import run_blocking, ShellException from selfprivacy_api.actions.system import run_blocking, ShellException
from selfprivacy_api.actions.system import set_dns_provider
from selfprivacy_api.graphql.queries.providers import DnsProvider
def test_set_dns(generic_userdata):
token = "testytesty"
provider = DnsProvider.DESEC
set_dns_provider(provider, token)
with ReadUserData() as user_data:
assert user_data["dns"]["provider"] == "DESEC"
with ReadUserData(file_type=UserDataFiles.SECRETS) as secrets:
assert secrets["dns"]["apiKey"] == token
# uname is just an arbitrary command expected to be everywhere we care # uname is just an arbitrary command expected to be everywhere we care
def test_uname(): def test_uname():
output = run_blocking(["uname"]) output = run_blocking(["uname"])
assert output is not None assert output is not None