feat: graphql endpoint to fetch system logs from journald

This commit is contained in:
nhnn 2024-05-27 12:59:43 +03:00
parent cb2a1421bf
commit fc2ac0fe6d
5 changed files with 267 additions and 0 deletions

View file

@ -14,6 +14,7 @@ pythonPackages.buildPythonPackage rec {
pydantic pydantic
pytz pytz
redis redis
systemd
setuptools setuptools
strawberry-graphql strawberry-graphql
typing-extensions typing-extensions

View file

@ -0,0 +1,123 @@
"""System logs"""
from datetime import datetime
import os
import typing
import strawberry
from systemd import journal
def get_events_from_journal(
j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict]
):
events = []
i = 0
while i < limit:
entry = next(j)
if entry == None or entry == dict():
break
if entry["MESSAGE"] != "":
events.append(LogEntry(entry))
i += 1
return events
@strawberry.type
class LogEntry:
message: str = strawberry.field()
timestamp: datetime = strawberry.field()
priority: int = strawberry.field()
systemd_unit: typing.Optional[str] = strawberry.field()
systemd_slice: typing.Optional[str] = strawberry.field()
def __init__(self, journal_entry: typing.Dict):
self.entry = journal_entry
self.message = journal_entry["MESSAGE"]
self.timestamp = journal_entry["__REALTIME_TIMESTAMP"]
self.priority = journal_entry["PRIORITY"]
self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT")
self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE")
@strawberry.field()
def cursor(self) -> str:
return self.entry["__CURSOR"]
@strawberry.type
class PageMeta:
up_cursor: typing.Optional[str] = strawberry.field()
down_cursor: typing.Optional[str] = strawberry.field()
def __init__(
self, up_cursor: typing.Optional[str], down_cursor: typing.Optional[str]
):
self.up_cursor = up_cursor
self.down_cursor = down_cursor
@strawberry.type
class PaginatedEntries:
page_meta: PageMeta = strawberry.field(description="Metadata to aid in pagination.")
entries: typing.List[LogEntry] = strawberry.field(
description="The list of log entries."
)
def __init__(self, meta: PageMeta, entries: typing.List[LogEntry]):
self.page_meta = meta
self.entries = entries
@staticmethod
def from_entries(entries: typing.List[LogEntry]):
if entries == []:
return PaginatedEntries(PageMeta(None, None), [])
return PaginatedEntries(
PageMeta(
entries[0].cursor(),
entries[-1].cursor(),
),
entries,
)
@strawberry.type
class Logs:
@strawberry.field()
def paginated(
self,
limit: int = 20,
up_cursor: str
| None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results.
down_cursor: str
| None = None, # All entries returned will be greater than this cursor. Sets lower bound on results.
) -> PaginatedEntries:
if limit > 50:
raise Exception("You can't fetch more than 50 entries via single request.")
j = journal.Reader()
if up_cursor == None and down_cursor == None:
j.seek_tail()
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return PaginatedEntries.from_entries(events)
elif up_cursor == None and down_cursor != None:
j.seek_cursor(down_cursor)
j.get_previous() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return PaginatedEntries.from_entries(events)
elif up_cursor != None and down_cursor == None:
j.seek_cursor(up_cursor)
j.get_next() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_next())
return PaginatedEntries.from_entries(events)
else:
raise NotImplemented(
"Pagination by both up_cursor and down_cursor is not implemented"
)

View file

@ -25,6 +25,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations
from selfprivacy_api.graphql.queries.api_queries import Api from selfprivacy_api.graphql.queries.api_queries import Api
from selfprivacy_api.graphql.queries.backup import Backup from selfprivacy_api.graphql.queries.backup import Backup
from selfprivacy_api.graphql.queries.jobs import Job from selfprivacy_api.graphql.queries.jobs import Job
from selfprivacy_api.graphql.queries.logs import Logs
from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.services import Services
from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.storage import Storage
from selfprivacy_api.graphql.queries.system import System from selfprivacy_api.graphql.queries.system import System
@ -53,6 +54,11 @@ class Query:
"""System queries""" """System queries"""
return System() return System()
@strawberry.field(permission_classes=[IsAuthenticated])
def logs(self) -> Logs:
"""Log queries"""
return Logs()
@strawberry.field(permission_classes=[IsAuthenticated]) @strawberry.field(permission_classes=[IsAuthenticated])
def users(self) -> Users: def users(self) -> Users:
"""Users queries""" """Users queries"""

