feature(jobs): job update generator

This commit is contained in:
Houkime 2024-05-06 14:54:13 +00:00 committed by Inex Code
parent 6510d4cac6
commit 9bfffcd820
3 changed files with 102 additions and 15 deletions

View file

@ -15,6 +15,7 @@ A job is a dictionary with the following keys:
- result: result of the job - result: result of the job
""" """
import typing import typing
import asyncio
import datetime import datetime
from uuid import UUID from uuid import UUID
import uuid import uuid
@ -23,6 +24,7 @@ from enum import Enum
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.utils.redis_pool import RedisPool from selfprivacy_api.utils.redis_pool import RedisPool
from selfprivacy_api.utils.redis_model_storage import store_model_as_hash
JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days
@ -102,7 +104,7 @@ class Jobs:
result=None, result=None,
) )
redis = RedisPool().get_connection() redis = RedisPool().get_connection()
_store_job_as_hash(redis, _redis_key_from_uuid(job.uid), job) store_model_as_hash(redis, _redis_key_from_uuid(job.uid), job)
return job return job
@staticmethod @staticmethod
@ -218,7 +220,7 @@ class Jobs:
redis = RedisPool().get_connection() redis = RedisPool().get_connection()
key = _redis_key_from_uuid(job.uid) key = _redis_key_from_uuid(job.uid)
if redis.exists(key): if redis.exists(key):
_store_job_as_hash(redis, key, job) store_model_as_hash(redis, key, job)
if status in (JobStatus.FINISHED, JobStatus.ERROR): if status in (JobStatus.FINISHED, JobStatus.ERROR):
redis.expire(key, JOB_EXPIRATION_SECONDS) redis.expire(key, JOB_EXPIRATION_SECONDS)
@ -294,17 +296,6 @@ def _progress_log_key_from_uuid(uuid_string) -> str:
return PROGRESS_LOGS_PREFIX + str(uuid_string) return PROGRESS_LOGS_PREFIX + str(uuid_string)
def _store_job_as_hash(redis, redis_key, model) -> None:
for key, value in model.dict().items():
if isinstance(value, uuid.UUID):
value = str(value)
if isinstance(value, datetime.datetime):
value = value.isoformat()
if isinstance(value, JobStatus):
value = value.value
redis.hset(redis_key, key, str(value))
def _job_from_hash(redis, redis_key) -> typing.Optional[Job]: def _job_from_hash(redis, redis_key) -> typing.Optional[Job]:
if redis.exists(redis_key): if redis.exists(redis_key):
job_dict = redis.hgetall(redis_key) job_dict = redis.hgetall(redis_key)
@ -321,3 +312,15 @@ def _job_from_hash(redis, redis_key) -> typing.Optional[Job]:
return Job(**job_dict) return Job(**job_dict)
return None return None
async def job_notifications() -> typing.AsyncGenerator[dict, None]:
channel = await RedisPool().subscribe_to_keys("jobs:*")
while True:
try:
# we cannot timeout here because we do not know when the next message is supposed to arrive
message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore
if message is not None:
yield message
except GeneratorExit:
break

View file

@ -1,15 +1,23 @@
import uuid
from datetime import datetime from datetime import datetime
from typing import Optional from typing import Optional
from enum import Enum from enum import Enum
def store_model_as_hash(redis, redis_key, model): def store_model_as_hash(redis, redis_key, model):
for key, value in model.dict().items(): model_dict = model.dict()
for key, value in model_dict.items():
if isinstance(value, uuid.UUID):
value = str(value)
if isinstance(value, datetime): if isinstance(value, datetime):
value = value.isoformat() value = value.isoformat()
if isinstance(value, Enum): if isinstance(value, Enum):
value = value.value value = value.value
redis.hset(redis_key, key, str(value)) value = str(value)
model_dict[key] = value
redis.hset(redis_key, mapping=model_dict)
def hash_as_model(redis, redis_key: str, model_class): def hash_as_model(redis, redis_key: str, model_class):

View file

@ -7,6 +7,8 @@ from typing import List
from selfprivacy_api.utils.redis_pool import RedisPool from selfprivacy_api.utils.redis_pool import RedisPool
from selfprivacy_api.jobs import Jobs, job_notifications
TEST_KEY = "test:test" TEST_KEY = "test:test"
STOPWORD = "STOP" STOPWORD = "STOP"
@ -140,3 +142,77 @@ async def test_keyspace_notifications(empty_redis, event_loop):
"pattern": f"__keyspace@0__:{TEST_KEY}", "pattern": f"__keyspace@0__:{TEST_KEY}",
"type": "pmessage", "type": "pmessage",
} }
@pytest.mark.asyncio
async def test_keyspace_notifications_patterns(empty_redis, event_loop):
pattern = "test*"
pubsub = await RedisPool().subscribe_to_keys(pattern)
async with pubsub:
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
empty_redis.set(TEST_KEY, "I am set!")
message = await future_message
assert message is not None
assert message["data"] is not None
assert message == {
"channel": f"__keyspace@0__:{TEST_KEY}",
"data": "set",
"pattern": f"__keyspace@0__:{pattern}",
"type": "pmessage",
}
@pytest.mark.asyncio
async def test_keyspace_notifications_jobs(empty_redis, event_loop):
pattern = "jobs:*"
pubsub = await RedisPool().subscribe_to_keys(pattern)
async with pubsub:
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
Jobs.add("testjob1", "test.test", "Testing aaaalll day")
message = await future_message
assert message is not None
assert message["data"] is not None
assert message["data"] == "hset"
async def reader_of_jobs() -> List[dict]:
"""
Reads 3 job updates and exits
"""
result: List[dict] = []
async for message in job_notifications():
result.append(message)
if len(result) >= 3:
break
return result
@pytest.mark.asyncio
async def test_jobs_generator(empty_redis, event_loop):
# Will read exactly 3 job messages
future_messages = asyncio.create_task(reader_of_jobs())
await asyncio.sleep(1)
Jobs.add("testjob1", "test.test", "Testing aaaalll day")
Jobs.add("testjob2", "test.test", "Testing aaaalll day")
Jobs.add("testjob3", "test.test", "Testing aaaalll day")
Jobs.add("testjob4", "test.test", "Testing aaaalll day")
assert len(Jobs.get_jobs()) == 4
r = RedisPool().get_connection()
assert len(r.keys("jobs:*")) == 4
messages = await future_messages
assert len(messages) == 3
channels = [message["channel"] for message in messages]
operations = [message["data"] for message in messages]
assert set(operations) == set(["hset"]) # all of them are hsets
# Asserting that all of jobs emitted exactly one message
jobs = Jobs.get_jobs()
names = ["testjob1", "testjob2", "testjob3"]
ids = [str(job.uid) for job in jobs if job.name in names]
for id in ids:
assert id in " ".join(channels)
# Asserting that they came in order
assert "testjob4" not in " ".join(channels)