mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-09 09:31:04 +00:00
feature(jobs): job update generator
This commit is contained in:
parent
b204d4a9b3
commit
43980f16ea
|
@ -15,6 +15,7 @@ A job is a dictionary with the following keys:
|
|||
- result: result of the job
|
||||
"""
|
||||
import typing
|
||||
import asyncio
|
||||
import datetime
|
||||
from uuid import UUID
|
||||
import uuid
|
||||
|
@ -23,6 +24,7 @@ from enum import Enum
|
|||
from pydantic import BaseModel
|
||||
|
||||
from selfprivacy_api.utils.redis_pool import RedisPool
|
||||
from selfprivacy_api.utils.redis_model_storage import store_model_as_hash
|
||||
|
||||
JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days
|
||||
|
||||
|
@ -102,7 +104,7 @@ class Jobs:
|
|||
result=None,
|
||||
)
|
||||
redis = RedisPool().get_connection()
|
||||
_store_job_as_hash(redis, _redis_key_from_uuid(job.uid), job)
|
||||
store_model_as_hash(redis, _redis_key_from_uuid(job.uid), job)
|
||||
return job
|
||||
|
||||
@staticmethod
|
||||
|
@ -218,7 +220,7 @@ class Jobs:
|
|||
redis = RedisPool().get_connection()
|
||||
key = _redis_key_from_uuid(job.uid)
|
||||
if redis.exists(key):
|
||||
_store_job_as_hash(redis, key, job)
|
||||
store_model_as_hash(redis, key, job)
|
||||
if status in (JobStatus.FINISHED, JobStatus.ERROR):
|
||||
redis.expire(key, JOB_EXPIRATION_SECONDS)
|
||||
|
||||
|
@ -294,17 +296,6 @@ def _progress_log_key_from_uuid(uuid_string) -> str:
|
|||
return PROGRESS_LOGS_PREFIX + str(uuid_string)
|
||||
|
||||
|
||||
def _store_job_as_hash(redis, redis_key, model) -> None:
|
||||
for key, value in model.dict().items():
|
||||
if isinstance(value, uuid.UUID):
|
||||
value = str(value)
|
||||
if isinstance(value, datetime.datetime):
|
||||
value = value.isoformat()
|
||||
if isinstance(value, JobStatus):
|
||||
value = value.value
|
||||
redis.hset(redis_key, key, str(value))
|
||||
|
||||
|
||||
def _job_from_hash(redis, redis_key) -> typing.Optional[Job]:
|
||||
if redis.exists(redis_key):
|
||||
job_dict = redis.hgetall(redis_key)
|
||||
|
@ -321,3 +312,15 @@ def _job_from_hash(redis, redis_key) -> typing.Optional[Job]:
|
|||
|
||||
return Job(**job_dict)
|
||||
return None
|
||||
|
||||
|
||||
async def job_notifications() -> typing.AsyncGenerator[dict, None]:
|
||||
channel = await RedisPool().subscribe_to_keys("jobs:*")
|
||||
while True:
|
||||
try:
|
||||
# we cannot timeout here because we do not know when the next message is supposed to arrive
|
||||
message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore
|
||||
if message is not None:
|
||||
yield message
|
||||
except GeneratorExit:
|
||||
break
|
||||
|
|
|
@ -1,15 +1,23 @@
|
|||
import uuid
|
||||
|
||||
from datetime import datetime
|
||||
from typing import Optional
|
||||
from enum import Enum
|
||||
|
||||
|
||||
def store_model_as_hash(redis, redis_key, model):
|
||||
for key, value in model.dict().items():
|
||||
model_dict = model.dict()
|
||||
for key, value in model_dict.items():
|
||||
if isinstance(value, uuid.UUID):
|
||||
value = str(value)
|
||||
if isinstance(value, datetime):
|
||||
value = value.isoformat()
|
||||
if isinstance(value, Enum):
|
||||
value = value.value
|
||||
redis.hset(redis_key, key, str(value))
|
||||
value = str(value)
|
||||
model_dict[key] = value
|
||||
|
||||
redis.hset(redis_key, mapping=model_dict)
|
||||
|
||||
|
||||
def hash_as_model(redis, redis_key: str, model_class):
|
||||
|
|
|
@ -7,6 +7,8 @@ from typing import List
|
|||
|
||||
from selfprivacy_api.utils.redis_pool import RedisPool
|
||||
|
||||
from selfprivacy_api.jobs import Jobs, job_notifications
|
||||
|
||||
TEST_KEY = "test:test"
|
||||
STOPWORD = "STOP"
|
||||
|
||||
|
@ -140,3 +142,77 @@ async def test_keyspace_notifications(empty_redis, event_loop):
|
|||
"pattern": f"__keyspace@0__:{TEST_KEY}",
|
||||
"type": "pmessage",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keyspace_notifications_patterns(empty_redis, event_loop):
|
||||
pattern = "test*"
|
||||
pubsub = await RedisPool().subscribe_to_keys(pattern)
|
||||
async with pubsub:
|
||||
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
|
||||
empty_redis.set(TEST_KEY, "I am set!")
|
||||
message = await future_message
|
||||
assert message is not None
|
||||
assert message["data"] is not None
|
||||
assert message == {
|
||||
"channel": f"__keyspace@0__:{TEST_KEY}",
|
||||
"data": "set",
|
||||
"pattern": f"__keyspace@0__:{pattern}",
|
||||
"type": "pmessage",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keyspace_notifications_jobs(empty_redis, event_loop):
|
||||
pattern = "jobs:*"
|
||||
pubsub = await RedisPool().subscribe_to_keys(pattern)
|
||||
async with pubsub:
|
||||
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
|
||||
Jobs.add("testjob1", "test.test", "Testing aaaalll day")
|
||||
message = await future_message
|
||||
assert message is not None
|
||||
assert message["data"] is not None
|
||||
assert message["data"] == "hset"
|
||||
|
||||
|
||||
async def reader_of_jobs() -> List[dict]:
|
||||
"""
|
||||
Reads 3 job updates and exits
|
||||
"""
|
||||
result: List[dict] = []
|
||||
async for message in job_notifications():
|
||||
result.append(message)
|
||||
if len(result) >= 3:
|
||||
break
|
||||
return result
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_jobs_generator(empty_redis, event_loop):
|
||||
# Will read exactly 3 job messages
|
||||
future_messages = asyncio.create_task(reader_of_jobs())
|
||||
await asyncio.sleep(1)
|
||||
|
||||
Jobs.add("testjob1", "test.test", "Testing aaaalll day")
|
||||
Jobs.add("testjob2", "test.test", "Testing aaaalll day")
|
||||
Jobs.add("testjob3", "test.test", "Testing aaaalll day")
|
||||
Jobs.add("testjob4", "test.test", "Testing aaaalll day")
|
||||
|
||||
assert len(Jobs.get_jobs()) == 4
|
||||
r = RedisPool().get_connection()
|
||||
assert len(r.keys("jobs:*")) == 4
|
||||
|
||||
messages = await future_messages
|
||||
assert len(messages) == 3
|
||||
channels = [message["channel"] for message in messages]
|
||||
operations = [message["data"] for message in messages]
|
||||
assert set(operations) == set(["hset"]) # all of them are hsets
|
||||
|
||||
# Asserting that all of jobs emitted exactly one message
|
||||
jobs = Jobs.get_jobs()
|
||||
names = ["testjob1", "testjob2", "testjob3"]
|
||||
ids = [str(job.uid) for job in jobs if job.name in names]
|
||||
for id in ids:
|
||||
assert id in " ".join(channels)
|
||||
# Asserting that they came in order
|
||||
assert "testjob4" not in " ".join(channels)
|
||||
|
|
Loading…
Reference in a new issue