Test of the redis subscription

This commit is contained in:
inexcode 2022-10-24 03:55:48 +03:00
parent 0a09a338b8
commit 3ba94e02da
2 changed files with 21 additions and 3 deletions

View file

@ -2,6 +2,9 @@
# pylint: disable=too-few-public-methods
import asyncio
import async_timeout
import redis.asyncio as redis
from typing import AsyncGenerator
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
@ -90,9 +93,22 @@ class Subscription:
@strawberry.subscription(permission_classes=[IsAuthenticated])
async def count(self, target: int = 100) -> AsyncGenerator[int, None]:
for i in range(target):
yield i
await asyncio.sleep(0.5)
r = redis.from_url('unix:///run/redis-sp-api/redis.sock')
pubsub = r.pubsub()
await pubsub.psubscribe("__keyspace@0__:api_test")
while True:
try:
async with async_timeout.timeout(1):
message = await pubsub.get_message()
if message:
if message['data'] == 'set':
await r.get('api_test')
yield int(await r.get('api_test'))
else:
await asyncio.sleep(0.01)
except asyncio.TimeoutError:
pass
schema = strawberry.Schema(query=Query, mutation=Mutation, subscription=Subscription)

View file

@ -16,6 +16,7 @@ let
typing-extensions
psutil
black
redis
fastapi
uvicorn
(buildPythonPackage rec {
@ -33,6 +34,7 @@ let
pydantic
pygments
poetry
redis
# flask-cors
(buildPythonPackage rec {
pname = "graphql-core";