refactor(repository): Tokens repository JSON backend (#18)

Co-authored-by: def <dettlaff@riseup.net>
Co-authored-by: Inex Code <inex.code@selfprivacy.org>
Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/18
Co-authored-by: def <dettlaff@riseup.net>
Co-committed-by: def <dettlaff@riseup.net>
This commit is contained in:
def 2022-11-16 19:12:38 +02:00 committed by Inex Code
parent 0a09a338b8
commit e130d37033
16 changed files with 1158 additions and 2 deletions

View file

@ -8,7 +8,9 @@ at api.skippedMigrations in userdata.json and populating it
with IDs of the migrations to skip.
Adding DISABLE_ALL to that array disables the migrations module entirely.
"""
from selfprivacy_api.migrations.check_for_failed_binds_migration import CheckForFailedBindsMigration
from selfprivacy_api.migrations.check_for_failed_binds_migration import (
CheckForFailedBindsMigration,
)
from selfprivacy_api.utils import ReadUserData
from selfprivacy_api.migrations.fix_nixos_config_branch import FixNixosConfigBranch
from selfprivacy_api.migrations.create_tokens_json import CreateTokensJson

View file

View file

@ -0,0 +1,46 @@
"""
New device key used to obtain access token.
"""
from datetime import datetime, timedelta
import secrets
from pydantic import BaseModel
from mnemonic import Mnemonic
class NewDeviceKey(BaseModel):
"""
Recovery key used to obtain access token.
Recovery key has a key string, date of creation, date of expiration.
"""
key: str
created_at: datetime
expires_at: datetime
def is_valid(self) -> bool:
"""
Check if the recovery key is valid.
"""
if self.expires_at < datetime.now():
return False
return True
def as_mnemonic(self) -> str:
"""
Get the recovery key as a mnemonic.
"""
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
@staticmethod
def generate() -> "NewDeviceKey":
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
key = secrets.token_bytes(16).hex()
return NewDeviceKey(
key=key,
created_at=creation_date,
expires_at=datetime.now() + timedelta(minutes=10),
)

View file

@ -0,0 +1,56 @@
"""
Recovery key used to obtain access token.
Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left.
"""
from datetime import datetime
import secrets
from typing import Optional
from pydantic import BaseModel
from mnemonic import Mnemonic
class RecoveryKey(BaseModel):
"""
Recovery key used to obtain access token.
Recovery key has a key string, date of creation, optional date of expiration and optional count of uses left.
"""
key: str
created_at: datetime
expires_at: Optional[datetime]
uses_left: Optional[int]
def is_valid(self) -> bool:
"""
Check if the recovery key is valid.
"""
if self.expires_at is not None and self.expires_at < datetime.now():
return False
if self.uses_left is not None and self.uses_left <= 0:
return False
return True
def as_mnemonic(self) -> str:
"""
Get the recovery key as a mnemonic.
"""
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
@staticmethod
def generate(
expiration: Optional[datetime],
uses_left: Optional[int],
) -> "RecoveryKey":
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
key = secrets.token_bytes(24).hex()
return RecoveryKey(
key=key,
created_at=creation_date,
expires_at=expiration,
uses_left=uses_left,
)

View file

@ -0,0 +1,33 @@
"""
Model of the access token.
Access token has a token string, device name and date of creation.
"""
from datetime import datetime
import secrets
from pydantic import BaseModel
class Token(BaseModel):
"""
Model of the access token.
Access token has a token string, device name and date of creation.
"""
token: str
device_name: str
created_at: datetime
@staticmethod
def generate(device_name: str) -> "Token":
"""
Factory to generate a random token.
"""
creation_date = datetime.now()
token = secrets.token_urlsafe(32)
return Token(
token=token,
device_name=device_name,
created_at=creation_date,
)

View file

View file

@ -0,0 +1,8 @@
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
repository = JsonTokensRepository()

View file

@ -0,0 +1,93 @@
from abc import ABC, abstractmethod
from datetime import datetime
from typing import Optional
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
class AbstractTokensRepository(ABC):
@abstractmethod
def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token"""
@abstractmethod
def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name"""
@abstractmethod
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
@abstractmethod
def create_token(self, device_name: str) -> Token:
"""Create new token"""
@abstractmethod
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
@abstractmethod
def refresh_token(self, input_token: Token) -> Token:
"""Refresh the token"""
def is_token_valid(self, token_string: str) -> bool:
"""Check if the token is valid"""
token = self.get_token_by_token_string(token_string)
if token is None:
return False
return True
def is_token_name_exists(self, token_name: str) -> bool:
"""Check if the token name exists"""
token = self.get_token_by_name(token_name)
if token is None:
return False
return True
def is_token_name_pair_valid(self, token_name: str, token_string: str) -> bool:
"""Check if the token name and token are valid"""
token = self.get_token_by_name(token_name)
if token is None:
return False
return token.token == token_string
@abstractmethod
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
@abstractmethod
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
@abstractmethod
def use_mnemonic_recovery_key(
self, mnemonic_phrase: str, device_name: str
) -> Token:
"""Use the mnemonic recovery key and create a new token with the given name"""
def is_recovery_key_valid(self) -> bool:
"""Check if the recovery key is valid"""
recovery_key = self.get_recovery_key()
if recovery_key is None:
return False
return recovery_key.is_valid()
@abstractmethod
def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key"""
@abstractmethod
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
@abstractmethod
def use_mnemonic_new_device_key(
self, mnemonic_phrase: str, device_name: str
) -> Token:
"""Use the mnemonic new device key"""

View file

@ -0,0 +1,14 @@
class TokenNotFound(Exception):
"""Token not found!"""
class RecoveryKeyNotFound(Exception):
"""Recovery key not found!"""
class InvalidMnemonic(Exception):
"""Phrase is not mnemonic!"""
class NewDeviceKeyNotFound(Exception):
"""New device key not found!"""

View file

@ -0,0 +1,238 @@
"""
temporary legacy
"""
from typing import Optional
from datetime import datetime
from mnemonic import Mnemonic
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
RecoveryKeyNotFound,
InvalidMnemonic,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
class JsonTokensRepository(AbstractTokensRepository):
def get_token_by_token_string(self, token_string: str) -> Optional[Token]:
"""Get the token by token"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == token_string:
return Token(
token=token_string,
device_name=userdata_token["name"],
created_at=userdata_token["date"],
)
raise TokenNotFound("Token not found!")
def get_token_by_name(self, token_name: str) -> Optional[Token]:
"""Get the token by name"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["name"] == token_name:
return Token(
token=userdata_token["token"],
device_name=token_name,
created_at=userdata_token["date"],
)
raise TokenNotFound("Token not found!")
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
tokens_list = []
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
tokens_list.append(
Token(
token=userdata_token["token"],
device_name=userdata_token["name"],
created_at=userdata_token["date"],
)
)
return tokens_list
def create_token(self, device_name: str) -> Token:
"""Create new token"""
new_token = Token.generate(device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["tokens"].append(
{
"token": new_token.token,
"name": new_token.device_name,
"date": new_token.created_at.strftime(DATETIME_FORMAT),
}
)
return new_token
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["token"] == input_token.token:
tokens_file["tokens"].remove(userdata_token)
return
raise TokenNotFound("Token not found!")
def refresh_token(self, input_token: Token) -> Token:
"""Change the token field of the existing token"""
new_token = Token.generate(device_name=input_token.device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
for userdata_token in tokens_file["tokens"]:
if userdata_token["name"] == input_token.device_name:
userdata_token["token"] = new_token.token
userdata_token["date"] = (
new_token.created_at.strftime(DATETIME_FORMAT),
)
return new_token
raise TokenNotFound("Token not found!")
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if (
"recovery_token" not in tokens_file
or tokens_file["recovery_token"] is None
):
return
recovery_key = RecoveryKey(
key=tokens_file["recovery_token"].get("token"),
created_at=tokens_file["recovery_token"].get("date"),
expires_at=tokens_file["recovery_token"].get("expitation"),
uses_left=tokens_file["recovery_token"].get("uses_left"),
)
return recovery_key
def create_recovery_key(
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration, uses_left)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["recovery_token"] = {
"token": recovery_key.key,
"date": recovery_key.created_at.strftime(DATETIME_FORMAT),
"expiration": recovery_key.expires_at,
"uses_left": recovery_key.uses_left,
}
return recovery_key
def use_mnemonic_recovery_key(
self, mnemonic_phrase: str, device_name: str
) -> Token:
"""Use the mnemonic recovery key and create a new token with the given name"""
recovery_key = self.get_recovery_key()
if recovery_key is None:
raise RecoveryKeyNotFound("Recovery key not found")
if not recovery_key.is_valid():
raise RecoveryKeyNotFound("Recovery key not found")
recovery_token = bytes.fromhex(recovery_key.key)
if not Mnemonic(language="english").check(mnemonic_phrase):
raise InvalidMnemonic("Phrase is not mnemonic!")
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
if phrase_bytes != recovery_token:
raise RecoveryKeyNotFound("Recovery key not found")
new_token = Token.generate(device_name=device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["tokens"].append(
{
"token": new_token.token,
"name": new_token.device_name,
"date": new_token.created_at.strftime(DATETIME_FORMAT),
}
)
if "recovery_token" in tokens:
if (
"uses_left" in tokens["recovery_token"]
and tokens["recovery_token"]["uses_left"] is not None
):
tokens["recovery_token"]["uses_left"] -= 1
return new_token
def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key"""
new_device_key = NewDeviceKey.generate()
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["new_device"] = {
"token": new_device_key.key,
"date": new_device_key.created_at.strftime(DATETIME_FORMAT),
"expiration": new_device_key.expires_at.strftime(DATETIME_FORMAT),
}
return new_device_key
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" in tokens_file:
del tokens_file["new_device"]
return
def use_mnemonic_new_device_key(
self, mnemonic_phrase: str, device_name: str
) -> Token:
"""Use the mnemonic new device key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
if "new_device" not in tokens_file or tokens_file["new_device"] is None:
raise NewDeviceKeyNotFound("New device key not found")
new_device_key = NewDeviceKey(
key=tokens_file["new_device"]["token"],
created_at=tokens_file["new_device"]["date"],
expires_at=tokens_file["new_device"]["expiration"],
)
token = bytes.fromhex(new_device_key.key)
if not Mnemonic(language="english").check(mnemonic_phrase):
raise InvalidMnemonic("Phrase is not mnemonic!")
phrase_bytes = Mnemonic(language="english").to_entropy(mnemonic_phrase)
if bytes(phrase_bytes) != bytes(token):
raise NewDeviceKeyNotFound("Phrase is not token!")
new_token = Token.generate(device_name=device_name)
with WriteUserData(UserDataFiles.TOKENS) as tokens:
if "new_device" in tokens:
del tokens["new_device"]
return new_token

View file

@ -0,0 +1,15 @@
"""
Token repository using Redis as backend.
"""
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
class RedisTokensRepository(AbstractTokensRepository):
"""
Token repository using Redis as a backend
"""
def __init__(self) -> None:
raise NotImplementedError

View file

@ -0,0 +1,582 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
from datetime import datetime, timezone
import pytest
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.exceptions import (
InvalidMnemonic,
RecoveryKeyNotFound,
TokenNotFound,
NewDeviceKeyNotFound,
)
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from tests.common import read_json
ORIGINAL_TOKEN_CONTENT = [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
@pytest.fixture
def tokens(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "tokens.json")
assert read_json(datadir / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
return datadir
@pytest.fixture
def empty_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json")
assert read_json(datadir / "empty_keys.json")["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return datadir
@pytest.fixture
def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")
assert read_json(datadir / "null_keys.json")["recovery_token"] is None
assert read_json(datadir / "null_keys.json")["new_device"] is None
return datadir
class RecoveryKeyMockReturnNotValid:
def is_valid() -> bool:
return False
@pytest.fixture
def mock_new_device_key_generate(mocker):
mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.NewDeviceKey.generate",
autospec=True,
return_value=NewDeviceKey(
key="43478d05b35e4781598acd76e33832bb",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
),
)
return mock
@pytest.fixture
def mock_generate_token(mocker):
mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate",
autospec=True,
return_value=Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
device_name="newdevice",
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
),
)
return mock
@pytest.fixture
def mock_get_recovery_key_return_not_valid(mocker):
mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.JsonTokensRepository.get_recovery_key",
autospec=True,
return_value=RecoveryKeyMockReturnNotValid,
)
return mock
@pytest.fixture
def mock_token_generate(mocker):
mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.Token.generate",
autospec=True,
return_value=Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
),
)
return mock
@pytest.fixture
def mock_recovery_key_generate(mocker):
mock = mocker.patch(
"selfprivacy_api.repositories.tokens.json_tokens_repository.RecoveryKey.generate",
autospec=True,
return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
expires_at=None,
uses_left=1,
),
)
return mock
###############
# Test tokens #
###############
def test_get_token_by_token_string(tokens):
repo = JsonTokensRepository()
assert repo.get_token_by_token_string(
token_string="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI"
) == Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
def test_get_token_by_non_existent_token_string(tokens):
repo = JsonTokensRepository()
with pytest.raises(TokenNotFound):
assert repo.get_token_by_token_string(token_string="iamBadtoken") is None
def test_get_token_by_name(tokens):
repo = JsonTokensRepository()
assert repo.get_token_by_name(token_name="primary_token") is not None
assert repo.get_token_by_name(token_name="primary_token") == Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
def test_get_token_by_non_existent_name(tokens):
repo = JsonTokensRepository()
with pytest.raises(TokenNotFound):
assert repo.get_token_by_name(token_name="badname") is None
def test_get_tokens(tokens):
repo = JsonTokensRepository()
assert repo.get_tokens() == [
Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
),
Token(
token="3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
device_name="second_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc),
),
Token(
token="LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
device_name="third_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698, tzinfo=timezone.utc),
),
Token(
token="dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
device_name="forth_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
),
]
def test_get_tokens_when_one(empty_keys):
repo = JsonTokensRepository()
assert repo.get_tokens() == [
Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
]
def test_create_token(tokens, mock_token_generate):
repo = JsonTokensRepository()
assert repo.create_token(device_name="IamNewDevice") == Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
def test_delete_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
repo.delete_token(input_token)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
]
def test_delete_not_found_token(tokens):
repo = JsonTokensRepository()
input_token = Token(
token="imbadtoken",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None
assert read_json(tokens / "tokens.json")["tokens"] == ORIGINAL_TOKEN_CONTENT
def test_refresh_token(tokens, mock_token_generate):
repo = JsonTokensRepository()
input_token = Token(
token="KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
assert repo.refresh_token(input_token) == Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
def test_refresh_not_found_token(tokens, mock_token_generate):
repo = JsonTokensRepository()
input_token = Token(
token="idontknowwhoiam",
device_name="tellmewhoiam?",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698),
)
with pytest.raises(TokenNotFound):
assert repo.refresh_token(input_token) is None
################
# Recovery key #
################
def test_get_recovery_key(tokens):
repo = JsonTokensRepository()
assert repo.get_recovery_key() == RecoveryKey(
key="ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
created_at=datetime(2022, 11, 11, 11, 48, 54, 228038),
expires_at=None,
uses_left=2,
)
def test_get_recovery_key_when_empty(empty_keys):
repo = JsonTokensRepository()
assert repo.get_recovery_key() is None
def test_create_recovery_key(tokens, mock_recovery_key_generate):
repo = JsonTokensRepository()
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"token": "889bf49c1d3199d71a2e704718772bd53a422020334db051",
"date": "2022-07-15T17:41:31.675698",
"expiration": None,
"uses_left": 1,
}
def test_use_mnemonic_recovery_key_when_empty(
empty_keys, mock_recovery_key_generate, mock_token_generate
):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_not_valid_recovery_key(
tokens, mock_get_recovery_key_return_not_valid
):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_not_mnemonic_recovery_key(tokens):
repo = JsonTokensRepository()
with pytest.raises(InvalidMnemonic):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="sorry, it was joke",
device_name="primary_token",
)
is None
)
def test_use_not_mnemonic_recovery_key(tokens):
repo = JsonTokensRepository()
with pytest.raises(InvalidMnemonic):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="please come back",
device_name="primary_token",
)
is None
)
def test_use_not_found_mnemonic_recovery_key(tokens):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_menemonic_recovery_key_when_empty(empty_keys):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_menemonic_recovery_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(RecoveryKeyNotFound):
assert (
repo.use_mnemonic_recovery_key(
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
device_name="primary_token",
)
is None
)
def test_use_mnemonic_recovery_key(tokens, mock_generate_token):
repo = JsonTokensRepository()
assert repo.use_mnemonic_recovery_key(
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
device_name="newdevice",
) == Token(
token="ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
device_name="newdevice",
created_at=datetime(2022, 11, 14, 6, 6, 32, 777123),
)
assert read_json(tokens / "tokens.json")["tokens"] == [
{
"date": "2022-07-15 17:41:31.675698",
"name": "primary_token",
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z",
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z",
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698",
},
{
"date": "2022-11-14T06:06:32.777123",
"name": "newdevice",
"token": "ur71mC4aiI6FIYAN--cTL-38rPHS5D6NuB1bgN_qKF4",
},
]
assert read_json(tokens / "tokens.json")["recovery_token"] == {
"date": "2022-11-11T11:48:54.228038",
"expiration": None,
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"uses_left": 1,
}
##################
# New device key #
##################
def test_get_new_device_key(tokens, mock_new_device_key_generate):
repo = JsonTokensRepository()
assert repo.get_new_device_key() is not None
assert read_json(tokens / "tokens.json")["new_device"] == {
"date": "2022-07-15T17:41:31.675698",
"expiration": "2022-07-15T17:41:31.675698",
"token": "43478d05b35e4781598acd76e33832bb",
}
def test_delete_new_device_key(tokens):
repo = JsonTokensRepository()
assert repo.delete_new_device_key() is None
assert "new_device" not in read_json(tokens / "tokens.json")
def test_delete_new_device_key_when_empty(empty_keys):
repo = JsonTokensRepository()
repo.delete_new_device_key()
assert "new_device" not in read_json(empty_keys / "empty_keys.json")
def test_use_invalid_mnemonic_new_device_key(
tokens, mock_new_device_key_generate, datadir, mock_token_generate
):
repo = JsonTokensRepository()
with pytest.raises(InvalidMnemonic):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="oh-no",
)
is None
)
def test_use_not_exists_mnemonic_new_device_key(
tokens, mock_new_device_key_generate, mock_token_generate
):
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="uniform clarify napkin bid dress search input armor police cross salon because myself uphold slice bamboo hungry park",
)
is None
)
def test_use_mnemonic_new_device_key(
tokens, mock_new_device_key_generate, mock_token_generate
):
repo = JsonTokensRepository()
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
)
is not None
)
# assert read_json(datadir / "tokens.json")["new_device"] == []
def test_use_mnemonic_new_device_key_when_empty(empty_keys):
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
)
is None
)
def test_use_mnemonic_new_device_key_when_null(null_keys):
repo = JsonTokensRepository()
with pytest.raises(NewDeviceKeyNotFound):
assert (
repo.use_mnemonic_new_device_key(
device_name="imnew",
mnemonic_phrase="captain ribbon toddler settle symbol minute step broccoli bless universe divide bulb",
)
is None
)

