diff --git a/selfprivacy_api/actions/ssh.py b/selfprivacy_api/actions/ssh.py index 8a92735..0c529ef 100644 --- a/selfprivacy_api/actions/ssh.py +++ b/selfprivacy_api/actions/ssh.py @@ -31,7 +31,7 @@ def get_ssh_settings() -> UserdataSshSettings: if "enable" not in data["ssh"]: data["ssh"]["enable"] = True if "passwordAuthentication" not in data["ssh"]: - data["ssh"]["passwordAuthentication"] = True + data["ssh"]["passwordAuthentication"] = False if "rootKeys" not in data["ssh"]: data["ssh"]["rootKeys"] = [] return UserdataSshSettings(**data["ssh"]) diff --git a/selfprivacy_api/actions/system.py b/selfprivacy_api/actions/system.py index 853662f..e9ac6b4 100644 --- a/selfprivacy_api/actions/system.py +++ b/selfprivacy_api/actions/system.py @@ -13,7 +13,7 @@ def get_timezone() -> str: with ReadUserData() as user_data: if "timezone" in user_data: return user_data["timezone"] - return "Europe/Uzhgorod" + return "Etc/UTC" class InvalidTimezone(Exception): diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py index 4aa932c..e342edc 100644 --- a/selfprivacy_api/migrations/__init__.py +++ b/selfprivacy_api/migrations/__init__.py @@ -8,35 +8,36 @@ at api.skippedMigrations in userdata.json and populating it with IDs of the migrations to skip. Adding DISABLE_ALL to that array disables the migrations module entirely. """ -from selfprivacy_api.migrations.check_for_failed_binds_migration import ( - CheckForFailedBindsMigration, -) -from selfprivacy_api.utils import ReadUserData -from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch -from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson -from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import ( - MigrateToSelfprivacyChannel, -) -from selfprivacy_api.migrations.mount_volume import MountVolume -from selfprivacy_api.migrations.providers import CreateProviderFields -from selfprivacy_api.migrations.prepare_for_nixos_2211 import ( - MigrateToSelfprivacyChannelFrom2205, -) -from selfprivacy_api.migrations.prepare_for_nixos_2305 import ( - MigrateToSelfprivacyChannelFrom2211, -) -from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis +# from selfprivacy_api.migrations.check_for_failed_binds_migration import ( +# CheckForFailedBindsMigration, +# ) +from selfprivacy_api.utils import ReadUserData, UserDataFiles + +# from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch +# from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson +# from selfprivacy_api.migrations.migrate_to_selfprivacy_channel import ( +# MigrateToSelfprivacyChannel, +# ) +# from selfprivacy_api.migrations.mount_volume import MountVolume +# from selfprivacy_api.migrations.providers import CreateProviderFields +# from selfprivacy_api.migrations.prepare_for_nixos_2211 import ( +# MigrateToSelfprivacyChannelFrom2205, +# ) +# from selfprivacy_api.migrations.prepare_for_nixos_2305 import ( +# MigrateToSelfprivacyChannelFrom2211, +# ) +# from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis migrations = [ - FixNixosConfigBranch(), - CreateTokensJson(), - MigrateToSelfprivacyChannel(), - MountVolume(), - CheckForFailedBindsMigration(), - CreateProviderFields(), - MigrateToSelfprivacyChannelFrom2205(), - MigrateToSelfprivacyChannelFrom2211(), - LoadTokensToRedis(), + # FixNixosConfigBranch(), + # CreateTokensJson(), + # MigrateToSelfprivacyChannel(), + # MountVolume(), + # CheckForFailedBindsMigration(), + # CreateProviderFields(), + # MigrateToSelfprivacyChannelFrom2205(), + # MigrateToSelfprivacyChannelFrom2211(), + # LoadTokensToRedis(), ] @@ -45,7 +46,7 @@ def run_migrations(): Go over all migrations. If they are not skipped in userdata file, run them if the migration needed. """ - with ReadUserData() as data: + with ReadUserData(UserDataFiles.SECRETS) as data: if "api" not in data: skipped_migrations = [] elif "skippedMigrations" not in data["api"]: diff --git a/selfprivacy_api/migrations/write_token_to_redis.py b/selfprivacy_api/migrations/write_token_to_redis.py new file mode 100644 index 0000000..e83eaf1 --- /dev/null +++ b/selfprivacy_api/migrations/write_token_to_redis.py @@ -0,0 +1,56 @@ +from selfprivacy_api.migrations.migration import Migration + +from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( + RedisTokensRepository, +) +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) +from selfprivacy_api.utils import ReadUserData, UserDataFiles + + +class WriteTokenToRedis(Migration): + """Load Json tokens into Redis""" + + def get_migration_name(self): + return "write_token_to_redis" + + def get_migration_description(self): + return "Loads the initial token into redis token storage" + + def is_repo_empty(self, repo: AbstractTokensRepository) -> bool: + if repo.get_tokens() != []: + return False + return True + + def get_token_from_json(self): + try: + with ReadUserData(UserDataFiles.SECRETS) as userdata: + return userdata["api"]["token"] + except Exception as e: + print(e) + return None + + def is_migration_needed(self): + try: + if self.get_token_from_json() is not None and self.is_repo_empty( + RedisTokensRepository() + ): + return True + except Exception as e: + print(e) + return False + + def migrate(self): + # Write info about providers to userdata.json + try: + token = self.get_token_from_json() + if token is None: + print("No token found in tokens.json") + return + RedisTokensRepository()._store_token(token) + + print("Done") + except Exception as e: + print(e) + print("Error migrating access tokens from json to redis") diff --git a/selfprivacy_api/repositories/tokens/__init__.py b/selfprivacy_api/repositories/tokens/__init__.py index 9941bdc..e69de29 100644 --- a/selfprivacy_api/repositories/tokens/__init__.py +++ b/selfprivacy_api/repositories/tokens/__init__.py @@ -1,8 +0,0 @@ -from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( - AbstractTokensRepository, -) -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, -) - -repository = JsonTokensRepository() diff --git a/selfprivacy_api/repositories/tokens/json_tokens_repository.py b/selfprivacy_api/repositories/tokens/json_tokens_repository.py deleted file mode 100644 index be753ea..0000000 --- a/selfprivacy_api/repositories/tokens/json_tokens_repository.py +++ /dev/null @@ -1,153 +0,0 @@ -""" -temporary legacy -""" -from typing import Optional -from datetime import datetime, timezone - -from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData -from selfprivacy_api.models.tokens.token import Token -from selfprivacy_api.models.tokens.recovery_key import RecoveryKey -from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey -from selfprivacy_api.repositories.tokens.exceptions import ( - TokenNotFound, -) -from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( - AbstractTokensRepository, -) - - -DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" - - -class JsonTokensRepository(AbstractTokensRepository): - def get_tokens(self) -> list[Token]: - """Get the tokens""" - tokens_list = [] - - with ReadUserData(UserDataFiles.TOKENS) as tokens_file: - for userdata_token in tokens_file["tokens"]: - tokens_list.append( - Token( - token=userdata_token["token"], - device_name=userdata_token["name"], - created_at=userdata_token["date"], - ) - ) - - return tokens_list - - def _store_token(self, new_token: Token): - """Store a token directly""" - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - tokens_file["tokens"].append( - { - "token": new_token.token, - "name": new_token.device_name, - "date": new_token.created_at.strftime(DATETIME_FORMAT), - } - ) - - def delete_token(self, input_token: Token) -> None: - """Delete the token""" - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - for userdata_token in tokens_file["tokens"]: - if userdata_token["token"] == input_token.token: - tokens_file["tokens"].remove(userdata_token) - return - - raise TokenNotFound("Token not found!") - - def __key_date_from_str(self, date_string: str) -> datetime: - if date_string is None or date_string == "": - return None - # we assume that we store dates in json as naive utc - utc_no_tz = datetime.fromisoformat(date_string) - utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc) - return utc_with_tz - - def __date_from_tokens_file( - self, tokens_file: object, tokenfield: str, datefield: str - ): - date_string = tokens_file[tokenfield].get(datefield) - return self.__key_date_from_str(date_string) - - def get_recovery_key(self) -> Optional[RecoveryKey]: - """Get the recovery key""" - with ReadUserData(UserDataFiles.TOKENS) as tokens_file: - - if ( - "recovery_token" not in tokens_file - or tokens_file["recovery_token"] is None - ): - return - - recovery_key = RecoveryKey( - key=tokens_file["recovery_token"].get("token"), - created_at=self.__date_from_tokens_file( - tokens_file, "recovery_token", "date" - ), - expires_at=self.__date_from_tokens_file( - tokens_file, "recovery_token", "expiration" - ), - uses_left=tokens_file["recovery_token"].get("uses_left"), - ) - - return recovery_key - - def _store_recovery_key(self, recovery_key: RecoveryKey) -> None: - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - key_expiration: Optional[str] = None - if recovery_key.expires_at is not None: - key_expiration = recovery_key.expires_at.strftime(DATETIME_FORMAT) - tokens_file["recovery_token"] = { - "token": recovery_key.key, - "date": recovery_key.created_at.strftime(DATETIME_FORMAT), - "expiration": key_expiration, - "uses_left": recovery_key.uses_left, - } - - def _decrement_recovery_token(self): - """Decrement recovery key use count by one""" - if self.is_recovery_key_valid(): - with WriteUserData(UserDataFiles.TOKENS) as tokens: - if tokens["recovery_token"]["uses_left"] is not None: - tokens["recovery_token"]["uses_left"] -= 1 - - def _delete_recovery_key(self) -> None: - """Delete the recovery key""" - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - if "recovery_token" in tokens_file: - del tokens_file["recovery_token"] - return - - def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None: - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - tokens_file["new_device"] = { - "token": new_device_key.key, - "date": new_device_key.created_at.strftime(DATETIME_FORMAT), - "expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT), - } - - def delete_new_device_key(self) -> None: - """Delete the new device key""" - with WriteUserData(UserDataFiles.TOKENS) as tokens_file: - if "new_device" in tokens_file: - del tokens_file["new_device"] - return - - def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]: - """Retrieves new device key that is already stored.""" - with ReadUserData(UserDataFiles.TOKENS) as tokens_file: - if "new_device" not in tokens_file or tokens_file["new_device"] is None: - return - - new_device_key = NewDeviceKey( - key=tokens_file["new_device"]["token"], - created_at=self.__date_from_tokens_file( - tokens_file, "new_device", "date" - ), - expires_at=self.__date_from_tokens_file( - tokens_file, "new_device", "expiration" - ), - ) - return new_device_key diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py index 4ac01af..27b72f2 100644 --- a/selfprivacy_api/services/nextcloud/__init__.py +++ b/selfprivacy_api/services/nextcloud/__init__.py @@ -56,7 +56,9 @@ class Nextcloud(Service): @staticmethod def is_enabled() -> bool: with ReadUserData() as user_data: - return user_data.get("nextcloud", {}).get("enable", False) + return ( + user_data.get("modules", {}).get("nextcloud", {}).get("enable", False) + ) @staticmethod def get_status() -> ServiceStatus: @@ -75,6 +77,8 @@ class Nextcloud(Service): def enable(): """Enable Nextcloud service.""" with WriteUserData() as user_data: + if "modules" not in user_data: + user_data["modules"] = {} if "nextcloud" not in user_data: user_data["nextcloud"] = {} user_data["nextcloud"]["enable"] = True @@ -83,6 +87,8 @@ class Nextcloud(Service): def disable(): """Disable Nextcloud service.""" with WriteUserData() as user_data: + if "modules" not in user_data: + user_data["modules"] = {} if "nextcloud" not in user_data: user_data["nextcloud"] = {} user_data["nextcloud"]["enable"] = False diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index cb384f9..230d5d1 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -9,9 +9,7 @@ import portalocker USERDATA_FILE = "/etc/nixos/userdata.json" -# TODO SECRETS_FILE = "/etc/selfprivacy/secrets.json" -TOKENS_FILE = "/etc/nixos/userdata/tokens.json" -JOBS_FILE = "/etc/nixos/userdata/jobs.json" +SECRETS_FILE = "/etc/selfprivacy/secrets.json" DOMAIN_FILE = "/var/domain" @@ -19,8 +17,7 @@ class UserDataFiles(Enum): """Enum for userdata files""" USERDATA = 0 - TOKENS = 1 - JOBS = 2 + SECRETS = 3 def get_domain(): @@ -36,14 +33,12 @@ class WriteUserData(object): def __init__(self, file_type=UserDataFiles.USERDATA): if file_type == UserDataFiles.USERDATA: self.userdata_file = open(USERDATA_FILE, "r+", encoding="utf-8") - elif file_type == UserDataFiles.TOKENS: - self.userdata_file = open(TOKENS_FILE, "r+", encoding="utf-8") - elif file_type == UserDataFiles.JOBS: + elif file_type == UserDataFiles.SECRETS: # Make sure file exists - if not os.path.exists(JOBS_FILE): - with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file: - jobs_file.write("{}") - self.userdata_file = open(JOBS_FILE, "r+", encoding="utf-8") + if not os.path.exists(SECRETS_FILE): + with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file: + secrets_file.write("{}") + self.userdata_file = open(SECRETS_FILE, "r+", encoding="utf-8") else: raise ValueError("Unknown file type") portalocker.lock(self.userdata_file, portalocker.LOCK_EX) @@ -67,14 +62,11 @@ class ReadUserData(object): def __init__(self, file_type=UserDataFiles.USERDATA): if file_type == UserDataFiles.USERDATA: self.userdata_file = open(USERDATA_FILE, "r", encoding="utf-8") - elif file_type == UserDataFiles.TOKENS: - self.userdata_file = open(TOKENS_FILE, "r", encoding="utf-8") - elif file_type == UserDataFiles.JOBS: - # Make sure file exists - if not os.path.exists(JOBS_FILE): - with open(JOBS_FILE, "w", encoding="utf-8") as jobs_file: - jobs_file.write("{}") - self.userdata_file = open(JOBS_FILE, "r", encoding="utf-8") + elif file_type == UserDataFiles.SECRETS: + if not os.path.exists(SECRETS_FILE): + with open(SECRETS_FILE, "w", encoding="utf-8") as secrets_file: + secrets_file.write("{}") + self.userdata_file = open(SECRETS_FILE, "r", encoding="utf-8") else: raise ValueError("Unknown file type") portalocker.lock(self.userdata_file, portalocker.LOCK_SH) diff --git a/selfprivacy_api/utils/huey.py b/selfprivacy_api/utils/huey.py index a7ff492..7f36ee2 100644 --- a/selfprivacy_api/utils/huey.py +++ b/selfprivacy_api/utils/huey.py @@ -2,7 +2,7 @@ import os from huey import SqliteHuey -HUEY_DATABASE = "/etc/nixos/userdata/tasks.db" +HUEY_DATABASE = "/etc/selfprivacy/tasks.db" # Singleton instance containing the huey database. diff --git a/tests/conftest.py b/tests/conftest.py index f058997..b1ffea6 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -10,9 +10,6 @@ import os.path as path import datetime from selfprivacy_api.models.tokens.token import Token -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, -) from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( RedisTokensRepository, ) @@ -48,25 +45,6 @@ def global_data_dir(): return path.join(path.dirname(__file__), "data") -@pytest.fixture -def empty_tokens(mocker, tmpdir): - tokenfile = tmpdir / "empty_tokens.json" - with open(tokenfile, "w") as file: - file.write(EMPTY_TOKENS_JSON) - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile) - assert read_json(tokenfile)["tokens"] == [] - return tmpdir - - -@pytest.fixture -def empty_json_repo(empty_tokens): - repo = JsonTokensRepository() - for token in repo.get_tokens(): - repo.delete_token(token) - assert repo.get_tokens() == [] - return repo - - @pytest.fixture def empty_redis_repo(): repo = RedisTokensRepository() @@ -90,13 +68,6 @@ def tokens_file(empty_redis_repo, tmpdir): return repo -@pytest.fixture -def jobs_file(mocker, shared_datadir): - """Mock tokens file.""" - mock = mocker.patch("selfprivacy_api.utils.JOBS_FILE", shared_datadir / "jobs.json") - return mock - - @pytest.fixture def generic_userdata(mocker, tmpdir): filename = "turned_on.json" @@ -121,14 +92,14 @@ def huey_database(mocker, shared_datadir): @pytest.fixture -def client(tokens_file, huey_database, jobs_file): +def client(tokens_file, huey_database): from selfprivacy_api.app import app return TestClient(app) @pytest.fixture -def authorized_client(tokens_file, huey_database, jobs_file): +def authorized_client(tokens_file, huey_database): """Authorized test client fixture.""" from selfprivacy_api.app import app @@ -140,7 +111,7 @@ def authorized_client(tokens_file, huey_database, jobs_file): @pytest.fixture -def wrong_auth_client(tokens_file, huey_database, jobs_file): +def wrong_auth_client(tokens_file, huey_database): """Wrong token test client fixture.""" from selfprivacy_api.app import app diff --git a/tests/test_graphql/test_repository/test_json_tokens_repository.py b/tests/test_graphql/test_repository/test_json_tokens_repository.py deleted file mode 100644 index 23df9df..0000000 --- a/tests/test_graphql/test_repository/test_json_tokens_repository.py +++ /dev/null @@ -1,245 +0,0 @@ -# pylint: disable=redefined-outer-name -# pylint: disable=unused-argument -# pylint: disable=missing-function-docstring -""" -tests that restrict json token repository implementation -""" - -import pytest - - -from datetime import datetime - -from selfprivacy_api.models.tokens.token import Token -from selfprivacy_api.repositories.tokens.exceptions import ( - TokenNotFound, - RecoveryKeyNotFound, - NewDeviceKeyNotFound, -) -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, -) - -from tests.common import read_json -from test_tokens_repository import ( - mock_recovery_key_generate, - mock_generate_token, - mock_new_device_key_generate, -) - -ORIGINAL_TOKEN_CONTENT = [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698", - }, - { - "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - "name": "second_token", - "date": "2022-07-15 17:41:31.675698Z", - }, - { - "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - "name": "third_token", - "date": "2022-07-15T17:41:31.675698Z", - }, - { - "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - "name": "forth_token", - "date": "2022-07-15T17:41:31.675698", - }, -] - -EMPTY_KEYS_JSON = """ -{ - "tokens": [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698" - } - ] -} -""" - - -@pytest.fixture -def tokens(mocker, datadir): - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json") - assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT - return datadir - - -@pytest.fixture -def empty_keys(mocker, tmpdir): - tokens_file = tmpdir / "empty_keys.json" - with open(tokens_file, "w") as file: - file.write(EMPTY_KEYS_JSON) - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file) - assert read_json(tokens_file)["tokens"] == [ - { - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - "name": "primary_token", - "date": "2022-07-15 17:41:31.675698", - } - ] - return tmpdir - - -@pytest.fixture -def null_keys(mocker, datadir): - mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") - assert read_json(datadir / "null_keys.json")["recovery_token"] is None - assert read_json(datadir / "null_keys.json")["new_device"] is None - return datadir - - -def test_delete_token(tokens): - repo = JsonTokensRepository() - input_token = Token( - token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) - - repo.delete_token(input_token) - assert read_json(tokens / "tokens.json")["tokens"] == [ - { - "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - "name": "second_token", - "date": "2022-07-15 17:41:31.675698Z", - }, - { - "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - "name": "third_token", - "date": "2022-07-15T17:41:31.675698Z", - }, - { - "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - "name": "forth_token", - "date": "2022-07-15T17:41:31.675698", - }, - ] - - -def test_delete_not_found_token(tokens): - repo = JsonTokensRepository() - input_token = Token( - token="imbadtoken", - device_name="primary_token", - created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), - ) - with pytest.raises(TokenNotFound): - assert repo.delete_token(input_token) is None - - assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT - - -def test_create_recovery_key(tokens, mock_recovery_key_generate): - repo = JsonTokensRepository() - - assert repo.create_recovery_key(uses_left=1, expiration=None) is not None - assert read_json(tokens / "tokens.json")["recovery_token"] == { - "token": "889bf49c1d3199d71a2e704718772bd53a422020334db051", - "date": "2022-07-15T17:41:31.675698", - "expiration": None, - "uses_left": 1, - } - - -def test_use_mnemonic_recovery_key_when_null(null_keys): - repo = JsonTokensRepository() - - with pytest.raises(RecoveryKeyNotFound): - assert ( - repo.use_mnemonic_recovery_key( - mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", - device_name="primary_token", - ) - is None - ) - - -def test_use_mnemonic_recovery_key(tokens, mock_generate_token): - repo = JsonTokensRepository() - - assert repo.use_mnemonic_recovery_key( - mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park", - device_name="newdevice", - ) == Token( - token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", - device_name="newdevice", - created_at=datetime(2022, 11, 14, 6, 6, 32, 777123), - ) - - assert read_json(tokens / "tokens.json")["tokens"] == [ - { - "date": "2022-07-15 17:41:31.675698", - "name": "primary_token", - "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", - }, - { - "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", - "name": "second_token", - "date": "2022-07-15 17:41:31.675698Z", - }, - { - "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", - "name": "third_token", - "date": "2022-07-15T17:41:31.675698Z", - }, - { - "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", - "name": "forth_token", - "date": "2022-07-15T17:41:31.675698", - }, - { - "date": "2022-11-14T06:06:32.777123", - "name": "newdevice", - "token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", - }, - ] - assert read_json(tokens / "tokens.json")["recovery_token"] == { - "date": "2022-11-11T11:48:54.228038", - "expiration": None, - "token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", - "uses_left": 1, - } - - -def test_get_new_device_key(tokens, mock_new_device_key_generate): - repo = JsonTokensRepository() - - assert repo.get_new_device_key() is not None - assert read_json(tokens / "tokens.json")["new_device"] == { - "date": "2022-07-15T17:41:31.675698", - "expiration": "2022-07-15T17:41:31.675698", - "token": "43478d05b35e4781598acd76e33832bb", - } - - -def test_delete_new_device_key(tokens): - repo = JsonTokensRepository() - - assert repo.delete_new_device_key() is None - assert "new_device" not in read_json(tokens / "tokens.json") - - -def test_delete_new_device_key_when_empty(empty_keys): - repo = JsonTokensRepository() - - repo.delete_new_device_key() - assert "new_device" not in read_json(empty_keys / "empty_keys.json") - - -def test_use_mnemonic_new_device_key_when_null(null_keys): - repo = JsonTokensRepository() - - with pytest.raises(NewDeviceKeyNotFound): - assert ( - repo.use_mnemonic_new_device_key( - device_name="imnew", - mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", - ) - is None - ) diff --git a/tests/test_graphql/test_repository/test_tokens_repository.py b/tests/test_graphql/test_repository/test_tokens_repository.py index eb5e7cb..0c4784c 100644 --- a/tests/test_graphql/test_repository/test_tokens_repository.py +++ b/tests/test_graphql/test_repository/test_tokens_repository.py @@ -17,9 +17,6 @@ from selfprivacy_api.repositories.tokens.exceptions import ( NewDeviceKeyNotFound, ) -from selfprivacy_api.repositories.tokens.json_tokens_repository import ( - JsonTokensRepository, -) from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( RedisTokensRepository, ) @@ -133,10 +130,8 @@ def mock_recovery_key_generate(mocker): return mock -@pytest.fixture(params=["json", "redis"]) -def empty_repo(request, empty_json_repo, empty_redis_repo): - if request.param == "json": - return empty_json_repo +@pytest.fixture(params=["redis"]) +def empty_repo(request, empty_redis_repo): if request.param == "redis": return empty_redis_repo # return empty_json_repo @@ -584,22 +579,22 @@ def assert_identical( assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key() -def clone_to_redis(repo: JsonTokensRepository): - other_repo = RedisTokensRepository() - other_repo.clone(repo) - assert_identical(repo, other_repo) +# def clone_to_redis(repo: JsonTokensRepository): +# other_repo = RedisTokensRepository() +# other_repo.clone(repo) +# assert_identical(repo, other_repo) -# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist -def test_clone_json_to_redis_empty(empty_repo): - repo = empty_repo - if isinstance(repo, JsonTokensRepository): - clone_to_redis(repo) +# # we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist +# def test_clone_json_to_redis_empty(empty_repo): +# repo = empty_repo +# if isinstance(repo, JsonTokensRepository): +# clone_to_redis(repo) -def test_clone_json_to_redis_full(some_tokens_repo): - repo = some_tokens_repo - if isinstance(repo, JsonTokensRepository): - repo.get_new_device_key() - repo.create_recovery_key(five_minutes_into_future(), 2) - clone_to_redis(repo) +# def test_clone_json_to_redis_full(some_tokens_repo): +# repo = some_tokens_repo +# if isinstance(repo, JsonTokensRepository): +# repo.get_new_device_key() +# repo.create_recovery_key(five_minutes_into_future(), 2) +# clone_to_redis(repo) diff --git a/tests/test_graphql/test_system.py b/tests/test_graphql/test_system.py index ed00268..34a07a9 100644 --- a/tests/test_graphql/test_system.py +++ b/tests/test_graphql/test_system.py @@ -273,6 +273,8 @@ def test_graphql_get_domain( assert is_dns_record_in_array( dns_records, dns_record(name="api", record_type="AAAA") ) + print(dns_records) + print(dns_record(name="cloud")) assert is_dns_record_in_array(dns_records, dns_record(name="cloud")) assert is_dns_record_in_array( dns_records, dns_record(name="cloud", record_type="AAAA") @@ -373,9 +375,7 @@ def test_graphql_get_timezone_on_undefined(authorized_client, undefined_config): ) assert response.status_code == 200 assert response.json().get("data") is not None - assert ( - response.json()["data"]["system"]["settings"]["timezone"] == "Europe/Uzhgorod" - ) + assert response.json()["data"]["system"]["settings"]["timezone"] == "Etc/UTC" API_CHANGE_TIMEZONE_MUTATION = """ diff --git a/tests/test_graphql/test_system/turned_on.json b/tests/test_graphql/test_system/turned_on.json index c6b758b..caf2658 100644 --- a/tests/test_graphql/test_system/turned_on.json +++ b/tests/test_graphql/test_system/turned_on.json @@ -10,11 +10,6 @@ "domain": "test.tld", "hashedMasterPassword": "HASHED_PASSWORD", "hostname": "test-instance", - "nextcloud": { - "adminPassword": "ADMIN", - "databasePassword": "ADMIN", - "enable": true - }, "resticPassword": "PASS", "ssh": { "enable": true, @@ -51,10 +46,12 @@ "server": { "provider": "HETZNER" }, - "backup": { - "provider": "BACKBLAZE", - "accountId": "ID", - "accountKey": "KEY", - "bucket": "selfprivacy" + "modules": { + "nextcloud": { + "enable": true + }, + "simple-nixos-mailserver": { + "enable": true + } } } diff --git a/tests/test_rest_endpoints/services/test_mailserver.py b/tests/test_rest_endpoints/services/test_mailserver.py deleted file mode 100644 index 2803683..0000000 --- a/tests/test_rest_endpoints/services/test_mailserver.py +++ /dev/null @@ -1,102 +0,0 @@ -import base64 -import json -import pytest - -from selfprivacy_api.utils import get_dkim_key - -############################################################################### - - -class ProcessMock: - """Mock subprocess.Popen""" - - def __init__(self, args, **kwargs): - self.args = args - self.kwargs = kwargs - - def communicate(): - return ( - b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for example.com\n', - None, - ) - - -class NoFileMock(ProcessMock): - def communicate(): - return (b"", None) - - -@pytest.fixture -def mock_subproccess_popen(mocker): - mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock) - mocker.patch( - "selfprivacy_api.rest.services.get_domain", - autospec=True, - return_value="example.com", - ) - mocker.patch("os.path.exists", autospec=True, return_value=True) - return mock - - -@pytest.fixture -def mock_no_file(mocker): - mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock) - mocker.patch( - "selfprivacy_api.rest.services.get_domain", - autospec=True, - return_value="example.com", - ) - mocker.patch("os.path.exists", autospec=True, return_value=False) - return mock - - -############################################################################### - - -def test_unauthorized(client, mock_subproccess_popen): - """Test unauthorized""" - response = client.get("/services/mailserver/dkim") - assert response.status_code == 401 - - -def test_illegal_methods(authorized_client, mock_subproccess_popen): - response = authorized_client.post("/services/mailserver/dkim") - assert response.status_code == 405 - response = authorized_client.put("/services/mailserver/dkim") - assert response.status_code == 405 - response = authorized_client.delete("/services/mailserver/dkim") - assert response.status_code == 405 - - -def test_get_dkim_key(mock_subproccess_popen): - """Test DKIM key""" - dkim_key = get_dkim_key("example.com") - assert ( - dkim_key - == "v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" - ) - assert mock_subproccess_popen.call_args[0][0] == [ - "cat", - "/var/dkim/example.com.selector.txt", - ] - - -def test_dkim_key(authorized_client, mock_subproccess_popen): - """Test old REST DKIM key endpoint""" - response = authorized_client.get("/services/mailserver/dkim") - assert response.status_code == 200 - assert ( - base64.b64decode(response.text) - == b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for example.com\n' - ) - assert mock_subproccess_popen.call_args[0][0] == [ - "cat", - "/var/dkim/example.com.selector.txt", - ] - - -def test_no_dkim_key(authorized_client, mock_no_file): - """Test no DKIM key""" - response = authorized_client.get("/services/mailserver/dkim") - assert response.status_code == 404 - assert mock_no_file.called == False diff --git a/tests/test_rest_endpoints/services/test_nextcloud.py b/tests/test_rest_endpoints/services/test_nextcloud.py deleted file mode 100644 index b05c363..0000000 --- a/tests/test_rest_endpoints/services/test_nextcloud.py +++ /dev/null @@ -1,123 +0,0 @@ -import json -import pytest - - -def read_json(file_path): - with open(file_path, "r") as f: - return json.load(f) - - -############################################################################### - - -@pytest.fixture -def nextcloud_off(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json") - assert read_json(datadir / "turned_off.json")["nextcloud"]["enable"] == False - return datadir - - -@pytest.fixture -def nextcloud_on(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json") - assert read_json(datadir / "turned_on.json")["nextcloud"]["enable"] == True - return datadir - - -@pytest.fixture -def nextcloud_enable_undefined(mocker, datadir): - mocker.patch( - "selfprivacy_api.utils.USERDATA_FILE", new=datadir / "enable_undefined.json" - ) - assert "enable" not in read_json(datadir / "enable_undefined.json")["nextcloud"] - return datadir - - -@pytest.fixture -def nextcloud_undefined(mocker, datadir): - mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json") - assert "nextcloud" not in read_json(datadir / "undefined.json") - return datadir - - -############################################################################### - - -@pytest.mark.parametrize("endpoint", ["enable", "disable"]) -def test_unauthorized(client, nextcloud_off, endpoint): - response = client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 401 - - -@pytest.mark.parametrize("endpoint", ["enable", "disable"]) -def test_illegal_methods(authorized_client, nextcloud_off, endpoint): - response = authorized_client.get(f"/services/nextcloud/{endpoint}") - assert response.status_code == 405 - response = authorized_client.put(f"/services/nextcloud/{endpoint}") - assert response.status_code == 405 - response = authorized_client.delete(f"/services/nextcloud/{endpoint}") - assert response.status_code == 405 - - -@pytest.mark.parametrize( - "endpoint,target_file", - [("enable", "turned_on.json"), ("disable", "turned_off.json")], -) -def test_switch_from_off(authorized_client, nextcloud_off, endpoint, target_file): - response = authorized_client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 200 - assert read_json(nextcloud_off / "turned_off.json") == read_json( - nextcloud_off / target_file - ) - - -@pytest.mark.parametrize( - "endpoint,target_file", - [("enable", "turned_on.json"), ("disable", "turned_off.json")], -) -def test_switch_from_on(authorized_client, nextcloud_on, endpoint, target_file): - response = authorized_client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 200 - assert read_json(nextcloud_on / "turned_on.json") == read_json( - nextcloud_on / target_file - ) - - -@pytest.mark.parametrize( - "endpoint,target_file", - [("enable", "turned_on.json"), ("disable", "turned_off.json")], -) -def test_switch_twice(authorized_client, nextcloud_off, endpoint, target_file): - response = authorized_client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 200 - response = authorized_client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 200 - assert read_json(nextcloud_off / "turned_off.json") == read_json( - nextcloud_off / target_file - ) - - -@pytest.mark.parametrize( - "endpoint,target_file", - [("enable", "turned_on.json"), ("disable", "turned_off.json")], -) -def test_on_attribute_deleted( - authorized_client, nextcloud_enable_undefined, endpoint, target_file -): - response = authorized_client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 200 - assert read_json(nextcloud_enable_undefined / "enable_undefined.json") == read_json( - nextcloud_enable_undefined / target_file - ) - - -@pytest.mark.parametrize("endpoint,target", [("enable", True), ("disable", False)]) -def test_on_nextcloud_undefined( - authorized_client, nextcloud_undefined, endpoint, target -): - response = authorized_client.post(f"/services/nextcloud/{endpoint}") - assert response.status_code == 200 - assert ( - read_json(nextcloud_undefined / "undefined.json")["nextcloud"]["enable"] - == target - ) diff --git a/tests/test_rest_endpoints/services/test_ssh.py b/tests/test_rest_endpoints/services/test_ssh.py index a17bdab..b1c1ab8 100644 --- a/tests/test_rest_endpoints/services/test_ssh.py +++ b/tests/test_rest_endpoints/services/test_ssh.py @@ -161,7 +161,7 @@ def test_get_current_settings_undefined(authorized_client, undefined_settings): def test_get_current_settings_mostly_undefined(authorized_client, undefined_values): response = authorized_client.get("/services/ssh") assert response.status_code == 200 - assert response.json() == {"enable": True, "passwordAuthentication": True} + assert response.json() == {"enable": True, "passwordAuthentication": False} ## PUT ON /ssh ###################################################### diff --git a/tests/test_rest_endpoints/test_system.py b/tests/test_rest_endpoints/test_system.py index 90c1499..39cb0ae 100644 --- a/tests/test_rest_endpoints/test_system.py +++ b/tests/test_rest_endpoints/test_system.py @@ -129,7 +129,7 @@ def test_get_timezone(authorized_client, turned_on): def test_get_timezone_on_undefined(authorized_client, undefined_config): response = authorized_client.get("/system/configuration/timezone") assert response.status_code == 200 - assert response.json() == "Europe/Uzhgorod" + assert response.json() == "Etc/UTC" def test_put_timezone_unauthorized(client, turned_on):