mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-24 17:56:44 +00:00
test(redis): test key event notifications
This commit is contained in:
parent
f08dc3ad23
commit
5558577927
|
@ -15,6 +15,8 @@ STOPWORD = "STOP"
|
|||
def empty_redis(event_loop):
|
||||
r = RedisPool().get_connection()
|
||||
r.flushdb()
|
||||
r.config_set("notify-keyspace-events", "KEA")
|
||||
assert r.config_get("notify-keyspace-events")["notify-keyspace-events"] == "AKE"
|
||||
yield r
|
||||
r.flushdb()
|
||||
|
||||
|
@ -51,6 +53,15 @@ async def channel_reader(channel: redis.client.PubSub) -> List[dict]:
|
|||
return result
|
||||
|
||||
|
||||
async def channel_reader_onemessage(channel: redis.client.PubSub) -> dict:
|
||||
while True:
|
||||
# Mypy cannot correctly detect that it is a coroutine
|
||||
# But it is
|
||||
message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore
|
||||
if message is not None:
|
||||
return message
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_pubsub(empty_redis, event_loop):
|
||||
# Adapted from :
|
||||
|
@ -93,3 +104,40 @@ async def test_pubsub(empty_redis, event_loop):
|
|||
assert message["data"] == STOPWORD
|
||||
|
||||
await r.close()
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keyspace_notifications_simple(empty_redis, event_loop):
|
||||
r = RedisPool().get_connection_async()
|
||||
await r.set(TEST_KEY, "I am not empty")
|
||||
async with r.pubsub() as pubsub:
|
||||
await pubsub.subscribe("__keyspace@0__:" + TEST_KEY)
|
||||
|
||||
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
|
||||
empty_redis.set(TEST_KEY, "I am set!")
|
||||
message = await future_message
|
||||
assert message is not None
|
||||
assert message["data"] is not None
|
||||
assert message == {
|
||||
"channel": f"__keyspace@0__:{TEST_KEY}",
|
||||
"data": "set",
|
||||
"pattern": None,
|
||||
"type": "message",
|
||||
}
|
||||
|
||||
|
||||
@pytest.mark.asyncio
|
||||
async def test_keyspace_notifications(empty_redis, event_loop):
|
||||
pubsub = await RedisPool().subscribe_to_keys(TEST_KEY)
|
||||
async with pubsub:
|
||||
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
|
||||
empty_redis.set(TEST_KEY, "I am set!")
|
||||
message = await future_message
|
||||
assert message is not None
|
||||
assert message["data"] is not None
|
||||
assert message == {
|
||||
"channel": f"__keyspace@0__:{TEST_KEY}",
|
||||
"data": "set",
|
||||
"pattern": f"__keyspace@0__:{TEST_KEY}",
|
||||
"type": "pmessage",
|
||||
}
|
||||
|
|
Loading…
Reference in a new issue