refactor: Changes to reflect the new NixOS config structure

This commit is contained in:
Inex Code 2023-11-21 23:14:42 +03:00
parent d3873119b0
commit 22f9d2e9df
18 changed files with 139 additions and 752 deletions

View file

@ -31,7 +31,7 @@ def get_ssh_settings() -> UserdataSshSettings:
if "enable" not in data["ssh"]:
data["ssh"]["enable"] = True
if "passwordAuthentication" not in data["ssh"]:
data["ssh"]["passwordAuthentication"] = True
data["ssh"]["passwordAuthentication"] = False
if "rootKeys" not in data["ssh"]:
data["ssh"]["rootKeys"] = []
return UserdataSshSettings(**data["ssh"])

View file

@ -13,7 +13,7 @@ def get_timezone() -> str:
with ReadUserData() as user_data:
if "timezone" in user_data:
return user_data["timezone"]
return "Europe/Uzhgorod"
return "Etc/UTC"
class InvalidTimezone(Exception):

View file

@ -8,35 +8,36 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.migrations.check_for_failed_binds_migration import (
CheckForFailedBindsMigration,
)
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import (
MigrateToSelfprivacyChannel,
)
from selfprivacy_api.migrations.mount_volume import MountVolume
from selfprivacy_api.migrations.providers import CreateProviderFields
from selfprivacy_api.migrations.prepare_for_nixos_2211 import (
MigrateToSelfprivacyChannelFrom2205,
)
from selfprivacy_api.migrations.prepare_for_nixos_2305 import (
MigrateToSelfprivacyChannelFrom2211,
)
from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis
# from selfprivacy_api.migrations.check_for_failed_binds_migration import (
# CheckForFailedBindsMigration,
# )
from selfprivacy_api.utils import ReadUserData, UserDataFiles
# from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
# from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson
# from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import (
# MigrateToSelfprivacyChannel,
# )
# from selfprivacy_api.migrations.mount_volume import MountVolume
# from selfprivacy_api.migrations.providers import CreateProviderFields
# from selfprivacy_api.migrations.prepare_for_nixos_2211 import (
# MigrateToSelfprivacyChannelFrom2205,
# )
# from selfprivacy_api.migrations.prepare_for_nixos_2305 import (
# MigrateToSelfprivacyChannelFrom2211,
# )
# from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis
migrations = [
FixNixosConfigBranch(),
CreateTokensJson(),
MigrateToSelfprivacyChannel(),
MountVolume(),
CheckForFailedBindsMigration(),
CreateProviderFields(),
MigrateToSelfprivacyChannelFrom2205(),
MigrateToSelfprivacyChannelFrom2211(),
LoadTokensToRedis(),
# FixNixosConfigBranch(),
# CreateTokensJson(),
# MigrateToSelfprivacyChannel(),
# MountVolume(),
# CheckForFailedBindsMigration(),
# CreateProviderFields(),
# MigrateToSelfprivacyChannelFrom2205(),
# MigrateToSelfprivacyChannelFrom2211(),
# LoadTokensToRedis(),
]
@ -45,7 +46,7 @@ def run_migrations():
Go over all migrations. If they are not skipped in userdata file, run them
if the migration needed.
"""
with ReadUserData() as data:
with ReadUserData(UserDataFiles.SECRETS) as data:
if "api" not in data:
skipped_migrations = []
elif "skippedMigrations" not in data["api"]:

View file

@ -0,0 +1,56 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.utils import ReadUserData, UserDataFiles
class WriteTokenToRedis(Migration):
"""Load Json tokens into Redis"""
def get_migration_name(self):
return "write_token_to_redis"
def get_migration_description(self):
return "Loads the initial token into redis token storage"
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
if repo.get_tokens() != []:
return False
return True
def get_token_from_json(self):
try:
with ReadUserData(UserDataFiles.SECRETS) as userdata:
return userdata["api"]["token"]
except Exception as e:
print(e)
return None
def is_migration_needed(self):
try:
if self.get_token_from_json() is not None and self.is_repo_empty(
RedisTokensRepository()
):
return True
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
token = self.get_token_from_json()
if token is None:
print("No token found in tokens.json")
return
RedisTokensRepository()._store_token(token)
print("Done")
except Exception as e:
print(e)
print("Error migrating access tokens from json to redis")

View file

@ -1,8 +0,0 @@
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
repository = JsonTokensRepository()

View file

@ -1,153 +0,0 @@
"""
temporary legacy
"""
from typing import Optional
from datetime import datetime, timezone
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
class JsonTokensRepository(AbstractTokensRepository):
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
tokens_list = []
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
tokens_list.append(
Token(
token=userdata_token["token"],
device_name=userdata_token["name"],
created_at=userdata_token["date"],
)
)
return tokens_list
def _store_token(self, new_token: Token):
"""Store a token directly"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["tokens"].append(
{
"token": new_token.token,
"name": new_token.device_name,
"date": new_token.created_at.strftime(DATETIME_FORMAT),
}
)
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == input_token.token:
tokens_file["tokens"].remove(userdata_token)
return
raise TokenNotFound("Token not found!")
def __key_date_from_str(self, date_string: str) -> datetime:
if date_string is None or date_string == "":
return None
# we assume that we store dates in json as naive utc
utc_no_tz = datetime.fromisoformat(date_string)
utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc)
return utc_with_tz
def __date_from_tokens_file(
self, tokens_file: object, tokenfield: str, datefield: str
):
date_string = tokens_file[tokenfield].get(datefield)
return self.__key_date_from_str(date_string)
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if (
"recovery_token" not in tokens_file
or tokens_file["recovery_token"] is None
):
return
recovery_key = RecoveryKey(
key=tokens_file["recovery_token"].get("token"),
created_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "expiration"
),
uses_left=tokens_file["recovery_token"].get("uses_left"),
)
return recovery_key
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
key_expiration: Optional[str] = None
if recovery_key.expires_at is not None:
key_expiration = recovery_key.expires_at.strftime(DATETIME_FORMAT)
tokens_file["recovery_token"] = {
"token": recovery_key.key,
"date": recovery_key.created_at.strftime(DATETIME_FORMAT),
"expiration": key_expiration,
"uses_left": recovery_key.uses_left,
}
def _decrement_recovery_token(self):
"""Decrement recovery key use count by one"""
if self.is_recovery_key_valid():
with WriteUserData(UserDataFiles.TOKENS) as tokens:
if tokens["recovery_token"]["uses_left"] is not None:
tokens["recovery_token"]["uses_left"] -= 1
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "recovery_token" in tokens_file:
del tokens_file["recovery_token"]
return
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["new_device"] = {
"token": new_device_key.key,
"date": new_device_key.created_at.strftime(DATETIME_FORMAT),
"expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT),
}
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" in tokens_file:
del tokens_file["new_device"]
return
def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
"""Retrieves new device key that is already stored."""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" not in tokens_file or tokens_file["new_device"] is None:
return
new_device_key = NewDeviceKey(
key=tokens_file["new_device"]["token"],
created_at=self.__date_from_tokens_file(
tokens_file, "new_device", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "new_device", "expiration"
),
)
return new_device_key

