Merge remote-tracking branch 'origin/master' into flake

This commit is contained in:
Alexander Tomokhov 2023-11-16 04:39:33 +04:00
commit 6f7613dedb
36 changed files with 1244 additions and 1313 deletions

View file

@ -1,11 +1,15 @@
"""App tokens actions""" """
from datetime import datetime App tokens actions.
The only actions on tokens that are accessible from APIs
"""
from datetime import datetime, timezone
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from mnemonic import Mnemonic from mnemonic import Mnemonic
from selfprivacy_api.repositories.tokens.json_tokens_repository import ( from selfprivacy_api.utils.timeutils import ensure_tz_aware, ensure_tz_aware_strict
JsonTokensRepository, from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
) )
from selfprivacy_api.repositories.tokens.exceptions import ( from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound, TokenNotFound,
@ -14,7 +18,7 @@ from selfprivacy_api.repositories.tokens.exceptions import (
NewDeviceKeyNotFound, NewDeviceKeyNotFound,
) )
TOKEN_REPO = JsonTokensRepository() TOKEN_REPO = RedisTokensRepository()
class TokenInfoWithIsCaller(BaseModel): class TokenInfoWithIsCaller(BaseModel):
@ -25,6 +29,14 @@ class TokenInfoWithIsCaller(BaseModel):
is_caller: bool is_caller: bool
def _naive(date_time: datetime) -> datetime:
if date_time is None:
return None
if date_time.tzinfo is not None:
date_time.astimezone(timezone.utc)
return date_time.replace(tzinfo=None)
def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCaller]: def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCaller]:
"""Get the tokens info""" """Get the tokens info"""
caller_name = TOKEN_REPO.get_token_by_token_string(caller_token).device_name caller_name = TOKEN_REPO.get_token_by_token_string(caller_token).device_name
@ -83,16 +95,22 @@ class RecoveryTokenStatus(BaseModel):
def get_api_recovery_token_status() -> RecoveryTokenStatus: def get_api_recovery_token_status() -> RecoveryTokenStatus:
"""Get the recovery token status""" """Get the recovery token status, timezone-aware"""
token = TOKEN_REPO.get_recovery_key() token = TOKEN_REPO.get_recovery_key()
if token is None: if token is None:
return RecoveryTokenStatus(exists=False, valid=False) return RecoveryTokenStatus(exists=False, valid=False)
is_valid = TOKEN_REPO.is_recovery_key_valid() is_valid = TOKEN_REPO.is_recovery_key_valid()
# New tokens are tz-aware, but older ones might not be
expiry_date = token.expires_at
if expiry_date is not None:
expiry_date = ensure_tz_aware_strict(expiry_date)
return RecoveryTokenStatus( return RecoveryTokenStatus(
exists=True, exists=True,
valid=is_valid, valid=is_valid,
date=token.created_at, date=ensure_tz_aware_strict(token.created_at),
expiration=token.expires_at, expiration=expiry_date,
uses_left=token.uses_left, uses_left=token.uses_left,
) )
@ -110,8 +128,9 @@ def get_new_api_recovery_key(
) -> str: ) -> str:
"""Get new recovery key""" """Get new recovery key"""
if expiration_date is not None: if expiration_date is not None:
current_time = datetime.now().timestamp() expiration_date = ensure_tz_aware(expiration_date)
if expiration_date.timestamp() < current_time: current_time = datetime.now(timezone.utc)
if expiration_date < current_time:
raise InvalidExpirationDate("Expiration date is in the past") raise InvalidExpirationDate("Expiration date is in the past")
if uses_left is not None: if uses_left is not None:
if uses_left <= 0: if uses_left <= 0:

View file

