migrate Jobs to redis

This commit is contained in:
Houkime 2022-11-16 13:54:54 +00:00
parent 9ee0240bbd
commit f7b7e5a0be
2 changed files with 71 additions and 46 deletions

View file

@ -17,13 +17,16 @@ A job is a dictionary with the following keys:
import typing import typing
import datetime import datetime
from uuid import UUID from uuid import UUID
import json
import uuid import uuid
from enum import Enum from enum import Enum
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.utils import ReadUserData, UserDataFiles, WriteUserData from selfprivacy_api.utils.redis_pool import RedisPool
import asyncio
loop = asyncio.get_event_loop()
class JobStatus(Enum): class JobStatus(Enum):
@ -66,8 +69,11 @@ class Jobs:
""" """
Reset the jobs list. Reset the jobs list.
""" """
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
user_data["jobs"] = [] jobs = Jobs.get_jobs()
for job in jobs:
loop.run_until_complete(r.delete(redis_key_from_uuid(job.uid)))
loop.run_until_complete(r.delete("jobs"))
@staticmethod @staticmethod
def add( def add(
@ -95,13 +101,10 @@ class Jobs:
error=None, error=None,
result=None, result=None,
) )
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
try: store_job_as_hash(r, redis_key_from_uuid(job.uid), job)
if "jobs" not in user_data: coroutine = r.lpush("jobs", redis_key_from_uuid(job.uid))
user_data["jobs"] = [] loop.run_until_complete(coroutine)
user_data["jobs"].append(json.loads(job.json()))
except json.decoder.JSONDecodeError:
user_data["jobs"] = [json.loads(job.json())]
return job return job
@staticmethod @staticmethod
@ -116,13 +119,10 @@ class Jobs:
""" """
Remove a job from the jobs list. Remove a job from the jobs list.
""" """
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
if "jobs" not in user_data: key = redis_key_from_uuid(job_uuid)
user_data["jobs"] = [] loop.run_until_complete(r.delete(key))
for i, j in enumerate(user_data["jobs"]): loop.run_until_complete(r.lrem("jobs", 0, key))
if j["uid"] == job_uuid:
del user_data["jobs"][i]
return True
return False return False
@staticmethod @staticmethod
@ -154,13 +154,10 @@ class Jobs:
if status in (JobStatus.FINISHED, JobStatus.ERROR): if status in (JobStatus.FINISHED, JobStatus.ERROR):
job.finished_at = datetime.datetime.now() job.finished_at = datetime.datetime.now()
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
if "jobs" not in user_data: key = redis_key_from_uuid(job.uid)
user_data["jobs"] = [] if exists_sync(r, key):
for i, j in enumerate(user_data["jobs"]): store_job_as_hash(r, key, job)
if j["uid"] == str(job.uid):
user_data["jobs"][i] = json.loads(job.json())
break
return job return job
@ -169,12 +166,10 @@ class Jobs:
""" """
Get a job from the jobs list. Get a job from the jobs list.
""" """
with ReadUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
if "jobs" not in user_data: key = redis_key_from_uuid(uid)
user_data["jobs"] = [] if exists_sync(r, key):
for job in user_data["jobs"]: return job_from_hash(r, key)
if job["uid"] == uid:
return Job(**job)
return None return None
@staticmethod @staticmethod
@ -182,23 +177,54 @@ class Jobs:
""" """
Get the jobs list. Get the jobs list.
""" """
with ReadUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
try: jobs = loop.run_until_complete(r.lrange("jobs", 0, -1))
if "jobs" not in user_data: return [job_from_hash(r, job_key) for job_key in jobs]
user_data["jobs"] = []
return [Job(**job) for job in user_data["jobs"]]
except json.decoder.JSONDecodeError:
return []
@staticmethod @staticmethod
def is_busy() -> bool: def is_busy() -> bool:
""" """
Check if there is a job running. Check if there is a job running.
""" """
with ReadUserData(UserDataFiles.JOBS) as user_data: for job in Jobs.get_jobs():
if "jobs" not in user_data: if job["status"] == JobStatus.RUNNING.value:
user_data["jobs"] = [] return True
for job in user_data["jobs"]:
if job["status"] == JobStatus.RUNNING.value:
return True
return False return False
def redis_key_from_uuid(uuid):
return "jobs:" + str(uuid)
def store_job_as_hash(r, redis_key, model):
for key, value in model.dict().items():
if isinstance(value, uuid.UUID):
value = str(value)
if isinstance(value, datetime.datetime):
value = value.isoformat()
if isinstance(value, JobStatus):
value = value.value
coroutine = r.hset(redis_key, key, str(value))
loop.run_until_complete(coroutine)
def job_from_hash(r, redis_key):
if exists_sync(r, redis_key):
job_dict = loop.run_until_complete(r.hgetall(redis_key))
for date in [
"created_at",
"updated_at",
"finished_at",
]:
if job_dict[date] != "None":
job_dict[date] = datetime.datetime.fromisoformat(job_dict[date])
for key in job_dict.keys():
if job_dict[key] == "None":
job_dict[key] = None
return Job(**job_dict)
return None
def exists_sync(r, key):
return loop.run_until_complete(r.exists(key))

View file

@ -1,14 +1,13 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
# pylint: disable=unused-argument # pylint: disable=unused-argument
import json
import pytest import pytest
from selfprivacy_api.utils import WriteUserData, ReadUserData
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Jobs, JobStatus
def test_jobs(authorized_client, jobs_file, shared_datadir): def test_jobs(authorized_client, jobs_file, shared_datadir):
jobs = Jobs() jobs = Jobs()
jobs.reset()
assert jobs.get_jobs() == [] assert jobs.get_jobs() == []
test_job = jobs.add( test_job = jobs.add(