feat: streaming of journald entries via graphql subscription

This commit is contained in:
nhnn 2024-05-30 10:05:36 +03:00
parent fc2ac0fe6d
commit 3d2c79ecb1
5 changed files with 76 additions and 4 deletions

View file

@ -19,6 +19,7 @@ pythonPackages.buildPythonPackage rec {
strawberry-graphql
typing-extensions
uvicorn
websockets
];
pythonImportsCheck = [ "selfprivacy_api" ];
doCheck = false;

View file

@ -26,7 +26,7 @@ def get_events_from_journal(
class LogEntry:
message: str = strawberry.field()
timestamp: datetime = strawberry.field()
priority: int = strawberry.field()
priority: typing.Optional[int] = strawberry.field()
systemd_unit: typing.Optional[str] = strawberry.field()
systemd_slice: typing.Optional[str] = strawberry.field()
@ -34,7 +34,7 @@ class LogEntry:
self.entry = journal_entry
self.message = journal_entry["MESSAGE"]
self.timestamp = journal_entry["__REALTIME_TIMESTAMP"]
self.priority = journal_entry["PRIORITY"]
self.priority = journal_entry.get("PRIORITY")
self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT")
self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE")

View file

@ -25,7 +25,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations
from selfprivacy_api.graphql.queries.api_queries import Api
from selfprivacy_api.graphql.queries.backup import Backup
from selfprivacy_api.graphql.queries.jobs import Job
from selfprivacy_api.graphql.queries.logs import Logs
from selfprivacy_api.graphql.queries.logs import LogEntry, Logs
from selfprivacy_api.graphql.queries.services import Services
from selfprivacy_api.graphql.queries.storage import Storage
from selfprivacy_api.graphql.queries.system import System
@ -34,6 +34,7 @@ from selfprivacy_api.graphql.subscriptions.jobs import ApiJob
from selfprivacy_api.graphql.subscriptions.jobs import (
job_updates as job_update_generator,
)
from selfprivacy_api.graphql.subscriptions.logs import log_stream
from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations
from selfprivacy_api.graphql.queries.users import Users
@ -174,6 +175,11 @@ class Subscription:
yield i
await asyncio.sleep(0.5)
@strawberry.subscription
async def log_entries(self, info: strawberry.types.Info) -> AsyncGenerator[LogEntry, None]:
reject_if_unauthenticated(info)
return log_stream()
schema = strawberry.Schema(
query=Query,

View file

@ -0,0 +1,31 @@
from typing import AsyncGenerator, List
from systemd import journal
import asyncio
from selfprivacy_api.graphql.queries.logs import LogEntry
async def log_stream() -> AsyncGenerator[LogEntry, None]:
j = journal.Reader()
j.seek_tail()
j.get_previous()
queue = asyncio.Queue()
async def callback():
if j.process() != journal.APPEND:
return
for entry in j:
await queue.put(entry)
asyncio.get_event_loop().add_reader(j, lambda: asyncio.ensure_future(callback()))
while True:
entry = await queue.get()
try:
yield LogEntry(entry)
except:
asyncio.get_event_loop().remove_reader(j)
return
queue.task_done()

View file

@ -1,6 +1,8 @@
from datetime import datetime
from systemd import journal
from tests.test_graphql.test_websocket import init_graphql
def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry):
assert api_entry["message"] == journal_entry["MESSAGE"]
@ -8,7 +10,7 @@ def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry):
datetime.fromisoformat(api_entry["timestamp"])
== journal_entry["__REALTIME_TIMESTAMP"]
)
assert api_entry["priority"] == journal_entry["PRIORITY"]
assert api_entry.get("priority") == journal_entry.get("PRIORITY")
assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT")
assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE")
@ -131,3 +133,35 @@ def test_graphql_get_logs_with_down_border(authorized_client):
for api_entry, journal_entry in zip(returned_entries, expected_entries):
assert_log_entry_equals_to_journal_entry(api_entry, journal_entry)
def test_websocket_subscription_for_logs(authorized_client):
with authorized_client.websocket_connect(
"/graphql", subprotocols=["graphql-transport-ws"]
) as websocket:
init_graphql(websocket)
websocket.send_json(
{
"id": "3aaa2445",
"type": "subscribe",
"payload": {
"query": "subscription TestSubscription { logEntries { message } }",
},
}
)
def read_until(message, limit=5):
i = 0
while i < limit:
msg = websocket.receive_json()["payload"]["data"]["logEntries"][
"message"
]
if msg == message:
return
else:
continue
raise Exception("Failed to read websocket data, timeout")
for i in range(0, 10):
journal.send(f"Lorem ipsum number {i}")
read_until(f"Lorem ipsum number {i}")