fix: extract business logic to utils/systemd_journal.py

This commit is contained in:
nhnn 2024-07-12 20:50:43 +03:00
parent 859ac4dbc6
commit 94b0276f74
3 changed files with 64 additions and 47 deletions

View file

@ -1,25 +1,8 @@
"""System logs""" """System logs"""
from datetime import datetime from datetime import datetime
import os
import typing import typing
import strawberry import strawberry
from systemd import journal from selfprivacy_api.utils.systemd_journal import get_paginated_logs
def get_events_from_journal(
j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict]
):
events = []
i = 0
while i < limit:
entry = next(j)
if entry == None or entry == dict():
break
if entry["MESSAGE"] != "":
events.append(LogEntry(entry))
i += 1
return events
@strawberry.type @strawberry.type
@ -95,31 +78,11 @@ class Logs:
) -> PaginatedEntries: ) -> PaginatedEntries:
if limit > 50: if limit > 50:
raise Exception("You can't fetch more than 50 entries via single request.") raise Exception("You can't fetch more than 50 entries via single request.")
j = journal.Reader() return PaginatedEntries.from_entries(
list(
if up_cursor == None and down_cursor == None: map(
j.seek_tail() lambda x: LogEntry(x),
get_paginated_logs(limit, up_cursor, down_cursor),
events = get_events_from_journal(j, limit, lambda j: j.get_previous()) )
events.reverse() )
return PaginatedEntries.from_entries(events)
elif up_cursor == None and down_cursor != None:
j.seek_cursor(down_cursor)
j.get_previous() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return PaginatedEntries.from_entries(events)
elif up_cursor != None and down_cursor == None:
j.seek_cursor(up_cursor)
j.get_next() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_next())
return PaginatedEntries.from_entries(events)
else:
raise NotImplemented(
"Pagination by both up_cursor and down_cursor is not implemented"
) )

View file

@ -177,8 +177,7 @@ class Subscription:
@strawberry.subscription @strawberry.subscription
async def log_entries( async def log_entries(
self, self, info: strawberry.types.Info
info: strawberry.types.Info,
) -> AsyncGenerator[LogEntry, None]: ) -> AsyncGenerator[LogEntry, None]:
reject_if_unauthenticated(info) reject_if_unauthenticated(info)
return log_stream() return log_stream()

View file

@ -0,0 +1,55 @@
import typing
from systemd import journal
def get_events_from_journal(
j: journal.Reader, limit: int, next: typing.Callable[[journal.Reader], typing.Dict]
):
events = []
i = 0
while i < limit:
entry = next(j)
if entry is None or entry == dict():
break
if entry["MESSAGE"] != "":
events.append(entry)
i += 1
return events
def get_paginated_logs(
limit: int = 20,
up_cursor: str
| None = None, # All entries returned will be lesser than this cursor. Sets upper bound on results.
down_cursor: str
| None = None, # All entries returned will be greater than this cursor. Sets lower bound on results.
):
j = journal.Reader()
if up_cursor is None and down_cursor is None:
j.seek_tail()
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return events
elif up_cursor is None and down_cursor is not None:
j.seek_cursor(down_cursor)
j.get_previous() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_previous())
events.reverse()
return events
elif up_cursor is not None and down_cursor is None:
j.seek_cursor(up_cursor)
j.get_next() # pagination is exclusive
events = get_events_from_journal(j, limit, lambda j: j.get_next())
return events
else:
raise NotImplementedError(
"Pagination by both up_cursor and down_cursor is not implemented"
)