selfprivacy-rest-api/selfprivacy_api/repositories/tokens/redis_tokens_repository.py

164 lines
5.8 KiB
Python
Raw Normal View History

"""
Token repository using Redis as backend.
"""
from typing import Optional
from datetime import datetime, timezone
2023-06-21 12:15:33 +00:00
from hashlib import md5
from selfprivacy_api.repositories.tokens.abstract_tokens_repository import (
AbstractTokensRepository,
)
from selfprivacy_api.utils.redis_pool import RedisPool
from selfprivacy_api.models.tokens.token import Token
from selfprivacy_api.models.tokens.recovery_key import RecoveryKey
from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey
2022-12-14 17:31:32 +00:00
from selfprivacy_api.repositories.tokens.exceptions import TokenNotFound
TOKENS_PREFIX = "token_repo:tokens:"
2022-12-14 17:20:09 +00:00
NEW_DEVICE_KEY_REDIS_KEY = "token_repo:new_device_key"
2022-12-14 18:01:34 +00:00
RECOVERY_KEY_REDIS_KEY = "token_repo:recovery_key"
class RedisTokensRepository(AbstractTokensRepository):
"""
Token repository using Redis as a backend
"""
def __init__(self):
self.connection = RedisPool().get_connection()
2022-12-14 15:21:32 +00:00
@staticmethod
def token_key_for_device(device_name: str):
2023-06-21 12:15:33 +00:00
hash = md5()
hash.update(bytes(device_name, "utf-8"))
digest = hash.hexdigest()
return TOKENS_PREFIX + digest
def get_tokens(self) -> list[Token]:
"""Get the tokens"""
2022-12-30 18:06:16 +00:00
redis = self.connection
token_keys = redis.keys(TOKENS_PREFIX + "*")
tokens = []
for key in token_keys:
token = self._token_from_hash(key)
if token is not None:
tokens.append(token)
return tokens
2023-06-21 12:15:33 +00:00
def _discover_token_key(self, input_token: Token) -> str:
"""brute-force searching for tokens, for robust deletion"""
redis = self.connection
token_keys = redis.keys(TOKENS_PREFIX + "*")
for key in token_keys:
token = self._token_from_hash(key)
if token == input_token:
return key
def delete_token(self, input_token: Token) -> None:
"""Delete the token"""
2022-12-30 18:06:16 +00:00
redis = self.connection
2023-06-21 12:15:33 +00:00
key = self._discover_token_key(input_token)
if key is None:
raise TokenNotFound
2022-12-30 18:06:16 +00:00
redis.delete(key)
def get_recovery_key(self) -> Optional[RecoveryKey]:
"""Get the recovery key"""
2022-12-30 18:06:16 +00:00
redis = self.connection
if redis.exists(RECOVERY_KEY_REDIS_KEY):
2022-12-14 18:01:34 +00:00
return self._recovery_key_from_hash(RECOVERY_KEY_REDIS_KEY)
return None
def _store_recovery_key(self, recovery_key: RecoveryKey) -> None:
self._store_model_as_hash(RECOVERY_KEY_REDIS_KEY, recovery_key)
def _delete_recovery_key(self) -> None:
"""Delete the recovery key"""
redis = self.connection
redis.delete(RECOVERY_KEY_REDIS_KEY)
def _store_new_device_key(self, new_device_key: NewDeviceKey) -> None:
"""Store new device key directly"""
2022-12-14 17:20:09 +00:00
self._store_model_as_hash(NEW_DEVICE_KEY_REDIS_KEY, new_device_key)
def delete_new_device_key(self) -> None:
"""Delete the new device key"""
2022-12-30 18:06:16 +00:00
redis = self.connection
redis.delete(NEW_DEVICE_KEY_REDIS_KEY)
@staticmethod
def _token_redis_key(token: Token) -> str:
return RedisTokensRepository.token_key_for_device(token.device_name)
def _store_token(self, new_token: Token):
"""Store a token directly"""
key = RedisTokensRepository._token_redis_key(new_token)
2022-12-14 17:20:09 +00:00
self._store_model_as_hash(key, new_token)
def _decrement_recovery_token(self):
"""Decrement recovery key use count by one"""
if self.is_recovery_key_valid():
2022-12-30 18:06:16 +00:00
recovery_key = self.get_recovery_key()
if recovery_key is None:
return
uses_left = recovery_key.uses_left
if uses_left is not None:
2022-12-30 18:06:16 +00:00
redis = self.connection
redis.hset(RECOVERY_KEY_REDIS_KEY, "uses_left", uses_left - 1)
def _get_stored_new_device_key(self) -> Optional[NewDeviceKey]:
"""Retrieves new device key that is already stored."""
return self._new_device_key_from_hash(NEW_DEVICE_KEY_REDIS_KEY)
2022-12-14 14:48:43 +00:00
@staticmethod
def _is_date_key(key: str):
return key in [
"created_at",
2022-12-14 18:01:34 +00:00
"expires_at",
]
@staticmethod
def _prepare_model_dict(d: dict):
date_keys = [key for key in d.keys() if RedisTokensRepository._is_date_key(key)]
for date in date_keys:
if d[date] != "None":
d[date] = datetime.fromisoformat(d[date])
for key in d.keys():
if d[key] == "None":
d[key] = None
def _model_dict_from_hash(self, redis_key: str) -> Optional[dict]:
2022-12-30 18:06:16 +00:00
redis = self.connection
if redis.exists(redis_key):
token_dict = redis.hgetall(redis_key)
RedisTokensRepository._prepare_model_dict(token_dict)
return token_dict
return None
2022-12-14 14:48:43 +00:00
def _hash_as_model(self, redis_key: str, model_class):
token_dict = self._model_dict_from_hash(redis_key)
if token_dict is not None:
return model_class(**token_dict)
2022-12-14 14:48:43 +00:00
return None
def _token_from_hash(self, redis_key: str) -> Optional[Token]:
2023-06-21 12:15:33 +00:00
token = self._hash_as_model(redis_key, Token)
if token is not None:
token.created_at = token.created_at.replace(tzinfo=None)
return token
2022-12-14 18:01:34 +00:00
def _recovery_key_from_hash(self, redis_key: str) -> Optional[RecoveryKey]:
return self._hash_as_model(redis_key, RecoveryKey)
2022-12-14 18:01:34 +00:00
def _new_device_key_from_hash(self, redis_key: str) -> Optional[NewDeviceKey]:
return self._hash_as_model(redis_key, NewDeviceKey)
2022-12-14 17:20:09 +00:00
def _store_model_as_hash(self, redis_key, model):
2022-12-30 18:06:16 +00:00
redis = self.connection
2022-12-14 15:21:32 +00:00
for key, value in model.dict().items():
if isinstance(value, datetime):
if value.tzinfo is None:
value = value.replace(tzinfo=timezone.utc)
2022-12-14 15:21:32 +00:00
value = value.isoformat()
2022-12-30 18:06:16 +00:00
redis.hset(redis_key, key, str(value))