mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-22 12:11:26 +00:00
test(redis): test key event notifications
This commit is contained in:
parent
4d60b7264a
commit
5bf5e7462f
|
@ -15,6 +15,8 @@ STOPWORD = "STOP"
|
||||||
def empty_redis(event_loop):
|
def empty_redis(event_loop):
|
||||||
r = RedisPool().get_connection()
|
r = RedisPool().get_connection()
|
||||||
r.flushdb()
|
r.flushdb()
|
||||||
|
r.config_set("notify-keyspace-events", "KEA")
|
||||||
|
assert r.config_get("notify-keyspace-events")["notify-keyspace-events"] == "AKE"
|
||||||
yield r
|
yield r
|
||||||
r.flushdb()
|
r.flushdb()
|
||||||
|
|
||||||
|
@ -51,6 +53,15 @@ async def channel_reader(channel: redis.client.PubSub) -> List[dict]:
|
||||||
return result
|
return result
|
||||||
|
|
||||||
|
|
||||||
|
async def channel_reader_onemessage(channel: redis.client.PubSub) -> dict:
|
||||||
|
while True:
|
||||||
|
# Mypy cannot correctly detect that it is a coroutine
|
||||||
|
# But it is
|
||||||
|
message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore
|
||||||
|
if message is not None:
|
||||||
|
return message
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.asyncio
|
@pytest.mark.asyncio
|
||||||
async def test_pubsub(empty_redis, event_loop):
|
async def test_pubsub(empty_redis, event_loop):
|
||||||
# Adapted from :
|
# Adapted from :
|
||||||
|
@ -93,3 +104,40 @@ async def test_pubsub(empty_redis, event_loop):
|
||||||
assert message["data"] == STOPWORD
|
assert message["data"] == STOPWORD
|
||||||
|
|
||||||
await r.close()
|
await r.close()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_keyspace_notifications_simple(empty_redis, event_loop):
|
||||||
|
r = RedisPool().get_connection_async()
|
||||||
|
await r.set(TEST_KEY, "I am not empty")
|
||||||
|
async with r.pubsub() as pubsub:
|
||||||
|
await pubsub.subscribe("__keyspace@0__:" + TEST_KEY)
|
||||||
|
|
||||||
|
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
|
||||||
|
empty_redis.set(TEST_KEY, "I am set!")
|
||||||
|
message = await future_message
|
||||||
|
assert message is not None
|
||||||
|
assert message["data"] is not None
|
||||||
|
assert message == {
|
||||||
|
"channel": f"__keyspace@0__:{TEST_KEY}",
|
||||||
|
"data": "set",
|
||||||
|
"pattern": None,
|
||||||
|
"type": "message",
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_keyspace_notifications(empty_redis, event_loop):
|
||||||
|
pubsub = await RedisPool().subscribe_to_keys(TEST_KEY)
|
||||||
|
async with pubsub:
|
||||||
|
future_message = asyncio.create_task(channel_reader_onemessage(pubsub))
|
||||||
|
empty_redis.set(TEST_KEY, "I am set!")
|
||||||
|
message = await future_message
|
||||||
|
assert message is not None
|
||||||
|
assert message["data"] is not None
|
||||||
|
assert message == {
|
||||||
|
"channel": f"__keyspace@0__:{TEST_KEY}",
|
||||||
|
"data": "set",
|
||||||
|
"pattern": f"__keyspace@0__:{TEST_KEY}",
|
||||||
|
"type": "pmessage",
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in a new issue