From e130d37033c98de92bfe5df3c191fba7f63ed702 Mon Sep 17 00:00:00 2001 From: def Date: Wed, 16 Nov 2022 19:12:38 +0200 Subject: [PATCH] refactor(repository): Tokens repository JSON backend (#18) Co-authored-by: def Co-authored-by: Inex Code Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/18 Co-authored-by: def Co-committed-by: def --- selfprivacy_api/migrations/__init__.py | 4 +- selfprivacy_api/models/__init__.py | 0 .../models/tokens/new_device_key.py | 46 ++ selfprivacy_api/models/tokens/recovery_key.py | 56 ++ selfprivacy_api/models/tokens/token.py | 33 + selfprivacy_api/repositories/__init__.py | 0 .../repositories/tokens/__init__.py | 8 + .../tokens/abstract_tokens_repository.py | 93 +++ .../repositories/tokens/exceptions.py | 14 + .../tokens/json_tokens_repository.py | 238 +++++++ .../tokens/redis_tokens_repository.py | 15 + .../test_repository/test_tokens_repository.py | 582 ++++++++++++++++++ .../test_tokens_repository/empty_keys.json | 9 + .../test_tokens_repository/null_keys.json | 26 + .../test_tokens_repository/tokens.json | 35 ++ tests/test_graphql/test_users.py | 1 - 16 files changed, 1158 insertions(+), 2 deletions(-) create mode 100644 selfprivacy_api/models/__init__.py create mode 100644 selfprivacy_api/models/tokens/new_device_key.py create mode 100644 selfprivacy_api/models/tokens/recovery_key.py create mode 100644 selfprivacy_api/models/tokens/token.py create mode 100644 selfprivacy_api/repositories/__init__.py create mode 100644 selfprivacy_api/repositories/tokens/__init__.py create mode 100644 selfprivacy_api/repositories/tokens/abstract_tokens_repository.py create mode 100644 selfprivacy_api/repositories/tokens/exceptions.py create mode 100644 selfprivacy_api/repositories/tokens/json_tokens_repository.py create mode 100644 selfprivacy_api/repositories/tokens/redis_tokens_repository.py create mode 100644 tests/test_graphql/test_repository/test_tokens_repository.py create mode 100644 tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json create mode 100644 tests/test_graphql/test_repository/test_tokens_repository/null_keys.json create mode 100644 tests/test_graphql/test_repository/test_tokens_repository/tokens.json diff --git a/selfprivacy_api/migrations/__init__.py b/selfprivacy_api/migrations/__init__.py index 8209198..b051f04 100644 --- a/selfprivacy_api/migrations/__init__.py +++ b/selfprivacy_api/migrations/__init__.py @@ -8,7 +8,9 @@ at api.skippedMigrations in userdata.json and populating it with IDs of the migrations to skip. Adding DISABLE_ALL to that array disables the migrations module entirely. """ -from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration +from selfprivacy_api.migrations.check_for_failed_binds_migration import ( + CheckForFailedBindsMigration, +) from selfprivacy_api.utils import ReadUserData from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson diff --git a/selfprivacy_api/models/__init__.py b/selfprivacy_api/models/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/models/tokens/new_device_key.py b/selfprivacy_api/models/tokens/new_device_key.py new file mode 100644 index 0000000..dda926c --- /dev/null +++ b/selfprivacy_api/models/tokens/new_device_key.py @@ -0,0 +1,46 @@ +""" +New device key used to obtain access token. +""" +from datetime import datetime, timedelta +import secrets +from pydantic import BaseModel +from mnemonic import Mnemonic + + +class NewDeviceKey(BaseModel): + """ + Recovery key used to obtain access token. + + Recovery key has a key string, date of creation, date of expiration. + """ + + key: str + created_at: datetime + expires_at: datetime + + def is_valid(self) -> bool: + """ + Check if the recovery key is valid. + """ + if self.expires_at < datetime.now(): + return False + return True + + def as_mnemonic(self) -> str: + """ + Get the recovery key as a mnemonic. + """ + return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key)) + + @staticmethod + def generate() -> "NewDeviceKey": + """ + Factory to generate a random token. + """ + creation_date = datetime.now() + key = secrets.token_bytes(16).hex() + return NewDeviceKey( + key=key, + created_at=creation_date, + expires_at=datetime.now() + timedelta(minutes=10), + ) diff --git a/selfprivacy_api/models/tokens/recovery_key.py b/selfprivacy_api/models/tokens/recovery_key.py new file mode 100644 index 0000000..098aceb --- /dev/null +++ b/selfprivacy_api/models/tokens/recovery_key.py @@ -0,0 +1,56 @@ +""" +Recovery key used to obtain access token. + +Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left. +""" +from datetime import datetime +import secrets +from typing import Optional +from pydantic import BaseModel +from mnemonic import Mnemonic + + +class RecoveryKey(BaseModel): + """ + Recovery key used to obtain access token. + + Recovery key has a key string, date of creation, optional date of expiration and optional count of uses left. + """ + + key: str + created_at: datetime + expires_at: Optional[datetime] + uses_left: Optional[int] + + def is_valid(self) -> bool: + """ + Check if the recovery key is valid. + """ + if self.expires_at is not None and self.expires_at < datetime.now(): + return False + if self.uses_left is not None and self.uses_left <= 0: + return False + return True + + def as_mnemonic(self) -> str: + """ + Get the recovery key as a mnemonic. + """ + return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key)) + + @staticmethod + def generate( + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> "RecoveryKey": + """ + Factory to generate a random token. + """ + creation_date = datetime.now() + key = secrets.token_bytes(24).hex() + return RecoveryKey( + key=key, + created_at=creation_date, + expires_at=expiration, + uses_left=uses_left, + ) diff --git a/selfprivacy_api/models/tokens/token.py b/selfprivacy_api/models/tokens/token.py new file mode 100644 index 0000000..4c34f58 --- /dev/null +++ b/selfprivacy_api/models/tokens/token.py @@ -0,0 +1,33 @@ +""" +Model of the access token. + +Access token has a token string, device name and date of creation. +""" +from datetime import datetime +import secrets +from pydantic import BaseModel + + +class Token(BaseModel): + """ + Model of the access token. + + Access token has a token string, device name and date of creation. + """ + + token: str + device_name: str + created_at: datetime + + @staticmethod + def generate(device_name: str) -> "Token": + """ + Factory to generate a random token. + """ + creation_date = datetime.now() + token = secrets.token_urlsafe(32) + return Token( + token=token, + device_name=device_name, + created_at=creation_date, + ) diff --git a/selfprivacy_api/repositories/__init__.py b/selfprivacy_api/repositories/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/repositories/tokens/__init__.py b/selfprivacy_api/repositories/tokens/__init__.py new file mode 100644 index 0000000..9941bdc --- /dev/null +++ b/selfprivacy_api/repositories/tokens/__init__.py @@ -0,0 +1,8 @@ +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) + +repository = JsonTokensRepository() diff --git a/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py new file mode 100644 index 0000000..3cf6e1d --- /dev/null +++ b/selfprivacy_api/repositories/tokens/abstract_tokens_repository.py @@ -0,0 +1,93 @@ +from abc import ABC, abstractmethod +from datetime import datetime +from typing import Optional + +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.models.tokens.recovery_key import RecoveryKey +from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey + + +class AbstractTokensRepository(ABC): + @abstractmethod + def get_token_by_token_string(self, token_string: str) -> Optional[Token]: + """Get the token by token""" + + @abstractmethod + def get_token_by_name(self, token_name: str) -> Optional[Token]: + """Get the token by name""" + + @abstractmethod + def get_tokens(self) -> list[Token]: + """Get the tokens""" + + @abstractmethod + def create_token(self, device_name: str) -> Token: + """Create new token""" + + @abstractmethod + def delete_token(self, input_token: Token) -> None: + """Delete the token""" + + @abstractmethod + def refresh_token(self, input_token: Token) -> Token: + """Refresh the token""" + + def is_token_valid(self, token_string: str) -> bool: + """Check if the token is valid""" + token = self.get_token_by_token_string(token_string) + if token is None: + return False + return True + + def is_token_name_exists(self, token_name: str) -> bool: + """Check if the token name exists""" + token = self.get_token_by_name(token_name) + if token is None: + return False + return True + + def is_token_name_pair_valid(self, token_name: str, token_string: str) -> bool: + """Check if the token name and token are valid""" + token = self.get_token_by_name(token_name) + if token is None: + return False + return token.token == token_string + + @abstractmethod + def get_recovery_key(self) -> Optional[RecoveryKey]: + """Get the recovery key""" + + @abstractmethod + def create_recovery_key( + self, + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> RecoveryKey: + """Create the recovery key""" + + @abstractmethod + def use_mnemonic_recovery_key( + self, mnemonic_phrase: str, device_name: str + ) -> Token: + """Use the mnemonic recovery key and create a new token with the given name""" + + def is_recovery_key_valid(self) -> bool: + """Check if the recovery key is valid""" + recovery_key = self.get_recovery_key() + if recovery_key is None: + return False + return recovery_key.is_valid() + + @abstractmethod + def get_new_device_key(self) -> NewDeviceKey: + """Creates and returns the new device key""" + + @abstractmethod + def delete_new_device_key(self) -> None: + """Delete the new device key""" + + @abstractmethod + def use_mnemonic_new_device_key( + self, mnemonic_phrase: str, device_name: str + ) -> Token: + """Use the mnemonic new device key""" diff --git a/selfprivacy_api/repositories/tokens/exceptions.py b/selfprivacy_api/repositories/tokens/exceptions.py new file mode 100644 index 0000000..6b419c7 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/exceptions.py @@ -0,0 +1,14 @@ +class TokenNotFound(Exception): + """Token not found!""" + + +class RecoveryKeyNotFound(Exception): + """Recovery key not found!""" + + +class InvalidMnemonic(Exception): + """Phrase is not mnemonic!""" + + +class NewDeviceKeyNotFound(Exception): + """New device key not found!""" diff --git a/selfprivacy_api/repositories/tokens/json_tokens_repository.py b/selfprivacy_api/repositories/tokens/json_tokens_repository.py new file mode 100644 index 0000000..aad3158 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/json_tokens_repository.py @@ -0,0 +1,238 @@ +""" +temporary legacy +""" +from typing import Optional +from datetime import datetime +from mnemonic import Mnemonic + +from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.models.tokens.recovery_key import RecoveryKey +from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey +from selfprivacy_api.repositories.tokens.exceptions import ( + TokenNotFound, + RecoveryKeyNotFound, + InvalidMnemonic, + NewDeviceKeyNotFound, +) +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) + +DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" + + +class JsonTokensRepository(AbstractTokensRepository): + def get_token_by_token_string(self, token_string: str) -> Optional[Token]: + """Get the token by token""" + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + if userdata_token["token"] == token_string: + + return Token( + token=token_string, + device_name=userdata_token["name"], + created_at=userdata_token["date"], + ) + + raise TokenNotFound("Token not found!") + + def get_token_by_name(self, token_name: str) -> Optional[Token]: + """Get the token by name""" + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + if userdata_token["name"] == token_name: + + return Token( + token=userdata_token["token"], + device_name=token_name, + created_at=userdata_token["date"], + ) + + raise TokenNotFound("Token not found!") + + def get_tokens(self) -> list[Token]: + """Get the tokens""" + tokens_list = [] + + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + tokens_list.append( + Token( + token=userdata_token["token"], + device_name=userdata_token["name"], + created_at=userdata_token["date"], + ) + ) + + return tokens_list + + def create_token(self, device_name: str) -> Token: + """Create new token""" + new_token = Token.generate(device_name) + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file["tokens"].append( + { + "token": new_token.token, + "name": new_token.device_name, + "date": new_token.created_at.strftime(DATETIME_FORMAT), + } + ) + return new_token + + def delete_token(self, input_token: Token) -> None: + """Delete the token""" + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + if userdata_token["token"] == input_token.token: + tokens_file["tokens"].remove(userdata_token) + return + + raise TokenNotFound("Token not found!") + + def refresh_token(self, input_token: Token) -> Token: + """Change the token field of the existing token""" + new_token = Token.generate(device_name=input_token.device_name) + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + for userdata_token in tokens_file["tokens"]: + + if userdata_token["name"] == input_token.device_name: + userdata_token["token"] = new_token.token + userdata_token["date"] = ( + new_token.created_at.strftime(DATETIME_FORMAT), + ) + + return new_token + + raise TokenNotFound("Token not found!") + + def get_recovery_key(self) -> Optional[RecoveryKey]: + """Get the recovery key""" + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + + if ( + "recovery_token" not in tokens_file + or tokens_file["recovery_token"] is None + ): + return + + recovery_key = RecoveryKey( + key=tokens_file["recovery_token"].get("token"), + created_at=tokens_file["recovery_token"].get("date"), + expires_at=tokens_file["recovery_token"].get("expitation"), + uses_left=tokens_file["recovery_token"].get("uses_left"), + ) + + return recovery_key + + def create_recovery_key( + self, + expiration: Optional[datetime], + uses_left: Optional[int], + ) -> RecoveryKey: + """Create the recovery key""" + + recovery_key = RecoveryKey.generate(expiration, uses_left) + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file["recovery_token"] = { + "token": recovery_key.key, + "date": recovery_key.created_at.strftime(DATETIME_FORMAT), + "expiration": recovery_key.expires_at, + "uses_left": recovery_key.uses_left, + } + + return recovery_key + + def use_mnemonic_recovery_key( + self, mnemonic_phrase: str, device_name: str + ) -> Token: + """Use the mnemonic recovery key and create a new token with the given name""" + recovery_key = self.get_recovery_key() + + if recovery_key is None: + raise RecoveryKeyNotFound("Recovery key not found") + + if not recovery_key.is_valid(): + raise RecoveryKeyNotFound("Recovery key not found") + + recovery_token = bytes.fromhex(recovery_key.key) + + if not Mnemonic(language="english").check(mnemonic_phrase): + raise InvalidMnemonic("Phrase is not mnemonic!") + + phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) + if phrase_bytes != recovery_token: + raise RecoveryKeyNotFound("Recovery key not found") + + new_token = Token.generate(device_name=device_name) + + with WriteUserData(UserDataFiles.TOKENS) as tokens: + tokens["tokens"].append( + { + "token": new_token.token, + "name": new_token.device_name, + "date": new_token.created_at.strftime(DATETIME_FORMAT), + } + ) + + if "recovery_token" in tokens: + if ( + "uses_left" in tokens["recovery_token"] + and tokens["recovery_token"]["uses_left"] is not None + ): + tokens["recovery_token"]["uses_left"] -= 1 + return new_token + + def get_new_device_key(self) -> NewDeviceKey: + """Creates and returns the new device key""" + new_device_key = NewDeviceKey.generate() + + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + tokens_file["new_device"] = { + "token": new_device_key.key, + "date": new_device_key.created_at.strftime(DATETIME_FORMAT), + "expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT), + } + + return new_device_key + + def delete_new_device_key(self) -> None: + """Delete the new device key""" + with WriteUserData(UserDataFiles.TOKENS) as tokens_file: + if "new_device" in tokens_file: + del tokens_file["new_device"] + return + + def use_mnemonic_new_device_key( + self, mnemonic_phrase: str, device_name: str + ) -> Token: + """Use the mnemonic new device key""" + + with ReadUserData(UserDataFiles.TOKENS) as tokens_file: + if "new_device" not in tokens_file or tokens_file["new_device"] is None: + raise NewDeviceKeyNotFound("New device key not found") + + new_device_key = NewDeviceKey( + key=tokens_file["new_device"]["token"], + created_at=tokens_file["new_device"]["date"], + expires_at=tokens_file["new_device"]["expiration"], + ) + + token = bytes.fromhex(new_device_key.key) + + if not Mnemonic(language="english").check(mnemonic_phrase): + raise InvalidMnemonic("Phrase is not mnemonic!") + + phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase) + if bytes(phrase_bytes) != bytes(token): + raise NewDeviceKeyNotFound("Phrase is not token!") + + new_token = Token.generate(device_name=device_name) + with WriteUserData(UserDataFiles.TOKENS) as tokens: + if "new_device" in tokens: + del tokens["new_device"] + + return new_token diff --git a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py new file mode 100644 index 0000000..0186c11 --- /dev/null +++ b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py @@ -0,0 +1,15 @@ +""" +Token repository using Redis as backend. +""" +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) + + +class RedisTokensRepository(AbstractTokensRepository): + """ + Token repository using Redis as a backend + """ + + def __init__(self) -> None: + raise NotImplementedError diff --git a/tests/test_graphql/test_repository/test_tokens_repository.py b/tests/test_graphql/test_repository/test_tokens_repository.py new file mode 100644 index 0000000..878e242 --- /dev/null +++ b/tests/test_graphql/test_repository/test_tokens_repository.py @@ -0,0 +1,582 @@ +# pylint: disable=redefined-outer-name +# pylint: disable=unused-argument +# pylint: disable=missing-function-docstring + +from datetime import datetime, timezone + +import pytest + +from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey +from selfprivacy_api.models.tokens.recovery_key import RecoveryKey +from selfprivacy_api.models.tokens.token import Token +from selfprivacy_api.repositories.tokens.exceptions import ( + InvalidMnemonic, + RecoveryKeyNotFound, + TokenNotFound, + NewDeviceKeyNotFound, +) +from selfprivacy_api.repositories.tokens.json_tokens_repository import ( + JsonTokensRepository, +) +from tests.common import read_json + + +ORIGINAL_TOKEN_CONTENT = [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698", + }, + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z", + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z", + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698", + }, +] + + +@pytest.fixture +def tokens(mocker, datadir): + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json") + assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT + return datadir + + +@pytest.fixture +def empty_keys(mocker, datadir): + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json") + assert read_json(datadir / "empty_keys.json")["tokens"] == [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698", + } + ] + return datadir + + +@pytest.fixture +def null_keys(mocker, datadir): + mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") + assert read_json(datadir / "null_keys.json")["recovery_token"] is None + assert read_json(datadir / "null_keys.json")["new_device"] is None + return datadir + + +class RecoveryKeyMockReturnNotValid: + def is_valid() -> bool: + return False + + +@pytest.fixture +def mock_new_device_key_generate(mocker): + mock = mocker.patch( + "selfprivacy_api.repositories.tokens.json_tokens_repository.NewDeviceKey.generate", + autospec=True, + return_value=NewDeviceKey( + key="43478d05b35e4781598acd76e33832bb", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ), + ) + return mock + + +@pytest.fixture +def mock_generate_token(mocker): + mock = mocker.patch( + "selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate", + autospec=True, + return_value=Token( + token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", + device_name="newdevice", + created_at=datetime(2022, 11, 14, 6, 6, 32, 777123), + ), + ) + return mock + + +@pytest.fixture +def mock_get_recovery_key_return_not_valid(mocker): + mock = mocker.patch( + "selfprivacy_api.repositories.tokens.json_tokens_repository.JsonTokensRepository.get_recovery_key", + autospec=True, + return_value=RecoveryKeyMockReturnNotValid, + ) + return mock + + +@pytest.fixture +def mock_token_generate(mocker): + mock = mocker.patch( + "selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate", + autospec=True, + return_value=Token( + token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", + device_name="IamNewDevice", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ), + ) + return mock + + +@pytest.fixture +def mock_recovery_key_generate(mocker): + mock = mocker.patch( + "selfprivacy_api.repositories.tokens.json_tokens_repository.RecoveryKey.generate", + autospec=True, + return_value=RecoveryKey( + key="889bf49c1d3199d71a2e704718772bd53a422020334db051", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + expires_at=None, + uses_left=1, + ), + ) + return mock + + +############### +# Test tokens # +############### + + +def test_get_token_by_token_string(tokens): + repo = JsonTokensRepository() + + assert repo.get_token_by_token_string( + token_string="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI" + ) == Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + +def test_get_token_by_non_existent_token_string(tokens): + repo = JsonTokensRepository() + + with pytest.raises(TokenNotFound): + assert repo.get_token_by_token_string(token_string="iamBadtoken") is None + + +def test_get_token_by_name(tokens): + repo = JsonTokensRepository() + + assert repo.get_token_by_name(token_name="primary_token") is not None + assert repo.get_token_by_name(token_name="primary_token") == Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + +def test_get_token_by_non_existent_name(tokens): + repo = JsonTokensRepository() + + with pytest.raises(TokenNotFound): + assert repo.get_token_by_name(token_name="badname") is None + + +def test_get_tokens(tokens): + repo = JsonTokensRepository() + + assert repo.get_tokens() == [ + Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ), + Token( + token="3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + device_name="second_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc), + ), + Token( + token="LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + device_name="third_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc), + ), + Token( + token="dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + device_name="forth_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ), + ] + + +def test_get_tokens_when_one(empty_keys): + repo = JsonTokensRepository() + + assert repo.get_tokens() == [ + Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + ] + + +def test_create_token(tokens, mock_token_generate): + repo = JsonTokensRepository() + + assert repo.create_token(device_name="IamNewDevice") == Token( + token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", + device_name="IamNewDevice", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + +def test_delete_token(tokens): + repo = JsonTokensRepository() + input_token = Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + repo.delete_token(input_token) + assert read_json(tokens / "tokens.json")["tokens"] == [ + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z", + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z", + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698", + }, + ] + + +def test_delete_not_found_token(tokens): + repo = JsonTokensRepository() + input_token = Token( + token="imbadtoken", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + with pytest.raises(TokenNotFound): + assert repo.delete_token(input_token) is None + + assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT + + +def test_refresh_token(tokens, mock_token_generate): + repo = JsonTokensRepository() + input_token = Token( + token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + device_name="primary_token", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + assert repo.refresh_token(input_token) == Token( + token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", + device_name="IamNewDevice", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + +def test_refresh_not_found_token(tokens, mock_token_generate): + repo = JsonTokensRepository() + input_token = Token( + token="idontknowwhoiam", + device_name="tellmewhoiam?", + created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), + ) + + with pytest.raises(TokenNotFound): + assert repo.refresh_token(input_token) is None + + +################ +# Recovery key # +################ + + +def test_get_recovery_key(tokens): + repo = JsonTokensRepository() + + assert repo.get_recovery_key() == RecoveryKey( + key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", + created_at=datetime(2022, 11, 11, 11, 48, 54, 228038), + expires_at=None, + uses_left=2, + ) + + +def test_get_recovery_key_when_empty(empty_keys): + repo = JsonTokensRepository() + + assert repo.get_recovery_key() is None + + +def test_create_recovery_key(tokens, mock_recovery_key_generate): + repo = JsonTokensRepository() + + assert repo.create_recovery_key(uses_left=1, expiration=None) is not None + assert read_json(tokens / "tokens.json")["recovery_token"] == { + "token": "889bf49c1d3199d71a2e704718772bd53a422020334db051", + "date": "2022-07-15T17:41:31.675698", + "expiration": None, + "uses_left": 1, + } + + +def test_use_mnemonic_recovery_key_when_empty( + empty_keys, mock_recovery_key_generate, mock_token_generate +): + repo = JsonTokensRepository() + + with pytest.raises(RecoveryKeyNotFound): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + device_name="primary_token", + ) + is None + ) + + +def test_use_mnemonic_not_valid_recovery_key( + tokens, mock_get_recovery_key_return_not_valid +): + repo = JsonTokensRepository() + + with pytest.raises(RecoveryKeyNotFound): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + device_name="primary_token", + ) + is None + ) + + +def test_use_mnemonic_not_mnemonic_recovery_key(tokens): + repo = JsonTokensRepository() + + with pytest.raises(InvalidMnemonic): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="sorry, it was joke", + device_name="primary_token", + ) + is None + ) + + +def test_use_not_mnemonic_recovery_key(tokens): + repo = JsonTokensRepository() + + with pytest.raises(InvalidMnemonic): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="please come back", + device_name="primary_token", + ) + is None + ) + + +def test_use_not_found_mnemonic_recovery_key(tokens): + repo = JsonTokensRepository() + + with pytest.raises(RecoveryKeyNotFound): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + device_name="primary_token", + ) + is None + ) + + +def test_use_menemonic_recovery_key_when_empty(empty_keys): + repo = JsonTokensRepository() + + with pytest.raises(RecoveryKeyNotFound): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + device_name="primary_token", + ) + is None + ) + + +def test_use_menemonic_recovery_key_when_null(null_keys): + repo = JsonTokensRepository() + + with pytest.raises(RecoveryKeyNotFound): + assert ( + repo.use_mnemonic_recovery_key( + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + device_name="primary_token", + ) + is None + ) + + +def test_use_mnemonic_recovery_key(tokens, mock_generate_token): + repo = JsonTokensRepository() + + assert repo.use_mnemonic_recovery_key( + mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park", + device_name="newdevice", + ) == Token( + token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", + device_name="newdevice", + created_at=datetime(2022, 11, 14, 6, 6, 32, 777123), + ) + + assert read_json(tokens / "tokens.json")["tokens"] == [ + { + "date": "2022-07-15 17:41:31.675698", + "name": "primary_token", + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + }, + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z", + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z", + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698", + }, + { + "date": "2022-11-14T06:06:32.777123", + "name": "newdevice", + "token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4", + }, + ] + + assert read_json(tokens / "tokens.json")["recovery_token"] == { + "date": "2022-11-11T11:48:54.228038", + "expiration": None, + "token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", + "uses_left": 1, + } + + +################## +# New device key # +################## + + +def test_get_new_device_key(tokens, mock_new_device_key_generate): + repo = JsonTokensRepository() + + assert repo.get_new_device_key() is not None + assert read_json(tokens / "tokens.json")["new_device"] == { + "date": "2022-07-15T17:41:31.675698", + "expiration": "2022-07-15T17:41:31.675698", + "token": "43478d05b35e4781598acd76e33832bb", + } + + +def test_delete_new_device_key(tokens): + repo = JsonTokensRepository() + + assert repo.delete_new_device_key() is None + assert "new_device" not in read_json(tokens / "tokens.json") + + +def test_delete_new_device_key_when_empty(empty_keys): + repo = JsonTokensRepository() + + repo.delete_new_device_key() + assert "new_device" not in read_json(empty_keys / "empty_keys.json") + + +def test_use_invalid_mnemonic_new_device_key( + tokens, mock_new_device_key_generate, datadir, mock_token_generate +): + repo = JsonTokensRepository() + + with pytest.raises(InvalidMnemonic): + assert ( + repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="oh-no", + ) + is None + ) + + +def test_use_not_exists_mnemonic_new_device_key( + tokens, mock_new_device_key_generate, mock_token_generate +): + repo = JsonTokensRepository() + + with pytest.raises(NewDeviceKeyNotFound): + assert ( + repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park", + ) + is None + ) + + +def test_use_mnemonic_new_device_key( + tokens, mock_new_device_key_generate, mock_token_generate +): + repo = JsonTokensRepository() + + assert ( + repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + ) + is not None + ) + # assert read_json(datadir / "tokens.json")["new_device"] == [] + + +def test_use_mnemonic_new_device_key_when_empty(empty_keys): + repo = JsonTokensRepository() + + with pytest.raises(NewDeviceKeyNotFound): + assert ( + repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + ) + is None + ) + + +def test_use_mnemonic_new_device_key_when_null(null_keys): + repo = JsonTokensRepository() + + with pytest.raises(NewDeviceKeyNotFound): + assert ( + repo.use_mnemonic_new_device_key( + device_name="imnew", + mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb", + ) + is None + ) diff --git a/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json b/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json new file mode 100644 index 0000000..2131ddf --- /dev/null +++ b/tests/test_graphql/test_repository/test_tokens_repository/empty_keys.json @@ -0,0 +1,9 @@ +{ + "tokens": [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698" + } + ] +} diff --git a/tests/test_graphql/test_repository/test_tokens_repository/null_keys.json b/tests/test_graphql/test_repository/test_tokens_repository/null_keys.json new file mode 100644 index 0000000..45e6f90 --- /dev/null +++ b/tests/test_graphql/test_repository/test_tokens_repository/null_keys.json @@ -0,0 +1,26 @@ +{ + "tokens": [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698" + }, + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z" + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z" + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698" + } + ], + "recovery_token": null, + "new_device": null +} diff --git a/tests/test_graphql/test_repository/test_tokens_repository/tokens.json b/tests/test_graphql/test_repository/test_tokens_repository/tokens.json new file mode 100644 index 0000000..bb1805c --- /dev/null +++ b/tests/test_graphql/test_repository/test_tokens_repository/tokens.json @@ -0,0 +1,35 @@ +{ + "tokens": [ + { + "token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI", + "name": "primary_token", + "date": "2022-07-15 17:41:31.675698" + }, + { + "token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68", + "name": "second_token", + "date": "2022-07-15 17:41:31.675698Z" + }, + { + "token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8", + "name": "third_token", + "date": "2022-07-15T17:41:31.675698Z" + }, + { + "token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM", + "name": "forth_token", + "date": "2022-07-15T17:41:31.675698" + } + ], + "recovery_token": { + "token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54", + "date": "2022-11-11T11:48:54.228038", + "expiration": null, + "uses_left": 2 + }, + "new_device": { + "token": "2237238de23dc71ab558e317bdb8ff8e", + "date": "2022-10-26 20:50:47.973212", + "expiration": "2022-10-26 21:00:47.974153" + } +} diff --git a/tests/test_graphql/test_users.py b/tests/test_graphql/test_users.py index c36dcb2..7a65736 100644 --- a/tests/test_graphql/test_users.py +++ b/tests/test_graphql/test_users.py @@ -516,7 +516,6 @@ def test_graphql_add_long_username(authorized_client, one_user, mock_subprocess_ }, }, ) - assert response.status_code == 200 assert response.json().get("data") is not None assert response.json()["data"]["createUser"]["message"] is not None