View file

@ -81,6 +81,10 @@ def generate_service_query(query_array):
return "query TestService {\n services {" + "\n".join(query_array) + "}\n}" return "query TestService {\n services {" + "\n".join(query_array) + "}\n}"
def generate_logs_query(query_array):
return "query TestService {\n logs {" + "\n".join(query_array) + "}\n}"
def mnemonic_to_hex(mnemonic): def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex() return Mnemonic(language="english").to_entropy(mnemonic).hex()

View file

@ -0,0 +1,133 @@
from datetime import datetime
from systemd import journal
def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry):
assert api_entry["message"] == journal_entry["MESSAGE"]
assert (
datetime.fromisoformat(api_entry["timestamp"])
== journal_entry["__REALTIME_TIMESTAMP"]
)
assert api_entry["priority"] == journal_entry["PRIORITY"]
assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT")
assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE")
def take_from_journal(j, limit, next):
entries = []
for _ in range(0, limit):
entry = next(j)
if entry["MESSAGE"] != "":
entries.append(entry)
return entries
API_GET_LOGS_WITH_UP_BORDER = """
query TestQuery($upCursor: String) {
logs {
paginated(limit: 4, upCursor: $upCursor) {
pageMeta {
upCursor
downCursor
}
entries {
message
timestamp
priority
systemdUnit
systemdSlice
}
}
}
}
"""
API_GET_LOGS_WITH_DOWN_BORDER = """
query TestQuery($downCursor: String) {
logs {
paginated(limit: 4, downCursor: $downCursor) {
pageMeta {
upCursor
downCursor
}
entries {
message
timestamp
priority
systemdUnit
systemdSlice
}
}
}
}
"""
def test_graphql_get_logs_with_up_border(authorized_client):
j = journal.Reader()
j.seek_tail()
# < - cursor
# <- - log entry will be returned by API call.
# ...
# log <
# log <-
# log <-
# log <-
# log <-
# log
expected_entries = take_from_journal(j, 6, lambda j: j.get_previous())
expected_entries.reverse()
response = authorized_client.post(
"/graphql",
json={
"query": API_GET_LOGS_WITH_UP_BORDER,
"variables": {"upCursor": expected_entries[0]["__CURSOR"]},
},
)
assert response.status_code == 200
expected_entries = expected_entries[1:-1]
returned_entries = response.json()["data"]["logs"]["paginated"]["entries"]
assert len(returned_entries) == len(expected_entries)
for api_entry, journal_entry in zip(returned_entries, expected_entries):
assert_log_entry_equals_to_journal_entry(api_entry, journal_entry)
def test_graphql_get_logs_with_down_border(authorized_client):
j = journal.Reader()
j.seek_head()
j.get_next()
# < - cursor
# <- - log entry will be returned by API call.
# log
# log <-
# log <-
# log <-
# log <-
# log <
# ...
expected_entries = take_from_journal(j, 5, lambda j: j.get_next())
response = authorized_client.post(
"/graphql",
json={
"query": API_GET_LOGS_WITH_DOWN_BORDER,
"variables": {"downCursor": expected_entries[-1]["__CURSOR"]},
},
)
assert response.status_code == 200
expected_entries = expected_entries[:-1]
returned_entries = response.json()["data"]["logs"]["paginated"]["entries"]
assert len(returned_entries) == len(expected_entries)
for api_entry, journal_entry in zip(returned_entries, expected_entries):
assert_log_entry_equals_to_journal_entry(api_entry, journal_entry)