Merge pull request 'feat: graphql endpoint to fetch system logs' (#116) from nhnn/selfprivacy-rest-api:api-logs into master

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/116
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
This commit is contained in:
Inex Code 2024-07-15 16:23:30 +03:00
commit 16d1f9f21a
8 changed files with 371 additions and 9 deletions

View file

@ -14,6 +14,7 @@ pythonPackages.buildPythonPackage rec {
pydantic
pytz
redis
systemd
setuptools
strawberry-graphql
typing-extensions

View file

@ -2,11 +2,11 @@
"nodes": {
"nixpkgs": {
"locked": {
"lastModified": 1709677081,
"narHash": "sha256-tix36Y7u0rkn6mTm0lA45b45oab2cFLqAzDbJxeXS+c=",
"lastModified": 1719957072,
"narHash": "sha256-gvFhEf5nszouwLAkT9nWsDzocUTqLWHuL++dvNjMp9I=",
"owner": "nixos",
"repo": "nixpkgs",
"rev": "880992dcc006a5e00dd0591446fdf723e6a51a64",
"rev": "7144d6241f02d171d25fba3edeaf15e0f2592105",
"type": "github"
},
"original": {

View file

@ -0,0 +1,88 @@
"""System logs"""
from datetime import datetime
import typing
import strawberry
from selfprivacy_api.utils.systemd_journal import get_paginated_logs
@strawberry.type
class LogEntry:
message: str = strawberry.field()
timestamp: datetime = strawberry.field()
priority: typing.Optional[int] = strawberry.field()
systemd_unit: typing.Optional[str] = strawberry.field()
systemd_slice: typing.Optional[str] = strawberry.field()
def __init__(self, journal_entry: typing.Dict):
self.entry = journal_entry
self.message = journal_entry["MESSAGE"]
self.timestamp = journal_entry["__REALTIME_TIMESTAMP"]
self.priority = journal_entry.get("PRIORITY")
self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT")
self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE")
@strawberry.field()
def cursor(self) -> str:
return self.entry["__CURSOR"]
@strawberry.type
class LogsPageMeta:
up_cursor: typing.Optional[str] = strawberry.field()
down_cursor: typing.Optional[str] = strawberry.field()
def __init__(
self, up_cursor: typing.Optional[str], down_cursor: typing.Optional[str]
):
self.up_cursor = up_cursor
self.down_cursor = down_cursor
@strawberry.type
class PaginatedEntries:
page_meta: LogsPageMeta = strawberry.field(
description="Metadata to aid in pagination."
)
entries: typing.List[LogEntry] = strawberry.field(
description="The list of log entries."
)
def __init__(self, meta: LogsPageMeta, entries: typing.List[LogEntry]):
self.page_meta = meta
self.entries = entries
@staticmethod
def from_entries(entries: typing.List[LogEntry]):
if entries == []:
return PaginatedEntries(LogsPageMeta(None, None), [])
return PaginatedEntries(
LogsPageMeta(
entries[0].cursor(),
entries[-1].cursor(),
),
entries,
)
@strawberry.type
class Logs:
@strawberry.field()
def paginated(
self,
limit: int = 20,
# All entries returned will be lesser than this cursor. Sets upper bound on results.
up_cursor: str | None = None,
# All entries returned will be greater than this cursor. Sets lower bound on results.
down_cursor: str | None = None,
) -> PaginatedEntries:
if limit > 50:
raise Exception("You can't fetch more than 50 entries via single request.")
return PaginatedEntries.from_entries(
list(
map(
lambda x: LogEntry(x),
get_paginated_logs(limit, up_cursor, down_cursor),
)
)
)

View file

@ -4,6 +4,7 @@
import asyncio
from typing import AsyncGenerator, List
import strawberry
from strawberry.types import Info
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.deprecated_mutations import (
@ -25,6 +26,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations
from selfprivacy_api.graphql.queries.api_queries import Api
from selfprivacy_api.graphql.queries.backup import Backup
from selfprivacy_api.graphql.queries.jobs import Job
from selfprivacy_api.graphql.queries.logs import LogEntry, Logs
from selfprivacy_api.graphql.queries.services import Services
from selfprivacy_api.graphql.queries.storage import Storage
from selfprivacy_api.graphql.queries.system import System
@ -33,6 +35,7 @@ from selfprivacy_api.graphql.subscriptions.jobs import ApiJob
from selfprivacy_api.graphql.subscriptions.jobs import (
job_updates as job_update_generator,
)
from selfprivacy_api.graphql.subscriptions.logs import log_stream
from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations
from selfprivacy_api.graphql.queries.users import Users
@ -53,6 +56,11 @@ class Query:
"""System queries"""
return System()
@strawberry.field(permission_classes=[IsAuthenticated])
def logs(self) -> Logs:
"""Log queries"""
return Logs()
@strawberry.field(permission_classes=[IsAuthenticated])
def users(self) -> Users:
"""Users queries"""
@ -137,11 +145,11 @@ class Mutation(
# A cruft for Websockets
def authenticated(info: strawberry.types.Info) -> bool:
def authenticated(info: Info) -> bool:
return IsAuthenticated().has_permission(source=None, info=info)
def reject_if_unauthenticated(info: strawberry.types.Info):
def reject_if_unauthenticated(info: Info):
if not authenticated(info):
raise Exception(IsAuthenticated().message)
@ -154,20 +162,23 @@ class Subscription:
demands it while the spec is vague in this area."""
@strawberry.subscription
async def job_updates(
self, info: strawberry.types.Info
) -> AsyncGenerator[List[ApiJob], None]:
async def job_updates(self, info: Info) -> AsyncGenerator[List[ApiJob], None]:
reject_if_unauthenticated(info)
return job_update_generator()
@strawberry.subscription
# Used for testing, consider deletion to shrink attack surface
async def count(self, info: strawberry.types.Info) -> AsyncGenerator[int, None]:
async def count(self, info: Info) -> AsyncGenerator[int, None]:
reject_if_unauthenticated(info)
for i in range(10):
yield i
await asyncio.sleep(0.5)
@strawberry.subscription
async def log_entries(self, info: Info) -> AsyncGenerator[LogEntry, None]:
reject_if_unauthenticated(info)
return log_stream()
schema = strawberry.Schema(
query=Query,

View file

@ -0,0 +1,31 @@
from typing import AsyncGenerator
from systemd import journal
import asyncio
from selfprivacy_api.graphql.queries.logs import LogEntry
async def log_stream() -> AsyncGenerator[LogEntry, None]:
j = journal.Reader()
j.seek_tail()
j.get_previous()
queue = asyncio.Queue()
async def callback():
if j.process() != journal.APPEND:
return
for entry in j:
await queue.put(entry)
asyncio.get_event_loop().add_reader(j, lambda: asyncio.ensure_future(callback()))
while True:
entry = await queue.get()
try:
yield LogEntry(entry)
except Exception:
asyncio.get_event_loop().remove_reader(j)
return
queue.task_done()

View file

@ -0,0 +1,55 @@
import typing
from systemd import journal
def get_events_from_journal(
j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict]
):
events = []
i = 0
while i < limit:
entry = next(j)
if entry is None or entry == dict():
break
if entry["MESSAGE"] != "":
events.append(entry)
i += 1
return events
def get_paginated_logs(
limit: int = 20,
# All entries returned will be lesser than this cursor. Sets upper bound on results.
up_cursor: str | None = None,
# All entries returned will be greater than this cursor. Sets lower bound on results.
down_cursor: str | None = None,
):
j = journal.Reader()
if up_cursor is None and down_cursor is None:
j.seek_tail()
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return events
elif up_cursor is None and down_cursor is not None:
j.seek_cursor(down_cursor)
j.get_previous() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return events
elif up_cursor is not None and down_cursor is None:
j.seek_cursor(up_cursor)
j.get_next() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_next())
return events
else:
raise NotImplementedError(
"Pagination by both up_cursor and down_cursor is not implemented"
)

View file

@ -81,6 +81,10 @@ def generate_service_query(query_array):
return "query TestService {\n services {" + "\n".join(query_array) + "}\n}"
def generate_logs_query(query_array):
return "query TestService {\n logs {" + "\n".join(query_array) + "}\n}"
def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex()

View file

@ -0,0 +1,172 @@
import asyncio
import pytest
from datetime import datetime
from systemd import journal
from tests.test_graphql.test_websocket import init_graphql
def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry):
assert api_entry["message"] == journal_entry["MESSAGE"]
assert (
datetime.fromisoformat(api_entry["timestamp"])
== journal_entry["__REALTIME_TIMESTAMP"]
)
assert api_entry.get("priority") == journal_entry.get("PRIORITY")
assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT")
assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE")
def take_from_journal(j, limit, next):
entries = []
for _ in range(0, limit):
entry = next(j)
if entry["MESSAGE"] != "":
entries.append(entry)
return entries
API_GET_LOGS_WITH_UP_BORDER = """
query TestQuery($upCursor: String) {
logs {
paginated(limit: 4, upCursor: $upCursor) {
pageMeta {
upCursor
downCursor
}
entries {
message
timestamp
priority
systemdUnit
systemdSlice
}
}
}
}
"""
API_GET_LOGS_WITH_DOWN_BORDER = """
query TestQuery($downCursor: String) {
logs {
paginated(limit: 4, downCursor: $downCursor) {
pageMeta {
upCursor
downCursor
}
entries {
message
timestamp
priority
systemdUnit
systemdSlice
}
}
}
}
"""
def test_graphql_get_logs_with_up_border(authorized_client):
j = journal.Reader()
j.seek_tail()
# < - cursor
# <- - log entry will be returned by API call.
# ...
# log <
# log <-
# log <-
# log <-
# log <-
# log
expected_entries = take_from_journal(j, 6, lambda j: j.get_previous())
expected_entries.reverse()
response = authorized_client.post(
"/graphql",
json={
"query": API_GET_LOGS_WITH_UP_BORDER,
"variables": {"upCursor": expected_entries[0]["__CURSOR"]},
},
)
assert response.status_code == 200
expected_entries = expected_entries[1:-1]
returned_entries = response.json()["data"]["logs"]["paginated"]["entries"]
assert len(returned_entries) == len(expected_entries)
for api_entry, journal_entry in zip(returned_entries, expected_entries):
assert_log_entry_equals_to_journal_entry(api_entry, journal_entry)
def test_graphql_get_logs_with_down_border(authorized_client):
j = journal.Reader()
j.seek_head()
j.get_next()
# < - cursor
# <- - log entry will be returned by API call.
# log
# log <-
# log <-
# log <-
# log <-
# log <
# ...
expected_entries = take_from_journal(j, 5, lambda j: j.get_next())
response = authorized_client.post(
"/graphql",
json={
"query": API_GET_LOGS_WITH_DOWN_BORDER,
"variables": {"downCursor": expected_entries[-1]["__CURSOR"]},
},
)
assert response.status_code == 200
expected_entries = expected_entries[:-1]
returned_entries = response.json()["data"]["logs"]["paginated"]["entries"]
assert len(returned_entries) == len(expected_entries)
for api_entry, journal_entry in zip(returned_entries, expected_entries):
assert_log_entry_equals_to_journal_entry(api_entry, journal_entry)
@pytest.mark.asyncio
async def test_websocket_subscription_for_logs(authorized_client):
with authorized_client.websocket_connect(
"/graphql", subprotocols=["graphql-transport-ws"]
) as websocket:
init_graphql(websocket)
websocket.send_json(
{
"id": "3aaa2445",
"type": "subscribe",
"payload": {
"query": "subscription TestSubscription { logEntries { message } }",
},
}
)
await asyncio.sleep(1)
def read_until(message, limit=5):
i = 0
while i < limit:
msg = websocket.receive_json()["payload"]["data"]["logEntries"][
"message"
]
if msg == message:
return
else:
i += 1
continue
raise Exception("Failed to read websocket data, timeout")
for i in range(0, 10):
journal.send(f"Lorem ipsum number {i}")
read_until(f"Lorem ipsum number {i}")