selfprivacy-rest-api/selfprivacy_api/graphql/subscriptions/jobs.py
2024-07-04 17:19:25 +03:00

21 lines
622 B
Python

# pylint: disable=too-few-public-methods
import strawberry
from typing import AsyncGenerator, List
from selfprivacy_api.jobs import job_notifications
from selfprivacy_api.graphql.common_types.jobs import ApiJob
from selfprivacy_api.graphql.queries.jobs import get_all_jobs
@strawberry.type
class JobSubscriptions:
"""Subscriptions related to jobs"""
@strawberry.subscription
async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]:
# Send the complete list of jobs every time anything gets updated
async for notification in job_notifications():
yield get_all_jobs()