mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-25 02:06:54 +00:00
test(websocket): remove some duplication
This commit is contained in:
parent
57378a7940
commit
41f6d8b6d2
|
@ -34,6 +34,18 @@ jobUpdates {
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def api_subscribe(websocket, id, subscription):
|
||||||
|
websocket.send_json(
|
||||||
|
{
|
||||||
|
"id": id,
|
||||||
|
"type": "subscribe",
|
||||||
|
"payload": {
|
||||||
|
"query": "subscription TestSubscription {" + subscription + "}",
|
||||||
|
},
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
def connect_ws_authenticated(authorized_client) -> WebSocketTestSession:
|
def connect_ws_authenticated(authorized_client) -> WebSocketTestSession:
|
||||||
token = "Bearer " + str(DEVICE_WE_AUTH_TESTS_WITH["token"])
|
token = "Bearer " + str(DEVICE_WE_AUTH_TESTS_WITH["token"])
|
||||||
return authorized_client.websocket_connect(
|
return authorized_client.websocket_connect(
|
||||||
|
@ -61,7 +73,7 @@ def init_graphql(websocket):
|
||||||
def authenticated_websocket(
|
def authenticated_websocket(
|
||||||
authorized_client,
|
authorized_client,
|
||||||
) -> Generator[WebSocketTestSession, None, None]:
|
) -> Generator[WebSocketTestSession, None, None]:
|
||||||
# We use authorized_client only tohave token in the repo, this client by itself is not enough to authorize websocket
|
# We use authorized_client only to have token in the repo, this client by itself is not enough to authorize websocket
|
||||||
|
|
||||||
ValueError(TOKEN_REPO.get_tokens())
|
ValueError(TOKEN_REPO.get_tokens())
|
||||||
with connect_ws_authenticated(authorized_client) as websocket:
|
with connect_ws_authenticated(authorized_client) as websocket:
|
||||||
|
@ -104,45 +116,30 @@ def test_websocket_graphql_ping(authorized_client):
|
||||||
assert pong == {"type": "pong"}
|
assert pong == {"type": "pong"}
|
||||||
|
|
||||||
|
|
||||||
def api_subscribe(websocket, id, subscription):
|
def test_websocket_subscription_minimal(authorized_client, authenticated_websocket):
|
||||||
websocket.send_json(
|
|
||||||
{
|
|
||||||
"id": id,
|
|
||||||
"type": "subscribe",
|
|
||||||
"payload": {
|
|
||||||
"query": "subscription TestSubscription {" + subscription + "}",
|
|
||||||
},
|
|
||||||
}
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
def test_websocket_subscription_minimal(authorized_client):
|
|
||||||
# Test a small endpoint that exists specifically for tests
|
# Test a small endpoint that exists specifically for tests
|
||||||
client = authorized_client
|
websocket = authenticated_websocket
|
||||||
with client.websocket_connect(
|
init_graphql(websocket)
|
||||||
"/graphql", subprotocols=["graphql-transport-ws"]
|
arbitrary_id = "3aaa2445"
|
||||||
) as websocket:
|
api_subscribe(websocket, arbitrary_id, "count")
|
||||||
init_graphql(websocket)
|
response = websocket.receive_json()
|
||||||
arbitrary_id = "3aaa2445"
|
assert response == {
|
||||||
api_subscribe(websocket, arbitrary_id, "count")
|
"id": arbitrary_id,
|
||||||
response = websocket.receive_json()
|
"payload": {"data": {"count": 0}},
|
||||||
assert response == {
|
"type": "next",
|
||||||
"id": arbitrary_id,
|
}
|
||||||
"payload": {"data": {"count": 0}},
|
response = websocket.receive_json()
|
||||||
"type": "next",
|
assert response == {
|
||||||
}
|
"id": arbitrary_id,
|
||||||
response = websocket.receive_json()
|
"payload": {"data": {"count": 1}},
|
||||||
assert response == {
|
"type": "next",
|
||||||
"id": arbitrary_id,
|
}
|
||||||
"payload": {"data": {"count": 1}},
|
response = websocket.receive_json()
|
||||||
"type": "next",
|
assert response == {
|
||||||
}
|
"id": arbitrary_id,
|
||||||
response = websocket.receive_json()
|
"payload": {"data": {"count": 2}},
|
||||||
assert response == {
|
"type": "next",
|
||||||
"id": arbitrary_id,
|
}
|
||||||
"payload": {"data": {"count": 2}},
|
|
||||||
"type": "next",
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
def test_websocket_subscription_minimal_unauthorized(unauthenticated_websocket):
|
def test_websocket_subscription_minimal_unauthorized(unauthenticated_websocket):
|
||||||
|
|
Loading…
Reference in a new issue