feature(jobs): UNSAFE endpoint to get job updates

This commit is contained in:
Houkime 2024-05-22 11:04:37 +00:00
parent 3910e416db
commit 6d2fdab071
4 changed files with 83 additions and 38 deletions

View file

@ -12,9 +12,10 @@ from selfprivacy_api.graphql.common_types.jobs import (
def get_all_jobs() -> List[ApiJob]: def get_all_jobs() -> List[ApiJob]:
Jobs.get_jobs() jobs = Jobs.get_jobs()
api_jobs = [job_to_api_job(job) for job in jobs]
return [job_to_api_job(job) for job in Jobs.get_jobs()] assert api_jobs is not None
return api_jobs
@strawberry.type @strawberry.type

View file

@ -2,7 +2,7 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import asyncio import asyncio
from typing import AsyncGenerator from typing import AsyncGenerator, List
import strawberry import strawberry
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.deprecated_mutations import ( from selfprivacy_api.graphql.mutations.deprecated_mutations import (
@ -28,7 +28,9 @@ from selfprivacy_api.graphql.queries.services import Services
from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.storage import Storage
from selfprivacy_api.graphql.queries.system import System from selfprivacy_api.graphql.queries.system import System
from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions from selfprivacy_api.graphql.subscriptions.jobs import ApiJob
from selfprivacy_api.jobs import job_notifications
from selfprivacy_api.graphql.queries.jobs import get_all_jobs
from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations
from selfprivacy_api.graphql.queries.users import Users from selfprivacy_api.graphql.queries.users import Users
@ -136,10 +138,15 @@ class Mutation(
class Subscription: class Subscription:
"""Root schema for subscriptions""" """Root schema for subscriptions"""
@strawberry.field(permission_classes=[IsAuthenticated]) @strawberry.subscription
def jobs(self) -> JobSubscriptions: async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]:
"""Jobs subscriptions""" # Send the complete list of jobs every time anything gets updated
return JobSubscriptions() async for notification in job_notifications():
yield get_all_jobs()
# @strawberry.subscription
# async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]:
# return job_updates()
@strawberry.subscription @strawberry.subscription
async def count(self) -> AsyncGenerator[int, None]: async def count(self) -> AsyncGenerator[int, None]:

View file

@ -1,5 +1,4 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import strawberry
from typing import AsyncGenerator, List from typing import AsyncGenerator, List
@ -9,12 +8,7 @@ from selfprivacy_api.graphql.common_types.jobs import ApiJob
from selfprivacy_api.graphql.queries.jobs import get_all_jobs from selfprivacy_api.graphql.queries.jobs import get_all_jobs
@strawberry.type async def job_updates() -> AsyncGenerator[List[ApiJob], None]:
class JobSubscriptions:
"""Subscriptions related to jobs"""
@strawberry.subscription
async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]:
# Send the complete list of jobs every time anything gets updated # Send the complete list of jobs every time anything gets updated
async for notification in job_notifications(): async for notification in job_notifications():
yield get_all_jobs() yield get_all_jobs()

View file

@ -1,6 +1,13 @@
from tests.common import generate_jobs_subscription from tests.common import generate_jobs_subscription
from selfprivacy_api.graphql.queries.jobs import Job as _Job
# from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions
import pytest
import asyncio
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Jobs
from time import sleep
from tests.test_redis import empty_redis
JOBS_SUBSCRIPTION = """ JOBS_SUBSCRIPTION = """
jobUpdates { jobUpdates {
@ -91,21 +98,57 @@ def test_websocket_subscription_minimal(authorized_client):
} }
# def test_websocket_subscription(authorized_client): async def read_one_job(websocket):
# client = authorized_client # bug? We only get them starting from the second job update
# with client.websocket_connect( # that's why we receive two jobs in the list them
# "/graphql", subprotocols=["graphql-transport-ws"] # the first update gets lost somewhere
# ) as websocket: response = websocket.receive_json()
# init_graphql(websocket) return response
# websocket.send_json(
# {
# "id": "3aaa2445", @pytest.mark.asyncio
# "type": "subscribe", async def test_websocket_subscription(authorized_client, empty_redis, event_loop):
# "payload": { client = authorized_client
# "query": generate_jobs_subscription([JOBS_SUBSCRIPTION]), with client.websocket_connect(
# }, "/graphql", subprotocols=["graphql-transport-ws"]
# } ) as websocket:
# ) init_graphql(websocket)
# Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy") websocket.send_json(
# response = websocket.receive_json() {
# raise NotImplementedError(response) "id": "3aaa2445",
"type": "subscribe",
"payload": {
"query": "subscription TestSubscription {"
+ JOBS_SUBSCRIPTION
+ "}",
},
}
)
future = asyncio.create_task(read_one_job(websocket))
jobs = []
jobs.append(Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy it works"))
sleep(0.5)
jobs.append(Jobs.add("bogus2", "bogus.bogus", "yyyaaaaayy it works"))
response = await future
data = response["payload"]["data"]
jobs_received = data["jobUpdates"]
received_names = [job["name"] for job in jobs_received]
for job in jobs:
assert job.name in received_names
for job in jobs:
for api_job in jobs_received:
if (job.name) == api_job["name"]:
assert api_job["uid"] == str(job.uid)
assert api_job["typeId"] == job.type_id
assert api_job["name"] == job.name
assert api_job["description"] == job.description
assert api_job["status"] == job.status
assert api_job["statusText"] == job.status_text
assert api_job["progress"] == job.progress
assert api_job["createdAt"] == job.created_at.isoformat()
assert api_job["updatedAt"] == job.updated_at.isoformat()
assert api_job["finishedAt"] == None
assert api_job["error"] == None
assert api_job["result"] == None