Migrate Jobs to redis

Jobs API shall now use redis to store and retrieve jobs. This will make
it possible to add pubsub for jobs updates. For now it uses blocking api
of redis.
This commit is contained in:
Houkime 2022-11-23 12:32:46 +00:00
parent f7b7e5a0be
commit 5afa2338ca
2 changed files with 10 additions and 16 deletions

View file

@ -23,10 +23,6 @@ from enum import Enum
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.utils.redis_pool import RedisPool from selfprivacy_api.utils.redis_pool import RedisPool
import asyncio
loop = asyncio.get_event_loop()
class JobStatus(Enum): class JobStatus(Enum):
@ -72,8 +68,8 @@ class Jobs:
r = RedisPool().get_connection() r = RedisPool().get_connection()
jobs = Jobs.get_jobs() jobs = Jobs.get_jobs()
for job in jobs: for job in jobs:
loop.run_until_complete(r.delete(redis_key_from_uuid(job.uid))) r.delete(redis_key_from_uuid(job.uid))
loop.run_until_complete(r.delete("jobs")) r.delete("jobs")
@staticmethod @staticmethod
def add( def add(
@ -103,8 +99,7 @@ class Jobs:
) )
r = RedisPool().get_connection() r = RedisPool().get_connection()
store_job_as_hash(r, redis_key_from_uuid(job.uid), job) store_job_as_hash(r, redis_key_from_uuid(job.uid), job)
coroutine = r.lpush("jobs", redis_key_from_uuid(job.uid)) r.lpush("jobs", redis_key_from_uuid(job.uid))
loop.run_until_complete(coroutine)
return job return job
@staticmethod @staticmethod
@ -121,8 +116,8 @@ class Jobs:
""" """
r = RedisPool().get_connection() r = RedisPool().get_connection()
key = redis_key_from_uuid(job_uuid) key = redis_key_from_uuid(job_uuid)
loop.run_until_complete(r.delete(key)) r.delete(key)
loop.run_until_complete(r.lrem("jobs", 0, key)) r.lrem("jobs", 0, key)
return False return False
@staticmethod @staticmethod
@ -178,7 +173,7 @@ class Jobs:
Get the jobs list. Get the jobs list.
""" """
r = RedisPool().get_connection() r = RedisPool().get_connection()
jobs = loop.run_until_complete(r.lrange("jobs", 0, -1)) jobs = r.lrange("jobs", 0, -1)
return [job_from_hash(r, job_key) for job_key in jobs] return [job_from_hash(r, job_key) for job_key in jobs]
@staticmethod @staticmethod
@ -204,13 +199,12 @@ def store_job_as_hash(r, redis_key, model):
value = value.isoformat() value = value.isoformat()
if isinstance(value, JobStatus): if isinstance(value, JobStatus):
value = value.value value = value.value
coroutine = r.hset(redis_key, key, str(value)) r.hset(redis_key, key, str(value))
loop.run_until_complete(coroutine)
def job_from_hash(r, redis_key): def job_from_hash(r, redis_key):
if exists_sync(r, redis_key): if exists_sync(r, redis_key):
job_dict = loop.run_until_complete(r.hgetall(redis_key)) job_dict = r.hgetall(redis_key)
for date in [ for date in [
"created_at", "created_at",
"updated_at", "updated_at",
@ -227,4 +221,4 @@ def job_from_hash(r, redis_key):
def exists_sync(r, key): def exists_sync(r, key):
return loop.run_until_complete(r.exists(key)) return r.exists(key)

View file

@ -1,7 +1,7 @@
""" """
Redis pool module for selfprivacy_api Redis pool module for selfprivacy_api
""" """
import redis.asyncio as redis import redis
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock" REDIS_SOCKET = "/run/redis-sp-api/redis.sock"