@ -1,7 +1,8 @@
""" """
This module contains the controller class for backups. This module contains the controller class for backups.
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
import time
import os import os
from os import statvfs from os import statvfs
from typing import Callable, List, Optional from typing import Callable, List, Optional
@ -37,6 +38,7 @@ from selfprivacy_api.backup.providers import get_provider
from selfprivacy_api.backup.storage import Storage from selfprivacy_api.backup.storage import Storage
from selfprivacy_api.backup.jobs import ( from selfprivacy_api.backup.jobs import (
get_backup_job, get_backup_job,
get_backup_fail,
add_backup_job, add_backup_job,
get_restore_job, get_restore_job,
add_restore_job, add_restore_job,
@ -292,9 +294,9 @@ class Backups:
def back_up( def back_up(
service: Service, reason: BackupReason = BackupReason.EXPLICIT service: Service, reason: BackupReason = BackupReason.EXPLICIT
) -> Snapshot: ) -> Snapshot:
"""The top-level function to back up a service""" """The top-level function to back up a service
folders = service.get_folders() If it fails for any reason at all, it should both mark job as
service_name = service.get_id() errored and re-raise an error"""
job = get_backup_job(service) job = get_backup_job(service)
if job is None: if job is None:
@ -302,6 +304,10 @@ class Backups:
Jobs.update(job, status=JobStatus.RUNNING) Jobs.update(job, status=JobStatus.RUNNING)
try: try:
if service.can_be_backed_up() is False:
raise ValueError("cannot backup a non-backuppable service")
folders = service.get_folders()
service_name = service.get_id()
service.pre_backup() service.pre_backup()
snapshot = Backups.provider().backupper.start_backup( snapshot = Backups.provider().backupper.start_backup(
folders, folders,
@ -692,23 +698,45 @@ class Backups:
"""Get a timezone-aware time of the last backup of a service""" """Get a timezone-aware time of the last backup of a service"""
return Storage.get_last_backup_time(service.get_id()) return Storage.get_last_backup_time(service.get_id())
@staticmethod
def get_last_backup_error_time(service: Service) -> Optional[datetime]:
"""Get a timezone-aware time of the last backup of a service"""
job = get_backup_fail(service)
if job is not None:
datetime_created = job.created_at
if datetime_created.tzinfo is None:
# assume it is in localtime
offset = timedelta(seconds=time.localtime().tm_gmtoff)
datetime_created = datetime_created - offset
return datetime.combine(
datetime_created.date(), datetime_created.time(), timezone.utc
)
return datetime_created
return None
@staticmethod @staticmethod
def is_time_to_backup_service(service: Service, time: datetime): def is_time_to_backup_service(service: Service, time: datetime):
"""Returns True if it is time to back up a service""" """Returns True if it is time to back up a service"""
period = Backups.autobackup_period_minutes() period = Backups.autobackup_period_minutes()
service_id = service.get_id()
if not service.can_be_backed_up(): if not service.can_be_backed_up():
return False return False
if period is None: if period is None:
return False return False
last_backup = Storage.get_last_backup_time(service_id) last_error = Backups.get_last_backup_error_time(service)
if last_error is not None:
if time < last_error + timedelta(seconds=AUTOBACKUP_JOB_EXPIRATION_SECONDS):
return False
last_backup = Backups.get_last_backed_up(service)
if last_backup is None: if last_backup is None:
# queue a backup immediately if there are no previous backups # queue a backup immediately if there are no previous backups
return True return True
if time > last_backup + timedelta(minutes=period): if time > last_backup + timedelta(minutes=period):
return True return True
return False return False
# Helpers # Helpers

View file

@ -80,9 +80,19 @@ def get_job_by_type(type_id: str) -> Optional[Job]:
return job return job
def get_failed_job_by_type(type_id: str) -> Optional[Job]:
for job in Jobs.get_jobs():
if job.type_id == type_id and job.status == JobStatus.ERROR:
return job
def get_backup_job(service: Service) -> Optional[Job]: def get_backup_job(service: Service) -> Optional[Job]:
return get_job_by_type(backup_job_type(service)) return get_job_by_type(backup_job_type(service))
def get_backup_fail(service: Service) -> Optional[Job]:
return get_failed_job_by_type(backup_job_type(service))
def get_restore_job(service: Service) -> Optional[Job]: def get_restore_job(service: Service) -> Optional[Job]:
return get_job_by_type(restore_job_type(service)) return get_job_by_type(restore_job_type(service))

View file

@ -38,7 +38,7 @@ class ApiRecoveryKeyStatus:
def get_recovery_key_status() -> ApiRecoveryKeyStatus: def get_recovery_key_status() -> ApiRecoveryKeyStatus:
"""Get recovery key status""" """Get recovery key status, times are timezone-aware"""
status = get_api_recovery_token_status() status = get_api_recovery_token_status()
if status is None or not status.exists: if status is None or not status.exists:
return ApiRecoveryKeyStatus( return ApiRecoveryKeyStatus(

View file

@ -8,8 +8,8 @@ A job is a dictionary with the following keys:
- name: name of the job - name: name of the job
- description: description of the job - description: description of the job
- status: status of the job - status: status of the job
- created_at: date of creation of the job - created_at: date of creation of the job, naive localtime
- updated_at: date of last update of the job - updated_at: date of last update of the job, naive localtime
- finished_at: date of finish of the job - finished_at: date of finish of the job
- error: error message if the job failed - error: error message if the job failed
- result: result of the job - result: result of the job

View file

@ -25,6 +25,7 @@ from selfprivacy_api.migrations.prepare_for_nixos_2211 import (
from selfprivacy_api.migrations.prepare_for_nixos_2305 import ( from selfprivacy_api.migrations.prepare_for_nixos_2305 import (
MigrateToSelfprivacyChannelFrom2211, MigrateToSelfprivacyChannelFrom2211,
) )
from selfprivacy_api.migrations.redis_tokens import LoadTokensToRedis
migrations = [ migrations = [
FixNixosConfigBranch(), FixNixosConfigBranch(),
@ -35,6 +36,7 @@ migrations = [
CreateProviderFields(), CreateProviderFields(),
MigrateToSelfprivacyChannelFrom2205(), MigrateToSelfprivacyChannelFrom2205(),
MigrateToSelfprivacyChannelFrom2211(), MigrateToSelfprivacyChannelFrom2211(),
LoadTokensToRedis(),
] ]

View file

@ -0,0 +1,48 @@
from selfprivacy_api.migrations.migration import Migration
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
class LoadTokensToRedis(Migration):
"""Load Json tokens into Redis"""
def get_migration_name(self):
return "load_tokens_to_redis"
def get_migration_description(self):
return "Loads access tokens and recovery keys from legacy json file into redis token storage"
def is_repo_empty(self, repo: AbstractTokensRepository) -> bool:
if repo.get_tokens() != []:
return False
if repo.get_recovery_key() is not None:
return False
return True
def is_migration_needed(self):
try:
if not self.is_repo_empty(JsonTokensRepository()) and self.is_repo_empty(
RedisTokensRepository()
):
return True
except Exception as e:
print(e)
return False
def migrate(self):
# Write info about providers to userdata.json
try:
RedisTokensRepository().clone(JsonTokensRepository())
print("Done")
except Exception as e:
print(e)
print("Error migrating access tokens from json to redis")

View file

@ -1,11 +1,13 @@
""" """
New device key used to obtain access token. New device key used to obtain access token.
""" """
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
import secrets import secrets
from pydantic import BaseModel from pydantic import BaseModel
from mnemonic import Mnemonic from mnemonic import Mnemonic
from selfprivacy_api.models.tokens.time import is_past
class NewDeviceKey(BaseModel): class NewDeviceKey(BaseModel):
""" """
@ -20,15 +22,15 @@ class NewDeviceKey(BaseModel):
def is_valid(self) -> bool: def is_valid(self) -> bool:
""" """
Check if the recovery key is valid. Check if key is valid.
""" """
if self.expires_at < datetime.now(): if is_past(self.expires_at):
return False return False
return True return True
def as_mnemonic(self) -> str: def as_mnemonic(self) -> str:
""" """
Get the recovery key as a mnemonic. Get the key as a mnemonic.
""" """
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key)) return Mnemonic(language="english").to_mnemonic(bytes.fromhex(self.key))
@ -37,10 +39,10 @@ class NewDeviceKey(BaseModel):
""" """
Factory to generate a random token. Factory to generate a random token.
""" """
creation_date = datetime.now() creation_date = datetime.now(timezone.utc)
key = secrets.token_bytes(16).hex() key = secrets.token_bytes(16).hex()
return NewDeviceKey( return NewDeviceKey(
key=key, key=key,
created_at=creation_date, created_at=creation_date,
expires_at=datetime.now() + timedelta(minutes=10), expires_at=creation_date + timedelta(minutes=10),
) )

View file

@ -3,12 +3,14 @@ Recovery key used to obtain access token.
Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left. Recovery key has a token string, date of creation, optional date of expiration and optional count of uses left.
""" """
from datetime import datetime from datetime import datetime, timezone
import secrets import secrets
from typing import Optional from typing import Optional
from pydantic import BaseModel from pydantic import BaseModel
from mnemonic import Mnemonic from mnemonic import Mnemonic
from selfprivacy_api.models.tokens.time import is_past, ensure_timezone
class RecoveryKey(BaseModel): class RecoveryKey(BaseModel):
""" """
@ -26,7 +28,7 @@ class RecoveryKey(BaseModel):
""" """
Check if the recovery key is valid. Check if the recovery key is valid.
""" """
if self.expires_at is not None and self.expires_at < datetime.now(): if self.expires_at is not None and is_past(self.expires_at):
return False return False
if self.uses_left is not None and self.uses_left <= 0: if self.uses_left is not None and self.uses_left <= 0:
return False return False
@ -45,8 +47,11 @@ class RecoveryKey(BaseModel):
) -> "RecoveryKey": ) -> "RecoveryKey":
""" """
Factory to generate a random token. Factory to generate a random token.
If passed naive time as expiration, assumes utc
""" """
creation_date = datetime.now() creation_date = datetime.now(timezone.utc)
if expiration is not None:
expiration = ensure_timezone(expiration)
key = secrets.token_bytes(24).hex() key = secrets.token_bytes(24).hex()
return RecoveryKey( return RecoveryKey(
key=key, key=key,

View file

@ -0,0 +1,14 @@
from datetime import datetime, timezone
def is_past(dt: datetime) -> bool:
# we cannot compare a naive now()
# to dt which might be tz-aware or unaware
dt = ensure_timezone(dt)
return dt < datetime.now(timezone.utc)
def ensure_timezone(dt: datetime) -> datetime:
if dt.tzinfo is None or dt.tzinfo.utcoffset(None) is None:
dt = dt.replace(tzinfo=timezone.utc)
return dt

View file

@ -1,3 +1,5 @@
from __future__ import annotations
from abc import ABC, abstractmethod from abc import ABC, abstractmethod
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
@ -86,13 +88,15 @@ class AbstractTokensRepository(ABC):
def get_recovery_key(self) -> Optional[RecoveryKey]: def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key""" """Get the recovery key"""
@abstractmethod
def create_recovery_key( def create_recovery_key(
self, self,
expiration: Optional[datetime], expiration: Optional[datetime],
uses_left: Optional[int], uses_left: Optional[int],
) -> RecoveryKey: ) -> RecoveryKey:
"""Create the recovery key""" """Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration, uses_left)
self._store_recovery_key(recovery_key)
return recovery_key
def use_mnemonic_recovery_key( def use_mnemonic_recovery_key(
self, mnemonic_phrase: str, device_name: str self, mnemonic_phrase: str, device_name: str
@ -123,6 +127,14 @@ class AbstractTokensRepository(ABC):
return False return False
return recovery_key.is_valid() return recovery_key.is_valid()
@abstractmethod
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
"""Store recovery key directly"""
@abstractmethod
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
def get_new_device_key(self) -> NewDeviceKey: def get_new_device_key(self) -> NewDeviceKey:
"""Creates and returns the new device key""" """Creates and returns the new device key"""
new_device_key = NewDeviceKey.generate() new_device_key = NewDeviceKey.generate()
@ -156,6 +168,26 @@ class AbstractTokensRepository(ABC):
return new_token return new_token
def reset(self):
for token in self.get_tokens():
self.delete_token(token)
self.delete_new_device_key()
self._delete_recovery_key()
def clone(self, source: AbstractTokensRepository) -> None:
"""Clone the state of another repository to this one"""
self.reset()
for token in source.get_tokens():
self._store_token(token)
recovery_key = source.get_recovery_key()
if recovery_key is not None:
self._store_recovery_key(recovery_key)
new_device_key = source._get_stored_new_device_key()
if new_device_key is not None:
self._store_new_device_key(new_device_key)
@abstractmethod @abstractmethod
def _store_token(self, new_token: Token): def _store_token(self, new_token: Token):
"""Store a token directly""" """Store a token directly"""

View file

@ -2,7 +2,7 @@
temporary legacy temporary legacy
""" """
from typing import Optional from typing import Optional
from datetime import datetime from datetime import datetime, timezone
from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData from selfprivacy_api.utils import UserDataFiles, WriteUserData, ReadUserData
from selfprivacy_api.models.tokens.token import Token from selfprivacy_api.models.tokens.token import Token
@ -15,6 +15,7 @@ from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository, AbstractTokensRepository,
) )
DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" DATETIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
@ -56,6 +57,20 @@ class JsonTokensRepository(AbstractTokensRepository):
raise TokenNotFound("Token not found!") raise TokenNotFound("Token not found!")
def __key_date_from_str(self, date_string: str) -> datetime:
if date_string is None or date_string == "":
return None
# we assume that we store dates in json as naive utc
utc_no_tz = datetime.fromisoformat(date_string)
utc_with_tz = utc_no_tz.replace(tzinfo=timezone.utc)
return utc_with_tz
def __date_from_tokens_file(
self, tokens_file: object, tokenfield: str, datefield: str
):
date_string = tokens_file[tokenfield].get(datefield)
return self.__key_date_from_str(date_string)
def get_recovery_key(self) -> Optional[RecoveryKey]: def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key""" """Get the recovery key"""
with ReadUserData(UserDataFiles.TOKENS) as tokens_file: with ReadUserData(UserDataFiles.TOKENS) as tokens_file:
@ -68,22 +83,18 @@ class JsonTokensRepository(AbstractTokensRepository):
recovery_key = RecoveryKey( recovery_key = RecoveryKey(
key=tokens_file["recovery_token"].get("token"), key=tokens_file["recovery_token"].get("token"),
created_at=tokens_file["recovery_token"].get("date"), created_at=self.__date_from_tokens_file(
expires_at=tokens_file["recovery_token"].get("expiration"), tokens_file, "recovery_token", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "recovery_token", "expiration"
),
uses_left=tokens_file["recovery_token"].get("uses_left"), uses_left=tokens_file["recovery_token"].get("uses_left"),
) )
return recovery_key return recovery_key
def create_recovery_key( def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration, uses_left)
with WriteUserData(UserDataFiles.TOKENS) as tokens_file: with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
key_expiration: Optional[str] = None key_expiration: Optional[str] = None
if recovery_key.expires_at is not None: if recovery_key.expires_at is not None:
@ -95,8 +106,6 @@ class JsonTokensRepository(AbstractTokensRepository):
"uses_left": recovery_key.uses_left, "uses_left": recovery_key.uses_left,
} }
return recovery_key
def _decrement_recovery_token(self): def _decrement_recovery_token(self):
"""Decrement recovery key use count by one""" """Decrement recovery key use count by one"""
if self.is_recovery_key_valid(): if self.is_recovery_key_valid():
@ -104,6 +113,13 @@ class JsonTokensRepository(AbstractTokensRepository):
if tokens["recovery_token"]["uses_left"] is not None: if tokens["recovery_token"]["uses_left"] is not None:
tokens["recovery_token"]["uses_left"] -= 1 tokens["recovery_token"]["uses_left"] -= 1
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
if "recovery_token" in tokens_file:
del tokens_file["recovery_token"]
return
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None: def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
with WriteUserData(UserDataFiles.TOKENS) as tokens_file: with WriteUserData(UserDataFiles.TOKENS) as tokens_file:
tokens_file["new_device"] = { tokens_file["new_device"] = {
@ -127,7 +143,11 @@ class JsonTokensRepository(AbstractTokensRepository):
new_device_key = NewDeviceKey( new_device_key = NewDeviceKey(
key=tokens_file["new_device"]["token"], key=tokens_file["new_device"]["token"],
created_at=tokens_file["new_device"]["date"], created_at=self.__date_from_tokens_file(
expires_at=tokens_file["new_device"]["expiration"], tokens_file, "new_device", "date"
),
expires_at=self.__date_from_tokens_file(
tokens_file, "new_device", "expiration"
),
) )
return new_device_key return new_device_key

View file

@ -4,6 +4,7 @@ Token repository using Redis as backend.
from typing import Any, Optional from typing import Any, Optional
from datetime import datetime from datetime import datetime
from hashlib import md5 from hashlib import md5
from datetime import timezone
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository, AbstractTokensRepository,
@ -53,6 +54,7 @@ class RedisTokensRepository(AbstractTokensRepository):
token = self._token_from_hash(key) token = self._token_from_hash(key)
if token == input_token: if token == input_token:
return key return key
return None
def delete_token(self, input_token: Token) -> None: def delete_token(self, input_token: Token) -> None:
"""Delete the token""" """Delete the token"""
@ -62,13 +64,6 @@ class RedisTokensRepository(AbstractTokensRepository):
raise TokenNotFound raise TokenNotFound
redis.delete(key) redis.delete(key)
def reset(self):
for token in self.get_tokens():
self.delete_token(token)
self.delete_new_device_key()
redis = self.connection
redis.delete(RECOVERY_KEY_REDIS_KEY)
def get_recovery_key(self) -> Optional[RecoveryKey]: def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key""" """Get the recovery key"""
redis = self.connection redis = self.connection
@ -76,15 +71,13 @@ class RedisTokensRepository(AbstractTokensRepository):
return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY) return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY)
return None return None
def create_recovery_key( def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
self,
expiration: Optional[datetime],
uses_left: Optional[int],
) -> RecoveryKey:
"""Create the recovery key"""
recovery_key = RecoveryKey.generate(expiration=expiration, uses_left=uses_left)
self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key) self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key)
return recovery_key
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
redis = self.connection
redis.delete(RECOVERY_KEY_REDIS_KEY)
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None: def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
"""Store new device key directly""" """Store new device key directly"""
@ -157,6 +150,7 @@ class RedisTokensRepository(AbstractTokensRepository):
if token is not None: if token is not None:
token.created_at = token.created_at.replace(tzinfo=None) token.created_at = token.created_at.replace(tzinfo=None)
return token return token
return None
def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]: def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]:
return self._hash_as_model(redis_key, RecoveryKey) return self._hash_as_model(redis_key, RecoveryKey)
@ -168,5 +162,7 @@ class RedisTokensRepository(AbstractTokensRepository):
redis = self.connection redis = self.connection
for key, value in model.dict().items(): for key, value in model.dict().items():
if isinstance(value, datetime): if isinstance(value, datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
value = value.isoformat() value = value.isoformat()
redis.hset(redis_key, key, str(value)) redis.hset(redis_key, key, str(value))

View file

@ -0,0 +1,52 @@
from datetime import datetime, timezone
def ensure_tz_aware(dt: datetime) -> datetime:
"""
returns timezone-aware datetime
assumes utc on naive datetime input
"""
if dt.tzinfo is None:
# astimezone() is dangerous, it makes an implicit assumption that
# the time is localtime
dt = dt.replace(tzinfo=timezone.utc)
return dt
def ensure_tz_aware_strict(dt: datetime) -> datetime:
"""
returns timezone-aware datetime
raises error if input is a naive datetime
"""
if dt.tzinfo is None:
raise ValueError(
"no timezone in datetime (tz-aware datetime is required for this operation)",
dt,
)
return dt
def tzaware_parse_time(iso_timestamp: str) -> datetime:
"""
parse an iso8601 timestamp into timezone-aware datetime
assume utc if no timezone in stamp
example of timestamp:
2023-11-10T12:07:47.868788+00:00
"""
dt = datetime.fromisoformat(iso_timestamp)
dt = ensure_tz_aware(dt)
return dt
def tzaware_parse_time_strict(iso_timestamp: str) -> datetime:
"""
parse an iso8601 timestamp into timezone-aware datetime
raise an error if no timezone in stamp
example of timestamp:
2023-11-10T12:07:47.868788+00:00
"""
dt = datetime.fromisoformat(iso_timestamp)
dt = ensure_tz_aware_strict(dt)
return dt

View file

@ -1,6 +1,45 @@
import json import json
from datetime import datetime, timezone, timedelta
from mnemonic import Mnemonic from mnemonic import Mnemonic
# for expiration tests. If headache, consider freezegun
RECOVERY_KEY_VALIDATION_DATETIME = "selfprivacy_api.models.tokens.time.datetime"
DEVICE_KEY_VALIDATION_DATETIME = RECOVERY_KEY_VALIDATION_DATETIME
def five_minutes_into_future_naive():
return datetime.now() + timedelta(minutes=5)
def five_minutes_into_future_naive_utc():
return datetime.utcnow() + timedelta(minutes=5)
def five_minutes_into_future():
return datetime.now(timezone.utc) + timedelta(minutes=5)
def five_minutes_into_past_naive():
return datetime.now() - timedelta(minutes=5)
def five_minutes_into_past_naive_utc():
return datetime.utcnow() - timedelta(minutes=5)
def five_minutes_into_past():
return datetime.now(timezone.utc) - timedelta(minutes=5)
class NearFuture(datetime):
@classmethod
def now(cls, tz=None):
return datetime.now(tz) + timedelta(minutes=13)
@classmethod
def utcnow(cls):
return datetime.utcnow() + timedelta(minutes=13)
def read_json(file_path): def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file: with open(file_path, "r", encoding="utf-8") as file:
@ -30,3 +69,9 @@ def generate_backup_query(query_array):
def mnemonic_to_hex(mnemonic): def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex() return Mnemonic(language="english").to_entropy(mnemonic).hex()
def assert_recovery_recent(time_generated: str):
assert datetime.fromisoformat(time_generated) - timedelta(seconds=5) < datetime.now(
timezone.utc
)

View file

@ -6,6 +6,38 @@ import pytest
from os import path from os import path
from fastapi.testclient import TestClient from fastapi.testclient import TestClient
import os.path as path
import datetime
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository,
)
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from tests.common import read_json
EMPTY_TOKENS_JSON = ' {"tokens": []}'
TOKENS_FILE_CONTENTS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
},
]
}
DEVICE_WE_AUTH_TESTS_WITH = TOKENS_FILE_CONTENTS["tokens"][0]
def pytest_generate_tests(metafunc): def pytest_generate_tests(metafunc):
@ -17,12 +49,45 @@ def global_data_dir():
@pytest.fixture @pytest.fixture
def tokens_file(mocker, shared_datadir): def empty_tokens(mocker, tmpdir):
"""Mock tokens file.""" tokenfile = tmpdir / "empty_tokens.json"
mock = mocker.patch( with open(tokenfile, "w") as file:
"selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json" file.write(EMPTY_TOKENS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokenfile)
assert read_json(tokenfile)["tokens"] == []
return tmpdir
@pytest.fixture
def empty_json_repo(empty_tokens):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
repo.reset()
assert repo.get_tokens() == []
return repo
@pytest.fixture
def tokens_file(empty_redis_repo, tmpdir):
"""A state with tokens"""
repo = empty_redis_repo
for token in TOKENS_FILE_CONTENTS["tokens"]:
repo._store_token(
Token(
token=token["token"],
device_name=token["name"],
created_at=token["date"],
) )
return mock )
return repo
@pytest.fixture @pytest.fixture
@ -68,7 +133,9 @@ def authorized_client(tokens_file, huey_database, jobs_file):
from selfprivacy_api.app import app from selfprivacy_api.app import app
client = TestClient(app) client = TestClient(app)
client.headers.update({"Authorization": "Bearer TEST_TOKEN"}) client.headers.update(
{"Authorization": "Bearer " + DEVICE_WE_AUTH_TESTS_WITH["token"]}
)
return client return client

