From 43980f16ea43d302fd6628de1b841ac81c5523a1 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 6 May 2024 14:54:13 +0000 Subject: [PATCH] feature(jobs): job update generator --- selfprivacy_api/jobs/__init__.py | 29 ++++---- selfprivacy_api/utils/redis_model_storage.py | 12 +++- tests/test_redis.py | 76 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 15 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 4649bb0..3dd48c4 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -15,6 +15,7 @@ A job is a dictionary with the following keys: - result: result of the job """ import typing +import asyncio import datetime from uuid import UUID import uuid @@ -23,6 +24,7 @@ from enum import Enum from pydantic import BaseModel from selfprivacy_api.utils.redis_pool import RedisPool +from selfprivacy_api.utils.redis_model_storage import store_model_as_hash JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days @@ -102,7 +104,7 @@ class Jobs: result=None, ) redis = RedisPool().get_connection() - _store_job_as_hash(redis, _redis_key_from_uuid(job.uid), job) + store_model_as_hash(redis, _redis_key_from_uuid(job.uid), job) return job @staticmethod @@ -218,7 +220,7 @@ class Jobs: redis = RedisPool().get_connection() key = _redis_key_from_uuid(job.uid) if redis.exists(key): - _store_job_as_hash(redis, key, job) + store_model_as_hash(redis, key, job) if status in (JobStatus.FINISHED, JobStatus.ERROR): redis.expire(key, JOB_EXPIRATION_SECONDS) @@ -294,17 +296,6 @@ def _progress_log_key_from_uuid(uuid_string) -> str: return PROGRESS_LOGS_PREFIX + str(uuid_string) -def _store_job_as_hash(redis, redis_key, model) -> None: - for key, value in model.dict().items(): - if isinstance(value, uuid.UUID): - value = str(value) - if isinstance(value, datetime.datetime): - value = value.isoformat() - if isinstance(value, JobStatus): - value = value.value - redis.hset(redis_key, key, str(value)) - - def _job_from_hash(redis, redis_key) -> typing.Optional[Job]: if redis.exists(redis_key): job_dict = redis.hgetall(redis_key) @@ -321,3 +312,15 @@ def _job_from_hash(redis, redis_key) -> typing.Optional[Job]: return Job(**job_dict) return None + + +async def job_notifications() -> typing.AsyncGenerator[dict, None]: + channel = await RedisPool().subscribe_to_keys("jobs:*") + while True: + try: + # we cannot timeout here because we do not know when the next message is supposed to arrive + message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore + if message is not None: + yield message + except GeneratorExit: + break diff --git a/selfprivacy_api/utils/redis_model_storage.py b/selfprivacy_api/utils/redis_model_storage.py index 06dfe8c..7d84210 100644 --- a/selfprivacy_api/utils/redis_model_storage.py +++ b/selfprivacy_api/utils/redis_model_storage.py @@ -1,15 +1,23 @@ +import uuid + from datetime import datetime from typing import Optional from enum import Enum def store_model_as_hash(redis, redis_key, model): - for key, value in model.dict().items(): + model_dict = model.dict() + for key, value in model_dict.items(): + if isinstance(value, uuid.UUID): + value = str(value) if isinstance(value, datetime): value = value.isoformat() if isinstance(value, Enum): value = value.value - redis.hset(redis_key, key, str(value)) + value = str(value) + model_dict[key] = value + + redis.hset(redis_key, mapping=model_dict) def hash_as_model(redis, redis_key: str, model_class): diff --git a/tests/test_redis.py b/tests/test_redis.py index 70ef43a..02dfb21 100644 --- a/tests/test_redis.py +++ b/tests/test_redis.py @@ -7,6 +7,8 @@ from typing import List from selfprivacy_api.utils.redis_pool import RedisPool +from selfprivacy_api.jobs import Jobs, job_notifications + TEST_KEY = "test:test" STOPWORD = "STOP" @@ -140,3 +142,77 @@ async def test_keyspace_notifications(empty_redis, event_loop): "pattern": f"__keyspace@0__:{TEST_KEY}", "type": "pmessage", } + + +@pytest.mark.asyncio +async def test_keyspace_notifications_patterns(empty_redis, event_loop): + pattern = "test*" + pubsub = await RedisPool().subscribe_to_keys(pattern) + async with pubsub: + future_message = asyncio.create_task(channel_reader_onemessage(pubsub)) + empty_redis.set(TEST_KEY, "I am set!") + message = await future_message + assert message is not None + assert message["data"] is not None + assert message == { + "channel": f"__keyspace@0__:{TEST_KEY}", + "data": "set", + "pattern": f"__keyspace@0__:{pattern}", + "type": "pmessage", + } + + +@pytest.mark.asyncio +async def test_keyspace_notifications_jobs(empty_redis, event_loop): + pattern = "jobs:*" + pubsub = await RedisPool().subscribe_to_keys(pattern) + async with pubsub: + future_message = asyncio.create_task(channel_reader_onemessage(pubsub)) + Jobs.add("testjob1", "test.test", "Testing aaaalll day") + message = await future_message + assert message is not None + assert message["data"] is not None + assert message["data"] == "hset" + + +async def reader_of_jobs() -> List[dict]: + """ + Reads 3 job updates and exits + """ + result: List[dict] = [] + async for message in job_notifications(): + result.append(message) + if len(result) >= 3: + break + return result + + +@pytest.mark.asyncio +async def test_jobs_generator(empty_redis, event_loop): + # Will read exactly 3 job messages + future_messages = asyncio.create_task(reader_of_jobs()) + await asyncio.sleep(1) + + Jobs.add("testjob1", "test.test", "Testing aaaalll day") + Jobs.add("testjob2", "test.test", "Testing aaaalll day") + Jobs.add("testjob3", "test.test", "Testing aaaalll day") + Jobs.add("testjob4", "test.test", "Testing aaaalll day") + + assert len(Jobs.get_jobs()) == 4 + r = RedisPool().get_connection() + assert len(r.keys("jobs:*")) == 4 + + messages = await future_messages + assert len(messages) == 3 + channels = [message["channel"] for message in messages] + operations = [message["data"] for message in messages] + assert set(operations) == set(["hset"]) # all of them are hsets + + # Asserting that all of jobs emitted exactly one message + jobs = Jobs.get_jobs() + names = ["testjob1", "testjob2", "testjob3"] + ids = [str(job.uid) for job in jobs if job.name in names] + for id in ids: + assert id in " ".join(channels) + # Asserting that they came in order + assert "testjob4" not in " ".join(channels)