From 442538ee4361b4cf045e0b0b9673f05d1985f35d Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Wed, 22 May 2024 11:04:37 +0000 Subject: [PATCH] feature(jobs): UNSAFE endpoint to get job updates --- selfprivacy_api/graphql/queries/jobs.py | 7 +- selfprivacy_api/graphql/schema.py | 19 +++-- selfprivacy_api/graphql/subscriptions/jobs.py | 14 +--- tests/test_graphql/test_websocket.py | 81 ++++++++++++++----- 4 files changed, 83 insertions(+), 38 deletions(-) diff --git a/selfprivacy_api/graphql/queries/jobs.py b/selfprivacy_api/graphql/queries/jobs.py index 3cc3bf7..6a12838 100644 --- a/selfprivacy_api/graphql/queries/jobs.py +++ b/selfprivacy_api/graphql/queries/jobs.py @@ -12,9 +12,10 @@ from selfprivacy_api.graphql.common_types.jobs import ( def get_all_jobs() -> List[ApiJob]: - Jobs.get_jobs() - - return [job_to_api_job(job) for job in Jobs.get_jobs()] + jobs = Jobs.get_jobs() + api_jobs = [job_to_api_job(job) for job in jobs] + assert api_jobs is not None + return api_jobs @strawberry.type diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index 078ee3d..b8ed4e2 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -2,7 +2,7 @@ # pylint: disable=too-few-public-methods import asyncio -from typing import AsyncGenerator +from typing import AsyncGenerator, List import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.mutations.deprecated_mutations import ( @@ -28,7 +28,9 @@ from selfprivacy_api.graphql.queries.services import Services from selfprivacy_api.graphql.queries.storage import Storage from selfprivacy_api.graphql.queries.system import System -from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions +from selfprivacy_api.graphql.subscriptions.jobs import ApiJob +from selfprivacy_api.jobs import job_notifications +from selfprivacy_api.graphql.queries.jobs import get_all_jobs from selfprivacy_api.graphql.mutations.users_mutations import UsersMutations from selfprivacy_api.graphql.queries.users import Users @@ -136,10 +138,15 @@ class Mutation( class Subscription: """Root schema for subscriptions""" - @strawberry.field(permission_classes=[IsAuthenticated]) - def jobs(self) -> JobSubscriptions: - """Jobs subscriptions""" - return JobSubscriptions() + @strawberry.subscription + async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: + # Send the complete list of jobs every time anything gets updated + async for notification in job_notifications(): + yield get_all_jobs() + + # @strawberry.subscription + # async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: + # return job_updates() @strawberry.subscription async def count(self) -> AsyncGenerator[int, None]: diff --git a/selfprivacy_api/graphql/subscriptions/jobs.py b/selfprivacy_api/graphql/subscriptions/jobs.py index 380badb..11d6263 100644 --- a/selfprivacy_api/graphql/subscriptions/jobs.py +++ b/selfprivacy_api/graphql/subscriptions/jobs.py @@ -1,5 +1,4 @@ # pylint: disable=too-few-public-methods -import strawberry from typing import AsyncGenerator, List @@ -9,12 +8,7 @@ from selfprivacy_api.graphql.common_types.jobs import ApiJob from selfprivacy_api.graphql.queries.jobs import get_all_jobs -@strawberry.type -class JobSubscriptions: - """Subscriptions related to jobs""" - - @strawberry.subscription - async def job_updates(self) -> AsyncGenerator[List[ApiJob], None]: - # Send the complete list of jobs every time anything gets updated - async for notification in job_notifications(): - yield get_all_jobs() +async def job_updates() -> AsyncGenerator[List[ApiJob], None]: + # Send the complete list of jobs every time anything gets updated + async for notification in job_notifications(): + yield get_all_jobs() diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 58681e0..ee33262 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -1,6 +1,13 @@ from tests.common import generate_jobs_subscription -from selfprivacy_api.graphql.queries.jobs import Job as _Job + +# from selfprivacy_api.graphql.subscriptions.jobs import JobSubscriptions +import pytest +import asyncio + from selfprivacy_api.jobs import Jobs +from time import sleep + +from tests.test_redis import empty_redis JOBS_SUBSCRIPTION = """ jobUpdates { @@ -91,21 +98,57 @@ def test_websocket_subscription_minimal(authorized_client): } -# def test_websocket_subscription(authorized_client): -# client = authorized_client -# with client.websocket_connect( -# "/graphql", subprotocols=["graphql-transport-ws"] -# ) as websocket: -# init_graphql(websocket) -# websocket.send_json( -# { -# "id": "3aaa2445", -# "type": "subscribe", -# "payload": { -# "query": generate_jobs_subscription([JOBS_SUBSCRIPTION]), -# }, -# } -# ) -# Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy") -# response = websocket.receive_json() -# raise NotImplementedError(response) +async def read_one_job(websocket): + # bug? We only get them starting from the second job update + # that's why we receive two jobs in the list them + # the first update gets lost somewhere + response = websocket.receive_json() + return response + + +@pytest.mark.asyncio +async def test_websocket_subscription(authorized_client, empty_redis, event_loop): + client = authorized_client + with client.websocket_connect( + "/graphql", subprotocols=["graphql-transport-ws"] + ) as websocket: + init_graphql(websocket) + websocket.send_json( + { + "id": "3aaa2445", + "type": "subscribe", + "payload": { + "query": "subscription TestSubscription {" + + JOBS_SUBSCRIPTION + + "}", + }, + } + ) + future = asyncio.create_task(read_one_job(websocket)) + jobs = [] + jobs.append(Jobs.add("bogus", "bogus.bogus", "yyyaaaaayy it works")) + sleep(0.5) + jobs.append(Jobs.add("bogus2", "bogus.bogus", "yyyaaaaayy it works")) + + response = await future + data = response["payload"]["data"] + jobs_received = data["jobUpdates"] + received_names = [job["name"] for job in jobs_received] + for job in jobs: + assert job.name in received_names + + for job in jobs: + for api_job in jobs_received: + if (job.name) == api_job["name"]: + assert api_job["uid"] == str(job.uid) + assert api_job["typeId"] == job.type_id + assert api_job["name"] == job.name + assert api_job["description"] == job.description + assert api_job["status"] == job.status + assert api_job["statusText"] == job.status_text + assert api_job["progress"] == job.progress + assert api_job["createdAt"] == job.created_at.isoformat() + assert api_job["updatedAt"] == job.updated_at.isoformat() + assert api_job["finishedAt"] == None + assert api_job["error"] == None + assert api_job["result"] == None