mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-25 10:16:34 +00:00
302 lines
8.9 KiB
Python
302 lines
8.9 KiB
Python
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=unused-argument
|
|
# pylint: disable=missing-function-docstring
|
|
|
|
import pytest
|
|
|
|
from datetime import datetime, timezone
|
|
|
|
from tests.common import (
|
|
generate_api_query,
|
|
assert_recovery_recent,
|
|
NearFuture,
|
|
RECOVERY_KEY_VALIDATION_DATETIME,
|
|
)
|
|
|
|
# Graphql API's output should be timezone-naive
|
|
from tests.common import ten_minutes_into_future_naive_utc as ten_minutes_into_future
|
|
from tests.common import ten_minutes_into_future as ten_minutes_into_future_tz
|
|
from tests.common import ten_minutes_into_past_naive_utc as ten_minutes_into_past
|
|
|
|
from tests.test_graphql.common import (
|
|
assert_empty,
|
|
get_data,
|
|
assert_ok,
|
|
assert_errorcode,
|
|
assert_token_valid,
|
|
assert_original,
|
|
graphql_get_devices,
|
|
set_client_token,
|
|
)
|
|
|
|
API_RECOVERY_QUERY = """
|
|
recoveryKey {
|
|
exists
|
|
valid
|
|
creationDate
|
|
expirationDate
|
|
usesLeft
|
|
}
|
|
"""
|
|
|
|
|
|
def request_recovery_status(client):
|
|
return client.post(
|
|
"/graphql",
|
|
json={"query": generate_api_query([API_RECOVERY_QUERY])},
|
|
)
|
|
|
|
|
|
def graphql_recovery_status(client):
|
|
response = request_recovery_status(client)
|
|
data = get_data(response)
|
|
|
|
status = data["api"]["recoveryKey"]
|
|
assert status is not None
|
|
return status
|
|
|
|
|
|
def request_make_new_recovery_key(client, expires_at=None, uses=None):
|
|
json = {"query": API_RECOVERY_KEY_GENERATE_MUTATION}
|
|
limits = {}
|
|
|
|
if expires_at is not None:
|
|
limits["expirationDate"] = expires_at.isoformat()
|
|
if uses is not None:
|
|
limits["uses"] = uses
|
|
|
|
if limits != {}:
|
|
json["variables"] = {"limits": limits}
|
|
|
|
response = client.post("/graphql", json=json)
|
|
return response
|
|
|
|
|
|
def graphql_make_new_recovery_key(client, expires_at=None, uses=None):
|
|
response = request_make_new_recovery_key(client, expires_at, uses)
|
|
output = get_data(response)["api"]["getNewRecoveryApiKey"]
|
|
assert_ok(output)
|
|
|
|
key = output["key"]
|
|
assert key is not None
|
|
assert key.split(" ").__len__() == 18
|
|
return key
|
|
|
|
|
|
def request_recovery_auth(client, key, device_name):
|
|
return client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_RECOVERY_KEY_USE_MUTATION,
|
|
"variables": {
|
|
"input": {
|
|
"key": key,
|
|
"deviceName": device_name,
|
|
},
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def graphql_use_recovery_key(client, key, device_name):
|
|
response = request_recovery_auth(client, key, device_name)
|
|
output = get_data(response)["api"]["useRecoveryApiKey"]
|
|
assert_ok(output)
|
|
|
|
token = output["token"]
|
|
assert token is not None
|
|
assert_token_valid(client, token)
|
|
set_client_token(client, token)
|
|
assert device_name in [device["name"] for device in graphql_get_devices(client)]
|
|
return token
|
|
|
|
|
|
def test_graphql_recovery_key_status_unauthorized(client):
|
|
response = request_recovery_status(client)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_graphql_recovery_key_status_when_none_exists(authorized_client):
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is False
|
|
assert status["valid"] is False
|
|
assert status["creationDate"] is None
|
|
assert status["expirationDate"] is None
|
|
assert status["usesLeft"] is None
|
|
|
|
|
|
API_RECOVERY_KEY_GENERATE_MUTATION = """
|
|
mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput) {
|
|
api {
|
|
getNewRecoveryApiKey(limits: $limits) {
|
|
success
|
|
message
|
|
code
|
|
key
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
API_RECOVERY_KEY_USE_MUTATION = """
|
|
mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
|
|
api {
|
|
useRecoveryApiKey(input: $input) {
|
|
success
|
|
message
|
|
code
|
|
token
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_generate_recovery_key(client, authorized_client):
|
|
key = graphql_make_new_recovery_key(authorized_client)
|
|
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is True
|
|
assert status["valid"] is True
|
|
assert_recovery_recent(status["creationDate"])
|
|
assert status["expirationDate"] is None
|
|
assert status["usesLeft"] is None
|
|
|
|
graphql_use_recovery_key(client, key, "new_test_token")
|
|
# And again
|
|
graphql_use_recovery_key(client, key, "new_test_token2")
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"expiration_date", [ten_minutes_into_future(), ten_minutes_into_future_tz()]
|
|
)
|
|
def test_graphql_generate_recovery_key_with_expiration_date(
|
|
client, authorized_client, expiration_date: datetime
|
|
):
|
|
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
|
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is True
|
|
assert status["valid"] is True
|
|
assert_recovery_recent(status["creationDate"])
|
|
|
|
# timezone-aware comparison. Should pass regardless of server's tz
|
|
assert datetime.fromisoformat(status["expirationDate"]) == expiration_date.replace(
|
|
tzinfo=timezone.utc
|
|
)
|
|
|
|
assert status["usesLeft"] is None
|
|
|
|
graphql_use_recovery_key(client, key, "new_test_token")
|
|
# And again
|
|
graphql_use_recovery_key(client, key, "new_test_token2")
|
|
|
|
|
|
def test_graphql_use_recovery_key_after_expiration(client, authorized_client, mocker):
|
|
expiration_date = ten_minutes_into_future()
|
|
key = graphql_make_new_recovery_key(authorized_client, expires_at=expiration_date)
|
|
|
|
# Timewarp to after it expires
|
|
mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
|
|
|
|
response = request_recovery_auth(client, key, "new_test_token3")
|
|
output = get_data(response)["api"]["useRecoveryApiKey"]
|
|
assert_errorcode(output, 404)
|
|
|
|
assert output["token"] is None
|
|
assert_original(authorized_client)
|
|
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is True
|
|
assert status["valid"] is False
|
|
assert_recovery_recent(status["creationDate"])
|
|
|
|
# timezone-aware comparison. Should pass regardless of server's tz
|
|
assert datetime.fromisoformat(status["expirationDate"]) == expiration_date.replace(
|
|
tzinfo=timezone.utc
|
|
)
|
|
assert status["usesLeft"] is None
|
|
|
|
|
|
def test_graphql_generate_recovery_key_with_expiration_in_the_past(authorized_client):
|
|
expiration_date = ten_minutes_into_past()
|
|
response = request_make_new_recovery_key(
|
|
authorized_client, expires_at=expiration_date
|
|
)
|
|
|
|
output = get_data(response)["api"]["getNewRecoveryApiKey"]
|
|
assert_errorcode(output, 400)
|
|
|
|
assert output["key"] is None
|
|
assert graphql_recovery_status(authorized_client)["exists"] is False
|
|
|
|
|
|
def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client):
|
|
expiration_date = "invalid_time_format"
|
|
expiration_date_str = expiration_date
|
|
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
|
|
"variables": {
|
|
"limits": {
|
|
"expirationDate": expiration_date_str,
|
|
},
|
|
},
|
|
},
|
|
)
|
|
assert_empty(response)
|
|
assert graphql_recovery_status(authorized_client)["exists"] is False
|
|
|
|
|
|
def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, client):
|
|
mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
|
|
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is True
|
|
assert status["valid"] is True
|
|
assert status["creationDate"] is not None
|
|
assert status["expirationDate"] is None
|
|
assert status["usesLeft"] == 2
|
|
|
|
graphql_use_recovery_key(client, mnemonic_key, "new_test_token1")
|
|
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is True
|
|
assert status["valid"] is True
|
|
assert status["creationDate"] is not None
|
|
assert status["expirationDate"] is None
|
|
assert status["usesLeft"] == 1
|
|
|
|
graphql_use_recovery_key(client, mnemonic_key, "new_test_token2")
|
|
|
|
status = graphql_recovery_status(authorized_client)
|
|
assert status["exists"] is True
|
|
assert status["valid"] is False
|
|
assert status["creationDate"] is not None
|
|
assert status["expirationDate"] is None
|
|
assert status["usesLeft"] == 0
|
|
|
|
response = request_recovery_auth(client, mnemonic_key, "new_test_token3")
|
|
output = get_data(response)["api"]["useRecoveryApiKey"]
|
|
assert_errorcode(output, 404)
|
|
|
|
|
|
def test_graphql_generate_recovery_key_with_negative_uses(authorized_client):
|
|
response = request_make_new_recovery_key(authorized_client, uses=-1)
|
|
|
|
output = get_data(response)["api"]["getNewRecoveryApiKey"]
|
|
assert_errorcode(output, 400)
|
|
assert output["key"] is None
|
|
assert graphql_recovery_status(authorized_client)["exists"] is False
|
|
|
|
|
|
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client):
|
|
response = request_make_new_recovery_key(authorized_client, uses=0)
|
|
|
|
output = get_data(response)["api"]["getNewRecoveryApiKey"]
|
|
assert_errorcode(output, 400)
|
|
assert output["key"] is None
|
|
assert graphql_recovery_status(authorized_client)["exists"] is False
|