diff --git a/default.nix b/default.nix index 8d47c4d..1af935e 100644 --- a/default.nix +++ b/default.nix @@ -19,6 +19,7 @@ pythonPackages.buildPythonPackage rec { strawberry-graphql typing-extensions uvicorn + websockets ]; pythonImportsCheck = [ "selfprivacy_api" ]; doCheck = false; diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py index c16c950..b9e4af2 100644 --- a/selfprivacy_api/graphql/queries/logs.py +++ b/selfprivacy_api/graphql/queries/logs.py @@ -26,7 +26,7 @@ def get_events_from_journal( class LogEntry: message: str = strawberry.field() timestamp: datetime = strawberry.field() - priority: int = strawberry.field() + priority: typing.Optional[int] = strawberry.field() systemd_unit: typing.Optional[str] = strawberry.field() systemd_slice: typing.Optional[str] = strawberry.field() @@ -34,7 +34,7 @@ class LogEntry: self.entry = journal_entry self.message = journal_entry["MESSAGE"] self.timestamp = journal_entry["__REALTIME_TIMESTAMP"] - self.priority = journal_entry["PRIORITY"] + self.priority = journal_entry.get("PRIORITY") self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT") self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE") diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index c65e233..f0d5a11 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -25,7 +25,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations from selfprivacy_api.graphql.queries.api_queries import Api from selfprivacy_api.graphql.queries.backup import Backup from selfprivacy_api.graphql.queries.jobs import Job -from selfprivacy_api.graphql.queries.logs import Logs +from selfprivacy_api.graphql.queries.logs import LogEntry, Logs from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System @@ -34,6 +34,7 @@ from selfprivacy_api.graphql.subscriptions.jobs import ApiJob from selfprivacy_api.graphql.subscriptions.jobs import ( job_updates as job_update_generator, ) +from selfprivacy_api.graphql.subscriptions.logs import log_stream from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.queries.users import Users @@ -174,6 +175,11 @@ class Subscription: yield i await asyncio.sleep(0.5) + @strawberry.subscription + async def log_entries(self, info: strawberry.types.Info) -> AsyncGenerator[LogEntry, None]: + reject_if_unauthenticated(info) + return log_stream() + schema = strawberry.Schema( query=Query, diff --git a/selfprivacy_api/graphql/subscriptions/logs.py b/selfprivacy_api/graphql/subscriptions/logs.py new file mode 100644 index 0000000..be8a004 --- /dev/null +++ b/selfprivacy_api/graphql/subscriptions/logs.py @@ -0,0 +1,31 @@ +from typing import AsyncGenerator, List +from systemd import journal +import asyncio + +from selfprivacy_api.graphql.queries.logs import LogEntry + + +async def log_stream() -> AsyncGenerator[LogEntry, None]: + j = journal.Reader() + + j.seek_tail() + j.get_previous() + + queue = asyncio.Queue() + + async def callback(): + if j.process() != journal.APPEND: + return + for entry in j: + await queue.put(entry) + + asyncio.get_event_loop().add_reader(j, lambda: asyncio.ensure_future(callback())) + + while True: + entry = await queue.get() + try: + yield LogEntry(entry) + except: + asyncio.get_event_loop().remove_reader(j) + return + queue.task_done() diff --git a/tests/test_graphql/test_api_logs.py b/tests/test_graphql/test_api_logs.py index 587b6c1..18f4d32 100644 --- a/tests/test_graphql/test_api_logs.py +++ b/tests/test_graphql/test_api_logs.py @@ -1,6 +1,8 @@ from datetime import datetime from systemd import journal +from tests.test_graphql.test_websocket import init_graphql + def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry): assert api_entry["message"] == journal_entry["MESSAGE"] @@ -8,7 +10,7 @@ def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry): datetime.fromisoformat(api_entry["timestamp"]) == journal_entry["__REALTIME_TIMESTAMP"] ) - assert api_entry["priority"] == journal_entry["PRIORITY"] + assert api_entry.get("priority") == journal_entry.get("PRIORITY") assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT") assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE") @@ -131,3 +133,35 @@ def test_graphql_get_logs_with_down_border(authorized_client): for api_entry, journal_entry in zip(returned_entries, expected_entries): assert_log_entry_equals_to_journal_entry(api_entry, journal_entry) + + +def test_websocket_subscription_for_logs(authorized_client): + with authorized_client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription { logEntries { message } }", + }, + } + ) + + def read_until(message, limit=5): + i = 0 + while i < limit: + msg = websocket.receive_json()["payload"]["data"]["logEntries"][ + "message" + ] + if msg == message: + return + else: + continue + raise Exception("Failed to read websocket data, timeout") + + for i in range(0, 10): + journal.send(f"Lorem ipsum number {i}") + read_until(f"Lorem ipsum number {i}")