selfprivacy-rest-api/selfprivacy_api/utils/redis_model_storage.py

42 lines
1.1 KiB
Python
Raw Permalink Normal View History

2024-05-06 14:54:13 +00:00
import uuid
from datetime import datetime
from typing import Optional
from enum import Enum
def store_model_as_hash(redis, redis_key, model):
2024-05-06 14:54:13 +00:00
model_dict = model.dict()
for key, value in model_dict.items():
if isinstance(value, uuid.UUID):
value = str(value)
if isinstance(value, datetime):
value = value.isoformat()
if isinstance(value, Enum):
value = value.value
2024-05-06 14:54:13 +00:00
value = str(value)
model_dict[key] = value
redis.hset(redis_key, mapping=model_dict)
def hash_as_model(redis, redis_key: str, model_class):
token_dict = _model_dict_from_hash(redis, redis_key)
if token_dict is not None:
return model_class(**token_dict)
return None
def _prepare_model_dict(d: dict):
for key in d.keys():
if d[key] == "None":
d[key] = None
def _model_dict_from_hash(redis, redis_key: str) -> Optional[dict]:
if redis.exists(redis_key):
token_dict = redis.hgetall(redis_key)
_prepare_model_dict(token_dict)
return token_dict
return None