View file

@ -56,7 +56,9 @@ class Nextcloud(Service):
@staticmethod
def is_enabled() -> bool:
with ReadUserData() as user_data:
return user_data.get("nextcloud", {}).get("enable", False)
return (
user_data.get("modules", {}).get("nextcloud", {}).get("enable", False)
)
@staticmethod
def get_status() -> ServiceStatus:
@ -75,6 +77,8 @@ class Nextcloud(Service):
def enable():
"""Enable Nextcloud service."""
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["enable"] = True
@ -83,6 +87,8 @@ class Nextcloud(Service):
def disable():
"""Disable Nextcloud service."""
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["enable"] = False

View file

@ -9,9 +9,7 @@ import portalocker
USERDATA_FILE = "/etc/nixos/userdata.json"
# TODO SECRETS_FILE = "/etc/selfprivacy/secrets.json"
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
JOBS_FILE = "/etc/nixos/userdata/jobs.json"
SECRETS_FILE = "/etc/selfprivacy/secrets.json"
DOMAIN_FILE = "/var/domain"
@ -19,8 +17,7 @@ class UserDataFiles(Enum):
"""Enum for userdata files"""
USERDATA = 0
TOKENS = 1
JOBS = 2
SECRETS = 3
def get_domain():
@ -36,14 +33,12 @@ class WriteUserData(object):
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8")
elif file_type == UserDataFiles.JOBS:
elif file_type == UserDataFiles.SECRETS:
# Make sure file exists
if not os.path.exists(JOBS_FILE):
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
jobs_file.write("{}")
self.userdata_file = open(JOBS_FILE, "r+", encoding="utf-8")
if not os.path.exists(SECRETS_FILE):
with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file:
secrets_file.write("{}")
self.userdata_file = open(SECRETS_FILE, "r+", encoding="utf-8")
else:
raise ValueError("Unknown file type")
portalocker.lock(self.userdata_file, portalocker.LOCK_EX)
@ -67,14 +62,11 @@ class ReadUserData(object):
def __init__(self, file_type=UserDataFiles.USERDATA):
if file_type == UserDataFiles.USERDATA:
self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.TOKENS:
self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.JOBS:
# Make sure file exists
if not os.path.exists(JOBS_FILE):
with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file:
jobs_file.write("{}")
self.userdata_file = open(JOBS_FILE, "r", encoding="utf-8")
elif file_type == UserDataFiles.SECRETS:
if not os.path.exists(SECRETS_FILE):
with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file:
secrets_file.write("{}")
self.userdata_file = open(SECRETS_FILE, "r", encoding="utf-8")
else:
raise ValueError("Unknown file type")
portalocker.lock(self.userdata_file, portalocker.LOCK_SH)