View file

@ -0,0 +1,89 @@
from tests.common import generate_api_query
from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH
ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"]
def assert_ok(response, request):
data = assert_data(response)
assert data[request]["success"] is True
assert data[request]["message"] is not None
assert data[request]["code"] == 200
def assert_errorcode(response, request, code):
data = assert_data(response)
assert data[request]["success"] is False
assert data[request]["message"] is not None
assert data[request]["code"] == code
def assert_empty(response):
assert response.status_code == 200
assert response.json().get("data") is None
def assert_data(response):
assert response.status_code == 200
data = response.json().get("data")
assert data is not None
assert "api" in data.keys()
return data["api"]
API_DEVICES_QUERY = """
devices {
creationDate
isCaller
name
}
"""
def request_devices(client):
return client.post(
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
def graphql_get_devices(client):
response = request_devices(client)
data = assert_data(response)
devices = data["devices"]
assert devices is not None
return devices
def set_client_token(client, token):
client.headers.update({"Authorization": "Bearer " + token})
def assert_token_valid(client, token):
set_client_token(client, token)
assert graphql_get_devices(client) is not None
def assert_same(graphql_devices, abstract_devices):
"""Orderless comparison"""
assert len(graphql_devices) == len(abstract_devices)
for original_device in abstract_devices:
assert original_device["name"] in [device["name"] for device in graphql_devices]
for device in graphql_devices:
if device["name"] == original_device["name"]:
assert device["creationDate"] == original_device["date"].isoformat()
def assert_original(client):
devices = graphql_get_devices(client)
assert_original_devices(devices)
def assert_original_devices(devices):
assert_same(devices, ORIGINAL_DEVICES)
for device in devices:
if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]:
assert device["isCaller"] is True
else:
assert device["isCaller"] is False

View file

@ -0,0 +1,88 @@
from tests.common import generate_api_query
from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH
ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"]
def assert_ok(response, request):
data = assert_data(response)
data[request]["success"] is True
data[request]["message"] is not None
data[request]["code"] == 200
def assert_errorcode(response, request, code):
data = assert_data(response)
data[request]["success"] is False
data[request]["message"] is not None
data[request]["code"] == code
def assert_empty(response):
assert response.status_code == 200
assert response.json().get("data") is None
def assert_data(response):
assert response.status_code == 200
data = response.json().get("data")
assert data is not None
return data
API_DEVICES_QUERY = """
devices {
creationDate
isCaller
name
}
"""
def request_devices(client):
return client.post(
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
def graphql_get_devices(client):
response = request_devices(client)
data = assert_data(response)
devices = data["api"]["devices"]
assert devices is not None
return devices
def set_client_token(client, token):
client.headers.update({"Authorization": "Bearer " + token})
def assert_token_valid(client, token):
set_client_token(client, token)
assert graphql_get_devices(client) is not None
def assert_same(graphql_devices, abstract_devices):
"""Orderless comparison"""
assert len(graphql_devices) == len(abstract_devices)
for original_device in abstract_devices:
assert original_device["name"] in [device["name"] for device in graphql_devices]
for device in graphql_devices:
if device["name"] == original_device["name"]:
assert device["creationDate"] == original_device["date"].isoformat()
def assert_original(client):
devices = graphql_get_devices(client)
assert_original_devices(devices)
def assert_original_devices(devices):
assert_same(devices, ORIGINAL_DEVICES)
for device in devices:
if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]:
assert device["isCaller"] is True
else:
assert device["isCaller"] is False

View file

View file

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View file

