From fc2ac0fe6d0e4a834922bdf818328b9c46882302 Mon Sep 17 00:00:00 2001 From: nhnn Date: Mon, 27 May 2024 12:59:43 +0300 Subject: [PATCH] feat: graphql endpoint to fetch system logs from journald --- default.nix | 1 + selfprivacy_api/graphql/queries/logs.py | 123 ++++++++++++++++++++++ selfprivacy_api/graphql/schema.py | 6 ++ tests/common.py | 4 + tests/test_graphql/test_api_logs.py | 133 ++++++++++++++++++++++++ 5 files changed, 267 insertions(+) create mode 100644 selfprivacy_api/graphql/queries/logs.py create mode 100644 tests/test_graphql/test_api_logs.py diff --git a/default.nix b/default.nix index e7e6fcf..8d47c4d 100644 --- a/default.nix +++ b/default.nix @@ -14,6 +14,7 @@ pythonPackages.buildPythonPackage rec { pydantic pytz redis + systemd setuptools strawberry-graphql typing-extensions diff --git a/selfprivacy_api/graphql/queries/logs.py b/selfprivacy_api/graphql/queries/logs.py new file mode 100644 index 0000000..c16c950 --- /dev/null +++ b/selfprivacy_api/graphql/queries/logs.py @@ -0,0 +1,123 @@ +"""System logs""" +from datetime import datetime +import os +import typing +import strawberry +from systemd import journal + + +def get_events_from_journal( + j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict] +): + events = [] + i = 0 + while i < limit: + entry = next(j) + if entry == None or entry == dict(): + break + if entry["MESSAGE"] != "": + events.append(LogEntry(entry)) + i += 1 + + return events + + +@strawberry.type +class LogEntry: + message: str = strawberry.field() + timestamp: datetime = strawberry.field() + priority: int = strawberry.field() + systemd_unit: typing.Optional[str] = strawberry.field() + systemd_slice: typing.Optional[str] = strawberry.field() + + def __init__(self, journal_entry: typing.Dict): + self.entry = journal_entry + self.message = journal_entry["MESSAGE"] + self.timestamp = journal_entry["__REALTIME_TIMESTAMP"] + self.priority = journal_entry["PRIORITY"] + self.systemd_unit = journal_entry.get("_SYSTEMD_UNIT") + self.systemd_slice = journal_entry.get("_SYSTEMD_SLICE") + + @strawberry.field() + def cursor(self) -> str: + return self.entry["__CURSOR"] + + +@strawberry.type +class PageMeta: + up_cursor: typing.Optional[str] = strawberry.field() + down_cursor: typing.Optional[str] = strawberry.field() + + def __init__( + self, up_cursor: typing.Optional[str], down_cursor: typing.Optional[str] + ): + self.up_cursor = up_cursor + self.down_cursor = down_cursor + + +@strawberry.type +class PaginatedEntries: + page_meta: PageMeta = strawberry.field(description="Metadata to aid in pagination.") + entries: typing.List[LogEntry] = strawberry.field( + description="The list of log entries." + ) + + def __init__(self, meta: PageMeta, entries: typing.List[LogEntry]): + self.page_meta = meta + self.entries = entries + + @staticmethod + def from_entries(entries: typing.List[LogEntry]): + if entries == []: + return PaginatedEntries(PageMeta(None, None), []) + + return PaginatedEntries( + PageMeta( + entries[0].cursor(), + entries[-1].cursor(), + ), + entries, + ) + + +@strawberry.type +class Logs: + @strawberry.field() + def paginated( + self, + limit: int = 20, + up_cursor: str + | None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results. + down_cursor: str + | None = None, # All entries returned will be greater than this cursor. Sets lower bound on results. + ) -> PaginatedEntries: + if limit > 50: + raise Exception("You can't fetch more than 50 entries via single request.") + j = journal.Reader() + + if up_cursor == None and down_cursor == None: + j.seek_tail() + + events = get_events_from_journal(j, limit, lambda j: j.get_previous()) + events.reverse() + + return PaginatedEntries.from_entries(events) + elif up_cursor == None and down_cursor != None: + j.seek_cursor(down_cursor) + j.get_previous() # pagination is exclusive + + events = get_events_from_journal(j, limit, lambda j: j.get_previous()) + events.reverse() + + return PaginatedEntries.from_entries(events) + elif up_cursor != None and down_cursor == None: + j.seek_cursor(up_cursor) + j.get_next() # pagination is exclusive + + events = get_events_from_journal(j, limit, lambda j: j.get_next()) + + return PaginatedEntries.from_entries(events) + else: + raise NotImplemented( + "Pagination by both up_cursor and down_cursor is not implemented" + ) diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 05e6bf9..c65e233 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -25,6 +25,7 @@ from selfprivacy_api.graphql.mutations.backup_mutations import BackupMutations from selfprivacy_api.graphql.queries.api_queries import Api from selfprivacy_api.graphql.queries.backup import Backup from selfprivacy_api.graphql.queries.jobs import Job +from selfprivacy_api.graphql.queries.logs import Logs from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System @@ -53,6 +54,11 @@ class Query: """System queries""" return System() + @strawberry.field(permission_classes=[IsAuthenticated]) + def logs(self) -> Logs: + """Log queries""" + return Logs() + @strawberry.field(permission_classes=[IsAuthenticated]) def users(self) -> Users: """Users queries""" diff --git a/tests/common.py b/tests/common.py index 3c05033..1de3893 100644 --- a/tests/common.py +++ b/tests/common.py @@ -81,6 +81,10 @@ def generate_service_query(query_array): return "query TestService {\n services {" + "\n".join(query_array) + "}\n}" +def generate_logs_query(query_array): + return "query TestService {\n logs {" + "\n".join(query_array) + "}\n}" + + def mnemonic_to_hex(mnemonic): return Mnemonic(language="english").to_entropy(mnemonic).hex() diff --git a/tests/test_graphql/test_api_logs.py b/tests/test_graphql/test_api_logs.py new file mode 100644 index 0000000..587b6c1 --- /dev/null +++ b/tests/test_graphql/test_api_logs.py @@ -0,0 +1,133 @@ +from datetime import datetime +from systemd import journal + + +def assert_log_entry_equals_to_journal_entry(api_entry, journal_entry): + assert api_entry["message"] == journal_entry["MESSAGE"] + assert ( + datetime.fromisoformat(api_entry["timestamp"]) + == journal_entry["__REALTIME_TIMESTAMP"] + ) + assert api_entry["priority"] == journal_entry["PRIORITY"] + assert api_entry.get("systemdUnit") == journal_entry.get("_SYSTEMD_UNIT") + assert api_entry.get("systemdSlice") == journal_entry.get("_SYSTEMD_SLICE") + + +def take_from_journal(j, limit, next): + entries = [] + for _ in range(0, limit): + entry = next(j) + if entry["MESSAGE"] != "": + entries.append(entry) + return entries + + +API_GET_LOGS_WITH_UP_BORDER = """ +query TestQuery($upCursor: String) { + logs { + paginated(limit: 4, upCursor: $upCursor) { + pageMeta { + upCursor + downCursor + } + entries { + message + timestamp + priority + systemdUnit + systemdSlice + } + } + } +} +""" + +API_GET_LOGS_WITH_DOWN_BORDER = """ +query TestQuery($downCursor: String) { + logs { + paginated(limit: 4, downCursor: $downCursor) { + pageMeta { + upCursor + downCursor + } + entries { + message + timestamp + priority + systemdUnit + systemdSlice + } + } + } +} +""" + + +def test_graphql_get_logs_with_up_border(authorized_client): + j = journal.Reader() + j.seek_tail() + + # < - cursor + # <- - log entry will be returned by API call. + # ... + # log < + # log <- + # log <- + # log <- + # log <- + # log + + expected_entries = take_from_journal(j, 6, lambda j: j.get_previous()) + expected_entries.reverse() + + response = authorized_client.post( + "/graphql", + json={ + "query": API_GET_LOGS_WITH_UP_BORDER, + "variables": {"upCursor": expected_entries[0]["__CURSOR"]}, + }, + ) + assert response.status_code == 200 + + expected_entries = expected_entries[1:-1] + returned_entries = response.json()["data"]["logs"]["paginated"]["entries"] + + assert len(returned_entries) == len(expected_entries) + + for api_entry, journal_entry in zip(returned_entries, expected_entries): + assert_log_entry_equals_to_journal_entry(api_entry, journal_entry) + + +def test_graphql_get_logs_with_down_border(authorized_client): + j = journal.Reader() + j.seek_head() + j.get_next() + + # < - cursor + # <- - log entry will be returned by API call. + # log + # log <- + # log <- + # log <- + # log <- + # log < + # ... + + expected_entries = take_from_journal(j, 5, lambda j: j.get_next()) + + response = authorized_client.post( + "/graphql", + json={ + "query": API_GET_LOGS_WITH_DOWN_BORDER, + "variables": {"downCursor": expected_entries[-1]["__CURSOR"]}, + }, + ) + assert response.status_code == 200 + + expected_entries = expected_entries[:-1] + returned_entries = response.json()["data"]["logs"]["paginated"]["entries"] + + assert len(returned_entries) == len(expected_entries) + + for api_entry, journal_entry in zip(returned_entries, expected_entries): + assert_log_entry_equals_to_journal_entry(api_entry, journal_entry)