mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-22 20:11:30 +00:00
test(tokens-repo): rework expiring recovery key tests
This commit is contained in:
parent
ce4fbdae0a
commit
18f5ff815c
|
@ -1,4 +1,7 @@
|
||||||
from tests.common import generate_api_query
|
from tests.common import generate_api_query
|
||||||
|
from tests.conftest import TOKENS_FILE_CONTENTS, DEVICE_WE_AUTH_TESTS_WITH
|
||||||
|
|
||||||
|
ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"]
|
||||||
|
|
||||||
|
|
||||||
def assert_ok(response, request):
|
def assert_ok(response, request):
|
||||||
|
@ -58,3 +61,24 @@ def set_client_token(client, token):
|
||||||
def assert_token_valid(client, token):
|
def assert_token_valid(client, token):
|
||||||
set_client_token(client, token)
|
set_client_token(client, token)
|
||||||
assert graphql_get_devices(client) is not None
|
assert graphql_get_devices(client) is not None
|
||||||
|
|
||||||
|
|
||||||
|
def assert_same(graphql_devices, abstract_devices):
|
||||||
|
"""Orderless comparison"""
|
||||||
|
assert len(graphql_devices) == len(abstract_devices)
|
||||||
|
for original_device in abstract_devices:
|
||||||
|
assert original_device["name"] in [device["name"] for device in graphql_devices]
|
||||||
|
for device in graphql_devices:
|
||||||
|
if device["name"] == original_device["name"]:
|
||||||
|
assert device["creationDate"] == original_device["date"].isoformat()
|
||||||
|
|
||||||
|
|
||||||
|
def assert_original(client):
|
||||||
|
devices = graphql_get_devices(client)
|
||||||
|
assert_same(devices, ORIGINAL_DEVICES)
|
||||||
|
|
||||||
|
for device in devices:
|
||||||
|
if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]:
|
||||||
|
assert device["isCaller"] is True
|
||||||
|
else:
|
||||||
|
assert device["isCaller"] is False
|
||||||
|
|
|
@ -14,14 +14,15 @@ from tests.test_graphql.common import (
|
||||||
assert_ok,
|
assert_ok,
|
||||||
assert_errorcode,
|
assert_errorcode,
|
||||||
assert_token_valid,
|
assert_token_valid,
|
||||||
|
assert_original,
|
||||||
|
assert_same,
|
||||||
graphql_get_devices,
|
graphql_get_devices,
|
||||||
request_devices,
|
request_devices,
|
||||||
set_client_token,
|
set_client_token,
|
||||||
API_DEVICES_QUERY,
|
API_DEVICES_QUERY,
|
||||||
|
ORIGINAL_DEVICES,
|
||||||
)
|
)
|
||||||
|
|
||||||
ORIGINAL_DEVICES = TOKENS_FILE_CONTENTS["tokens"]
|
|
||||||
|
|
||||||
|
|
||||||
def graphql_get_caller_token_info(client):
|
def graphql_get_caller_token_info(client):
|
||||||
devices = graphql_get_devices(client)
|
devices = graphql_get_devices(client)
|
||||||
|
@ -30,27 +31,6 @@ def graphql_get_caller_token_info(client):
|
||||||
return device
|
return device
|
||||||
|
|
||||||
|
|
||||||
def assert_same(graphql_devices, abstract_devices):
|
|
||||||
"""Orderless comparison"""
|
|
||||||
assert len(graphql_devices) == len(abstract_devices)
|
|
||||||
for original_device in abstract_devices:
|
|
||||||
assert original_device["name"] in [device["name"] for device in graphql_devices]
|
|
||||||
for device in graphql_devices:
|
|
||||||
if device["name"] == original_device["name"]:
|
|
||||||
assert device["creationDate"] == original_device["date"].isoformat()
|
|
||||||
|
|
||||||
|
|
||||||
def assert_original(client):
|
|
||||||
devices = graphql_get_devices(client)
|
|
||||||
assert_same(devices, ORIGINAL_DEVICES)
|
|
||||||
|
|
||||||
for device in devices:
|
|
||||||
if device["name"] == DEVICE_WE_AUTH_TESTS_WITH["name"]:
|
|
||||||
assert device["isCaller"] is True
|
|
||||||
else:
|
|
||||||
assert device["isCaller"] is False
|
|
||||||
|
|
||||||
|
|
||||||
def graphql_get_new_device_key(authorized_client) -> str:
|
def graphql_get_new_device_key(authorized_client) -> str:
|
||||||
response = authorized_client.post(
|
response = authorized_client.post(
|
||||||
"/graphql",
|
"/graphql",
|
||||||
|
|
|
@ -9,12 +9,16 @@ from tests.common import (
|
||||||
read_json,
|
read_json,
|
||||||
write_json,
|
write_json,
|
||||||
assert_recovery_recent,
|
assert_recovery_recent,
|
||||||
|
NearFuture,
|
||||||
|
RECOVERY_KEY_VALIDATION_DATETIME,
|
||||||
)
|
)
|
||||||
from tests.test_graphql.common import (
|
from tests.test_graphql.common import (
|
||||||
assert_empty,
|
assert_empty,
|
||||||
assert_data,
|
assert_data,
|
||||||
assert_ok,
|
assert_ok,
|
||||||
|
assert_errorcode,
|
||||||
assert_token_valid,
|
assert_token_valid,
|
||||||
|
assert_original,
|
||||||
graphql_get_devices,
|
graphql_get_devices,
|
||||||
set_client_token,
|
set_client_token,
|
||||||
)
|
)
|
||||||
|
@ -46,13 +50,24 @@ def graphql_recovery_status(client):
|
||||||
return status
|
return status
|
||||||
|
|
||||||
|
|
||||||
def graphql_get_new_recovery_key(client):
|
def request_make_new_recovery_key(client, expires_at=None, uses=None):
|
||||||
response = client.post(
|
json = {"query": API_RECOVERY_KEY_GENERATE_MUTATION}
|
||||||
"/graphql",
|
limits = {}
|
||||||
json={
|
|
||||||
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
|
if expires_at is not None:
|
||||||
},
|
limits["expirationDate"] = expires_at.isoformat()
|
||||||
)
|
if uses is not None:
|
||||||
|
limits["uses"] = uses
|
||||||
|
|
||||||
|
if limits != {}:
|
||||||
|
json["variables"] = {"limits": limits}
|
||||||
|
|
||||||
|
response = client.post("/graphql", json=json)
|
||||||
|
return response
|
||||||
|
|
||||||
|
|
||||||
|
def graphql_make_new_recovery_key(client, expires_at=None, uses=None):
|
||||||
|
response = request_make_new_recovery_key(client, expires_at, uses)
|
||||||
assert_ok(response, "getNewRecoveryApiKey")
|
assert_ok(response, "getNewRecoveryApiKey")
|
||||||
key = response.json()["data"]["getNewRecoveryApiKey"]["key"]
|
key = response.json()["data"]["getNewRecoveryApiKey"]["key"]
|
||||||
assert key is not None
|
assert key is not None
|
||||||
|
@ -60,8 +75,8 @@ def graphql_get_new_recovery_key(client):
|
||||||
return key
|
return key
|
||||||
|
|
||||||
|
|
||||||
def graphql_use_recovery_key(client, key, device_name):
|
def request_recovery_auth(client, key, device_name):
|
||||||
response = client.post(
|
return client.post(
|
||||||
"/graphql",
|
"/graphql",
|
||||||
json={
|
json={
|
||||||
"query": API_RECOVERY_KEY_USE_MUTATION,
|
"query": API_RECOVERY_KEY_USE_MUTATION,
|
||||||
|
@ -73,6 +88,10 @@ def graphql_use_recovery_key(client, key, device_name):
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
)
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def graphql_use_recovery_key(client, key, device_name):
|
||||||
|
response = request_recovery_auth(client, key, device_name)
|
||||||
assert_ok(response, "useRecoveryApiKey")
|
assert_ok(response, "useRecoveryApiKey")
|
||||||
token = response.json()["data"]["useRecoveryApiKey"]["token"]
|
token = response.json()["data"]["useRecoveryApiKey"]["token"]
|
||||||
assert token is not None
|
assert token is not None
|
||||||
|
@ -122,7 +141,7 @@ mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
|
||||||
|
|
||||||
|
|
||||||
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
|
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
|
||||||
key = graphql_get_new_recovery_key(authorized_client)
|
key = graphql_make_new_recovery_key(authorized_client)
|
||||||
|
|
||||||
status = graphql_recovery_status(authorized_client)
|
status = graphql_recovery_status(authorized_client)
|
||||||
assert status["exists"] is True
|
assert status["exists"] is True
|
||||||
|
@ -140,154 +159,40 @@ def test_graphql_generate_recovery_key_with_expiration_date(
|
||||||
client, authorized_client, tokens_file
|
client, authorized_client, tokens_file
|
||||||
):
|
):
|
||||||
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
|
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
|
||||||
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f")
|
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
||||||
response = authorized_client.post(
|
|
||||||
"/graphql",
|
|
||||||
json={
|
|
||||||
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
|
|
||||||
"variables": {
|
|
||||||
"limits": {
|
|
||||||
"expirationDate": expiration_date_str,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json().get("data") is not None
|
|
||||||
assert response.json()["data"]["getNewRecoveryApiKey"]["success"] is True
|
|
||||||
assert response.json()["data"]["getNewRecoveryApiKey"]["message"] is not None
|
|
||||||
assert response.json()["data"]["getNewRecoveryApiKey"]["code"] == 200
|
|
||||||
assert response.json()["data"]["getNewRecoveryApiKey"]["key"] is not None
|
|
||||||
assert (
|
|
||||||
response.json()["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__()
|
|
||||||
== 18
|
|
||||||
)
|
|
||||||
assert read_json(tokens_file)["recovery_token"] is not None
|
|
||||||
|
|
||||||
key = response.json()["data"]["getNewRecoveryApiKey"]["key"]
|
status = graphql_recovery_status(authorized_client)
|
||||||
assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str
|
assert status["exists"] is True
|
||||||
assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key)
|
assert status["valid"] is True
|
||||||
|
assert_recovery_recent(status["creationDate"])
|
||||||
|
assert status["expirationDate"] == expiration_date.isoformat()
|
||||||
|
assert status["usesLeft"] is None
|
||||||
|
|
||||||
time_generated = read_json(tokens_file)["recovery_token"]["date"]
|
graphql_use_recovery_key(client, key, "new_test_token")
|
||||||
assert time_generated is not None
|
# And again
|
||||||
assert (
|
graphql_use_recovery_key(client, key, "new_test_token2")
|
||||||
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
|
|
||||||
- datetime.timedelta(seconds=5)
|
|
||||||
< datetime.datetime.now()
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try to get token status
|
|
||||||
response = authorized_client.post(
|
|
||||||
"/graphql",
|
|
||||||
json={"query": generate_api_query([API_RECOVERY_QUERY])},
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json().get("data") is not None
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"] is not None
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is True
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"][
|
|
||||||
"creationDate"
|
|
||||||
] == time_generated.replace("Z", "")
|
|
||||||
assert (
|
|
||||||
response.json()["data"]["api"]["recoveryKey"]["expirationDate"]
|
|
||||||
== expiration_date_str
|
|
||||||
)
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
|
|
||||||
|
|
||||||
# Try to use token
|
def test_graphql_use_recovery_key_after_expiration(
|
||||||
response = authorized_client.post(
|
client, authorized_client, tokens_file, mocker
|
||||||
"/graphql",
|
):
|
||||||
json={
|
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
|
||||||
"query": API_RECOVERY_KEY_USE_MUTATION,
|
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
||||||
"variables": {
|
|
||||||
"input": {
|
|
||||||
"key": key,
|
|
||||||
"deviceName": "new_test_token",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json().get("data") is not None
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["success"] is True
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["message"] is not None
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["code"] == 200
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["token"] is not None
|
|
||||||
assert (
|
|
||||||
response.json()["data"]["useRecoveryApiKey"]["token"]
|
|
||||||
== read_json(tokens_file)["tokens"][2]["token"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try to use token again
|
# Timewarp to after it expires
|
||||||
response = authorized_client.post(
|
mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
|
||||||
"/graphql",
|
|
||||||
json={
|
|
||||||
"query": API_RECOVERY_KEY_USE_MUTATION,
|
|
||||||
"variables": {
|
|
||||||
"input": {
|
|
||||||
"key": key,
|
|
||||||
"deviceName": "new_test_token2",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json().get("data") is not None
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["success"] is True
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["message"] is not None
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["code"] == 200
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["token"] is not None
|
|
||||||
assert (
|
|
||||||
response.json()["data"]["useRecoveryApiKey"]["token"]
|
|
||||||
== read_json(tokens_file)["tokens"][3]["token"]
|
|
||||||
)
|
|
||||||
|
|
||||||
# Try to use token after expiration date
|
response = request_recovery_auth(client, key, "new_test_token3")
|
||||||
new_data = read_json(tokens_file)
|
assert_errorcode(response, "useRecoveryApiKey", 404)
|
||||||
new_data["recovery_token"]["expiration"] = (
|
|
||||||
datetime.datetime.now() - datetime.timedelta(minutes=5)
|
|
||||||
).strftime("%Y-%m-%dT%H:%M:%S.%f")
|
|
||||||
write_json(tokens_file, new_data)
|
|
||||||
response = authorized_client.post(
|
|
||||||
"/graphql",
|
|
||||||
json={
|
|
||||||
"query": API_RECOVERY_KEY_USE_MUTATION,
|
|
||||||
"variables": {
|
|
||||||
"input": {
|
|
||||||
"key": key,
|
|
||||||
"deviceName": "new_test_token3",
|
|
||||||
},
|
|
||||||
},
|
|
||||||
},
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json().get("data") is not None
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["success"] is False
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["message"] is not None
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["code"] == 404
|
|
||||||
assert response.json()["data"]["useRecoveryApiKey"]["token"] is None
|
assert response.json()["data"]["useRecoveryApiKey"]["token"] is None
|
||||||
|
assert_original(authorized_client)
|
||||||
|
|
||||||
assert read_json(tokens_file)["tokens"] == new_data["tokens"]
|
status = graphql_recovery_status(authorized_client)
|
||||||
|
assert status["exists"] is True
|
||||||
# Try to get token status
|
assert status["valid"] is False
|
||||||
response = authorized_client.post(
|
assert_recovery_recent(status["creationDate"])
|
||||||
"/graphql",
|
assert status["expirationDate"] == expiration_date.isoformat()
|
||||||
json={"query": generate_api_query([API_RECOVERY_QUERY])},
|
assert status["usesLeft"] is None
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json().get("data") is not None
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"] is not None
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"]["exists"] is True
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"]["valid"] is False
|
|
||||||
assert (
|
|
||||||
response.json()["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
|
|
||||||
)
|
|
||||||
assert (
|
|
||||||
response.json()["data"]["api"]["recoveryKey"]["expirationDate"]
|
|
||||||
== new_data["recovery_token"]["expiration"]
|
|
||||||
)
|
|
||||||
assert response.json()["data"]["api"]["recoveryKey"]["usesLeft"] is None
|
|
||||||
|
|
||||||
|
|
||||||
def test_graphql_generate_recovery_key_with_expiration_in_the_past(
|
def test_graphql_generate_recovery_key_with_expiration_in_the_past(
|
||||||
|
|
Loading…
Reference in a new issue