diff --git a/selfprivacy_api/graphql/subscriptions/logs.py b/selfprivacy_api/graphql/subscriptions/logs.py index 1e47dba..dc7a65b 100644 --- a/selfprivacy_api/graphql/subscriptions/logs.py +++ b/selfprivacy_api/graphql/subscriptions/logs.py @@ -21,11 +21,17 @@ async def log_stream() -> AsyncGenerator[LogEntry, None]: asyncio.get_event_loop().add_reader(j, lambda: asyncio.ensure_future(callback())) - while True: - entry = await queue.get() - try: - yield LogEntry(entry) - except Exception: - asyncio.get_event_loop().remove_reader(j) - return - queue.task_done() + try: + while True: + entry = await queue.get() + try: + yield LogEntry(entry) + except Exception: + asyncio.get_event_loop().remove_reader(j) + j.close() + return + queue.task_done() + except asyncio.CancelledError: + asyncio.get_event_loop().remove_reader(j) + j.close() + return diff --git a/selfprivacy_api/utils/systemd_journal.py b/selfprivacy_api/utils/systemd_journal.py index cbb953f..e9992ac 100644 --- a/selfprivacy_api/utils/systemd_journal.py +++ b/selfprivacy_api/utils/systemd_journal.py @@ -42,6 +42,8 @@ def get_paginated_logs( events = get_events_from_journal(j, limit, lambda j: j.get_previous()) events.reverse() + j.close() + return events elif up_cursor is None and down_cursor is not None: j.seek_cursor(down_cursor) @@ -50,6 +52,8 @@ def get_paginated_logs( events = get_events_from_journal(j, limit, lambda j: j.get_previous()) events.reverse() + j.close() + return events elif up_cursor is not None and down_cursor is None: j.seek_cursor(up_cursor) @@ -57,8 +61,12 @@ def get_paginated_logs( events = get_events_from_journal(j, limit, lambda j: j.get_next()) + j.close() + return events else: + j.close() + raise NotImplementedError( "Pagination by both up_cursor and down_cursor is not implemented" )