View file

@ -0,0 +1,9 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View file

@ -0,0 +1,26 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z"
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z"
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698"
}
],
"recovery_token": null,
"new_device": null
}

View file

@ -0,0 +1,35 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
},
{
"token": "3JKgLOtFu6ZHgE4OU-R-VdW47IKpg-YQL0c6n7bol68",
"name": "second_token",
"date": "2022-07-15 17:41:31.675698Z"
},
{
"token": "LYiwFDekvALKTQSjk7vtMQuNP_6wqKuV-9AyMKytI_8",
"name": "third_token",
"date": "2022-07-15T17:41:31.675698Z"
},
{
"token": "dD3CFPcEZvapscgzWb7JZTLog7OMkP7NzJeu2fAazXM",
"name": "forth_token",
"date": "2022-07-15T17:41:31.675698"
}
],
"recovery_token": {
"token": "ed653e4b8b042b841d285fa7a682fa09e925ddb2d8906f54",
"date": "2022-11-11T11:48:54.228038",
"expiration": null,
"uses_left": 2
},
"new_device": {
"token": "2237238de23dc71ab558e317bdb8ff8e",
"date": "2022-10-26 20:50:47.973212",
"expiration": "2022-10-26 21:00:47.974153"
}
}

View file

@ -516,7 +516,6 @@ def test_graphql_add_long_username(authorized_client, one_user, mock_subprocess_
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["createUser"]["message"] is not None