From 862f85b8fd47f0f372eadf32deb0db5626922e04 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 1 Apr 2024 20:12:02 +0000 Subject: [PATCH 01/34] feature(redis): async connections --- selfprivacy_api/utils/redis_pool.py | 19 ++++++++++++----- tests/test_redis.py | 33 +++++++++++++++++++++++++++++ 2 files changed, 47 insertions(+), 5 deletions(-) create mode 100644 tests/test_redis.py diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index 3d35f01..04ccb51 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -2,6 +2,7 @@ Redis pool module for selfprivacy_api """ import redis +import redis.asyncio as redis_async from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass @@ -14,11 +15,18 @@ class RedisPool(metaclass=SingletonMetaclass): """ def __init__(self): + url = RedisPool.connection_url(dbnumber=0) + # We need a normal sync pool because otherwise + # our whole API will need to be async self._pool = redis.ConnectionPool.from_url( - RedisPool.connection_url(dbnumber=0), + url, + decode_responses=True, + ) + # We need an async pool for pubsub + self._async_pool = redis_async.ConnectionPool.from_url( + url, decode_responses=True, ) - self._pubsub_connection = self.get_connection() @staticmethod def connection_url(dbnumber: int) -> str: @@ -34,8 +42,9 @@ class RedisPool(metaclass=SingletonMetaclass): """ return redis.Redis(connection_pool=self._pool) - def get_pubsub(self): + def get_connection_async(self) -> redis_async.Redis: """ - Get a pubsub connection from the pool. + Get an async connection from the pool. + Async connections allow pubsub. """ - return self._pubsub_connection.pubsub() + return redis_async.Redis(connection_pool=self._async_pool) diff --git a/tests/test_redis.py b/tests/test_redis.py new file mode 100644 index 0000000..48ec56e --- /dev/null +++ b/tests/test_redis.py @@ -0,0 +1,33 @@ +import asyncio +import pytest + +from selfprivacy_api.utils.redis_pool import RedisPool + +TEST_KEY = "test:test" + + +@pytest.fixture() +def empty_redis(): + r = RedisPool().get_connection() + r.flushdb() + yield r + r.flushdb() + + +async def write_to_test_key(): + r = RedisPool().get_connection_async() + async with r.pipeline(transaction=True) as pipe: + ok1, ok2 = await pipe.set(TEST_KEY, "value1").set(TEST_KEY, "value2").execute() + assert ok1 + assert ok2 + assert await r.get(TEST_KEY) == "value2" + await r.close() + + +def test_async_connection(empty_redis): + r = RedisPool().get_connection() + assert not r.exists(TEST_KEY) + # It _will_ report an error if it arises + asyncio.run(write_to_test_key()) + # Confirming that we can read result from sync connection too + assert r.get(TEST_KEY) == "value2" From 996cde15e14aa3a66e64e24d23aad86e95ed7742 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 15 Apr 2024 13:35:44 +0000 Subject: [PATCH 02/34] chore(nixos): add pytest-asyncio --- flake.nix | 1 + 1 file changed, 1 insertion(+) diff --git a/flake.nix b/flake.nix index f8b81aa..ab969a4 100644 --- a/flake.nix +++ b/flake.nix @@ -20,6 +20,7 @@ pytest-datadir pytest-mock pytest-subprocess + pytest-asyncio black mypy pylsp-mypy From 4d60b7264abdf3dd2af93ae7ad9128f020578e64 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 15 Apr 2024 13:37:04 +0000 Subject: [PATCH 03/34] test(async): pubsub --- selfprivacy_api/utils/redis_pool.py | 12 +++++- tests/test_redis.py | 64 ++++++++++++++++++++++++++++- 2 files changed, 73 insertions(+), 3 deletions(-) diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index 04ccb51..ea827d1 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -9,13 +9,15 @@ from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass REDIS_SOCKET = "/run/redis-sp-api/redis.sock" -class RedisPool(metaclass=SingletonMetaclass): +# class RedisPool(metaclass=SingletonMetaclass): +class RedisPool: """ Redis connection pool singleton. """ def __init__(self): - url = RedisPool.connection_url(dbnumber=0) + self._dbnumber = 0 + url = RedisPool.connection_url(dbnumber=self._dbnumber) # We need a normal sync pool because otherwise # our whole API will need to be async self._pool = redis.ConnectionPool.from_url( @@ -48,3 +50,9 @@ class RedisPool(metaclass=SingletonMetaclass): Async connections allow pubsub. """ return redis_async.Redis(connection_pool=self._async_pool) + + async def subscribe_to_keys(self, pattern: str) -> redis_async.client.PubSub: + async_redis = self.get_connection_async() + pubsub = async_redis.pubsub() + await pubsub.psubscribe(f"__keyspace@{self._dbnumber}__:" + pattern) + return pubsub diff --git a/tests/test_redis.py b/tests/test_redis.py index 48ec56e..2def280 100644 --- a/tests/test_redis.py +++ b/tests/test_redis.py @@ -1,13 +1,18 @@ import asyncio import pytest +import pytest_asyncio +from asyncio import streams +import redis +from typing import List from selfprivacy_api.utils.redis_pool import RedisPool TEST_KEY = "test:test" +STOPWORD = "STOP" @pytest.fixture() -def empty_redis(): +def empty_redis(event_loop): r = RedisPool().get_connection() r.flushdb() yield r @@ -31,3 +36,60 @@ def test_async_connection(empty_redis): asyncio.run(write_to_test_key()) # Confirming that we can read result from sync connection too assert r.get(TEST_KEY) == "value2" + + +async def channel_reader(channel: redis.client.PubSub) -> List[dict]: + result: List[dict] = [] + while True: + # Mypy cannot correctly detect that it is a coroutine + # But it is + message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore + if message is not None: + result.append(message) + if message["data"] == STOPWORD: + break + return result + + +@pytest.mark.asyncio +async def test_pubsub(empty_redis, event_loop): + # Adapted from : + # https://redis.readthedocs.io/en/stable/examples/asyncio_examples.html + # Sanity checking because of previous event loop bugs + assert event_loop == asyncio.get_event_loop() + assert event_loop == asyncio.events.get_event_loop() + assert event_loop == asyncio.events._get_event_loop() + assert event_loop == asyncio.events.get_running_loop() + + reader = streams.StreamReader(34) + assert event_loop == reader._loop + f = reader._loop.create_future() + f.set_result(3) + await f + + r = RedisPool().get_connection_async() + async with r.pubsub() as pubsub: + await pubsub.subscribe("channel:1") + future = asyncio.create_task(channel_reader(pubsub)) + + await r.publish("channel:1", "Hello") + # message: dict = await pubsub.get_message(ignore_subscribe_messages=True, timeout=5.0) # type: ignore + # raise ValueError(message) + await r.publish("channel:1", "World") + await r.publish("channel:1", STOPWORD) + + messages = await future + + assert len(messages) == 3 + + message = messages[0] + assert "data" in message.keys() + assert message["data"] == "Hello" + message = messages[1] + assert "data" in message.keys() + assert message["data"] == "World" + message = messages[2] + assert "data" in message.keys() + assert message["data"] == STOPWORD + + await r.close() From 5bf5e7462f24a8882d04cdc5d723115415d5b5ff Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 22 Apr 2024 14:40:55 +0000 Subject: [PATCH 04/34] test(redis): test key event notifications --- tests/test_redis.py | 48 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 48 insertions(+) diff --git a/tests/test_redis.py b/tests/test_redis.py index 2def280..181d325 100644 --- a/tests/test_redis.py +++ b/tests/test_redis.py @@ -15,6 +15,8 @@ STOPWORD = "STOP" def empty_redis(event_loop): r = RedisPool().get_connection() r.flushdb() + r.config_set("notify-keyspace-events", "KEA") + assert r.config_get("notify-keyspace-events")["notify-keyspace-events"] == "AKE" yield r r.flushdb() @@ -51,6 +53,15 @@ async def channel_reader(channel: redis.client.PubSub) -> List[dict]: return result +async def channel_reader_onemessage(channel: redis.client.PubSub) -> dict: + while True: + # Mypy cannot correctly detect that it is a coroutine + # But it is + message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore + if message is not None: + return message + + @pytest.mark.asyncio async def test_pubsub(empty_redis, event_loop): # Adapted from : @@ -93,3 +104,40 @@ async def test_pubsub(empty_redis, event_loop): assert message["data"] == STOPWORD await r.close() + + +@pytest.mark.asyncio +async def test_keyspace_notifications_simple(empty_redis, event_loop): + r = RedisPool().get_connection_async() + await r.set(TEST_KEY, "I am not empty") + async with r.pubsub() as pubsub: + await pubsub.subscribe("__keyspace@0__:" + TEST_KEY) + + future_message = asyncio.create_task(channel_reader_onemessage(pubsub)) + empty_redis.set(TEST_KEY, "I am set!") + message = await future_message + assert message is not None + assert message["data"] is not None + assert message == { + "channel": f"__keyspace@0__:{TEST_KEY}", + "data": "set", + "pattern": None, + "type": "message", + } + + +@pytest.mark.asyncio +async def test_keyspace_notifications(empty_redis, event_loop): + pubsub = await RedisPool().subscribe_to_keys(TEST_KEY) + async with pubsub: + future_message = asyncio.create_task(channel_reader_onemessage(pubsub)) + empty_redis.set(TEST_KEY, "I am set!") + message = await future_message + assert message is not None + assert message["data"] is not None + assert message == { + "channel": f"__keyspace@0__:{TEST_KEY}", + "data": "set", + "pattern": f"__keyspace@0__:{TEST_KEY}", + "type": "pmessage", + } From 8d099c9a225ed6281df39aad32a97827072950d9 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 22 Apr 2024 14:41:56 +0000 Subject: [PATCH 05/34] refactoring(jobs): break out a function returning all jobs --- selfprivacy_api/graphql/queries/jobs.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/selfprivacy_api/graphql/queries/jobs.py b/selfprivacy_api/graphql/queries/jobs.py index e7b99e6..337382a 100644 --- a/selfprivacy_api/graphql/queries/jobs.py +++ b/selfprivacy_api/graphql/queries/jobs.py @@ -11,13 +11,17 @@ from selfprivacy_api.graphql.common_types.jobs import ( from selfprivacy_api.jobs import Jobs +def get_all_jobs() -> typing.List[ApiJob]: + Jobs.get_jobs() + + return [job_to_api_job(job) for job in Jobs.get_jobs()] + + @strawberry.type class Job: @strawberry.field def get_jobs(self) -> typing.List[ApiJob]: - Jobs.get_jobs() - - return [job_to_api_job(job) for job in Jobs.get_jobs()] + return get_all_jobs() @strawberry.field def get_job(self, job_id: str) -> typing.Optional[ApiJob]: From b204d4a9b3cc8b6a8dbf044f3d1dc9de665349d6 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 22 Apr 2024 14:50:08 +0000 Subject: [PATCH 06/34] feature(redis): enable key space notifications by default --- selfprivacy_api/utils/redis_pool.py | 2 ++ tests/test_redis.py | 1 - 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/selfprivacy_api/utils/redis_pool.py b/selfprivacy_api/utils/redis_pool.py index ea827d1..39c536f 100644 --- a/selfprivacy_api/utils/redis_pool.py +++ b/selfprivacy_api/utils/redis_pool.py @@ -29,6 +29,8 @@ class RedisPool: url, decode_responses=True, ) + # TODO: inefficient, this is probably done each time we connect + self.get_connection().config_set("notify-keyspace-events", "KEA") @staticmethod def connection_url(dbnumber: int) -> str: diff --git a/tests/test_redis.py b/tests/test_redis.py index 181d325..70ef43a 100644 --- a/tests/test_redis.py +++ b/tests/test_redis.py @@ -15,7 +15,6 @@ STOPWORD = "STOP" def empty_redis(event_loop): r = RedisPool().get_connection() r.flushdb() - r.config_set("notify-keyspace-events", "KEA") assert r.config_get("notify-keyspace-events")["notify-keyspace-events"] == "AKE" yield r r.flushdb() From 43980f16ea43d302fd6628de1b841ac81c5523a1 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 6 May 2024 14:54:13 +0000 Subject: [PATCH 07/34] feature(jobs): job update generator --- selfprivacy_api/jobs/__init__.py | 29 ++++---- selfprivacy_api/utils/redis_model_storage.py | 12 +++- tests/test_redis.py | 76 ++++++++++++++++++++ 3 files changed, 102 insertions(+), 15 deletions(-) diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index 4649bb0..3dd48c4 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -15,6 +15,7 @@ A job is a dictionary with the following keys: - result: result of the job """ import typing +import asyncio import datetime from uuid import UUID import uuid @@ -23,6 +24,7 @@ from enum import Enum from pydantic import BaseModel from selfprivacy_api.utils.redis_pool import RedisPool +from selfprivacy_api.utils.redis_model_storage import store_model_as_hash JOB_EXPIRATION_SECONDS = 10 * 24 * 60 * 60 # ten days @@ -102,7 +104,7 @@ class Jobs: result=None, ) redis = RedisPool().get_connection() - _store_job_as_hash(redis, _redis_key_from_uuid(job.uid), job) + store_model_as_hash(redis, _redis_key_from_uuid(job.uid), job) return job @staticmethod @@ -218,7 +220,7 @@ class Jobs: redis = RedisPool().get_connection() key = _redis_key_from_uuid(job.uid) if redis.exists(key): - _store_job_as_hash(redis, key, job) + store_model_as_hash(redis, key, job) if status in (JobStatus.FINISHED, JobStatus.ERROR): redis.expire(key, JOB_EXPIRATION_SECONDS) @@ -294,17 +296,6 @@ def _progress_log_key_from_uuid(uuid_string) -> str: return PROGRESS_LOGS_PREFIX + str(uuid_string) -def _store_job_as_hash(redis, redis_key, model) -> None: - for key, value in model.dict().items(): - if isinstance(value, uuid.UUID): - value = str(value) - if isinstance(value, datetime.datetime): - value = value.isoformat() - if isinstance(value, JobStatus): - value = value.value - redis.hset(redis_key, key, str(value)) - - def _job_from_hash(redis, redis_key) -> typing.Optional[Job]: if redis.exists(redis_key): job_dict = redis.hgetall(redis_key) @@ -321,3 +312,15 @@ def _job_from_hash(redis, redis_key) -> typing.Optional[Job]: return Job(**job_dict) return None + + +async def job_notifications() -> typing.AsyncGenerator[dict, None]: + channel = await RedisPool().subscribe_to_keys("jobs:*") + while True: + try: + # we cannot timeout here because we do not know when the next message is supposed to arrive + message: dict = await channel.get_message(ignore_subscribe_messages=True, timeout=None) # type: ignore + if message is not None: + yield message + except GeneratorExit: + break diff --git a/selfprivacy_api/utils/redis_model_storage.py b/selfprivacy_api/utils/redis_model_storage.py index 06dfe8c..7d84210 100644 --- a/selfprivacy_api/utils/redis_model_storage.py +++ b/selfprivacy_api/utils/redis_model_storage.py @@ -1,15 +1,23 @@ +import uuid + from datetime import datetime from typing import Optional from enum import Enum def store_model_as_hash(redis, redis_key, model): - for key, value in model.dict().items(): + model_dict = model.dict() + for key, value in model_dict.items(): + if isinstance(value, uuid.UUID): + value = str(value) if isinstance(value, datetime): value = value.isoformat() if isinstance(value, Enum): value = value.value - redis.hset(redis_key, key, str(value)) + value = str(value) + model_dict[key] = value + + redis.hset(redis_key, mapping=model_dict) def hash_as_model(redis, redis_key: str, model_class): diff --git a/tests/test_redis.py b/tests/test_redis.py index 70ef43a..02dfb21 100644 --- a/tests/test_redis.py +++ b/tests/test_redis.py @@ -7,6 +7,8 @@ from typing import List from selfprivacy_api.utils.redis_pool import RedisPool +from selfprivacy_api.jobs import Jobs, job_notifications + TEST_KEY = "test:test" STOPWORD = "STOP" @@ -140,3 +142,77 @@ async def test_keyspace_notifications(empty_redis, event_loop): "pattern": f"__keyspace@0__:{TEST_KEY}", "type": "pmessage", } + + +@pytest.mark.asyncio +async def test_keyspace_notifications_patterns(empty_redis, event_loop): + pattern = "test*" + pubsub = await RedisPool().subscribe_to_keys(pattern) + async with pubsub: + future_message = asyncio.create_task(channel_reader_onemessage(pubsub)) + empty_redis.set(TEST_KEY, "I am set!") + message = await future_message + assert message is not None + assert message["data"] is not None + assert message == { + "channel": f"__keyspace@0__:{TEST_KEY}", + "data": "set", + "pattern": f"__keyspace@0__:{pattern}", + "type": "pmessage", + } + + +@pytest.mark.asyncio +async def test_keyspace_notifications_jobs(empty_redis, event_loop): + pattern = "jobs:*" + pubsub = await RedisPool().subscribe_to_keys(pattern) + async with pubsub: + future_message = asyncio.create_task(channel_reader_onemessage(pubsub)) + Jobs.add("testjob1", "test.test", "Testing aaaalll day") + message = await future_message + assert message is not None + assert message["data"] is not None + assert message["data"] == "hset" + + +async def reader_of_jobs() -> List[dict]: + """ + Reads 3 job updates and exits + """ + result: List[dict] = [] + async for message in job_notifications(): + result.append(message) + if len(result) >= 3: + break + return result + + +@pytest.mark.asyncio +async def test_jobs_generator(empty_redis, event_loop): + # Will read exactly 3 job messages + future_messages = asyncio.create_task(reader_of_jobs()) + await asyncio.sleep(1) + + Jobs.add("testjob1", "test.test", "Testing aaaalll day") + Jobs.add("testjob2", "test.test", "Testing aaaalll day") + Jobs.add("testjob3", "test.test", "Testing aaaalll day") + Jobs.add("testjob4", "test.test", "Testing aaaalll day") + + assert len(Jobs.get_jobs()) == 4 + r = RedisPool().get_connection() + assert len(r.keys("jobs:*")) == 4 + + messages = await future_messages + assert len(messages) == 3 + channels = [message["channel"] for message in messages] + operations = [message["data"] for message in messages] + assert set(operations) == set(["hset"]) # all of them are hsets + + # Asserting that all of jobs emitted exactly one message + jobs = Jobs.get_jobs() + names = ["testjob1", "testjob2", "testjob3"] + ids = [str(job.uid) for job in jobs if job.name in names] + for id in ids: + assert id in " ".join(channels) + # Asserting that they came in order + assert "testjob4" not in " ".join(channels) From 4b1becb4e22275ef61b9011698b4102038f270d1 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 11:29:20 +0000 Subject: [PATCH 08/34] feature(jobs): websocket connection --- selfprivacy_api/app.py | 7 ++++++- tests/test_graphql/test_websocket.py | 6 ++++++ 2 files changed, 12 insertions(+), 1 deletion(-) create mode 100644 tests/test_graphql/test_websocket.py diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index 64ca85a..2f7e2f7 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -3,6 +3,7 @@ from fastapi import FastAPI from fastapi.middleware.cors import CORSMiddleware from strawberry.fastapi import GraphQLRouter +from strawberry.subscriptions import GRAPHQL_TRANSPORT_WS_PROTOCOL, GRAPHQL_WS_PROTOCOL import uvicorn @@ -13,8 +14,12 @@ from selfprivacy_api.migrations import run_migrations app = FastAPI() -graphql_app = GraphQLRouter( +graphql_app: GraphQLRouter = GraphQLRouter( schema, + subscription_protocols=[ + GRAPHQL_TRANSPORT_WS_PROTOCOL, + GRAPHQL_WS_PROTOCOL, + ], ) app.add_middleware( diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py new file mode 100644 index 0000000..fb2ac33 --- /dev/null +++ b/tests/test_graphql/test_websocket.py @@ -0,0 +1,6 @@ + +def test_websocket_connection_bare(authorized_client): + client =authorized_client + with client.websocket_connect('/graphql', subprotocols=[ "graphql-transport-ws","graphql-ws"] ) as websocket: + assert websocket is not None + assert websocket.scope is not None From 1fadf0214bf3edabedea83e1cb74f4c3d9f29bfb Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 13:01:07 +0000 Subject: [PATCH 09/34] test(jobs): test Graphql job getting --- tests/common.py | 4 +++ tests/test_graphql/test_jobs.py | 48 +++++++++++++++++++++++++++++++++ 2 files changed, 52 insertions(+) create mode 100644 tests/test_graphql/test_jobs.py diff --git a/tests/common.py b/tests/common.py index 5f69f3f..8c81f48 100644 --- a/tests/common.py +++ b/tests/common.py @@ -69,6 +69,10 @@ def generate_backup_query(query_array): return "query TestBackup {\n backup {" + "\n".join(query_array) + "}\n}" +def generate_jobs_query(query_array): + return "query TestJobs {\n jobs {" + "\n".join(query_array) + "}\n}" + + def generate_service_query(query_array): return "query TestService {\n services {" + "\n".join(query_array) + "}\n}" diff --git a/tests/test_graphql/test_jobs.py b/tests/test_graphql/test_jobs.py new file mode 100644 index 0000000..8dfb102 --- /dev/null +++ b/tests/test_graphql/test_jobs.py @@ -0,0 +1,48 @@ +from tests.common import generate_jobs_query +from tests.test_graphql.common import ( + assert_ok, + assert_empty, + assert_errorcode, + get_data, +) + +API_JOBS_QUERY = """ +getJobs { + uid + typeId + name + description + status + statusText + progress + createdAt + updatedAt + finishedAt + error + result +} +""" + + +def graphql_send_query(client, query: str, variables: dict = {}): + return client.post("/graphql", json={"query": query, "variables": variables}) + + +def api_jobs(authorized_client): + response = graphql_send_query( + authorized_client, generate_jobs_query([API_JOBS_QUERY]) + ) + data = get_data(response) + result = data["jobs"]["getJobs"] + assert result is not None + return result + + +def test_all_jobs_unauthorized(client): + response = graphql_send_query(client, generate_jobs_query([API_JOBS_QUERY])) + assert_empty(response) + + +def test_all_jobs_when_none(authorized_client): + output = api_jobs(authorized_client) + assert output == [] From 4306c942311fb8ba1baba6197e09611298a55004 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 13:42:17 +0000 Subject: [PATCH 10/34] test(jobs) test API job format --- tests/test_graphql/test_jobs.py | 26 ++++++++++++++++++++++++++ 1 file changed, 26 insertions(+) diff --git a/tests/test_graphql/test_jobs.py b/tests/test_graphql/test_jobs.py index 8dfb102..68a6d20 100644 --- a/tests/test_graphql/test_jobs.py +++ b/tests/test_graphql/test_jobs.py @@ -1,4 +1,6 @@ from tests.common import generate_jobs_query +import tests.test_graphql.test_api_backup + from tests.test_graphql.common import ( assert_ok, assert_empty, @@ -6,6 +8,8 @@ from tests.test_graphql.common import ( get_data, ) +from selfprivacy_api.jobs import Jobs + API_JOBS_QUERY = """ getJobs { uid @@ -46,3 +50,25 @@ def test_all_jobs_unauthorized(client): def test_all_jobs_when_none(authorized_client): output = api_jobs(authorized_client) assert output == [] + + +def test_all_jobs_when_some(authorized_client): + # We cannot make new jobs via API, at least directly + job = Jobs.add("bogus", "bogus.bogus", "fungus") + output = api_jobs(authorized_client) + + len(output) == 1 + api_job = output[0] + + assert api_job["uid"] == str(job.uid) + assert api_job["typeId"] == job.type_id + assert api_job["name"] == job.name + assert api_job["description"] == job.description + assert api_job["status"] == job.status + assert api_job["statusText"] == job.status_text + assert api_job["progress"] == job.progress + assert api_job["createdAt"] == job.created_at.isoformat() + assert api_job["updatedAt"] == job.updated_at.isoformat() + assert api_job["finishedAt"] == None + assert api_job["error"] == None + assert api_job["result"] == None From 098abd51499276bcb6aa7d2ff5be00bc6bc71c53 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 18:14:14 +0000 Subject: [PATCH 11/34] test(jobs): subscription query generating function --- tests/common.py | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/tests/common.py b/tests/common.py index 8c81f48..3c05033 100644 --- a/tests/common.py +++ b/tests/common.py @@ -73,6 +73,10 @@ def generate_jobs_query(query_array): return "query TestJobs {\n jobs {" + "\n".join(query_array) + "}\n}" +def generate_jobs_subscription(query_array): + return "subscription TestSubscription {\n jobs {" + "\n".join(query_array) + "}\n}" + + def generate_service_query(query_array): return "query TestService {\n services {" + "\n".join(query_array) + "}\n}" From c19fa227c96f38b8d49ea4f671041f34e4dc2e49 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 18:15:16 +0000 Subject: [PATCH 12/34] test(websocket) test connection init --- tests/test_graphql/test_websocket.py | 48 ++++++++++++++++++++++++++-- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index fb2ac33..2431285 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -1,6 +1,50 @@ +from tests.common import generate_jobs_subscription +from selfprivacy_api.graphql.queries.jobs import Job as _Job +from selfprivacy_api.jobs import Jobs + +# JOBS_SUBSCRIPTION = """ +# jobUpdates { +# uid +# typeId +# name +# description +# status +# statusText +# progress +# createdAt +# updatedAt +# finishedAt +# error +# result +# } +# """ + def test_websocket_connection_bare(authorized_client): - client =authorized_client - with client.websocket_connect('/graphql', subprotocols=[ "graphql-transport-ws","graphql-ws"] ) as websocket: + client = authorized_client + with client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws", "graphql-ws"] + ) as websocket: assert websocket is not None assert websocket.scope is not None + + +def test_websocket_graphql_init(authorized_client): + client = authorized_client + with client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: + websocket.send_json({"type": "connection_init", "payload": {}}) + ack = websocket.receive_json() + assert ack == {"type": "connection_ack"} + + +# def test_websocket_subscription(authorized_client): +# client = authorized_client +# with client.websocket_connect( +# "/graphql", subprotocols=["graphql-transport-ws", "graphql-ws"] +# ) as websocket: +# websocket.send(generate_jobs_subscription([JOBS_SUBSCRIPTION])) +# Jobs.add("bogus","bogus.bogus", "yyyaaaaayy") +# joblist = websocket.receive_json() +# raise NotImplementedError(joblist) From 02d337c3f0a0fbe6def1ca97d2b744c64a4b1d9a Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 18:31:16 +0000 Subject: [PATCH 13/34] test(websocket): ping pong test --- tests/test_graphql/test_websocket.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 2431285..ef71312 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -29,7 +29,7 @@ def test_websocket_connection_bare(authorized_client): assert websocket.scope is not None -def test_websocket_graphql_init(authorized_client): +def test_websocket_graphql_ping(authorized_client): client = authorized_client with client.websocket_connect( "/graphql", subprotocols=["graphql-transport-ws"] @@ -38,6 +38,11 @@ def test_websocket_graphql_init(authorized_client): ack = websocket.receive_json() assert ack == {"type": "connection_ack"} + # https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#ping + websocket.send_json({"type": "ping", "payload": {}}) + pong = websocket.receive_json() + assert pong == {"type": "pong"} + # def test_websocket_subscription(authorized_client): # client = authorized_client From 8348f11fafcc86280737666197acd1e26a9915dc Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 18:36:17 +0000 Subject: [PATCH 14/34] test(websocket): separate ping and init --- tests/test_graphql/test_websocket.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index ef71312..d534269 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -29,7 +29,7 @@ def test_websocket_connection_bare(authorized_client): assert websocket.scope is not None -def test_websocket_graphql_ping(authorized_client): +def test_websocket_graphql_init(authorized_client): client = authorized_client with client.websocket_connect( "/graphql", subprotocols=["graphql-transport-ws"] @@ -38,6 +38,12 @@ def test_websocket_graphql_ping(authorized_client): ack = websocket.receive_json() assert ack == {"type": "connection_ack"} + +def test_websocket_graphql_ping(authorized_client): + client = authorized_client + with client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: # https://github.com/enisdenjo/graphql-ws/blob/master/PROTOCOL.md#ping websocket.send_json({"type": "ping", "payload": {}}) pong = websocket.receive_json() From 3b0600efb61bb0fcffce7d5059ef029a9a3926af Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 20:41:36 +0000 Subject: [PATCH 15/34] feature(jobs): add subscription endpoint --- .../graphql/subscriptions/__init__.py | 0 selfprivacy_api/graphql/subscriptions/jobs.py | 20 +++++++++++++++++++ 2 files changed, 20 insertions(+) create mode 100644 selfprivacy_api/graphql/subscriptions/__init__.py create mode 100644 selfprivacy_api/graphql/subscriptions/jobs.py diff --git a/selfprivacy_api/graphql/subscriptions/__init__.py b/selfprivacy_api/graphql/subscriptions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/graphql/subscriptions/jobs.py b/selfprivacy_api/graphql/subscriptions/jobs.py new file mode 100644 index 0000000..380badb --- /dev/null +++ b/selfprivacy_api/graphql/subscriptions/jobs.py @@ -0,0 +1,20 @@ +# pylint: disable=too-few-public-methods +import strawberry + +from typing import AsyncGenerator, List + +from selfprivacy_api.jobs import job_notifications + +from selfprivacy_api.graphql.common_types.jobs import ApiJob +from selfprivacy_api.graphql.queries.jobs import get_all_jobs + + +@strawberry.type +class JobSubscriptions: + """Subscriptions related to jobs""" + + @strawberry.subscription + async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: + # Send the complete list of jobs every time anything gets updated + async for notification in job_notifications(): + yield get_all_jobs() From 967e59271ff1f6242198e6e2add9468882164893 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 20:41:48 +0000 Subject: [PATCH 16/34] chore(jobs): shorter typehints and import sorting --- selfprivacy_api/graphql/queries/jobs.py | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/selfprivacy_api/graphql/queries/jobs.py b/selfprivacy_api/graphql/queries/jobs.py index 337382a..3cc3bf7 100644 --- a/selfprivacy_api/graphql/queries/jobs.py +++ b/selfprivacy_api/graphql/queries/jobs.py @@ -1,17 +1,17 @@ """Jobs status""" # pylint: disable=too-few-public-methods -import typing import strawberry +from typing import List, Optional + +from selfprivacy_api.jobs import Jobs from selfprivacy_api.graphql.common_types.jobs import ( ApiJob, get_api_job_by_id, job_to_api_job, ) -from selfprivacy_api.jobs import Jobs - -def get_all_jobs() -> typing.List[ApiJob]: +def get_all_jobs() -> List[ApiJob]: Jobs.get_jobs() return [job_to_api_job(job) for job in Jobs.get_jobs()] @@ -20,9 +20,9 @@ def get_all_jobs() -> typing.List[ApiJob]: @strawberry.type class Job: @strawberry.field - def get_jobs(self) -> typing.List[ApiJob]: + def get_jobs(self) -> List[ApiJob]: return get_all_jobs() @strawberry.field - def get_job(self, job_id: str) -> typing.Optional[ApiJob]: + def get_job(self, job_id: str) -> Optional[ApiJob]: return get_api_job_by_id(job_id) From 3910e416db564c248276527bd200cbb5f476cd79 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 15 May 2024 20:43:17 +0000 Subject: [PATCH 17/34] test(jobs): test simple counting --- selfprivacy_api/graphql/schema.py | 15 +++-- tests/test_graphql/test_websocket.py | 92 +++++++++++++++++++++------- 2 files changed, 81 insertions(+), 26 deletions(-) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index e4e7264..078ee3d 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -28,6 +28,8 @@ from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System +from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions + from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.queries.users import Users from selfprivacy_api.jobs.test import test_job @@ -129,16 +131,19 @@ class Mutation( code=200, ) - pass - @strawberry.type class Subscription: """Root schema for subscriptions""" - @strawberry.subscription(permission_classes=[IsAuthenticated]) - async def count(self, target: int = 100) -> AsyncGenerator[int, None]: - for i in range(target): + @strawberry.field(permission_classes=[IsAuthenticated]) + def jobs(self) -> JobSubscriptions: + """Jobs subscriptions""" + return JobSubscriptions() + + @strawberry.subscription + async def count(self) -> AsyncGenerator[int, None]: + for i in range(10): yield i await asyncio.sleep(0.5) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index d534269..58681e0 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -2,22 +2,22 @@ from tests.common import generate_jobs_subscription from selfprivacy_api.graphql.queries.jobs import Job as _Job from selfprivacy_api.jobs import Jobs -# JOBS_SUBSCRIPTION = """ -# jobUpdates { -# uid -# typeId -# name -# description -# status -# statusText -# progress -# createdAt -# updatedAt -# finishedAt -# error -# result -# } -# """ +JOBS_SUBSCRIPTION = """ +jobUpdates { + uid + typeId + name + description + status + statusText + progress + createdAt + updatedAt + finishedAt + error + result +} +""" def test_websocket_connection_bare(authorized_client): @@ -50,12 +50,62 @@ def test_websocket_graphql_ping(authorized_client): assert pong == {"type": "pong"} +def init_graphql(websocket): + websocket.send_json({"type": "connection_init", "payload": {}}) + ack = websocket.receive_json() + assert ack == {"type": "connection_ack"} + + +def test_websocket_subscription_minimal(authorized_client): + client = authorized_client + with client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {count}", + }, + } + ) + response = websocket.receive_json() + assert response == { + "id": "3aaa2445", + "payload": {"data": {"count": 0}}, + "type": "next", + } + response = websocket.receive_json() + assert response == { + "id": "3aaa2445", + "payload": {"data": {"count": 1}}, + "type": "next", + } + response = websocket.receive_json() + assert response == { + "id": "3aaa2445", + "payload": {"data": {"count": 2}}, + "type": "next", + } + + # def test_websocket_subscription(authorized_client): # client = authorized_client # with client.websocket_connect( -# "/graphql", subprotocols=["graphql-transport-ws", "graphql-ws"] +# "/graphql", subprotocols=["graphql-transport-ws"] # ) as websocket: -# websocket.send(generate_jobs_subscription([JOBS_SUBSCRIPTION])) -# Jobs.add("bogus","bogus.bogus", "yyyaaaaayy") -# joblist = websocket.receive_json() -# raise NotImplementedError(joblist) +# init_graphql(websocket) +# websocket.send_json( +# { +# "id": "3aaa2445", +# "type": "subscribe", +# "payload": { +# "query": generate_jobs_subscription([JOBS_SUBSCRIPTION]), +# }, +# } +# ) +# Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy") +# response = websocket.receive_json() +# raise NotImplementedError(response) From 6d2fdab07180f09b5e43ef04640a415b3c8e17a0 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 22 May 2024 11:04:37 +0000 Subject: [PATCH 18/34] feature(jobs): UNSAFE endpoint to get job updates --- selfprivacy_api/graphql/queries/jobs.py | 7 +- selfprivacy_api/graphql/schema.py | 19 +++-- selfprivacy_api/graphql/subscriptions/jobs.py | 14 +--- tests/test_graphql/test_websocket.py | 81 ++++++++++++++----- 4 files changed, 83 insertions(+), 38 deletions(-) diff --git a/selfprivacy_api/graphql/queries/jobs.py b/selfprivacy_api/graphql/queries/jobs.py index 3cc3bf7..6a12838 100644 --- a/selfprivacy_api/graphql/queries/jobs.py +++ b/selfprivacy_api/graphql/queries/jobs.py @@ -12,9 +12,10 @@ from selfprivacy_api.graphql.common_types.jobs import ( def get_all_jobs() -> List[ApiJob]: - Jobs.get_jobs() - - return [job_to_api_job(job) for job in Jobs.get_jobs()] + jobs = Jobs.get_jobs() + api_jobs = [job_to_api_job(job) for job in jobs] + assert api_jobs is not None + return api_jobs @strawberry.type diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 078ee3d..b8ed4e2 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -2,7 +2,7 @@ # pylint: disable=too-few-public-methods import asyncio -from typing import AsyncGenerator +from typing import AsyncGenerator, List import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.mutations.deprecated_mutations import ( @@ -28,7 +28,9 @@ from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System -from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions +from selfprivacy_api.graphql.subscriptions.jobs import ApiJob +from selfprivacy_api.jobs import job_notifications +from selfprivacy_api.graphql.queries.jobs import get_all_jobs from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.queries.users import Users @@ -136,10 +138,15 @@ class Mutation( class Subscription: """Root schema for subscriptions""" - @strawberry.field(permission_classes=[IsAuthenticated]) - def jobs(self) -> JobSubscriptions: - """Jobs subscriptions""" - return JobSubscriptions() + @strawberry.subscription + async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: + # Send the complete list of jobs every time anything gets updated + async for notification in job_notifications(): + yield get_all_jobs() + + # @strawberry.subscription + # async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: + # return job_updates() @strawberry.subscription async def count(self) -> AsyncGenerator[int, None]: diff --git a/selfprivacy_api/graphql/subscriptions/jobs.py b/selfprivacy_api/graphql/subscriptions/jobs.py index 380badb..11d6263 100644 --- a/selfprivacy_api/graphql/subscriptions/jobs.py +++ b/selfprivacy_api/graphql/subscriptions/jobs.py @@ -1,5 +1,4 @@ # pylint: disable=too-few-public-methods -import strawberry from typing import AsyncGenerator, List @@ -9,12 +8,7 @@ from selfprivacy_api.graphql.common_types.jobs import ApiJob from selfprivacy_api.graphql.queries.jobs import get_all_jobs -@strawberry.type -class JobSubscriptions: - """Subscriptions related to jobs""" - - @strawberry.subscription - async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: - # Send the complete list of jobs every time anything gets updated - async for notification in job_notifications(): - yield get_all_jobs() +async def job_updates() -> AsyncGenerator[List[ApiJob], None]: + # Send the complete list of jobs every time anything gets updated + async for notification in job_notifications(): + yield get_all_jobs() diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 58681e0..ee33262 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -1,6 +1,13 @@ from tests.common import generate_jobs_subscription -from selfprivacy_api.graphql.queries.jobs import Job as _Job + +# from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions +import pytest +import asyncio + from selfprivacy_api.jobs import Jobs +from time import sleep + +from tests.test_redis import empty_redis JOBS_SUBSCRIPTION = """ jobUpdates { @@ -91,21 +98,57 @@ def test_websocket_subscription_minimal(authorized_client): } -# def test_websocket_subscription(authorized_client): -# client = authorized_client -# with client.websocket_connect( -# "/graphql", subprotocols=["graphql-transport-ws"] -# ) as websocket: -# init_graphql(websocket) -# websocket.send_json( -# { -# "id": "3aaa2445", -# "type": "subscribe", -# "payload": { -# "query": generate_jobs_subscription([JOBS_SUBSCRIPTION]), -# }, -# } -# ) -# Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy") -# response = websocket.receive_json() -# raise NotImplementedError(response) +async def read_one_job(websocket): + # bug? We only get them starting from the second job update + # that's why we receive two jobs in the list them + # the first update gets lost somewhere + response = websocket.receive_json() + return response + + +@pytest.mark.asyncio +async def test_websocket_subscription(authorized_client, empty_redis, event_loop): + client = authorized_client + with client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {" + + JOBS_SUBSCRIPTION + + "}", + }, + } + ) + future = asyncio.create_task(read_one_job(websocket)) + jobs = [] + jobs.append(Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy it works")) + sleep(0.5) + jobs.append(Jobs.add("bogus2", "bogus.bogus", "yyyaaaaayy it works")) + + response = await future + data = response["payload"]["data"] + jobs_received = data["jobUpdates"] + received_names = [job["name"] for job in jobs_received] + for job in jobs: + assert job.name in received_names + + for job in jobs: + for api_job in jobs_received: + if (job.name) == api_job["name"]: + assert api_job["uid"] == str(job.uid) + assert api_job["typeId"] == job.type_id + assert api_job["name"] == job.name + assert api_job["description"] == job.description + assert api_job["status"] == job.status + assert api_job["statusText"] == job.status_text + assert api_job["progress"] == job.progress + assert api_job["createdAt"] == job.created_at.isoformat() + assert api_job["updatedAt"] == job.updated_at.isoformat() + assert api_job["finishedAt"] == None + assert api_job["error"] == None + assert api_job["result"] == None From 39f584ad5c64f0423bef9a6fe1a0d6928dd34547 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 27 May 2024 18:22:20 +0000 Subject: [PATCH 19/34] test(devices): provide devices for a service test to fix conditional test fail. --- tests/test_graphql/test_services.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_graphql/test_services.py b/tests/test_graphql/test_services.py index 6e8dcf6..b7faf3d 100644 --- a/tests/test_graphql/test_services.py +++ b/tests/test_graphql/test_services.py @@ -543,8 +543,8 @@ def test_disable_enable(authorized_client, only_dummy_service): assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value -def test_move_immovable(authorized_client, only_dummy_service): - dummy_service = only_dummy_service +def test_move_immovable(authorized_client, dummy_service_with_binds): + dummy_service = dummy_service_with_binds dummy_service.set_movable(False) root = BlockDevices().get_root_block_device() mutation_response = api_move(authorized_client, dummy_service, root.name) From 8fd12a1775a969be56bd0f3c310c90e0df57e09b Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 27 May 2024 20:21:11 +0000 Subject: [PATCH 20/34] feature(websocket): add auth --- selfprivacy_api/graphql/schema.py | 18 ++- tests/test_graphql/test_websocket.py | 169 ++++++++++++++++++--------- 2 files changed, 131 insertions(+), 56 deletions(-) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index b8ed4e2..c6cf46b 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -4,6 +4,7 @@ import asyncio from typing import AsyncGenerator, List import strawberry + from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.mutations.deprecated_mutations import ( DeprecatedApiMutations, @@ -134,12 +135,25 @@ class Mutation( ) +# A cruft for Websockets +def authenticated(info) -> bool: + return IsAuthenticated().has_permission(source=None, info=info) + + @strawberry.type class Subscription: - """Root schema for subscriptions""" + """Root schema for subscriptions. + Every field here should be an AsyncIterator or AsyncGenerator + It is not a part of the spec but graphql-core (dep of strawberryql) + demands it while the spec is vague in this area.""" @strawberry.subscription - async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: + async def job_updates( + self, info: strawberry.types.Info + ) -> AsyncGenerator[List[ApiJob], None]: + if not authenticated(info): + raise Exception(IsAuthenticated().message) + # Send the complete list of jobs every time anything gets updated async for notification in job_notifications(): yield get_all_jobs() diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index ee33262..5a92416 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -1,13 +1,20 @@ -from tests.common import generate_jobs_subscription - # from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions import pytest import asyncio - -from selfprivacy_api.jobs import Jobs +from typing import Generator from time import sleep -from tests.test_redis import empty_redis +from starlette.testclient import WebSocketTestSession + +from selfprivacy_api.jobs import Jobs +from selfprivacy_api.actions.api_tokens import TOKEN_REPO +from selfprivacy_api.graphql import IsAuthenticated + +from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH +from tests.test_jobs import jobs as empty_jobs + +# We do not iterate through them yet +TESTED_SUBPROTOCOLS = ["graphql-transport-ws"] JOBS_SUBSCRIPTION = """ jobUpdates { @@ -27,6 +34,48 @@ jobUpdates { """ +def connect_ws_authenticated(authorized_client) -> WebSocketTestSession: + token = "Bearer " + str(DEVICE_WE_AUTH_TESTS_WITH["token"]) + return authorized_client.websocket_connect( + "/graphql", + subprotocols=TESTED_SUBPROTOCOLS, + params={"token": token}, + ) + + +def connect_ws_not_authenticated(client) -> WebSocketTestSession: + return client.websocket_connect( + "/graphql", + subprotocols=TESTED_SUBPROTOCOLS, + params={"token": "I like vegan icecream but it is not a valid token"}, + ) + + +def init_graphql(websocket): + websocket.send_json({"type": "connection_init", "payload": {}}) + ack = websocket.receive_json() + assert ack == {"type": "connection_ack"} + + +@pytest.fixture +def authenticated_websocket( + authorized_client, +) -> Generator[WebSocketTestSession, None, None]: + # We use authorized_client only tohave token in the repo, this client by itself is not enough to authorize websocket + + ValueError(TOKEN_REPO.get_tokens()) + with connect_ws_authenticated(authorized_client) as websocket: + yield websocket + sleep(1) + + +@pytest.fixture +def unauthenticated_websocket(client) -> Generator[WebSocketTestSession, None, None]: + with connect_ws_not_authenticated(client) as websocket: + yield websocket + sleep(1) + + def test_websocket_connection_bare(authorized_client): client = authorized_client with client.websocket_connect( @@ -57,12 +106,6 @@ def test_websocket_graphql_ping(authorized_client): assert pong == {"type": "pong"} -def init_graphql(websocket): - websocket.send_json({"type": "connection_init", "payload": {}}) - ack = websocket.receive_json() - assert ack == {"type": "connection_ack"} - - def test_websocket_subscription_minimal(authorized_client): client = authorized_client with client.websocket_connect( @@ -107,48 +150,66 @@ async def read_one_job(websocket): @pytest.mark.asyncio -async def test_websocket_subscription(authorized_client, empty_redis, event_loop): - client = authorized_client - with client.websocket_connect( - "/graphql", subprotocols=["graphql-transport-ws"] - ) as websocket: - init_graphql(websocket) - websocket.send_json( - { - "id": "3aaa2445", - "type": "subscribe", - "payload": { - "query": "subscription TestSubscription {" - + JOBS_SUBSCRIPTION - + "}", - }, - } - ) - future = asyncio.create_task(read_one_job(websocket)) - jobs = [] - jobs.append(Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy it works")) - sleep(0.5) - jobs.append(Jobs.add("bogus2", "bogus.bogus", "yyyaaaaayy it works")) +async def test_websocket_subscription(authenticated_websocket, event_loop, empty_jobs): + websocket = authenticated_websocket + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {" + JOBS_SUBSCRIPTION + "}", + }, + } + ) + future = asyncio.create_task(read_one_job(websocket)) + jobs = [] + jobs.append(Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy it works")) + sleep(0.5) + jobs.append(Jobs.add("bogus2", "bogus.bogus", "yyyaaaaayy it works")) - response = await future - data = response["payload"]["data"] - jobs_received = data["jobUpdates"] - received_names = [job["name"] for job in jobs_received] - for job in jobs: - assert job.name in received_names + response = await future + data = response["payload"]["data"] + jobs_received = data["jobUpdates"] + received_names = [job["name"] for job in jobs_received] + for job in jobs: + assert job.name in received_names - for job in jobs: - for api_job in jobs_received: - if (job.name) == api_job["name"]: - assert api_job["uid"] == str(job.uid) - assert api_job["typeId"] == job.type_id - assert api_job["name"] == job.name - assert api_job["description"] == job.description - assert api_job["status"] == job.status - assert api_job["statusText"] == job.status_text - assert api_job["progress"] == job.progress - assert api_job["createdAt"] == job.created_at.isoformat() - assert api_job["updatedAt"] == job.updated_at.isoformat() - assert api_job["finishedAt"] == None - assert api_job["error"] == None - assert api_job["result"] == None + assert len(jobs_received) == 2 + + for job in jobs: + for api_job in jobs_received: + if (job.name) == api_job["name"]: + assert api_job["uid"] == str(job.uid) + assert api_job["typeId"] == job.type_id + assert api_job["name"] == job.name + assert api_job["description"] == job.description + assert api_job["status"] == job.status + assert api_job["statusText"] == job.status_text + assert api_job["progress"] == job.progress + assert api_job["createdAt"] == job.created_at.isoformat() + assert api_job["updatedAt"] == job.updated_at.isoformat() + assert api_job["finishedAt"] == None + assert api_job["error"] == None + assert api_job["result"] == None + + +def test_websocket_subscription_unauthorized(unauthenticated_websocket): + websocket = unauthenticated_websocket + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {" + JOBS_SUBSCRIPTION + "}", + }, + } + ) + + response = websocket.receive_json() + assert response == { + "id": "3aaa2445", + "payload": [{"message": IsAuthenticated.message}], + "type": "error", + } From 950093a3b1bc46eddcc768031c51d052e1f8e3a0 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 27 May 2024 20:38:51 +0000 Subject: [PATCH 21/34] feature(websocket): add auth to counter too --- selfprivacy_api/graphql/schema.py | 17 +++---- tests/test_graphql/test_websocket.py | 69 +++++++++++++++------------- 2 files changed, 47 insertions(+), 39 deletions(-) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index c6cf46b..3280396 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -136,10 +136,15 @@ class Mutation( # A cruft for Websockets -def authenticated(info) -> bool: +def authenticated(info: strawberry.types.Info) -> bool: return IsAuthenticated().has_permission(source=None, info=info) +def reject_if_unauthenticated(info: strawberry.types.Info): + if not authenticated(info): + raise Exception(IsAuthenticated().message) + + @strawberry.type class Subscription: """Root schema for subscriptions. @@ -151,19 +156,15 @@ class Subscription: async def job_updates( self, info: strawberry.types.Info ) -> AsyncGenerator[List[ApiJob], None]: - if not authenticated(info): - raise Exception(IsAuthenticated().message) + reject_if_unauthenticated(info) # Send the complete list of jobs every time anything gets updated async for notification in job_notifications(): yield get_all_jobs() - # @strawberry.subscription - # async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: - # return job_updates() - @strawberry.subscription - async def count(self) -> AsyncGenerator[int, None]: + async def count(self, info: strawberry.types.Info) -> AsyncGenerator[int, None]: + reject_if_unauthenticated(info) for i in range(10): yield i await asyncio.sleep(0.5) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 5a92416..49cc944 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -106,41 +106,61 @@ def test_websocket_graphql_ping(authorized_client): assert pong == {"type": "pong"} +def api_subscribe(websocket, id, subscription): + websocket.send_json( + { + "id": id, + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {" + subscription + "}", + }, + } + ) + + def test_websocket_subscription_minimal(authorized_client): + # Test a small endpoint that exists specifically for tests client = authorized_client with client.websocket_connect( "/graphql", subprotocols=["graphql-transport-ws"] ) as websocket: init_graphql(websocket) - websocket.send_json( - { - "id": "3aaa2445", - "type": "subscribe", - "payload": { - "query": "subscription TestSubscription {count}", - }, - } - ) + arbitrary_id = "3aaa2445" + api_subscribe(websocket, arbitrary_id, "count") response = websocket.receive_json() assert response == { - "id": "3aaa2445", + "id": arbitrary_id, "payload": {"data": {"count": 0}}, "type": "next", } response = websocket.receive_json() assert response == { - "id": "3aaa2445", + "id": arbitrary_id, "payload": {"data": {"count": 1}}, "type": "next", } response = websocket.receive_json() assert response == { - "id": "3aaa2445", + "id": arbitrary_id, "payload": {"data": {"count": 2}}, "type": "next", } +def test_websocket_subscription_minimal_unauthorized(unauthenticated_websocket): + websocket = unauthenticated_websocket + init_graphql(websocket) + arbitrary_id = "3aaa2445" + api_subscribe(websocket, arbitrary_id, "count") + + response = websocket.receive_json() + assert response == { + "id": arbitrary_id, + "payload": [{"message": IsAuthenticated.message}], + "type": "error", + } + + async def read_one_job(websocket): # bug? We only get them starting from the second job update # that's why we receive two jobs in the list them @@ -153,15 +173,9 @@ async def read_one_job(websocket): async def test_websocket_subscription(authenticated_websocket, event_loop, empty_jobs): websocket = authenticated_websocket init_graphql(websocket) - websocket.send_json( - { - "id": "3aaa2445", - "type": "subscribe", - "payload": { - "query": "subscription TestSubscription {" + JOBS_SUBSCRIPTION + "}", - }, - } - ) + arbitrary_id = "3aaa2445" + api_subscribe(websocket, arbitrary_id, JOBS_SUBSCRIPTION) + future = asyncio.create_task(read_one_job(websocket)) jobs = [] jobs.append(Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy it works")) @@ -197,19 +211,12 @@ async def test_websocket_subscription(authenticated_websocket, event_loop, empty def test_websocket_subscription_unauthorized(unauthenticated_websocket): websocket = unauthenticated_websocket init_graphql(websocket) - websocket.send_json( - { - "id": "3aaa2445", - "type": "subscribe", - "payload": { - "query": "subscription TestSubscription {" + JOBS_SUBSCRIPTION + "}", - }, - } - ) + id = "3aaa2445" + api_subscribe(websocket, id, JOBS_SUBSCRIPTION) response = websocket.receive_json() assert response == { - "id": "3aaa2445", + "id": id, "payload": [{"message": IsAuthenticated.message}], "type": "error", } From f772005b1703c47c90f3b20dbdaecb541dc7cc07 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 27 May 2024 21:13:57 +0000 Subject: [PATCH 22/34] refactor(jobs): offload job subscription logic to a separate file --- selfprivacy_api/graphql/schema.py | 11 +++++------ tests/test_graphql/test_websocket.py | 16 ++++++++++++---- 2 files changed, 17 insertions(+), 10 deletions(-) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 3280396..05e6bf9 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -30,8 +30,9 @@ from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System from selfprivacy_api.graphql.subscriptions.jobs import ApiJob -from selfprivacy_api.jobs import job_notifications -from selfprivacy_api.graphql.queries.jobs import get_all_jobs +from selfprivacy_api.graphql.subscriptions.jobs import ( + job_updates as job_update_generator, +) from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.queries.users import Users @@ -157,12 +158,10 @@ class Subscription: self, info: strawberry.types.Info ) -> AsyncGenerator[List[ApiJob], None]: reject_if_unauthenticated(info) - - # Send the complete list of jobs every time anything gets updated - async for notification in job_notifications(): - yield get_all_jobs() + return job_update_generator() @strawberry.subscription + # Used for testing, consider deletion to shrink attack surface async def count(self, info: strawberry.types.Info) -> AsyncGenerator[int, None]: reject_if_unauthenticated(info) for i in range(10): diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 49cc944..d538ca1 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -162,9 +162,9 @@ def test_websocket_subscription_minimal_unauthorized(unauthenticated_websocket): async def read_one_job(websocket): - # bug? We only get them starting from the second job update - # that's why we receive two jobs in the list them - # the first update gets lost somewhere + # Bug? We only get them starting from the second job update + # That's why we receive two jobs in the list them + # The first update gets lost somewhere response = websocket.receive_json() return response @@ -215,8 +215,16 @@ def test_websocket_subscription_unauthorized(unauthenticated_websocket): api_subscribe(websocket, id, JOBS_SUBSCRIPTION) response = websocket.receive_json() + # I do not really know why strawberry gives more info on this + # One versus the counter + payload = response["payload"][0] + assert isinstance(payload, dict) + assert "locations" in payload.keys() + # It looks like this 'locations': [{'column': 32, 'line': 1}] + # We cannot test locations feasibly + del payload["locations"] assert response == { "id": id, - "payload": [{"message": IsAuthenticated.message}], + "payload": [{"message": IsAuthenticated.message, "path": ["jobUpdates"]}], "type": "error", } From 17ae1621562f59733960b4a2a0c20cd1f82af47d Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 27 May 2024 21:15:47 +0000 Subject: [PATCH 23/34] test(websocket): remove excessive sleeping --- tests/test_graphql/test_websocket.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index d538ca1..27cfd55 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -66,14 +66,12 @@ def authenticated_websocket( ValueError(TOKEN_REPO.get_tokens()) with connect_ws_authenticated(authorized_client) as websocket: yield websocket - sleep(1) @pytest.fixture def unauthenticated_websocket(client) -> Generator[WebSocketTestSession, None, None]: with connect_ws_not_authenticated(client) as websocket: yield websocket - sleep(1) def test_websocket_connection_bare(authorized_client): From cb2a1421bf25f759e9fcea596efb2125a699d30e Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Mon, 27 May 2024 21:28:29 +0000 Subject: [PATCH 24/34] test(websocket): remove some duplication --- tests/test_graphql/test_websocket.py | 75 +++++++++++++--------------- 1 file changed, 36 insertions(+), 39 deletions(-) diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 27cfd55..754fbbf 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -34,6 +34,18 @@ jobUpdates { """ +def api_subscribe(websocket, id, subscription): + websocket.send_json( + { + "id": id, + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {" + subscription + "}", + }, + } + ) + + def connect_ws_authenticated(authorized_client) -> WebSocketTestSession: token = "Bearer " + str(DEVICE_WE_AUTH_TESTS_WITH["token"]) return authorized_client.websocket_connect( @@ -61,7 +73,7 @@ def init_graphql(websocket): def authenticated_websocket( authorized_client, ) -> Generator[WebSocketTestSession, None, None]: - # We use authorized_client only tohave token in the repo, this client by itself is not enough to authorize websocket + # We use authorized_client only to have token in the repo, this client by itself is not enough to authorize websocket ValueError(TOKEN_REPO.get_tokens()) with connect_ws_authenticated(authorized_client) as websocket: @@ -104,45 +116,30 @@ def test_websocket_graphql_ping(authorized_client): assert pong == {"type": "pong"} -def api_subscribe(websocket, id, subscription): - websocket.send_json( - { - "id": id, - "type": "subscribe", - "payload": { - "query": "subscription TestSubscription {" + subscription + "}", - }, - } - ) - - -def test_websocket_subscription_minimal(authorized_client): +def test_websocket_subscription_minimal(authorized_client, authenticated_websocket): # Test a small endpoint that exists specifically for tests - client = authorized_client - with client.websocket_connect( - "/graphql", subprotocols=["graphql-transport-ws"] - ) as websocket: - init_graphql(websocket) - arbitrary_id = "3aaa2445" - api_subscribe(websocket, arbitrary_id, "count") - response = websocket.receive_json() - assert response == { - "id": arbitrary_id, - "payload": {"data": {"count": 0}}, - "type": "next", - } - response = websocket.receive_json() - assert response == { - "id": arbitrary_id, - "payload": {"data": {"count": 1}}, - "type": "next", - } - response = websocket.receive_json() - assert response == { - "id": arbitrary_id, - "payload": {"data": {"count": 2}}, - "type": "next", - } + websocket = authenticated_websocket + init_graphql(websocket) + arbitrary_id = "3aaa2445" + api_subscribe(websocket, arbitrary_id, "count") + response = websocket.receive_json() + assert response == { + "id": arbitrary_id, + "payload": {"data": {"count": 0}}, + "type": "next", + } + response = websocket.receive_json() + assert response == { + "id": arbitrary_id, + "payload": {"data": {"count": 1}}, + "type": "next", + } + response = websocket.receive_json() + assert response == { + "id": arbitrary_id, + "payload": {"data": {"count": 2}}, + "type": "next", + } def test_websocket_subscription_minimal_unauthorized(unauthenticated_websocket): From fc2ac0fe6d0e4a834922bdf818328b9c46882302 Mon Sep 17 00:00:00 2001 From: nhnn Date: Mon, 27 May 2024 12:59:43 +0300 Subject: [PATCH 25/34] feat: graphql endpoint to fetch system logs from journald --- default.nix | 1 + selfprivacy_api/graphql/queries/logs.py | 123 ++++++++++++++++++++++ selfprivacy_api/graphql/schema.py | 6 ++ tests/common.py | 4 + tests/test_graphql/test_api_logs.py | 133 ++++++++++++++++++++++++ 5 files changed, 267 insertions(+) create mode 100644 selfprivacy_api/graphql/queries/logs.py create mode 100644 tests/test_graphql/test_api_logs.py diff --git a/default.nix b/default.nix index e7e6fcf..8d47c4d 100644 --- a/default.nix +++ b/default.nix @@ -14,6 +14,7 @@ pythonPackages.buildPythonPackage rec { pydantic pytz redis + systemd setuptools strawberry-graphql typing-extensions diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py new file mode 100644 index 0000000..c16c950 --- /dev/null +++ b/selfprivacy_api/graphql/queries/logs.py @@ -0,0 +1,123 @@ +"""System logs""" +from datetime import datetime +import os +import typing +import strawberry +from systemd import journal + + +def get_events_from_journal( + j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict] +): + events = [] + i = 0 + while i < limit: + entry = next(j) + if entry == None or entry == dict(): + break + if entry["MESSAGE"] != "": + events.append(LogEntry(entry)) + i += 1 + + return events + + +@strawberry.type +class LogEntry: + message: str = strawberry.field() + timestamp: datetime = strawberry.field() + priority: int = strawberry.field() + systemd_unit: typing.Optional[str] = strawberry.field() + systemd_slice: typing.Optional[str] = strawberry.field() + + def __init__(self, journal_entry: typing.Dict): + self.entry = journal_entry + self.message = journal_entry["MESSAGE"] + self.timestamp = journal_entry["__REALTIME_TIMESTAMP"] + self.priority = journal_entry["PRIORITY"] + self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT") + self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE") + + @strawberry.field() + def cursor(self) -> str: + return self.entry["__CURSOR"] + + +@strawberry.type +class PageMeta: + up_cursor: typing.Optional[str] = strawberry.field() + down_cursor: typing.Optional[str] = strawberry.field() + + def __init__( + self, up_cursor: typing.Optional[str], down_cursor: typing.Optional[str] + ): + self.up_cursor = up_cursor + self.down_cursor = down_cursor + + +@strawberry.type +class PaginatedEntries: + page_meta: PageMeta = strawberry.field(description="Metadata to aid in pagination.") + entries: typing.List[LogEntry] = strawberry.field( + description="The list of log entries." + ) + + def __init__(self, meta: PageMeta, entries: typing.List[LogEntry]): + self.page_meta = meta + self.entries = entries + + @staticmethod + def from_entries(entries: typing.List[LogEntry]): + if entries == []: + return PaginatedEntries(PageMeta(None, None), []) + + return PaginatedEntries( + PageMeta( + entries[0].cursor(), + entries[-1].cursor(), + ), + entries, + ) + + +@strawberry.type +class Logs: + @strawberry.field() + def paginated( + self, + limit: int = 20, + up_cursor: str + | None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results. + down_cursor: str + | None = None, # All entries returned will be greater than this cursor. Sets lower bound on results. + ) -> PaginatedEntries: + if limit > 50: + raise Exception("You can't fetch more than 50 entries via single request.") + j = journal.Reader() + + if up_cursor == None and down_cursor == None: + j.seek_tail() + + events = get_events_from_journal(j, limit, lambda j: j.get_previous()) + events.reverse() + + return PaginatedEntries.from_entries(events) + elif up_cursor == None and down_cursor != None: + j.seek_cursor(down_cursor) + j.get_previous() # pagination is exclusive + + events = get_events_from_journal(j, limit, lambda j: j.get_previous()) + events.reverse() + + return PaginatedEntries.from_entries(events) + elif up_cursor != None and down_cursor == None: + j.seek_cursor(up_cursor) + j.get_next() # pagination is exclusive + + events = get_events_from_journal(j, limit, lambda j: j.get_next()) + + return PaginatedEntries.from_entries(events) + else: + raise NotImplemented( + "Pagination by both up_cursor and down_cursor is not implemented" + ) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 05e6bf9..c65e233 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -25,6 +25,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations from selfprivacy_api.graphql.queries.api_queries import Api from selfprivacy_api.graphql.queries.backup import Backup from selfprivacy_api.graphql.queries.jobs import Job +from selfprivacy_api.graphql.queries.logs import Logs from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System @@ -53,6 +54,11 @@ class Query: """System queries""" return System() + @strawberry.field(permission_classes=[IsAuthenticated]) + def logs(self) -> Logs: + """Log queries""" + return Logs() + @strawberry.field(permission_classes=[IsAuthenticated]) def users(self) -> Users: """Users queries""" diff --git a/tests/common.py b/tests/common.py index 3c05033..1de3893 100644 --- a/tests/common.py +++ b/tests/common.py @@ -81,6 +81,10 @@ def generate_service_query(query_array): return "query TestService {\n services {" + "\n".join(query_array) + "}\n}" +def generate_logs_query(query_array): + return "query TestService {\n logs {" + "\n".join(query_array) + "}\n}" + + def mnemonic_to_hex(mnemonic): return Mnemonic(language="english").to_entropy(mnemonic).hex() diff --git a/tests/test_graphql/test_api_logs.py b/tests/test_graphql/test_api_logs.py new file mode 100644 index 0000000..587b6c1 --- /dev/null +++ b/tests/test_graphql/test_api_logs.py @@ -0,0 +1,133 @@ +from datetime import datetime +from systemd import journal + + +def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry): + assert api_entry["message"] == journal_entry["MESSAGE"] + assert ( + datetime.fromisoformat(api_entry["timestamp"]) + == journal_entry["__REALTIME_TIMESTAMP"] + ) + assert api_entry["priority"] == journal_entry["PRIORITY"] + assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT") + assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE") + + +def take_from_journal(j, limit, next): + entries = [] + for _ in range(0, limit): + entry = next(j) + if entry["MESSAGE"] != "": + entries.append(entry) + return entries + + +API_GET_LOGS_WITH_UP_BORDER = """ +query TestQuery($upCursor: String) { + logs { + paginated(limit: 4, upCursor: $upCursor) { + pageMeta { + upCursor + downCursor + } + entries { + message + timestamp + priority + systemdUnit + systemdSlice + } + } + } +} +""" + +API_GET_LOGS_WITH_DOWN_BORDER = """ +query TestQuery($downCursor: String) { + logs { + paginated(limit: 4, downCursor: $downCursor) { + pageMeta { + upCursor + downCursor + } + entries { + message + timestamp + priority + systemdUnit + systemdSlice + } + } + } +} +""" + + +def test_graphql_get_logs_with_up_border(authorized_client): + j = journal.Reader() + j.seek_tail() + + # < - cursor + # <- - log entry will be returned by API call. + # ... + # log < + # log <- + # log <- + # log <- + # log <- + # log + + expected_entries = take_from_journal(j, 6, lambda j: j.get_previous()) + expected_entries.reverse() + + response = authorized_client.post( + "/graphql", + json={ + "query": API_GET_LOGS_WITH_UP_BORDER, + "variables": {"upCursor": expected_entries[0]["__CURSOR"]}, + }, + ) + assert response.status_code == 200 + + expected_entries = expected_entries[1:-1] + returned_entries = response.json()["data"]["logs"]["paginated"]["entries"] + + assert len(returned_entries) == len(expected_entries) + + for api_entry, journal_entry in zip(returned_entries, expected_entries): + assert_log_entry_equals_to_journal_entry(api_entry, journal_entry) + + +def test_graphql_get_logs_with_down_border(authorized_client): + j = journal.Reader() + j.seek_head() + j.get_next() + + # < - cursor + # <- - log entry will be returned by API call. + # log + # log <- + # log <- + # log <- + # log <- + # log < + # ... + + expected_entries = take_from_journal(j, 5, lambda j: j.get_next()) + + response = authorized_client.post( + "/graphql", + json={ + "query": API_GET_LOGS_WITH_DOWN_BORDER, + "variables": {"downCursor": expected_entries[-1]["__CURSOR"]}, + }, + ) + assert response.status_code == 200 + + expected_entries = expected_entries[:-1] + returned_entries = response.json()["data"]["logs"]["paginated"]["entries"] + + assert len(returned_entries) == len(expected_entries) + + for api_entry, journal_entry in zip(returned_entries, expected_entries): + assert_log_entry_equals_to_journal_entry(api_entry, journal_entry) From 3d2c79ecb1065d1db78fb1be0990175ca8b80743 Mon Sep 17 00:00:00 2001 From: nhnn Date: Thu, 30 May 2024 10:05:36 +0300 Subject: [PATCH 26/34] feat: streaming of journald entries via graphql subscription --- default.nix | 1 + selfprivacy_api/graphql/queries/logs.py | 4 +-- selfprivacy_api/graphql/schema.py | 8 ++++- selfprivacy_api/graphql/subscriptions/logs.py | 31 ++++++++++++++++ tests/test_graphql/test_api_logs.py | 36 ++++++++++++++++++- 5 files changed, 76 insertions(+), 4 deletions(-) create mode 100644 selfprivacy_api/graphql/subscriptions/logs.py diff --git a/default.nix b/default.nix index 8d47c4d..1af935e 100644 --- a/default.nix +++ b/default.nix @@ -19,6 +19,7 @@ pythonPackages.buildPythonPackage rec { strawberry-graphql typing-extensions uvicorn + websockets ]; pythonImportsCheck = [ "selfprivacy_api" ]; doCheck = false; diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py index c16c950..b9e4af2 100644 --- a/selfprivacy_api/graphql/queries/logs.py +++ b/selfprivacy_api/graphql/queries/logs.py @@ -26,7 +26,7 @@ def get_events_from_journal( class LogEntry: message: str = strawberry.field() timestamp: datetime = strawberry.field() - priority: int = strawberry.field() + priority: typing.Optional[int] = strawberry.field() systemd_unit: typing.Optional[str] = strawberry.field() systemd_slice: typing.Optional[str] = strawberry.field() @@ -34,7 +34,7 @@ class LogEntry: self.entry = journal_entry self.message = journal_entry["MESSAGE"] self.timestamp = journal_entry["__REALTIME_TIMESTAMP"] - self.priority = journal_entry["PRIORITY"] + self.priority = journal_entry.get("PRIORITY") self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT") self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE") diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index c65e233..f0d5a11 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -25,7 +25,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations from selfprivacy_api.graphql.queries.api_queries import Api from selfprivacy_api.graphql.queries.backup import Backup from selfprivacy_api.graphql.queries.jobs import Job -from selfprivacy_api.graphql.queries.logs import Logs +from selfprivacy_api.graphql.queries.logs import LogEntry, Logs from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System @@ -34,6 +34,7 @@ from selfprivacy_api.graphql.subscriptions.jobs import ApiJob from selfprivacy_api.graphql.subscriptions.jobs import ( job_updates as job_update_generator, ) +from selfprivacy_api.graphql.subscriptions.logs import log_stream from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.queries.users import Users @@ -174,6 +175,11 @@ class Subscription: yield i await asyncio.sleep(0.5) + @strawberry.subscription + async def log_entries(self, info: strawberry.types.Info) -> AsyncGenerator[LogEntry, None]: + reject_if_unauthenticated(info) + return log_stream() + schema = strawberry.Schema( query=Query, diff --git a/selfprivacy_api/graphql/subscriptions/logs.py b/selfprivacy_api/graphql/subscriptions/logs.py new file mode 100644 index 0000000..be8a004 --- /dev/null +++ b/selfprivacy_api/graphql/subscriptions/logs.py @@ -0,0 +1,31 @@ +from typing import AsyncGenerator, List +from systemd import journal +import asyncio + +from selfprivacy_api.graphql.queries.logs import LogEntry + + +async def log_stream() -> AsyncGenerator[LogEntry, None]: + j = journal.Reader() + + j.seek_tail() + j.get_previous() + + queue = asyncio.Queue() + + async def callback(): + if j.process() != journal.APPEND: + return + for entry in j: + await queue.put(entry) + + asyncio.get_event_loop().add_reader(j, lambda: asyncio.ensure_future(callback())) + + while True: + entry = await queue.get() + try: + yield LogEntry(entry) + except: + asyncio.get_event_loop().remove_reader(j) + return + queue.task_done() diff --git a/tests/test_graphql/test_api_logs.py b/tests/test_graphql/test_api_logs.py index 587b6c1..18f4d32 100644 --- a/tests/test_graphql/test_api_logs.py +++ b/tests/test_graphql/test_api_logs.py @@ -1,6 +1,8 @@ from datetime import datetime from systemd import journal +from tests.test_graphql.test_websocket import init_graphql + def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry): assert api_entry["message"] == journal_entry["MESSAGE"] @@ -8,7 +10,7 @@ def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry): datetime.fromisoformat(api_entry["timestamp"]) == journal_entry["__REALTIME_TIMESTAMP"] ) - assert api_entry["priority"] == journal_entry["PRIORITY"] + assert api_entry.get("priority") == journal_entry.get("PRIORITY") assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT") assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE") @@ -131,3 +133,35 @@ def test_graphql_get_logs_with_down_border(authorized_client): for api_entry, journal_entry in zip(returned_entries, expected_entries): assert_log_entry_equals_to_journal_entry(api_entry, journal_entry) + + +def test_websocket_subscription_for_logs(authorized_client): + with authorized_client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription { logEntries { message } }", + }, + } + ) + + def read_until(message, limit=5): + i = 0 + while i < limit: + msg = websocket.receive_json()["payload"]["data"]["logEntries"][ + "message" + ] + if msg == message: + return + else: + continue + raise Exception("Failed to read websocket data, timeout") + + for i in range(0, 10): + journal.send(f"Lorem ipsum number {i}") + read_until(f"Lorem ipsum number {i}") From 8b2e4666dd28f53151a47467f5cccd564927d2b0 Mon Sep 17 00:00:00 2001 From: nhnn Date: Tue, 11 Jun 2024 12:36:42 +0300 Subject: [PATCH 27/34] fix: rename PageMeta to LogsPageMeta --- selfprivacy_api/graphql/queries/logs.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py index b9e4af2..52204cf 100644 --- a/selfprivacy_api/graphql/queries/logs.py +++ b/selfprivacy_api/graphql/queries/logs.py @@ -44,7 +44,7 @@ class LogEntry: @strawberry.type -class PageMeta: +class LogsPageMeta: up_cursor: typing.Optional[str] = strawberry.field() down_cursor: typing.Optional[str] = strawberry.field() @@ -57,22 +57,22 @@ class PageMeta: @strawberry.type class PaginatedEntries: - page_meta: PageMeta = strawberry.field(description="Metadata to aid in pagination.") + page_meta: LogsPageMeta = strawberry.field(description="Metadata to aid in pagination.") entries: typing.List[LogEntry] = strawberry.field( description="The list of log entries." ) - def __init__(self, meta: PageMeta, entries: typing.List[LogEntry]): + def __init__(self, meta: LogsPageMeta, entries: typing.List[LogEntry]): self.page_meta = meta self.entries = entries @staticmethod def from_entries(entries: typing.List[LogEntry]): if entries == []: - return PaginatedEntries(PageMeta(None, None), []) + return PaginatedEntries(LogsPageMeta(None, None), []) return PaginatedEntries( - PageMeta( + LogsPageMeta( entries[0].cursor(), entries[-1].cursor(), ), From 5f3fc0d96e37aed82aebc59424f461c5e5d312f1 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Wed, 10 Jul 2024 19:18:22 +0400 Subject: [PATCH 28/34] chore: formatting --- selfprivacy_api/graphql/queries/logs.py | 4 +++- selfprivacy_api/graphql/schema.py | 5 ++++- 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py index 52204cf..6aa864f 100644 --- a/selfprivacy_api/graphql/queries/logs.py +++ b/selfprivacy_api/graphql/queries/logs.py @@ -57,7 +57,9 @@ class LogsPageMeta: @strawberry.type class PaginatedEntries: - page_meta: LogsPageMeta = strawberry.field(description="Metadata to aid in pagination.") + page_meta: LogsPageMeta = strawberry.field( + description="Metadata to aid in pagination." + ) entries: typing.List[LogEntry] = strawberry.field( description="The list of log entries." ) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index f0d5a11..a515d0c 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -176,7 +176,10 @@ class Subscription: await asyncio.sleep(0.5) @strawberry.subscription - async def log_entries(self, info: strawberry.types.Info) -> AsyncGenerator[LogEntry, None]: + async def log_entries( + self, + info: strawberry.types.Info, + ) -> AsyncGenerator[LogEntry, None]: reject_if_unauthenticated(info) return log_stream() From 4ca9b9f54e0c3991eade23276ce924881ea1d976 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Wed, 10 Jul 2024 21:46:14 +0400 Subject: [PATCH 29/34] fix: Wait for ws logs test to init --- tests/test_graphql/test_api_logs.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test_graphql/test_api_logs.py b/tests/test_graphql/test_api_logs.py index 18f4d32..6875531 100644 --- a/tests/test_graphql/test_api_logs.py +++ b/tests/test_graphql/test_api_logs.py @@ -1,3 +1,5 @@ +import asyncio +import pytest from datetime import datetime from systemd import journal @@ -135,7 +137,8 @@ def test_graphql_get_logs_with_down_border(authorized_client): assert_log_entry_equals_to_journal_entry(api_entry, journal_entry) -def test_websocket_subscription_for_logs(authorized_client): +@pytest.mark.asyncio +async def test_websocket_subscription_for_logs(authorized_client): with authorized_client.websocket_connect( "/graphql", subprotocols=["graphql-transport-ws"] ) as websocket: @@ -149,6 +152,7 @@ def test_websocket_subscription_for_logs(authorized_client): }, } ) + await asyncio.sleep(1) def read_until(message, limit=5): i = 0 @@ -159,6 +163,7 @@ def test_websocket_subscription_for_logs(authorized_client): if msg == message: return else: + i += 1 continue raise Exception("Failed to read websocket data, timeout") From 859ac4dbc6ba3664ac2e46cd12bd3fdbac9f6311 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Thu, 11 Jul 2024 19:08:04 +0400 Subject: [PATCH 30/34] chore: Update nixpkgs --- flake.lock | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/flake.lock b/flake.lock index 1f52d36..ba47e51 100644 --- a/flake.lock +++ b/flake.lock @@ -2,11 +2,11 @@ "nodes": { "nixpkgs": { "locked": { - "lastModified": 1709677081, - "narHash": "sha256-tix36Y7u0rkn6mTm0lA45b45oab2cFLqAzDbJxeXS+c=", + "lastModified": 1719957072, + "narHash": "sha256-gvFhEf5nszouwLAkT9nWsDzocUTqLWHuL++dvNjMp9I=", "owner": "nixos", "repo": "nixpkgs", - "rev": "880992dcc006a5e00dd0591446fdf723e6a51a64", + "rev": "7144d6241f02d171d25fba3edeaf15e0f2592105", "type": "github" }, "original": { From 94b0276f743622873a59fe46e131f64b7286e196 Mon Sep 17 00:00:00 2001 From: nhnn Date: Fri, 12 Jul 2024 20:50:43 +0300 Subject: [PATCH 31/34] fix: extract business logic to utils/systemd_journal.py --- selfprivacy_api/graphql/queries/logs.py | 53 ++++------------------- selfprivacy_api/graphql/schema.py | 3 +- selfprivacy_api/utils/systemd_journal.py | 55 ++++++++++++++++++++++++ 3 files changed, 64 insertions(+), 47 deletions(-) create mode 100644 selfprivacy_api/utils/systemd_journal.py diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py index 6aa864f..6841f30 100644 --- a/selfprivacy_api/graphql/queries/logs.py +++ b/selfprivacy_api/graphql/queries/logs.py @@ -1,25 +1,8 @@ """System logs""" from datetime import datetime -import os import typing import strawberry -from systemd import journal - - -def get_events_from_journal( - j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict] -): - events = [] - i = 0 - while i < limit: - entry = next(j) - if entry == None or entry == dict(): - break - if entry["MESSAGE"] != "": - events.append(LogEntry(entry)) - i += 1 - - return events +from selfprivacy_api.utils.systemd_journal import get_paginated_logs @strawberry.type @@ -95,31 +78,11 @@ class Logs: ) -> PaginatedEntries: if limit > 50: raise Exception("You can't fetch more than 50 entries via single request.") - j = journal.Reader() - - if up_cursor == None and down_cursor == None: - j.seek_tail() - - events = get_events_from_journal(j, limit, lambda j: j.get_previous()) - events.reverse() - - return PaginatedEntries.from_entries(events) - elif up_cursor == None and down_cursor != None: - j.seek_cursor(down_cursor) - j.get_previous() # pagination is exclusive - - events = get_events_from_journal(j, limit, lambda j: j.get_previous()) - events.reverse() - - return PaginatedEntries.from_entries(events) - elif up_cursor != None and down_cursor == None: - j.seek_cursor(up_cursor) - j.get_next() # pagination is exclusive - - events = get_events_from_journal(j, limit, lambda j: j.get_next()) - - return PaginatedEntries.from_entries(events) - else: - raise NotImplemented( - "Pagination by both up_cursor and down_cursor is not implemented" + return PaginatedEntries.from_entries( + list( + map( + lambda x: LogEntry(x), + get_paginated_logs(limit, up_cursor, down_cursor), + ) ) + ) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index a515d0c..534bacf 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -177,8 +177,7 @@ class Subscription: @strawberry.subscription async def log_entries( - self, - info: strawberry.types.Info, + self, info: strawberry.types.Info ) -> AsyncGenerator[LogEntry, None]: reject_if_unauthenticated(info) return log_stream() diff --git a/selfprivacy_api/utils/systemd_journal.py b/selfprivacy_api/utils/systemd_journal.py new file mode 100644 index 0000000..6c03c93 --- /dev/null +++ b/selfprivacy_api/utils/systemd_journal.py @@ -0,0 +1,55 @@ +import typing +from systemd import journal + + +def get_events_from_journal( + j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict] +): + events = [] + i = 0 + while i < limit: + entry = next(j) + if entry is None or entry == dict(): + break + if entry["MESSAGE"] != "": + events.append(entry) + i += 1 + + return events + + +def get_paginated_logs( + limit: int = 20, + up_cursor: str + | None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results. + down_cursor: str + | None = None, # All entries returned will be greater than this cursor. Sets lower bound on results. +): + j = journal.Reader() + + if up_cursor is None and down_cursor is None: + j.seek_tail() + + events = get_events_from_journal(j, limit, lambda j: j.get_previous()) + events.reverse() + + return events + elif up_cursor is None and down_cursor is not None: + j.seek_cursor(down_cursor) + j.get_previous() # pagination is exclusive + + events = get_events_from_journal(j, limit, lambda j: j.get_previous()) + events.reverse() + + return events + elif up_cursor is not None and down_cursor is None: + j.seek_cursor(up_cursor) + j.get_next() # pagination is exclusive + + events = get_events_from_journal(j, limit, lambda j: j.get_next()) + + return events + else: + raise NotImplementedError( + "Pagination by both up_cursor and down_cursor is not implemented" + ) From cc4b41165766a1f1a0b8981887ff127a65f3605e Mon Sep 17 00:00:00 2001 From: Inex Code Date: Mon, 15 Jul 2024 16:59:15 +0400 Subject: [PATCH 32/34] refactor: Replace strawberry.types.Info with just Info --- selfprivacy_api/graphql/schema.py | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 534bacf..b49a629 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -4,6 +4,7 @@ import asyncio from typing import AsyncGenerator, List import strawberry +from strawberry.types import Info from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.mutations.deprecated_mutations import ( @@ -144,11 +145,11 @@ class Mutation( # A cruft for Websockets -def authenticated(info: strawberry.types.Info) -> bool: +def authenticated(info: Info) -> bool: return IsAuthenticated().has_permission(source=None, info=info) -def reject_if_unauthenticated(info: strawberry.types.Info): +def reject_if_unauthenticated(info: Info): if not authenticated(info): raise Exception(IsAuthenticated().message) @@ -161,24 +162,20 @@ class Subscription: demands it while the spec is vague in this area.""" @strawberry.subscription - async def job_updates( - self, info: strawberry.types.Info - ) -> AsyncGenerator[List[ApiJob], None]: + async def job_updates(self, info: Info) -> AsyncGenerator[List[ApiJob], None]: reject_if_unauthenticated(info) return job_update_generator() @strawberry.subscription # Used for testing, consider deletion to shrink attack surface - async def count(self, info: strawberry.types.Info) -> AsyncGenerator[int, None]: + async def count(self, info: Info) -> AsyncGenerator[int, None]: reject_if_unauthenticated(info) for i in range(10): yield i await asyncio.sleep(0.5) @strawberry.subscription - async def log_entries( - self, info: strawberry.types.Info - ) -> AsyncGenerator[LogEntry, None]: + async def log_entries(self, info: Info) -> AsyncGenerator[LogEntry, None]: reject_if_unauthenticated(info) return log_stream() From 5c5e098bab3a3d8ea7acf3a42893010464db247c Mon Sep 17 00:00:00 2001 From: Inex Code Date: Mon, 15 Jul 2024 17:01:33 +0400 Subject: [PATCH 33/34] style: do not break line before logic operator --- selfprivacy_api/graphql/queries/logs.py | 8 ++++---- selfprivacy_api/utils/systemd_journal.py | 8 ++++---- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py index 6841f30..cf8fe21 100644 --- a/selfprivacy_api/graphql/queries/logs.py +++ b/selfprivacy_api/graphql/queries/logs.py @@ -71,10 +71,10 @@ class Logs: def paginated( self, limit: int = 20, - up_cursor: str - | None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results. - down_cursor: str - | None = None, # All entries returned will be greater than this cursor. Sets lower bound on results. + # All entries returned will be lesser than this cursor. Sets upper bound on results. + up_cursor: str | None = None, + # All entries returned will be greater than this cursor. Sets lower bound on results. + down_cursor: str | None = None, ) -> PaginatedEntries: if limit > 50: raise Exception("You can't fetch more than 50 entries via single request.") diff --git a/selfprivacy_api/utils/systemd_journal.py b/selfprivacy_api/utils/systemd_journal.py index 6c03c93..48e97b8 100644 --- a/selfprivacy_api/utils/systemd_journal.py +++ b/selfprivacy_api/utils/systemd_journal.py @@ -20,10 +20,10 @@ def get_events_from_journal( def get_paginated_logs( limit: int = 20, - up_cursor: str - | None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results. - down_cursor: str - | None = None, # All entries returned will be greater than this cursor. Sets lower bound on results. + # All entries returned will be lesser than this cursor. Sets upper bound on results. + up_cursor: str | None = None, + # All entries returned will be greater than this cursor. Sets lower bound on results. + down_cursor: str | None = None, ): j = journal.Reader() From d8fe54e0e941bed8564ee0dab76cc4b5740c570d Mon Sep 17 00:00:00 2001 From: Inex Code Date: Mon, 15 Jul 2024 17:05:38 +0400 Subject: [PATCH 34/34] fix: do not use bare 'except' --- selfprivacy_api/graphql/subscriptions/logs.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/selfprivacy_api/graphql/subscriptions/logs.py b/selfprivacy_api/graphql/subscriptions/logs.py index be8a004..1e47dba 100644 --- a/selfprivacy_api/graphql/subscriptions/logs.py +++ b/selfprivacy_api/graphql/subscriptions/logs.py @@ -1,4 +1,4 @@ -from typing import AsyncGenerator, List +from typing import AsyncGenerator from systemd import journal import asyncio @@ -25,7 +25,7 @@ async def log_stream() -> AsyncGenerator[LogEntry, None]: entry = await queue.get() try: yield LogEntry(entry) - except: + except Exception: asyncio.get_event_loop().remove_reader(j) return queue.task_done()