View file

@ -2,7 +2,7 @@
import os
from huey import SqliteHuey
HUEY_DATABASE = "/etc/nixos/userdata/tasks.db"
HUEY_DATABASE = "/etc/selfprivacy/tasks.db"
# Singleton instance containing the huey database.

View file

@ -10,9 +10,6 @@ import os.path as path
import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
@ -48,25 +45,6 @@ def global_data_dir():
return path.join(path.dirname(__file__), "data")
@pytest.fixture
def empty_tokens(mocker, tmpdir):
tokenfile = tmpdir / "empty_tokens.json"
with open(tokenfile, "w") as file:
file.write(EMPTY_TOKENS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
assert read_json(tokenfile)["tokens"] == []
return tmpdir
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
@ -90,13 +68,6 @@ def tokens_file(empty_redis_repo, tmpdir):
return repo
@pytest.fixture
def jobs_file(mocker, shared_datadir):
"""Mock tokens file."""
mock = mocker.patch("selfprivacy_api.utils.JOBS_FILE", shared_datadir / "jobs.json")
return mock
@pytest.fixture
def generic_userdata(mocker, tmpdir):
filename = "turned_on.json"
@ -121,14 +92,14 @@ def huey_database(mocker, shared_datadir):
@pytest.fixture
def client(tokens_file, huey_database, jobs_file):
def client(tokens_file, huey_database):
from selfprivacy_api.app import app
return TestClient(app)
@pytest.fixture
def authorized_client(tokens_file, huey_database, jobs_file):
def authorized_client(tokens_file, huey_database):
"""Authorized test client fixture."""
from selfprivacy_api.app import app
@ -140,7 +111,7 @@ def authorized_client(tokens_file, huey_database, jobs_file):
@pytest.fixture
def wrong_auth_client(tokens_file, huey_database, jobs_file):
def wrong_auth_client(tokens_file, huey_database):
"""Wrong token test client fixture."""
from selfprivacy_api.app import app

View file

@ -1,245 +0,0 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
"""
tests that restrict json token repository implementation
"""
import pytest
from datetime import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
RecoveryKeyNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from tests.common import read_json
from test_tokens_repository import (
mock_recovery_key_generate,
mock_generate_token,
mock_new_device_key_generate,
)
ORIGINAL_TOKEN_CONTENT = [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
EMPTY_KEYS_JSON = """
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}
"""
@pytest.fixture
def tokens(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
return datadir
@pytest.fixture
def empty_keys(mocker, tmpdir):
tokens_file = tmpdir / "empty_keys.json"
with open(tokens_file, "w") as file:
file.write(EMPTY_KEYS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
assert read_json(tokens_file)["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return tmpdir
@pytest.fixture
def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
assert read_json(datadir / "null_keys.json")["new_device"] is None
return datadir
def test_delete_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
repo.delete_token(input_token)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
def test_delete_not_found_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="imbadtoken",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
def test_create_recovery_key(tokens, mock_recovery_key_generate):
repo = JsonTokensRepository()
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051",
"date": "2022-07-15T17:41:31.675698",
"expiration": None,
"uses_left": 1,
}
def test_use_mnemonic_recovery_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
repo = JsonTokensRepository()
assert repo.use_mnemonic_recovery_key(
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
device_name="newdevice",
) == Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
device_name="newdevice",
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"date": "2022-07-15 17:41:31.675698",
"name": "primary_token",
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
{
"date": "2022-11-14T06:06:32.777123",
"name": "newdevice",
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
},
]
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"date": "2022-11-11T11:48:54.228038",
"expiration": None,
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"uses_left": 1,
}
def test_get_new_device_key(tokens, mock_new_device_key_generate):
repo = JsonTokensRepository()
assert repo.get_new_device_key() is not None
assert read_json(tokens / "tokens.json")["new_device"] == {
"date": "2022-07-15T17:41:31.675698",
"expiration": "2022-07-15T17:41:31.675698",
"token": "43478d05b35e4781598acd76e33832bb",
}
def test_delete_new_device_key(tokens):
repo = JsonTokensRepository()
assert repo.delete_new_device_key() is None
assert "new_device" not in read_json(tokens / "tokens.json")
def test_delete_new_device_key_when_empty(empty_keys):
repo = JsonTokensRepository()
repo.delete_new_device_key()
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
def test_use_mnemonic_new_device_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
)
is None
)