@ -3,25 +3,11 @@
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
from tests.common import generate_api_query from tests.common import generate_api_query
from tests.test_graphql.common import assert_original_devices
from tests.test_graphql.test_api_devices import API_DEVICES_QUERY from tests.test_graphql.test_api_devices import API_DEVICES_QUERY
from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY
from tests.test_graphql.test_api_version import API_VERSION_QUERY from tests.test_graphql.test_api_version import API_VERSION_QUERY
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
def test_graphql_get_entire_api_data(authorized_client, tokens_file): def test_graphql_get_entire_api_data(authorized_client, tokens_file):
response = authorized_client.post( response = authorized_client.post(
@ -35,20 +21,11 @@ def test_graphql_get_entire_api_data(authorized_client, tokens_file):
assert response.status_code == 200 assert response.status_code == 200
assert response.json().get("data") is not None assert response.json().get("data") is not None
assert "version" in response.json()["data"]["api"] assert "version" in response.json()["data"]["api"]
assert response.json()["data"]["api"]["devices"] is not None
assert len(response.json()["data"]["api"]["devices"]) == 2 devices = response.json()["data"]["api"]["devices"]
assert ( assert devices is not None
response.json()["data"]["api"]["devices"][0]["creationDate"] assert_original_devices(devices)
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json()["data"]["api"]["devices"][0]["name"] == "test_token"
assert (
response.json()["data"]["api"]["devices"][1]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json()["data"]["api"]["devices"][1]["name"] == "test_token2"
assert response.json()["data"]["api"]["recoveryKey"] is not None assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False

View file

@ -1,76 +1,77 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
# pylint: disable=unused-argument # pylint: disable=unused-argument
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
import datetime from tests.common import (
import pytest RECOVERY_KEY_VALIDATION_DATETIME,
from mnemonic import Mnemonic DEVICE_KEY_VALIDATION_DATETIME,
NearFuture,
from selfprivacy_api.repositories.tokens.json_tokens_repository import ( generate_api_query,
JsonTokensRepository, )
from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH, TOKENS_FILE_CONTENTS
from tests.test_graphql.api_common import (
assert_data,
assert_empty,
assert_ok,
assert_errorcode,
assert_token_valid,
assert_original,
assert_same,
graphql_get_devices,
request_devices,
set_client_token,
API_DEVICES_QUERY,
ORIGINAL_DEVICES,
) )
from selfprivacy_api.models.tokens.token import Token
from tests.common import generate_api_query, read_json, write_json
TOKENS_FILE_CONTETS = { def graphql_get_caller_token_info(client):
"tokens": [ devices = graphql_get_devices(client)
{ for device in devices:
"token": "TEST_TOKEN", if device["isCaller"] is True:
"name": "test_token", return device
"date": "2022-01-14 08:31:10.789314",
},
{ def graphql_get_new_device_key(authorized_client) -> str:
"token": "TEST_TOKEN2", response = authorized_client.post(
"name": "test_token2", "/graphql",
"date": "2022-01-14 08:31:10.789314", json={"query": NEW_DEVICE_KEY_MUTATION},
}, )
] assert_ok(response, "getNewDeviceApiKey")
key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]
assert key.split(" ").__len__() == 12
return key
def graphql_try_auth_new_device(client, mnemonic_key, device_name):
return client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": device_name,
} }
},
API_DEVICES_QUERY = """ },
devices { )
creationDate
isCaller
name
}
"""
@pytest.fixture def graphql_authorize_new_device(client, mnemonic_key, device_name) -> str:
def token_repo(): response = graphql_try_auth_new_device(client, mnemonic_key, "new_device")
return JsonTokensRepository() assert_ok(response, "authorizeWithNewDeviceApiKey")
token = response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"]
assert_token_valid(client, token)
def test_graphql_tokens_info(authorized_client, tokens_file): def test_graphql_tokens_info(authorized_client, tokens_file):
response = authorized_client.post( assert_original(authorized_client)
"/graphql",
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["devices"] is not None
assert len(response.json()["data"]["api"]["devices"]) == 2
assert (
response.json()["data"]["api"]["devices"][0]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json()["data"]["api"]["devices"][0]["name"] == "test_token"
assert (
response.json()["data"]["api"]["devices"][1]["creationDate"]
== "2022-01-14T08:31:10.789314"
)
assert response.json()["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json()["data"]["api"]["devices"][1]["name"] == "test_token2"
def test_graphql_tokens_info_unauthorized(client, tokens_file): def test_graphql_tokens_info_unauthorized(client, tokens_file):
response = client.post( response = request_devices(client)
"/graphql", assert_empty(response)
json={"query": generate_api_query([API_DEVICES_QUERY])},
)
assert response.status_code == 200
assert response.json()["data"] is None
DELETE_TOKEN_MUTATION = """ DELETE_TOKEN_MUTATION = """
@ -96,34 +97,27 @@ def test_graphql_delete_token_unauthorized(client, tokens_file):
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json()["data"] is None
def test_graphql_delete_token(authorized_client, tokens_file): def test_graphql_delete_token(authorized_client, tokens_file):
test_devices = ORIGINAL_DEVICES.copy()
device_to_delete = test_devices.pop(1)
assert device_to_delete != DEVICE_WE_AUTH_TESTS_WITH
response = authorized_client.post( response = authorized_client.post(
"/graphql", "/graphql",
json={ json={
"query": DELETE_TOKEN_MUTATION, "query": DELETE_TOKEN_MUTATION,
"variables": { "variables": {
"device": "test_token2", "device": device_to_delete["name"],
}, },
}, },
) )
assert response.status_code == 200 assert_ok(response, "deleteDeviceApiToken")
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is True devices = graphql_get_devices(authorized_client)
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None assert_same(devices, test_devices)
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
]
}
def test_graphql_delete_self_token(authorized_client, tokens_file): def test_graphql_delete_self_token(authorized_client, tokens_file):
@ -132,16 +126,12 @@ def test_graphql_delete_self_token(authorized_client, tokens_file):
json={ json={
"query": DELETE_TOKEN_MUTATION, "query": DELETE_TOKEN_MUTATION,
"variables": { "variables": {
"device": "test_token", "device": DEVICE_WE_AUTH_TESTS_WITH["name"],
}, },
}, },
) )
assert response.status_code == 200 assert_errorcode(response, "deleteDeviceApiToken", 400)
assert response.json().get("data") is not None assert_original(authorized_client)
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is False
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_delete_nonexistent_token( def test_graphql_delete_nonexistent_token(
@ -157,12 +147,9 @@ def test_graphql_delete_nonexistent_token(
}, },
}, },
) )
assert response.status_code == 200 assert_errorcode(response, "deleteDeviceApiToken", 404)
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["success"] is False assert_original(authorized_client)
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["message"] is not None
assert response.json()["data"]["api"]["deleteDeviceApiToken"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
REFRESH_TOKEN_MUTATION = """ REFRESH_TOKEN_MUTATION = """
@ -184,32 +171,22 @@ def test_graphql_refresh_token_unauthorized(client, tokens_file):
"/graphql", "/graphql",
json={"query": REFRESH_TOKEN_MUTATION}, json={"query": REFRESH_TOKEN_MUTATION},
) )
assert response.status_code == 200 assert_empty(response)
assert response.json()["data"] is None
def test_graphql_refresh_token( def test_graphql_refresh_token(authorized_client, client, tokens_file):
authorized_client, caller_name_and_date = graphql_get_caller_token_info(authorized_client)
tokens_file,
token_repo,
):
response = authorized_client.post( response = authorized_client.post(
"/graphql", "/graphql",
json={"query": REFRESH_TOKEN_MUTATION}, json={"query": REFRESH_TOKEN_MUTATION},
) )
assert response.status_code == 200 assert_ok(response, "refreshDeviceApiToken")
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["refreshDeviceApiToken"]["success"] is True new_token = response.json()["data"]["api"]["refreshDeviceApiToken"]["token"]
assert ( assert_token_valid(client, new_token)
response.json()["data"]["api"]["refreshDeviceApiToken"]["message"] is not None
) set_client_token(client, new_token)
assert response.json()["data"]["api"]["refreshDeviceApiToken"]["code"] == 200 assert graphql_get_caller_token_info(client) == caller_name_and_date
token = token_repo.get_token_by_name("test_token")
assert token == Token(
token=response.json()["data"]["api"]["refreshDeviceApiToken"]["token"],
device_name="test_token",
created_at=datetime.datetime(2022, 1, 14, 8, 31, 10, 789314),
)
NEW_DEVICE_KEY_MUTATION = """ NEW_DEVICE_KEY_MUTATION = """
@ -234,33 +211,7 @@ def test_graphql_get_new_device_auth_key_unauthorized(
"/graphql", "/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION}, json={"query": NEW_DEVICE_KEY_MUTATION},
) )
assert response.status_code == 200 assert_empty(response)
assert response.json()["data"] is None
def test_graphql_get_new_device_auth_key(
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
assert (
response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__()
== 12
)
token = (
Mnemonic(language="english")
.to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == token
INVALIDATE_NEW_DEVICE_KEY_MUTATION = """ INVALIDATE_NEW_DEVICE_KEY_MUTATION = """
@ -289,48 +240,20 @@ def test_graphql_invalidate_new_device_token_unauthorized(
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json()["data"] is None
def test_graphql_get_and_delete_new_device_key( def test_graphql_get_and_delete_new_device_key(client, authorized_client, tokens_file):
authorized_client, mnemonic_key = graphql_get_new_device_key(authorized_client)
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
assert (
response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__()
== 12
)
token = (
Mnemonic(language="english")
.to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.post( response = authorized_client.post(
"/graphql", "/graphql",
json={"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION}, json={"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION},
) )
assert response.status_code == 200 assert_ok(response, "invalidateNewDeviceApiKey")
assert response.json().get("data") is not None
assert ( response = graphql_try_auth_new_device(client, mnemonic_key, "new_device")
response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["success"] is True assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
)
assert (
response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["invalidateNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """ AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """
@ -347,209 +270,46 @@ mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
""" """
def test_graphql_get_and_authorize_new_device( def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file):
client, mnemonic_key = graphql_get_new_device_key(authorized_client)
authorized_client, old_devices = graphql_get_devices(authorized_client)
tokens_file,
): graphql_authorize_new_device(client, mnemonic_key, "new_device")
response = authorized_client.post( new_devices = graphql_get_devices(authorized_client)
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION}, assert len(new_devices) == len(old_devices) + 1
) assert "new_device" in [device["name"] for device in new_devices]
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
mnemonic_key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]
assert mnemonic_key.split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(mnemonic_key).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "new_device",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is True
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 200
token = response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == token
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_graphql_authorize_new_device_with_invalid_key( def test_graphql_authorize_new_device_with_invalid_key(
client, client, authorized_client, tokens_file
tokens_file,
): ):
response = client.post( response = graphql_try_auth_new_device(client, "invalid_token", "new_device")
"/graphql", assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, assert_original(authorized_client)
"variables": {
"input": {
"key": "invalid_token",
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is False
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_get_and_authorize_used_key( def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file):
client, mnemonic_key = graphql_get_new_device_key(authorized_client)
authorized_client,
tokens_file,
):
response = authorized_client.post(
"/graphql",
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
mnemonic_key = response.json()["data"]["api"]["getNewDeviceApiKey"]["key"]
assert mnemonic_key.split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(mnemonic_key).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "new_token",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is True
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 200
assert (
read_json(tokens_file)["tokens"][2]["token"]
== response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "new_token"
response = client.post( graphql_authorize_new_device(client, mnemonic_key, "new_device")
"/graphql", devices = graphql_get_devices(authorized_client)
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION, response = graphql_try_auth_new_device(client, mnemonic_key, "new_device2")
"variables": { assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
"input": {
"key": NEW_DEVICE_KEY_MUTATION, assert graphql_get_devices(authorized_client) == devices
"deviceName": "test_token2",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is False
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file)["tokens"].__len__() == 3
def test_graphql_get_and_authorize_key_after_12_minutes( def test_graphql_get_and_authorize_key_after_12_minutes(
client, client, authorized_client, tokens_file, mocker
authorized_client,
tokens_file,
): ):
response = authorized_client.post( mnemonic_key = graphql_get_new_device_key(authorized_client)
"/graphql", mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
json={"query": NEW_DEVICE_KEY_MUTATION},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewDeviceApiKey"]["code"] == 200
assert (
response.json()["data"]["api"]["getNewDeviceApiKey"]["key"].split(" ").__len__()
== 12
)
key = (
Mnemonic(language="english")
.to_entropy(response.json()["data"]["api"]["getNewDeviceApiKey"]["key"])
.hex()
)
assert read_json(tokens_file)["new_device"]["token"] == key
file_data = read_json(tokens_file) response = graphql_try_auth_new_device(client, mnemonic_key, "new_device")
file_data["new_device"]["expiration"] = str( assert_errorcode(response, "authorizeWithNewDeviceApiKey", 404)
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["success"]
is False
)
assert (
response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["message"]
is not None
)
assert response.json()["data"]["api"]["authorizeWithNewDeviceApiKey"]["code"] == 404
def test_graphql_authorize_without_token( def test_graphql_authorize_without_token(
@ -567,5 +327,4 @@ def test_graphql_authorize_without_token(
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None

View file

@ -1,24 +1,33 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
# pylint: disable=unused-argument # pylint: disable=unused-argument
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
import datetime
from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json import pytest
TOKENS_FILE_CONTETS = { from datetime import datetime, timezone
"tokens": [
{ from tests.common import (
"token": "TEST_TOKEN", generate_api_query,
"name": "test_token", assert_recovery_recent,
"date": "2022-01-14 08:31:10.789314", NearFuture,
}, RECOVERY_KEY_VALIDATION_DATETIME,
{ )
"token": "TEST_TOKEN2",
"name": "test_token2", # Graphql API's output should be timezone-naive
"date": "2022-01-14 08:31:10.789314", from tests.common import five_minutes_into_future_naive_utc as five_minutes_into_future
}, from tests.common import five_minutes_into_future as five_minutes_into_future_tz
] from tests.common import five_minutes_into_past_naive_utc as five_minutes_into_past
}
from tests.test_graphql.api_common import (
assert_empty,
assert_data,
assert_ok,
assert_errorcode,
assert_token_valid,
assert_original,
graphql_get_devices,
set_client_token,
)
API_RECOVERY_QUERY = """ API_RECOVERY_QUERY = """
recoveryKey { recoveryKey {
@ -31,28 +40,85 @@ recoveryKey {
""" """
def test_graphql_recovery_key_status_unauthorized(client, tokens_file): def request_recovery_status(client):
response = client.post( return client.post(
"/graphql", "/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])}, json={"query": generate_api_query([API_RECOVERY_QUERY])},
) )
assert response.status_code == 200
assert response.json().get("data") is None
def graphql_recovery_status(client):
response = request_recovery_status(client)
data = assert_data(response)
status = data["recoveryKey"]
assert status is not None
return status
def request_make_new_recovery_key(client, expires_at=None, uses=None):
json = {"query": API_RECOVERY_KEY_GENERATE_MUTATION}
limits = {}
if expires_at is not None:
limits["expirationDate"] = expires_at.isoformat()
if uses is not None:
limits["uses"] = uses
if limits != {}:
json["variables"] = {"limits": limits}
response = client.post("/graphql", json=json)
return response
def graphql_make_new_recovery_key(client, expires_at=None, uses=None):
response = request_make_new_recovery_key(client, expires_at, uses)
assert_ok(response, "getNewRecoveryApiKey")
key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
assert key is not None
assert key.split(" ").__len__() == 18
return key
def request_recovery_auth(client, key, device_name):
return client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": device_name,
},
},
},
)
def graphql_use_recovery_key(client, key, device_name):
response = request_recovery_auth(client, key, device_name)
assert_ok(response, "useRecoveryApiKey")
token = response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
assert token is not None
assert_token_valid(client, token)
set_client_token(client, token)
assert device_name in [device["name"] for device in graphql_get_devices(client)]
return token
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
response = request_recovery_status(client)
assert_empty(response)
def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file): def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file):
response = authorized_client.post( status = graphql_recovery_status(authorized_client)
"/graphql", assert status["exists"] is False
json={"query": generate_api_query([API_RECOVERY_QUERY])}, assert status["valid"] is False
) assert status["creationDate"] is None
assert response.status_code == 200 assert status["expirationDate"] is None
assert response.json().get("data") is not None assert status["usesLeft"] is None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is False
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
API_RECOVERY_KEY_GENERATE_MUTATION = """ API_RECOVERY_KEY_GENERATE_MUTATION = """
@ -83,281 +149,82 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file): def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
response = authorized_client.post( key = graphql_make_new_recovery_key(authorized_client)
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None
assert (
response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
.split(" ")
.__len__()
== 18
)
assert read_json(tokens_file)["recovery_token"] is not None
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status status = graphql_recovery_status(authorized_client)
response = authorized_client.post( assert status["exists"] is True
"/graphql", assert status["valid"] is True
json={"query": generate_api_query([API_RECOVERY_QUERY])}, assert_recovery_recent(status["creationDate"])
) assert status["expirationDate"] is None
assert response.status_code == 200 assert status["usesLeft"] is None
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"][
"creationDate"
] == time_generated.replace("Z", "")
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
# Try to use token graphql_use_recovery_key(client, key, "new_test_token")
response = client.post( # And again
"/graphql", graphql_use_recovery_key(client, key, "new_test_token2")
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "new_test_token"
# Try to use token again
response = client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
assert read_json(tokens_file)["tokens"][3]["name"] == "new_test_token2"
@pytest.mark.parametrize(
"expiration_date", [five_minutes_into_future(), five_minutes_into_future_tz()]
)
def test_graphql_generate_recovery_key_with_expiration_date( def test_graphql_generate_recovery_key_with_expiration_date(
client, authorized_client, tokens_file client, authorized_client, tokens_file, expiration_date: datetime
): ):
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f")
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": expiration_date_str,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None
assert (
response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"]
.split(" ")
.__len__()
== 18
)
assert read_json(tokens_file)["recovery_token"] is not None
key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] status = graphql_recovery_status(authorized_client)
assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str assert status["exists"] is True
assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key) assert status["valid"] is True
assert_recovery_recent(status["creationDate"])
time_generated = read_json(tokens_file)["recovery_token"]["date"] # timezone-aware comparison. Should pass regardless of server's tz
assert time_generated is not None assert datetime.fromisoformat(status["expirationDate"]) == expiration_date.replace(
assert ( tzinfo=timezone.utc
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
) )
# Try to get token status assert status["usesLeft"] is None
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"][
"creationDate"
] == time_generated.replace("Z", "")
assert (
response.json()["data"]["api"]["recoveryKey"]["expirationDate"]
== expiration_date_str
)
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
# Try to use token graphql_use_recovery_key(client, key, "new_test_token")
response = authorized_client.post( # And again
"/graphql", graphql_use_recovery_key(client, key, "new_test_token2")
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
# Try to use token again
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": key,
"deviceName": "new_test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
assert (
response.json()["data"]["api"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
# Try to use token after expiration date def test_graphql_use_recovery_key_after_expiration(
new_data = read_json(tokens_file) client, authorized_client, tokens_file, mocker
new_data["recovery_token"]["expiration"] = ( ):
datetime.datetime.now() - datetime.timedelta(minutes=5) expiration_date = five_minutes_into_future()
).strftime("%Y-%m-%dT%H:%M:%S.%f") key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
write_json(tokens_file, new_data)
response = authorized_client.post( # Timewarp to after it expires
"/graphql", mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
json={
"query": API_RECOVERY_KEY_USE_MUTATION, response = request_recovery_auth(client, key, "new_test_token3")
"variables": { assert_errorcode(response, "useRecoveryApiKey", 404)
"input": {
"key": key,
"deviceName": "new_test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 404
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None
assert_original(authorized_client)
assert read_json(tokens_file)["tokens"] == new_data["tokens"] status = graphql_recovery_status(authorized_client)
assert status["exists"] is True
assert status["valid"] is False
assert_recovery_recent(status["creationDate"])
# Try to get token status # timezone-aware comparison. Should pass regardless of server's tz
response = authorized_client.post( assert datetime.fromisoformat(status["expirationDate"]) == expiration_date.replace(
"/graphql", tzinfo=timezone.utc
json={"query": generate_api_query([API_RECOVERY_QUERY])},
) )
assert response.status_code == 200 assert status["usesLeft"] is None
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
assert (
response.json()["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
)
assert (
response.json()["data"]["api"]["recoveryKey"]["expirationDate"]
== new_data["recovery_token"]["expiration"]
)
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
def test_graphql_generate_recovery_key_with_expiration_in_the_past( def test_graphql_generate_recovery_key_with_expiration_in_the_past(
authorized_client, tokens_file authorized_client, tokens_file
): ):
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5) expiration_date = five_minutes_into_past()
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f") response = request_make_new_recovery_key(
authorized_client, expires_at=expiration_date
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": expiration_date_str,
},
},
},
) )
assert response.status_code == 200
assert response.json().get("data") is not None assert_errorcode(response, "getNewRecoveryApiKey", 400)
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None
assert "recovery_token" not in read_json(tokens_file) assert graphql_recovery_status(authorized_client)["exists"] is False
def test_graphql_generate_recovery_key_with_invalid_time_format( def test_graphql_generate_recovery_key_with_invalid_time_format(
@ -377,183 +244,57 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None assert graphql_recovery_status(authorized_client)["exists"] is False
assert "recovery_token" not in read_json(tokens_file)
def test_graphql_generate_recovery_key_with_limited_uses( def test_graphql_generate_recovery_key_with_limited_uses(
authorized_client, tokens_file authorized_client, client, tokens_file
): ):
response = authorized_client.post( mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": None,
"uses": 2,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is not None
mnemonic_key = response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] status = graphql_recovery_status(authorized_client)
key = mnemonic_to_hex(mnemonic_key) assert status["exists"] is True
assert status["valid"] is True
assert status["creationDate"] is not None
assert status["expirationDate"] is None
assert status["usesLeft"] == 2
assert read_json(tokens_file)["recovery_token"]["token"] == key graphql_use_recovery_key(client, mnemonic_key, "new_test_token1")
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
# Try to get token status status = graphql_recovery_status(authorized_client)
response = authorized_client.post( assert status["exists"] is True
"/graphql", assert status["valid"] is True
json={"query": generate_api_query([API_RECOVERY_QUERY])}, assert status["creationDate"] is not None
) assert status["expirationDate"] is None
assert response.status_code == 200 assert status["usesLeft"] == 1
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 2
# Try to use token graphql_use_recovery_key(client, mnemonic_key, "new_test_token2")
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "test_token1",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
# Try to get token status status = graphql_recovery_status(authorized_client)
response = authorized_client.post( assert status["exists"] is True
"/graphql", assert status["valid"] is False
json={"query": generate_api_query([API_RECOVERY_QUERY])}, assert status["creationDate"] is not None
) assert status["expirationDate"] is None
assert response.status_code == 200 assert status["usesLeft"] == 0
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 1
# Try to use token response = request_recovery_auth(client, mnemonic_key, "new_test_token3")
response = authorized_client.post( assert_errorcode(response, "useRecoveryApiKey", 404)
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is True
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 200
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is not None
# Try to get token status
response = authorized_client.post(
"/graphql",
json={"query": generate_api_query([API_RECOVERY_QUERY])},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["recoveryKey"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json()["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json()["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] == 0
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"key": mnemonic_key,
"deviceName": "test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["useRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["useRecoveryApiKey"]["code"] == 404
assert response.json()["data"]["api"]["useRecoveryApiKey"]["token"] is None
def test_graphql_generate_recovery_key_with_negative_uses( def test_graphql_generate_recovery_key_with_negative_uses(
authorized_client, tokens_file authorized_client, tokens_file
): ):
# Try to get token status response = request_make_new_recovery_key(authorized_client, uses=-1)
response = authorized_client.post(
"/graphql", assert_errorcode(response, "getNewRecoveryApiKey", 400)
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": -1,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file): def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file):
# Try to get token status response = request_make_new_recovery_key(authorized_client, uses=0)
response = authorized_client.post(
"/graphql", assert_errorcode(response, "getNewRecoveryApiKey", 400)
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": 0,
},
},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["success"] is False
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None assert response.json()["data"]["api"]["getNewRecoveryApiKey"]["key"] is None
assert graphql_recovery_status(authorized_client)["exists"] is False

View file

@ -14,6 +14,8 @@ import secrets
import tempfile import tempfile
from selfprivacy_api.utils.huey import huey
import selfprivacy_api.services as services import selfprivacy_api.services as services
from selfprivacy_api.services import Service, get_all_services from selfprivacy_api.services import Service, get_all_services
from selfprivacy_api.services.service import ServiceStatus from selfprivacy_api.services.service import ServiceStatus
@ -119,6 +121,10 @@ def dummy_service(tmpdir, backups, raw_dummy_service) -> Service:
# register our service # register our service
services.services.append(service) services.services.append(service)
# make sure we are in immediate mode because this thing is non pickleable to store on queue.
huey.immediate = True
assert huey.immediate is True
assert get_service_by_id(service.get_id()) is not None assert get_service_by_id(service.get_id()) is not None
yield service yield service
@ -996,6 +1002,32 @@ def test_autobackup_timing(backups, dummy_service):
assert Backups.is_time_to_backup_service(dummy_service, future) assert Backups.is_time_to_backup_service(dummy_service, future)
def test_backup_unbackuppable(backups, dummy_service):
dummy_service.set_backuppable(False)
assert dummy_service.can_be_backed_up() is False
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
def test_failed_autoback_prevents_more_autobackup(backups, dummy_service):
backup_period = 13 # minutes
now = datetime.now(timezone.utc)
Backups.set_autobackup_period_minutes(backup_period)
assert Backups.is_time_to_backup_service(dummy_service, now)
# artificially making an errored out backup job
dummy_service.set_backuppable(False)
with pytest.raises(ValueError):
Backups.back_up(dummy_service)
dummy_service.set_backuppable(True)
assert Backups.get_last_backed_up(dummy_service) is None
assert Backups.get_last_backup_error_time(dummy_service) is not None
assert Backups.is_time_to_backup_service(dummy_service, now) is False
# Storage # Storage
def test_snapshots_caching(backups, dummy_service): def test_snapshots_caching(backups, dummy_service):
Backups.back_up(dummy_service) Backups.back_up(dummy_service)

View file

@ -25,7 +25,6 @@ from test_tokens_repository import (
mock_recovery_key_generate, mock_recovery_key_generate,
mock_generate_token, mock_generate_token,
mock_new_device_key_generate, mock_new_device_key_generate,
empty_keys,
) )
ORIGINAL_TOKEN_CONTENT = [ ORIGINAL_TOKEN_CONTENT = [
@ -51,6 +50,18 @@ ORIGINAL_TOKEN_CONTENT = [
}, },
] ]
EMPTY_KEYS_JSON = """
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}
"""
@pytest.fixture @pytest.fixture
def tokens(mocker, datadir): def tokens(mocker, datadir):
@ -59,6 +70,22 @@ def tokens(mocker, datadir):
return datadir return datadir
@pytest.fixture
def empty_keys(mocker, tmpdir):
tokens_file = tmpdir / "empty_keys.json"
with open(tokens_file, "w") as file:
file.write(EMPTY_KEYS_JSON)
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=tokens_file)
assert read_json(tokens_file)["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return tmpdir
@pytest.fixture @pytest.fixture
def null_keys(mocker, datadir): def null_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json") mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "null_keys.json")

View file

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View file

@ -2,7 +2,7 @@
# pylint: disable=unused-argument # pylint: disable=unused-argument
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
from datetime import datetime, timedelta from datetime import datetime, timezone
from mnemonic import Mnemonic from mnemonic import Mnemonic
import pytest import pytest
@ -16,13 +16,18 @@ from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound, TokenNotFound,
NewDeviceKeyNotFound, NewDeviceKeyNotFound,
) )
from selfprivacy_api.repositories.tokens.json_tokens_repository import ( from selfprivacy_api.repositories.tokens.json_tokens_repository import (
JsonTokensRepository, JsonTokensRepository,
) )
from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository, RedisTokensRepository,
) )
from tests.common import read_json from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from tests.common import five_minutes_into_past, five_minutes_into_future
ORIGINAL_DEVICE_NAMES = [ ORIGINAL_DEVICE_NAMES = [
@ -32,24 +37,15 @@ ORIGINAL_DEVICE_NAMES = [
"forth_token", "forth_token",
] ]
TEST_DATE = datetime(2022, 7, 15, 17, 41, 31, 675698, timezone.utc)
# tokens are not tz-aware
TOKEN_TEST_DATE = datetime(2022, 7, 15, 17, 41, 31, 675698)
def mnemonic_from_hex(hexkey): def mnemonic_from_hex(hexkey):
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey)) return Mnemonic(language="english").to_mnemonic(bytes.fromhex(hexkey))
@pytest.fixture
def empty_keys(mocker, datadir):
mocker.patch("selfprivacy_api.utils.TOKENS_FILE", new=datadir / "empty_keys.json")
assert read_json(datadir / "empty_keys.json")["tokens"] == [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698",
}
]
return datadir
@pytest.fixture @pytest.fixture
def mock_new_device_key_generate(mocker): def mock_new_device_key_generate(mocker):
mock = mocker.patch( mock = mocker.patch(
@ -57,8 +53,8 @@ def mock_new_device_key_generate(mocker):
autospec=True, autospec=True,
return_value=NewDeviceKey( return_value=NewDeviceKey(
key="43478d05b35e4781598acd76e33832bb", key="43478d05b35e4781598acd76e33832bb",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), expires_at=TEST_DATE,
), ),
) )
return mock return mock
@ -72,8 +68,8 @@ def mock_new_device_key_generate_for_mnemonic(mocker):
autospec=True, autospec=True,
return_value=NewDeviceKey( return_value=NewDeviceKey(
key="2237238de23dc71ab558e317bdb8ff8e", key="2237238de23dc71ab558e317bdb8ff8e",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), expires_at=TEST_DATE,
), ),
) )
return mock return mock
@ -100,7 +96,7 @@ def mock_recovery_key_generate_invalid(mocker):
autospec=True, autospec=True,
return_value=RecoveryKey( return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051", key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
expires_at=None, expires_at=None,
uses_left=0, uses_left=0,
), ),
@ -116,7 +112,7 @@ def mock_token_generate(mocker):
return_value=Token( return_value=Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice", device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TOKEN_TEST_DATE,
), ),
) )
return mock return mock
@ -129,7 +125,7 @@ def mock_recovery_key_generate(mocker):
autospec=True, autospec=True,
return_value=RecoveryKey( return_value=RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051", key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
expires_at=None, expires_at=None,
uses_left=1, uses_left=1,
), ),
@ -137,23 +133,6 @@ def mock_recovery_key_generate(mocker):
return mock return mock
@pytest.fixture
def empty_json_repo(empty_keys):
repo = JsonTokensRepository()
for token in repo.get_tokens():
repo.delete_token(token)
assert repo.get_tokens() == []
return repo
@pytest.fixture
def empty_redis_repo():
repo = RedisTokensRepository()
repo.reset()
assert repo.get_tokens() == []
return repo
@pytest.fixture(params=["json", "redis"]) @pytest.fixture(params=["json", "redis"])
def empty_repo(request, empty_json_repo, empty_redis_repo): def empty_repo(request, empty_json_repo, empty_redis_repo):
if request.param == "json": if request.param == "json":
@ -250,13 +229,13 @@ def test_create_token(empty_repo, mock_token_generate):
assert repo.create_token(device_name="IamNewDevice") == Token( assert repo.create_token(device_name="IamNewDevice") == Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice", device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TOKEN_TEST_DATE,
) )
assert repo.get_tokens() == [ assert repo.get_tokens() == [
Token( Token(
token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM", token="ZuLNKtnxDeq6w2dpOJhbB3iat_sJLPTPl_rN5uc5MvM",
device_name="IamNewDevice", device_name="IamNewDevice",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TOKEN_TEST_DATE,
) )
] ]
@ -292,7 +271,7 @@ def test_delete_not_found_token(some_tokens_repo):
input_token = Token( input_token = Token(
token="imbadtoken", token="imbadtoken",
device_name="primary_token", device_name="primary_token",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
) )
with pytest.raises(TokenNotFound): with pytest.raises(TokenNotFound):
assert repo.delete_token(input_token) is None assert repo.delete_token(input_token) is None
@ -321,7 +300,7 @@ def test_refresh_not_found_token(some_tokens_repo, mock_token_generate):
input_token = Token( input_token = Token(
token="idontknowwhoiam", token="idontknowwhoiam",
device_name="tellmewhoiam?", device_name="tellmewhoiam?",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
) )
with pytest.raises(TokenNotFound): with pytest.raises(TokenNotFound):
@ -345,7 +324,7 @@ def test_create_get_recovery_key(some_tokens_repo, mock_recovery_key_generate):
assert repo.create_recovery_key(uses_left=1, expiration=None) is not None assert repo.create_recovery_key(uses_left=1, expiration=None) is not None
assert repo.get_recovery_key() == RecoveryKey( assert repo.get_recovery_key() == RecoveryKey(
key="889bf49c1d3199d71a2e704718772bd53a422020334db051", key="889bf49c1d3199d71a2e704718772bd53a422020334db051",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
expires_at=None, expires_at=None,
uses_left=1, uses_left=1,
) )
@ -384,10 +363,13 @@ def test_use_mnemonic_expired_recovery_key(
some_tokens_repo, some_tokens_repo,
): ):
repo = some_tokens_repo repo = some_tokens_repo
expiration = datetime.now() - timedelta(minutes=5) expiration = five_minutes_into_past()
assert repo.create_recovery_key(uses_left=2, expiration=expiration) is not None assert repo.create_recovery_key(uses_left=2, expiration=expiration) is not None
recovery_key = repo.get_recovery_key() recovery_key = repo.get_recovery_key()
assert recovery_key.expires_at == expiration # TODO: do not ignore timezone once json backend is deleted
assert recovery_key.expires_at.replace(tzinfo=None) == expiration.replace(
tzinfo=None
)
assert not repo.is_recovery_key_valid() assert not repo.is_recovery_key_valid()
with pytest.raises(RecoveryKeyNotFound): with pytest.raises(RecoveryKeyNotFound):
@ -484,8 +466,8 @@ def test_get_new_device_key(some_tokens_repo, mock_new_device_key_generate):
assert repo.get_new_device_key() == NewDeviceKey( assert repo.get_new_device_key() == NewDeviceKey(
key="43478d05b35e4781598acd76e33832bb", key="43478d05b35e4781598acd76e33832bb",
created_at=datetime(2022, 7, 15, 17, 41, 31, 675698), created_at=TEST_DATE,
expires_at=datetime(2022, 7, 15, 17, 41, 31, 675698), expires_at=TEST_DATE,
) )
@ -561,7 +543,7 @@ def test_use_mnemonic_expired_new_device_key(
some_tokens_repo, some_tokens_repo,
): ):
repo = some_tokens_repo repo = some_tokens_repo
expiration = datetime.now() - timedelta(minutes=5) expiration = five_minutes_into_past()
key = repo.get_new_device_key() key = repo.get_new_device_key()
assert key is not None assert key is not None
@ -588,3 +570,36 @@ def test_use_mnemonic_new_device_key_when_empty(empty_repo):
) )
is None is None
) )
def assert_identical(
repo_a: AbstractTokensRepository, repo_b: AbstractTokensRepository
):
tokens_a = repo_a.get_tokens()
tokens_b = repo_b.get_tokens()
assert len(tokens_a) == len(tokens_b)
for token in tokens_a:
assert token in tokens_b
assert repo_a.get_recovery_key() == repo_b.get_recovery_key()
assert repo_a._get_stored_new_device_key() == repo_b._get_stored_new_device_key()
def clone_to_redis(repo: JsonTokensRepository):
other_repo = RedisTokensRepository()
other_repo.clone(repo)
assert_identical(repo, other_repo)
# we cannot easily parametrize this unfortunately, since some_tokens and empty_repo cannot coexist
def test_clone_json_to_redis_empty(empty_repo):
repo = empty_repo
if isinstance(repo, JsonTokensRepository):
clone_to_redis(repo)
def test_clone_json_to_redis_full(some_tokens_repo):
repo = some_tokens_repo
if isinstance(repo, JsonTokensRepository):
repo.get_new_device_key()
repo.create_recovery_key(five_minutes_into_future(), 2)
clone_to_redis(repo)

View file

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "KG9ni-B-CMPk327Zv1qC7YBQaUGaBUcgdkvMvQ2atFI",
"name": "primary_token",
"date": "2022-07-15 17:41:31.675698"
}
]
}

View file

@ -3,6 +3,7 @@
import pytest import pytest
from tests.common import read_json from tests.common import read_json
from tests.test_graphql.common import assert_empty
class ProcessMock: class ProcessMock:
@ -72,8 +73,7 @@ def test_graphql_add_ssh_key_unauthorized(client, some_users, mock_subprocess_po
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_add_ssh_key(authorized_client, some_users, mock_subprocess_popen): def test_graphql_add_ssh_key(authorized_client, some_users, mock_subprocess_popen):
@ -231,8 +231,7 @@ def test_graphql_remove_ssh_key_unauthorized(client, some_users, mock_subprocess
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_remove_ssh_key(authorized_client, some_users, mock_subprocess_popen): def test_graphql_remove_ssh_key(authorized_client, some_users, mock_subprocess_popen):

View file

@ -5,6 +5,7 @@ import os
import pytest import pytest
from tests.common import generate_system_query, read_json from tests.common import generate_system_query, read_json
from tests.test_graphql.common import assert_empty
@pytest.fixture @pytest.fixture
@ -144,8 +145,7 @@ def test_graphql_get_python_version_wrong_auth(
"query": generate_system_query([API_PYTHON_VERSION_INFO]), "query": generate_system_query([API_PYTHON_VERSION_INFO]),
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_get_python_version(authorized_client, mock_subprocess_check_output): def test_graphql_get_python_version(authorized_client, mock_subprocess_check_output):
@ -181,8 +181,7 @@ def test_graphql_get_system_version_unauthorized(
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
assert mock_subprocess_check_output.call_count == 0 assert mock_subprocess_check_output.call_count == 0
@ -348,8 +347,7 @@ def test_graphql_get_timezone_unauthorized(client, turned_on):
"query": generate_system_query([API_GET_TIMEZONE]), "query": generate_system_query([API_GET_TIMEZONE]),
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_get_timezone(authorized_client, turned_on): def test_graphql_get_timezone(authorized_client, turned_on):
@ -405,8 +403,7 @@ def test_graphql_change_timezone_unauthorized(client, turned_on):
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_change_timezone(authorized_client, turned_on): def test_graphql_change_timezone(authorized_client, turned_on):
@ -515,8 +512,7 @@ def test_graphql_get_auto_upgrade_unauthorized(client, turned_on):
"query": generate_system_query([API_GET_AUTO_UPGRADE_SETTINGS_QUERY]), "query": generate_system_query([API_GET_AUTO_UPGRADE_SETTINGS_QUERY]),
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_get_auto_upgrade(authorized_client, turned_on): def test_graphql_get_auto_upgrade(authorized_client, turned_on):
@ -624,8 +620,7 @@ def test_graphql_change_auto_upgrade_unauthorized(client, turned_on):
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_change_auto_upgrade(authorized_client, turned_on): def test_graphql_change_auto_upgrade(authorized_client, turned_on):
@ -932,8 +927,7 @@ def test_graphql_pull_system_configuration_unauthorized(client, mock_subprocess_
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
assert mock_subprocess_popen.call_count == 0 assert mock_subprocess_popen.call_count == 0

View file

@ -6,6 +6,7 @@ from tests.common import (
generate_users_query, generate_users_query,
read_json, read_json,
) )
from tests.test_graphql.common import assert_empty
invalid_usernames = [ invalid_usernames = [
"messagebus", "messagebus",
@ -125,8 +126,7 @@ def test_graphql_get_users_unauthorized(client, some_users, mock_subprocess_pope
"query": generate_users_query([API_USERS_INFO]), "query": generate_users_query([API_USERS_INFO]),
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_get_some_users(authorized_client, some_users, mock_subprocess_popen): def test_graphql_get_some_users(authorized_client, some_users, mock_subprocess_popen):
@ -192,8 +192,7 @@ def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_pop
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen): def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
@ -323,8 +322,7 @@ def test_graphql_add_user_unauthorize(client, one_user, mock_subprocess_popen):
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_add_user(authorized_client, one_user, mock_subprocess_popen): def test_graphql_add_user(authorized_client, one_user, mock_subprocess_popen):
@ -576,8 +574,7 @@ def test_graphql_delete_user_unauthorized(client, some_users, mock_subprocess_po
"variables": {"username": "user1"}, "variables": {"username": "user1"},
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_delete_user(authorized_client, some_users, mock_subprocess_popen): def test_graphql_delete_user(authorized_client, some_users, mock_subprocess_popen):
@ -683,8 +680,7 @@ def test_graphql_update_user_unauthorized(client, some_users, mock_subprocess_po
}, },
}, },
) )
assert response.status_code == 200 assert_empty(response)
assert response.json().get("data") is None
def test_graphql_update_user(authorized_client, some_users, mock_subprocess_popen): def test_graphql_update_user(authorized_client, some_users, mock_subprocess_popen):

View file

@ -1,18 +1,25 @@
import pytest import pytest
from datetime import datetime, timedelta from datetime import datetime, timedelta, timezone
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
def test_recovery_key_expired(): def test_recovery_key_expired_utcnaive():
expiration = datetime.now() - timedelta(minutes=5) expiration = datetime.utcnow() - timedelta(minutes=5)
key = RecoveryKey.generate(expiration=expiration, uses_left=2)
assert not key.is_valid()
def test_recovery_key_expired_tzaware():
expiration = datetime.now(timezone.utc) - timedelta(minutes=5)
key = RecoveryKey.generate(expiration=expiration, uses_left=2) key = RecoveryKey.generate(expiration=expiration, uses_left=2)
assert not key.is_valid() assert not key.is_valid()
def test_new_device_key_expired(): def test_new_device_key_expired():
expiration = datetime.now() - timedelta(minutes=5) # key is supposed to be tzaware
expiration = datetime.now(timezone.utc) - timedelta(minutes=5)
key = NewDeviceKey.generate() key = NewDeviceKey.generate()
key.expires_at = expiration key.expires_at = expiration
assert not key.is_valid() assert not key.is_valid()

View file

@ -1,14 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View file

@ -1,9 +0,0 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "Test Token",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View file

@ -2,32 +2,18 @@
# pylint: disable=unused-argument # pylint: disable=unused-argument
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
import datetime import datetime
from datetime import timezone
import pytest import pytest
from mnemonic import Mnemonic
from selfprivacy_api.repositories.tokens.json_tokens_repository import ( from tests.conftest import TOKENS_FILE_CONTENTS
JsonTokensRepository, from tests.common import (
RECOVERY_KEY_VALIDATION_DATETIME,
DEVICE_KEY_VALIDATION_DATETIME,
NearFuture,
assert_recovery_recent,
) )
from tests.common import five_minutes_into_future_naive_utc as five_minutes_into_future
TOKEN_REPO = JsonTokensRepository() from tests.common import five_minutes_into_past_naive_utc as five_minutes_into_past
from tests.common import read_json, write_json
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
DATE_FORMATS = [ DATE_FORMATS = [
"%Y-%m-%dT%H:%M:%S.%fZ", "%Y-%m-%dT%H:%M:%S.%fZ",
@ -37,10 +23,99 @@ DATE_FORMATS = [
] ]
def test_get_tokens_info(authorized_client, tokens_file): def assert_original(client):
response = authorized_client.get("/auth/tokens") new_tokens = rest_get_tokens_info(client)
for token in TOKENS_FILE_CONTENTS["tokens"]:
assert_token_valid(client, token["token"])
for new_token in new_tokens:
if new_token["name"] == token["name"]:
assert (
datetime.datetime.fromisoformat(new_token["date"]) == token["date"]
)
assert_no_recovery(client)
def assert_token_valid(client, token):
client.headers.update({"Authorization": "Bearer " + token})
assert rest_get_tokens_info(client) is not None
def rest_get_tokens_info(client):
response = client.get("/auth/tokens")
assert response.status_code == 200 assert response.status_code == 200
assert response.json() == [ return response.json()
def rest_try_authorize_new_device(client, token, device_name):
response = client.post(
"/auth/new_device/authorize",
json={
"token": token,
"device": device_name,
},
)
return response
def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None):
json = {}
if expires_at is not None:
assert timeformat is not None
expires_at_str = expires_at.strftime(timeformat)
json["expiration"] = expires_at_str
if uses is not None:
json["uses"] = uses
if json == {}:
response = client.post("/auth/recovery_token")
else:
response = client.post(
"/auth/recovery_token",
json=json,
)
if not response.status_code == 200:
raise ValueError(response.reason, response.text, response.json()["detail"])
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def rest_get_recovery_status(client):
response = client.get("/auth/recovery_token")
assert response.status_code == 200
return response.json()
def rest_get_recovery_date(client):
status = rest_get_recovery_status(client)
assert "date" in status
return status["date"]
def assert_no_recovery(client):
assert not rest_get_recovery_status(client)["exists"]
def rest_recover_with_mnemonic(client, mnemonic_token, device_name):
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": device_name},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert_token_valid(client, new_token)
return new_token
# Tokens
def test_get_tokens_info(authorized_client, tokens_file):
assert sorted(rest_get_tokens_info(authorized_client), key=lambda x: x["name"]) == [
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}, {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True},
{ {
"name": "test_token2", "name": "test_token2",
@ -55,10 +130,10 @@ def test_get_tokens_unauthorized(client, tokens_file):
assert response.status_code == 401 assert response.status_code == 401
def test_delete_token_unauthorized(client, tokens_file): def test_delete_token_unauthorized(client, authorized_client, tokens_file):
response = client.delete("/auth/tokens") response = client.delete("/auth/tokens")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_delete_token(authorized_client, tokens_file): def test_delete_token(authorized_client, tokens_file):
@ -66,15 +141,9 @@ def test_delete_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token2"} "/auth/tokens", json={"token_name": "test_token2"}
) )
assert response.status_code == 200 assert response.status_code == 200
assert read_json(tokens_file) == { assert rest_get_tokens_info(authorized_client) == [
"tokens": [ {"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
}
] ]
}
def test_delete_self_token(authorized_client, tokens_file): def test_delete_self_token(authorized_client, tokens_file):
@ -82,7 +151,7 @@ def test_delete_self_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token"} "/auth/tokens", json={"token_name": "test_token"}
) )
assert response.status_code == 400 assert response.status_code == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_delete_nonexistent_token(authorized_client, tokens_file): def test_delete_nonexistent_token(authorized_client, tokens_file):
@ -90,131 +159,103 @@ def test_delete_nonexistent_token(authorized_client, tokens_file):
"/auth/tokens", json={"token_name": "test_token3"} "/auth/tokens", json={"token_name": "test_token3"}
) )
assert response.status_code == 404 assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_refresh_token_unauthorized(client, tokens_file): def test_refresh_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/tokens") response = client.post("/auth/tokens")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_refresh_token(authorized_client, tokens_file): def test_refresh_token(authorized_client, tokens_file):
response = authorized_client.post("/auth/tokens") response = authorized_client.post("/auth/tokens")
assert response.status_code == 200 assert response.status_code == 200
new_token = response.json()["token"] new_token = response.json()["token"]
assert TOKEN_REPO.get_token_by_token_string(new_token) is not None assert_token_valid(authorized_client, new_token)
# new device # New device
def test_get_new_device_auth_token_unauthorized(client, tokens_file): def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file):
response = client.post("/auth/new_device") response = client.post("/auth/new_device")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert "token" not in response.json()
assert "detail" in response.json()
# We only can check existence of a token we know.
def test_get_new_device_auth_token(authorized_client, tokens_file): def test_get_and_delete_new_device_token(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
response = authorized_client.delete("/auth/new_device", json={"token": token})
assert response.status_code == 200 assert response.status_code == 200
assert "token" in response.json() assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
def test_get_and_delete_new_device_token(authorized_client, tokens_file): def test_delete_token_unauthenticated(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
assert response.status_code == 200 response = client.delete("/auth/new_device", json={"token": token})
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.delete(
"/auth/new_device", json={"token": response.json()["token"]}
)
assert response.status_code == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_delete_token_unauthenticated(client, tokens_file):
response = client.delete("/auth/new_device")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200
def rest_get_new_device_token(client):
response = client.post("/auth/new_device")
assert response.status_code == 200
assert "token" in response.json()
return response.json()["token"]
def test_get_and_authorize_new_device(client, authorized_client, tokens_file): def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
response = rest_try_authorize_new_device(client, token, "new_device")
assert response.status_code == 200 assert response.status_code == 200
assert "token" in response.json() assert_token_valid(authorized_client, response.json()["token"])
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
)
assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_authorize_new_device_with_invalid_token(client, tokens_file): def test_authorize_new_device_with_invalid_token(
response = client.post( client, authorized_client, tokens_file
"/auth/new_device/authorize", ):
json={"token": "invalid_token", "device": "new_device"}, response = rest_try_authorize_new_device(client, "invalid_token", "new_device")
)
assert response.status_code == 404 assert response.status_code == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_get_and_authorize_used_token(client, authorized_client, tokens_file): def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
response = authorized_client.post("/auth/new_device") token_to_be_used_2_times = rest_get_new_device_token(authorized_client)
assert response.status_code == 200 response = rest_try_authorize_new_device(
assert "token" in response.json() client, token_to_be_used_2_times, "new_device"
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = client.post(
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
) )
assert response.status_code == 200 assert response.status_code == 200
assert read_json(tokens_file)["tokens"][2]["token"] == response.json()["token"] assert_token_valid(authorized_client, response.json()["token"])
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
response = client.post( response = rest_try_authorize_new_device(
"/auth/new_device/authorize", client, token_to_be_used_2_times, "new_device"
json={"token": response.json()["token"], "device": "new_device"},
) )
assert response.status_code == 404 assert response.status_code == 404
def test_get_and_authorize_token_after_12_minutes( def test_get_and_authorize_token_after_12_minutes(
client, authorized_client, tokens_file client, authorized_client, tokens_file, mocker
): ):
response = authorized_client.post("/auth/new_device") token = rest_get_new_device_token(authorized_client)
assert response.status_code == 200
assert "token" in response.json()
token = Mnemonic(language="english").to_entropy(response.json()["token"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
file_data = read_json(tokens_file) # TARDIS sounds
file_data["new_device"]["expiration"] = str( mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
response = client.post( response = rest_try_authorize_new_device(client, token, "new_device")
"/auth/new_device/authorize",
json={"token": response.json()["token"], "device": "new_device"},
)
assert response.status_code == 404 assert response.status_code == 404
assert_original(authorized_client)
def test_authorize_without_token(client, tokens_file): def test_authorize_without_token(client, authorized_client, tokens_file):
response = client.post( response = client.post(
"/auth/new_device/authorize", "/auth/new_device/authorize",
json={"device": "new_device"}, json={"device": "new_device"},
) )
assert response.status_code == 422 assert response.status_code == 422
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
# Recovery tokens # Recovery tokens
@ -240,10 +281,10 @@ def test_authorize_without_token(client, tokens_file):
# - if request is invalid, returns 400 # - if request is invalid, returns 400
def test_get_recovery_token_status_unauthorized(client, tokens_file): def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file):
response = client.get("/auth/recovery_token") response = client.get("/auth/recovery_token")
assert response.status_code == 401 assert response.status_code == 401
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file): def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
@ -256,31 +297,17 @@ def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
"expiration": None, "expiration": None,
"uses_left": None, "uses_left": None,
} }
assert read_json(tokens_file) == TOKENS_FILE_CONTETS assert_original(authorized_client)
def test_generate_recovery_token(authorized_client, client, tokens_file): def test_generate_recovery_token(authorized_client, client, tokens_file):
# Generate token without expiration and uses_left # Generate token without expiration and uses_left
response = authorized_client.post("/auth/recovery_token") mnemonic_token = rest_make_recovery_token(authorized_client)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
time_generated = read_json(tokens_file)["recovery_token"]["date"] time_generated = rest_get_recovery_date(authorized_client)
assert time_generated is not None assert_recovery_recent(time_generated)
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status assert rest_get_recovery_status(authorized_client) == {
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -288,112 +315,49 @@ def test_generate_recovery_token(authorized_client, client, tokens_file):
"uses_left": None, "uses_left": None,
} }
# Try to use the token rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
recovery_response = client.post( # And again
"/auth/recovery_token/use", rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
@pytest.mark.parametrize("timeformat", DATE_FORMATS) @pytest.mark.parametrize("timeformat", DATE_FORMATS)
def test_generate_recovery_token_with_expiration_date( def test_generate_recovery_token_with_expiration_date(
authorized_client, client, tokens_file, timeformat authorized_client, client, tokens_file, timeformat, mocker
): ):
# Generate token with expiration date # Generate token with expiration date
# Generate expiration date in the future # Generate expiration date in the future
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5) expiration_date = five_minutes_into_future()
expiration_date_str = expiration_date.strftime(timeformat) mnemonic_token = rest_make_recovery_token(
response = authorized_client.post( authorized_client, expires_at=expiration_date, timeformat=timeformat
"/auth/recovery_token",
json={"expiration": expiration_date_str},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert datetime.datetime.strptime(
read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f"
) == datetime.datetime.strptime(expiration_date_str, timeformat)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
) )
# Try to get token status time_generated = rest_get_recovery_date(authorized_client)
response = authorized_client.get("/auth/recovery_token") assert_recovery_recent(time_generated)
assert response.status_code == 200
assert response.json() == { assert rest_get_recovery_status(authorized_client) == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
"expiration": expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f"), "expiration": expiration_date.replace(tzinfo=timezone.utc).isoformat(),
"uses_left": None, "uses_left": None,
} }
# Try to use the token rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
recovery_response = client.post( # And again
"/auth/recovery_token/use", rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
# Try to use token again
recovery_response = client.post(
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
# Try to use token after expiration date # Try to use token after expiration date
new_data = read_json(tokens_file) mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime( device_name = "recovery_device3"
"%Y-%m-%dT%H:%M:%S.%f"
)
write_json(tokens_file, new_data)
recovery_response = client.post( recovery_response = client.post(
"/auth/recovery_token/use", "/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device3"}, json={"token": mnemonic_token, "device": device_name},
) )
assert recovery_response.status_code == 404 assert recovery_response.status_code == 404
# Assert that the token was not created in JSON # Assert that the token was not created
assert read_json(tokens_file)["tokens"] == new_data["tokens"] assert device_name not in [
token["name"] for token in rest_get_tokens_info(authorized_client)
# Get the status of the token ]
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True,
"valid": False,
"date": time_generated,
"expiration": new_data["recovery_token"]["expiration"],
"uses_left": None,
}
@pytest.mark.parametrize("timeformat", DATE_FORMATS) @pytest.mark.parametrize("timeformat", DATE_FORMATS)
@ -401,14 +365,14 @@ def test_generate_recovery_token_with_expiration_in_the_past(
authorized_client, tokens_file, timeformat authorized_client, tokens_file, timeformat
): ):
# Server must return 400 if expiration date is in the past # Server must return 400 if expiration date is in the past
expiration_date = datetime.datetime.utcnow() - datetime.timedelta(minutes=5) expiration_date = five_minutes_into_past()
expiration_date_str = expiration_date.strftime(timeformat) expiration_date_str = expiration_date.strftime(timeformat)
response = authorized_client.post( response = authorized_client.post(
"/auth/recovery_token", "/auth/recovery_token",
json={"expiration": expiration_date_str}, json={"expiration": expiration_date_str},
) )
assert response.status_code == 400 assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_invalid_time_format( def test_generate_recovery_token_with_invalid_time_format(
@ -421,37 +385,19 @@ def test_generate_recovery_token_with_invalid_time_format(
json={"expiration": expiration_date}, json={"expiration": expiration_date},
) )
assert response.status_code == 422 assert response.status_code == 422
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_limited_uses( def test_generate_recovery_token_with_limited_uses(
authorized_client, client, tokens_file authorized_client, client, tokens_file
): ):
# Generate token with limited uses # Generate token with limited uses
response = authorized_client.post( mnemonic_token = rest_make_recovery_token(authorized_client, uses=2)
"/auth/recovery_token",
json={"uses": 2},
)
assert response.status_code == 200
assert "token" in response.json()
mnemonic_token = response.json()["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
# Get the date of the token time_generated = rest_get_recovery_date(authorized_client)
time_generated = read_json(tokens_file)["recovery_token"]["date"] assert_recovery_recent(time_generated)
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
# Try to get token status assert rest_get_recovery_status(authorized_client) == {
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -460,21 +406,9 @@ def test_generate_recovery_token_with_limited_uses(
} }
# Try to use the token # Try to use the token
recovery_response = client.post( rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 1 assert rest_get_recovery_status(authorized_client) == {
# Get the status of the token
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": True, "valid": True,
"date": time_generated, "date": time_generated,
@ -483,19 +417,9 @@ def test_generate_recovery_token_with_limited_uses(
} }
# Try to use token again # Try to use token again
recovery_response = client.post( rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
"/auth/recovery_token/use",
json={"token": mnemonic_token, "device": "recovery_device2"},
)
assert recovery_response.status_code == 200
new_token = recovery_response.json()["token"]
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
# Get the status of the token assert rest_get_recovery_status(authorized_client) == {
response = authorized_client.get("/auth/recovery_token")
assert response.status_code == 200
assert response.json() == {
"exists": True, "exists": True,
"valid": False, "valid": False,
"date": time_generated, "date": time_generated,
@ -510,8 +434,6 @@ def test_generate_recovery_token_with_limited_uses(
) )
assert recovery_response.status_code == 404 assert recovery_response.status_code == 404
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 0
def test_generate_recovery_token_with_negative_uses( def test_generate_recovery_token_with_negative_uses(
authorized_client, client, tokens_file authorized_client, client, tokens_file
@ -522,7 +444,7 @@ def test_generate_recovery_token_with_negative_uses(
json={"uses": -2}, json={"uses": -2},
) )
assert response.status_code == 400 assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)
def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file): def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file):
@ -532,4 +454,4 @@ def test_generate_recovery_token_with_zero_uses(authorized_client, client, token
json={"uses": 0}, json={"uses": 0},
) )
assert response.status_code == 400 assert response.status_code == 400
assert "recovery_token" not in read_json(tokens_file) assert_no_recovery(authorized_client)