selfprivacy-rest-api/selfprivacy_api/utils/redis_pool.py
2024-07-04 17:19:25 +03:00

51 lines
1.4 KiB
Python

"""
Redis pool module for selfprivacy_api
"""
import redis
import redis.asyncio as redis_async
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock"
class RedisPool(metaclass=SingletonMetaclass):
"""
Redis connection pool singleton.
"""
def __init__(self):
url = RedisPool.connection_url(dbnumber=0)
# We need a normal sync pool because otherwise
# our whole API will need to be async
self._pool = redis.ConnectionPool.from_url(
url,
decode_responses=True,
)
# We need an async pool for pubsub
self._async_pool = redis_async.ConnectionPool.from_url(
url,
decode_responses=True,
)
@staticmethod
def connection_url(dbnumber: int) -> str:
"""
redis://[[username]:[password]]@localhost:6379/0
unix://[username@]/path/to/socket.sock?db=0[&password=password]
"""
return f"unix://{REDIS_SOCKET}?db={dbnumber}"
def get_connection(self):
"""
Get a connection from the pool.
"""
return redis.Redis(connection_pool=self._pool)
def get_connection_async(self) -> redis_async.Redis:
"""
Get an async connection from the pool.
Async connections allow pubsub.
"""
return redis_async.Redis(connection_pool=self._async_pool)