selfprivacy-rest-api/selfprivacy_api/actions/api_tokens.py

183 lines
5.6 KiB
Python
Raw Permalink Normal View History

"""
2024-07-26 19:59:44 +00:00
App tokens actions.
The only actions on tokens that are accessible from APIs
"""
2024-07-26 19:59:44 +00:00
from datetime import datetime, timezone
from typing import Optional
from pydantic import BaseModel
from mnemonic import Mnemonic
2023-11-10 17:10:01 +00:00
from selfprivacy_api.utils.timeutils import ensure_tz_aware, ensure_tz_aware_strict
from selfprivacy_api.repositories.tokens.redis_tokens_repository import (
RedisTokensRepository,
)
from selfprivacy_api.repositories.tokens.exceptions import (
TokenNotFound,
RecoveryKeyNotFound,
InvalidMnemonic,
NewDeviceKeyNotFound,
)
TOKEN_REPO = RedisTokensRepository()
class TokenInfoWithIsCaller(BaseModel):
"""Token info"""
name: str
date: datetime
is_caller: bool
2023-06-14 11:01:15 +00:00
def _naive(date_time: datetime) -> datetime:
if date_time is None:
return None
if date_time.tzinfo is not None:
date_time.astimezone(timezone.utc)
return date_time.replace(tzinfo=None)
2023-06-14 11:01:15 +00:00
def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCaller]:
"""Get the tokens info"""
caller_name = TOKEN_REPO.get_token_by_token_string(caller_token).device_name
tokens = TOKEN_REPO.get_tokens()
return [
TokenInfoWithIsCaller(
name=token.device_name,
date=token.created_at,
is_caller=token.device_name == caller_name,
)
for token in tokens
]
def is_token_valid(token) -> bool:
"""Check if token is valid"""
return TOKEN_REPO.is_token_valid(token)
class NotFoundException(Exception):
"""Not found exception"""
class CannotDeleteCallerException(Exception):
"""Cannot delete caller exception"""
def delete_api_token(caller_token: str, token_name: str) -> None:
"""Delete the token"""
if TOKEN_REPO.is_token_name_pair_valid(token_name, caller_token):
raise CannotDeleteCallerException("Cannot delete caller's token")
if not TOKEN_REPO.is_token_name_exists(token_name):
raise NotFoundException("Token not found")
token = TOKEN_REPO.get_token_by_name(token_name)
TOKEN_REPO.delete_token(token)
def refresh_api_token(caller_token: str) -> str:
"""Refresh the token"""
try:
old_token = TOKEN_REPO.get_token_by_token_string(caller_token)
new_token = TOKEN_REPO.refresh_token(old_token)
except TokenNotFound:
raise NotFoundException("Token not found")
return new_token.token
class RecoveryTokenStatus(BaseModel):
"""Recovery token status"""
exists: bool
valid: bool
date: Optional[datetime] = None
expiration: Optional[datetime] = None
uses_left: Optional[int] = None
def get_api_recovery_token_status() -> RecoveryTokenStatus:
2023-11-10 17:10:01 +00:00
"""Get the recovery token status, timezone-aware"""
token = TOKEN_REPO.get_recovery_key()
if token is None:
return RecoveryTokenStatus(exists=False, valid=False)
is_valid = TOKEN_REPO.is_recovery_key_valid()
2023-11-10 17:10:01 +00:00
# New tokens are tz-aware, but older ones might not be
expiry_date = token.expires_at
if expiry_date is not None:
expiry_date = ensure_tz_aware_strict(expiry_date)
return RecoveryTokenStatus(
exists=True,
valid=is_valid,
2023-11-10 17:10:01 +00:00
date=ensure_tz_aware_strict(token.created_at),
expiration=expiry_date,
uses_left=token.uses_left,
)
class InvalidExpirationDate(Exception):
"""Invalid expiration date exception"""
class InvalidUsesLeft(Exception):
"""Invalid uses left exception"""
def get_new_api_recovery_key(
expiration_date: Optional[datetime] = None, uses_left: Optional[int] = None
) -> str:
"""Get new recovery key"""
if expiration_date is not None:
expiration_date = ensure_tz_aware(expiration_date)
current_time = datetime.now(timezone.utc)
if expiration_date < current_time:
raise InvalidExpirationDate("Expiration date is in the past")
if uses_left is not None:
if uses_left <= 0:
raise InvalidUsesLeft("Uses must be greater than 0")
key = TOKEN_REPO.create_recovery_key(expiration_date, uses_left)
mnemonic_phrase = Mnemonic(language="english").to_mnemonic(bytes.fromhex(key.key))
return mnemonic_phrase
def use_mnemonic_recovery_token(mnemonic_phrase, name):
"""Use the recovery token by converting the mnemonic word list to a byte array.
If the recovery token if invalid itself, return None
If the binary representation of phrase not matches
the byte array of the recovery token, return None.
If the mnemonic phrase is valid then generate a device token and return it.
Substract 1 from uses_left if it exists.
mnemonic_phrase is a string representation of the mnemonic word list.
"""
try:
token = TOKEN_REPO.use_mnemonic_recovery_key(mnemonic_phrase, name)
return token.token
except (RecoveryKeyNotFound, InvalidMnemonic):
return None
def delete_new_device_auth_token() -> None:
TOKEN_REPO.delete_new_device_key()
def get_new_device_auth_token() -> str:
"""Generate and store a new device auth token which is valid for 10 minutes
and return a mnemonic phrase representation
"""
key = TOKEN_REPO.get_new_device_key()
return Mnemonic(language="english").to_mnemonic(bytes.fromhex(key.key))
2022-12-30 18:06:16 +00:00
def use_new_device_auth_token(mnemonic_phrase, name) -> Optional[str]:
"""Use the new device auth token by converting the mnemonic string to a byte array.
If the mnemonic phrase is valid then generate a device token and return it.
New device auth token must be deleted.
"""
try:
token = TOKEN_REPO.use_mnemonic_new_device_key(mnemonic_phrase, name)
return token.token
except (NewDeviceKeyNotFound, InvalidMnemonic):
return None