Merge pull request 'migrate Jobs to redis' (#20) from redis/jobs into redis/connection-pool

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/20
This commit is contained in:
Inex Code 2022-11-28 14:37:45 +02:00
commit 3ce71b0993
3 changed files with 77 additions and 47 deletions

View file

@ -17,13 +17,14 @@ A job is a dictionary with the following keys:
import typing import typing
import datetime import datetime
from uuid import UUID from uuid import UUID
import json
import uuid import uuid
from enum import Enum from enum import Enum
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.utils import ReadUserData, UserDataFiles, WriteUserData from selfprivacy_api.utils.redis_pool import RedisPool
JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days
class JobStatus(Enum): class JobStatus(Enum):
@ -66,8 +67,10 @@ class Jobs:
""" """
Reset the jobs list. Reset the jobs list.
""" """
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
user_data["jobs"] = [] jobs = Jobs.get_jobs()
for job in jobs:
r.delete(redis_key_from_uuid(job.uid))
@staticmethod @staticmethod
def add( def add(
@ -95,13 +98,8 @@ class Jobs:
error=None, error=None,
result=None, result=None,
) )
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
try: store_job_as_hash(r, redis_key_from_uuid(job.uid), job)
if "jobs" not in user_data:
user_data["jobs"] = []
user_data["jobs"].append(json.loads(job.json()))
except json.decoder.JSONDecodeError:
user_data["jobs"] = [json.loads(job.json())]
return job return job
@staticmethod @staticmethod
@ -116,13 +114,9 @@ class Jobs:
""" """
Remove a job from the jobs list. Remove a job from the jobs list.
""" """
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
if "jobs" not in user_data: key = redis_key_from_uuid(job_uuid)
user_data["jobs"] = [] r.delete(key)
for i, j in enumerate(user_data["jobs"]):
if j["uid"] == job_uuid:
del user_data["jobs"][i]
return True
return False return False
@staticmethod @staticmethod
@ -154,13 +148,12 @@ class Jobs:
if status in (JobStatus.FINISHED, JobStatus.ERROR): if status in (JobStatus.FINISHED, JobStatus.ERROR):
job.finished_at = datetime.datetime.now() job.finished_at = datetime.datetime.now()
with WriteUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
if "jobs" not in user_data: key = redis_key_from_uuid(job.uid)
user_data["jobs"] = [] if r.exists(key):
for i, j in enumerate(user_data["jobs"]): store_job_as_hash(r, key, job)
if j["uid"] == str(job.uid): if status in (JobStatus.FINISHED, JobStatus.ERROR):
user_data["jobs"][i] = json.loads(job.json()) r.expire(key, JOB_EXPIRATION_SECONDS)
break
return job return job
@ -169,12 +162,10 @@ class Jobs:
""" """
Get a job from the jobs list. Get a job from the jobs list.
""" """
with ReadUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
if "jobs" not in user_data: key = redis_key_from_uuid(uid)
user_data["jobs"] = [] if r.exists(key):
for job in user_data["jobs"]: return job_from_hash(r, key)
if job["uid"] == uid:
return Job(**job)
return None return None
@staticmethod @staticmethod
@ -182,23 +173,49 @@ class Jobs:
""" """
Get the jobs list. Get the jobs list.
""" """
with ReadUserData(UserDataFiles.JOBS) as user_data: r = RedisPool().get_connection()
try: jobs = r.keys("jobs:*")
if "jobs" not in user_data: return [job_from_hash(r, job_key) for job_key in jobs]
user_data["jobs"] = []
return [Job(**job) for job in user_data["jobs"]]
except json.decoder.JSONDecodeError:
return []
@staticmethod @staticmethod
def is_busy() -> bool: def is_busy() -> bool:
""" """
Check if there is a job running. Check if there is a job running.
""" """
with ReadUserData(UserDataFiles.JOBS) as user_data: for job in Jobs.get_jobs():
if "jobs" not in user_data: if job["status"] == JobStatus.RUNNING.value:
user_data["jobs"] = [] return True
for job in user_data["jobs"]:
if job["status"] == JobStatus.RUNNING.value:
return True
return False return False
def redis_key_from_uuid(uuid):
return "jobs:" + str(uuid)
def store_job_as_hash(r, redis_key, model):
for key, value in model.dict().items():
if isinstance(value, uuid.UUID):
value = str(value)
if isinstance(value, datetime.datetime):
value = value.isoformat()
if isinstance(value, JobStatus):
value = value.value
r.hset(redis_key, key, str(value))
def job_from_hash(r, redis_key):
if r.exists(redis_key):
job_dict = r.hgetall(redis_key)
for date in [
"created_at",
"updated_at",
"finished_at",
]:
if job_dict[date] != "None":
job_dict[date] = datetime.datetime.fromisoformat(job_dict[date])
for key in job_dict.keys():
if job_dict[key] == "None":
job_dict[key] = None
return Job(**job_dict)
return None

View file

@ -1,7 +1,7 @@
""" """
Redis pool module for selfprivacy_api Redis pool module for selfprivacy_api
""" """
import redis.asyncio as redis import redis
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
REDIS_SOCKET = "/run/redis-sp-api/redis.sock" REDIS_SOCKET = "/run/redis-sp-api/redis.sock"

View file

@ -1,14 +1,14 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
# pylint: disable=unused-argument # pylint: disable=unused-argument
import json
import pytest import pytest
from selfprivacy_api.utils import WriteUserData, ReadUserData
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Jobs, JobStatus
import selfprivacy_api.jobs as jobsmodule
def test_jobs(authorized_client, jobs_file, shared_datadir): def test_jobs(authorized_client, jobs_file, shared_datadir):
jobs = Jobs() jobs = Jobs()
jobs.reset()
assert jobs.get_jobs() == [] assert jobs.get_jobs() == []
test_job = jobs.add( test_job = jobs.add(
@ -31,6 +31,19 @@ def test_jobs(authorized_client, jobs_file, shared_datadir):
assert jobs.get_jobs() == [test_job] assert jobs.get_jobs() == [test_job]
backup = jobsmodule.JOB_EXPIRATION_SECONDS
jobsmodule.JOB_EXPIRATION_SECONDS = 0
jobs.update(
job=test_job,
status=JobStatus.FINISHED,
status_text="Yaaay!",
progress=100,
)
assert jobs.get_jobs() == []
jobsmodule.JOB_EXPIRATION_SECONDS = backup
@pytest.fixture @pytest.fixture
def mock_subprocess_run(mocker): def mock_subprocess_run(mocker):