View file

@ -17,9 +17,6 @@ from selfprivacy_api.repositories.tokens.exceptions import (
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
@ -133,10 +130,8 @@ def mock_recovery_key_generate(mocker):
return mock
@pytest.fixture(params=["json", "redis"])
def empty_repo(request, empty_json_repo, empty_redis_repo):
if request.param == "json":
return empty_json_repo
@pytest.fixture(params=["redis"])
def empty_repo(request, empty_redis_repo):
if request.param == "redis":
return empty_redis_repo
# return empty_json_repo
@ -584,22 +579,22 @@ def assert_identical(
assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key()
def clone_to_redis(repo: JsonTokensRepository):
other_repo = RedisTokensRepository()
other_repo.clone(repo)
assert_identical(repo, other_repo)
# def clone_to_redis(repo: JsonTokensRepository):
# other_repo = RedisTokensRepository()
# other_repo.clone(repo)
# assert_identical(repo, other_repo)
# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist
def test_clone_json_to_redis_empty(empty_repo):
repo = empty_repo
if isinstance(repo, JsonTokensRepository):
clone_to_redis(repo)
# # we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist
# def test_clone_json_to_redis_empty(empty_repo):
# repo = empty_repo
# if isinstance(repo, JsonTokensRepository):
# clone_to_redis(repo)
def test_clone_json_to_redis_full(some_tokens_repo):
repo = some_tokens_repo
if isinstance(repo, JsonTokensRepository):
repo.get_new_device_key()
repo.create_recovery_key(five_minutes_into_future(), 2)
clone_to_redis(repo)
# def test_clone_json_to_redis_full(some_tokens_repo):
# repo = some_tokens_repo
# if isinstance(repo, JsonTokensRepository):
# repo.get_new_device_key()
# repo.create_recovery_key(five_minutes_into_future(), 2)
# clone_to_redis(repo)

View file

@ -273,6 +273,8 @@ def test_graphql_get_domain(
assert is_dns_record_in_array(
dns_records, dns_record(name="api", record_type="AAAA")
)
print(dns_records)
print(dns_record(name="cloud"))
assert is_dns_record_in_array(dns_records, dns_record(name="cloud"))
assert is_dns_record_in_array(
dns_records, dns_record(name="cloud", record_type="AAAA")
@ -373,9 +375,7 @@ def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config):
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["system"]["settings"]["timezone"] == "Europe/Uzhgorod"
)
assert response.json()["data"]["system"]["settings"]["timezone"] == "Etc/UTC"
API_CHANGE_TIMEZONE_MUTATION = """

View file

@ -10,11 +10,6 @@
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
@ -51,10 +46,12 @@
"server": {
"provider": "HETZNER"
},
"backup": {
"provider": "BACKBLAZE",
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
"modules": {
"nextcloud": {
"enable": true
},
"simple-nixos-mailserver": {
"enable": true
}
}
}

View file

@ -1,102 +0,0 @@
import base64
import json
import pytest
from selfprivacy_api.utils import get_dkim_key
###############################################################################
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate():
return (
b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for example.com\n',
None,
)
class NoFileMock(ProcessMock):
def communicate():
return (b"", None)
@pytest.fixture
def mock_subproccess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
mocker.patch(
"selfprivacy_api.rest.services.get_domain",
autospec=True,
return_value="example.com",
)
mocker.patch("os.path.exists", autospec=True, return_value=True)
return mock
@pytest.fixture
def mock_no_file(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock)
mocker.patch(
"selfprivacy_api.rest.services.get_domain",
autospec=True,
return_value="example.com",
)
mocker.patch("os.path.exists", autospec=True, return_value=False)
return mock
###############################################################################
def test_unauthorized(client, mock_subproccess_popen):
"""Test unauthorized"""
response = client.get("/services/mailserver/dkim")
assert response.status_code == 401
def test_illegal_methods(authorized_client, mock_subproccess_popen):
response = authorized_client.post("/services/mailserver/dkim")
assert response.status_code == 405
response = authorized_client.put("/services/mailserver/dkim")
assert response.status_code == 405
response = authorized_client.delete("/services/mailserver/dkim")
assert response.status_code == 405
def test_get_dkim_key(mock_subproccess_popen):
"""Test DKIM key"""
dkim_key = get_dkim_key("example.com")
assert (
dkim_key
== "v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB"
)
assert mock_subproccess_popen.call_args[0][0] == [
"cat",
"/var/dkim/example.com.selector.txt",
]
def test_dkim_key(authorized_client, mock_subproccess_popen):
"""Test old REST DKIM key endpoint"""
response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 200
assert (
base64.b64decode(response.text)
== b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for example.com\n'
)
assert mock_subproccess_popen.call_args[0][0] == [
"cat",
"/var/dkim/example.com.selector.txt",
]
def test_no_dkim_key(authorized_client, mock_no_file):
"""Test no DKIM key"""
response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 404
assert mock_no_file.called == False

View file

@ -1,123 +0,0 @@
import json
import pytest
def read_json(file_path):
with open(file_path, "r") as f:
return json.load(f)
###############################################################################
@pytest.fixture
def nextcloud_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["nextcloud"]["enable"] == False
return datadir
@pytest.fixture
def nextcloud_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["nextcloud"]["enable"] == True
return datadir
@pytest.fixture
def nextcloud_enable_undefined(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json"
)
assert "enable" not in read_json(datadir / "enable_undefined.json")["nextcloud"]
return datadir
@pytest.fixture
def nextcloud_undefined(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "nextcloud" not in read_json(datadir / "undefined.json")
return datadir
###############################################################################
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_unauthorized(client, nextcloud_off, endpoint):
response = client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 401
@pytest.mark.parametrize("endpoint", ["enable", "disable"])
def test_illegal_methods(authorized_client, nextcloud_off, endpoint):
response = authorized_client.get(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
response = authorized_client.put(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
response = authorized_client.delete(f"/services/nextcloud/{endpoint}")
assert response.status_code == 405
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_off(authorized_client, nextcloud_off, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_off / "turned_off.json") == read_json(
nextcloud_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_from_on(authorized_client, nextcloud_on, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_on / "turned_on.json") == read_json(
nextcloud_on / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_switch_twice(authorized_client, nextcloud_off, endpoint, target_file):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_off / "turned_off.json") == read_json(
nextcloud_off / target_file
)
@pytest.mark.parametrize(
"endpoint,target_file",
[("enable", "turned_on.json"), ("disable", "turned_off.json")],
)
def test_on_attribute_deleted(
authorized_client, nextcloud_enable_undefined, endpoint, target_file
):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json(
nextcloud_enable_undefined / target_file
)
@pytest.mark.parametrize("endpoint,target", [("enable", True), ("disable", False)])
def test_on_nextcloud_undefined(
authorized_client, nextcloud_undefined, endpoint, target
):
response = authorized_client.post(f"/services/nextcloud/{endpoint}")
assert response.status_code == 200
assert (
read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"]
== target
)

View file

@ -161,7 +161,7 @@ def test_get_current_settings_undefined(authorized_client, undefined_settings):
def test_get_current_settings_mostly_undefined(authorized_client, undefined_values):
response = authorized_client.get("/services/ssh")
assert response.status_code == 200
assert response.json() == {"enable": True, "passwordAuthentication": True}
assert response.json() == {"enable": True, "passwordAuthentication": False}
## PUT ON /ssh ######################################################

View file

@ -129,7 +129,7 @@ def test_get_timezone(authorized_client, turned_on):
def test_get_timezone_on_undefined(authorized_client, undefined_config):
response = authorized_client.get("/system/configuration/timezone")
assert response.status_code == 200
assert response.json() == "Europe/Uzhgorod"
assert response.json() == "Etc/UTC"
def test_put_timezone_unauthorized(client, turned_on):