API keys graphql tests

This commit is contained in:
Inex Code 2022-06-29 20:39:46 +03:00
parent 45c3e3003d
commit 503a39f390
15 changed files with 1114 additions and 94 deletions

2
.pylintrc Normal file
View file

@ -0,0 +1,2 @@
[MASTER]
init-hook='import sys; sys.path.append("/path/to/root")'

View file

@ -0,0 +1,52 @@
"""API access mutations"""
# pylint: disable=too-few-public-methods
import datetime
import typing
from flask import request
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.mutation_interface import MutationReturnInterface
from selfprivacy_api.utils import parse_date
from selfprivacy_api.utils.auth import (
generate_recovery_token
)
@strawberry.type
class ApiKeyMutationReturn(MutationReturnInterface):
key: typing.Optional[str]
@strawberry.input
class RecoveryKeyLimitsInput:
"""Recovery key limits input"""
expiration_date: typing.Optional[datetime.datetime]
uses: typing.Optional[int]
@strawberry.type
class ApiMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def getNewRecoveryApiKey(self, limits: RecoveryKeyLimitsInput) -> ApiKeyMutationReturn:
"""Generate recovery key"""
if limits.expiration_date is not None:
if limits.expiration_date < datetime.datetime.now():
return ApiKeyMutationReturn(
success=False,
message="Expiration date must be in the future",
code=400,
key=None,
)
if limits.uses is not None:
if limits.uses < 1:
return ApiKeyMutationReturn(
success=False,
message="Uses must be greater than 0",
code=400,
key=None,
)
key = generate_recovery_token(limits.expiration_date, limits.uses)
return ApiKeyMutationReturn(
success=True,
message="Recovery key generated",
code=200,
key=key,
)

View file

@ -0,0 +1,7 @@
import strawberry
@strawberry.interface
class MutationReturnInterface:
success: bool
message: str
code: int

View file

@ -2,8 +2,10 @@
# pylint: disable=too-few-public-methods
import typing
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations
from selfprivacy_api.graphql.queries.api import Api
from selfprivacy_api.graphql.queries.api_queries import Api
from selfprivacy_api.graphql.queries.system import System
@ -11,7 +13,7 @@ from selfprivacy_api.graphql.queries.system import System
class Query:
"""Root schema for queries"""
@strawberry.field
@strawberry.field(permission_classes=[IsAuthenticated])
def system(self) -> System:
"""System queries"""
return System()
@ -21,5 +23,9 @@ class Query:
"""API access status"""
return Api()
@strawberry.type
class Mutation(ApiMutations):
"""Root schema for mutations"""
pass
schema = strawberry.Schema(query=Query)
schema = strawberry.Schema(query=Query, mutation=Mutation)

View file

@ -1,7 +1,7 @@
#!/usr/bin/env python3
"""Unassigned views"""
from flask_restful import Resource
from selfprivacy_api.graphql.queries.api import get_api_version
from selfprivacy_api.graphql.queries.api_queries import get_api_version
class ApiVersion(Resource):

View file

@ -3,6 +3,7 @@
import secrets
from datetime import datetime, timedelta
import re
import typing
from mnemonic import Mnemonic
@ -190,7 +191,7 @@ def _get_recovery_token():
return tokens["recovery_token"]["token"]
def generate_recovery_token(expiration=None, uses_left=None):
def generate_recovery_token(expiration: typing.Optional[datetime], uses_left: typing.Optional[int]) -> str:
"""Generate a 24 bytes recovery token and return a mneomnic word list.
Write a string representation of the recovery token to the tokens.json file.
"""

View file

@ -1,5 +1,5 @@
import json
from mnemonic import Mnemonic
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file:
@ -9,3 +9,9 @@ def read_json(file_path):
def write_json(file_path, data):
with open(file_path, "w", encoding="utf-8") as file:
json.dump(data, file, indent=4)
def generate_api_query(query_array):
return "query TestApi {\n api {" + "\n".join(query_array) + "}\n}"
def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex()

View file

@ -1,5 +1,6 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import datetime
import json
import re
@ -383,7 +384,7 @@ def test_generate_recovery_token_with_expiration_date(
def test_generate_recovery_token_with_expiration_in_the_past(
authorized_client, client, tokens_file
authorized_client, tokens_file
):
# Server must return 400 if expiration date is in the past
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5)
@ -397,7 +398,7 @@ def test_generate_recovery_token_with_expiration_in_the_past(
def test_generate_recovery_token_with_invalid_time_format(
authorized_client, client, tokens_file
authorized_client, tokens_file
):
# Server must return 400 if expiration date is in the past
expiration_date = "invalid_time_format"

View file

View file

@ -1,8 +1,13 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import json
# pylint: disable=missing-function-docstring
import pytest
from tests.common import generate_api_query
from tests.test_graphql.test_api_devices import API_DEVICES_QUERY
from tests.test_graphql.test_api_recovery import API_RECOVERY_QUERY
from tests.test_graphql.test_api_version import API_VERSION_QUERY
TOKENS_FILE_CONTETS = {
"tokens": [
{
@ -18,95 +23,27 @@ TOKENS_FILE_CONTETS = {
]
}
def test_graphql_get_api_version(authorized_client):
def test_graphql_get_entire_api_data(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": """
query {
api {
version
}
}
"""
"query": generate_api_query([API_VERSION_QUERY, API_DEVICES_QUERY, API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert "version" in response.get_json()["data"]["api"]
def test_graphql_api_version_unauthorized(client):
response = client.get(
"/graphql",
json={
"query": """
query {
api {
version
}
}
"""
},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]
def test_graphql_tokens_info(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": """
query {
api {
devices {
creationDate
isCaller
name
}
}
}
"""
},
)
assert response.status_code == 200
assert response.json == {
"data": {
"api": {
"devices": [
{
"creationDate": "2022-01-14T08:31:10.789314",
"isCaller": True,
"name": "test_token",
},
{
"creationDate": "2022-01-14T08:31:10.789314",
"isCaller": False,
"name": "test_token2",
},
]
}
}
}
def test_graphql_tokens_info_unauthorized(client, tokens_file):
response = client.get(
"/graphql",
json={
"query": """
query {
api {
devices {
creationDate
isCaller
name
}
}
}
"""
},
)
assert response.status_code == 200
assert response.json["data"] is None
assert response.json["data"]["api"]["devices"] is not None
assert len(response.json["data"]["api"]["devices"]) == 2
assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314"
assert response.json["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json["data"]["api"]["devices"][0]["name"] == "test_token"
assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314"
assert response.json["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2"
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is False
assert response.json["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None

View file

@ -0,0 +1,446 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import datetime
import json
import pytest
from mnemonic import Mnemonic
from tests.common import generate_api_query, read_json, write_json
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
API_DEVICES_QUERY = """
devices {
creationDate
isCaller
name
}
"""
def test_graphql_tokens_info(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_DEVICES_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["devices"] is not None
assert len(response.json["data"]["api"]["devices"]) == 2
assert response.json["data"]["api"]["devices"][0]["creationDate"] == "2022-01-14T08:31:10.789314"
assert response.json["data"]["api"]["devices"][0]["isCaller"] is True
assert response.json["data"]["api"]["devices"][0]["name"] == "test_token"
assert response.json["data"]["api"]["devices"][1]["creationDate"] == "2022-01-14T08:31:10.789314"
assert response.json["data"]["api"]["devices"][1]["isCaller"] is False
assert response.json["data"]["api"]["devices"][1]["name"] == "test_token2"
def test_graphql_tokens_info_unauthorized(client, tokens_file):
response = client.get(
"/graphql",
json={
"query": generate_api_query([API_DEVICES_QUERY])
},
)
assert response.status_code == 200
assert response.json["data"] is None
DELETE_TOKEN_MUTATION = """
mutation DeleteToken($device: String!) {
deleteDeviceApiToken(device: $device) {
success
message
code
}
}
"""
def test_graphql_delete_token_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token",
},
},
)
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_delete_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["deleteDeviceApiToken"]["success"] is True
assert response.json["data"]["deleteDeviceApiToken"]["message"] is not None
assert response.json["data"]["deleteDeviceApiToken"]["code"] == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
}
]
}
def test_graphql_delete_self_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["deleteDeviceApiToken"]["success"] is False
assert response.json["data"]["deleteDeviceApiToken"]["message"] is not None
assert response.json["data"]["deleteDeviceApiToken"]["code"] == 400
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_delete_nonexistent_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token3",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["deleteDeviceApiToken"]["success"] is False
assert response.json["data"]["deleteDeviceApiToken"]["message"] is not None
assert response.json["data"]["deleteDeviceApiToken"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
REFRESH_TOKEN_MUTATION = """
mutation RefreshToken {
refreshDeviceApiToken {
success
message
code
}
}
"""
def test_graphql_refresh_token_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": REFRESH_TOKEN_MUTATION
},
)
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_refresh_token(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": REFRESH_TOKEN_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["refreshDeviceApiToken"]["success"] is True
assert response.json["data"]["refreshDeviceApiToken"]["message"] is not None
assert response.json["data"]["refreshDeviceApiToken"]["code"] == 200
assert read_json(tokens_file) == {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
}
]
}
NEW_DEVICE_KEY_MUTATION = """
mutation NewDeviceKey {
getNewDeviceApiKey {
success
message
code
key
}
}
"""
def test_graphql_get_new_device_auth_key_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_get_new_device_auth_key(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
INVALIDATE_NEW_DEVICE_KEY_MUTATION = """
mutation InvalidateNewDeviceKey {
invalidateNewDeviceApiKey {
success
message
code
}
}
"""
def test_graphql_invalidate_new_device_token_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": DELETE_TOKEN_MUTATION,
"variables": {
"device": "test_token",
},
},
)
assert response.status_code == 200
assert response.json["data"] is None
def test_graphql_get_and_delete_new_device_key(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
token = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == token
response = authorized_client.post(
"/graphql",
json={
"query": INVALIDATE_NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["invalidateNewDeviceApiKey"]["success"] is True
assert response.json["data"]["invalidateNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["invalidateNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION = """
mutation AuthorizeWithNewDeviceKey($input: UseNewDeviceKeyInput!) {
authorizeWithNewDeviceApiKey(inupt: $input) {
success
message
code
token
}
}
"""
def test_graphql_get_and_authorize_new_device(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200
token = response.json["data"]["authorizeWithNewDeviceApiKey"]["token"]
assert read_json(tokens_file)["tokens"][2]["token"] == token
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
def test_graphql_authorize_new_device_with_invalid_key(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": "invalid_token",
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_get_and_authorize_used_key(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is True
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 200
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"deviceName": "test_token2",
}
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
def test_graphql_get_and_authorize_key_after_12_minutes(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": NEW_DEVICE_KEY_MUTATION
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewDeviceApiToken"]["success"] is True
assert response.json["data"]["getNewDeviceApiToken"]["message"] is not None
assert response.json["data"]["getNewDeviceApiToken"]["code"] == 200
assert response.json["data"]["getNewDeviceApiToken"]["key"].split(" ").__len__() == 12
key = Mnemonic(language="english").to_entropy(response.json["data"]["getNewDeviceApiToken"]["key"]).hex()
assert read_json(tokens_file)["new_device"]["token"] == key
file_data = read_json(tokens_file)
file_data["new_device"]["expiration"] = str(
datetime.datetime.now() - datetime.timedelta(minutes=13)
)
write_json(tokens_file, file_data)
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"key": key,
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["success"] is False
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["message"] is not None
assert response.json["data"]["authorizeWithNewDeviceApiKey"]["code"] == 404
def test_graphql_authorize_without_token(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": AUTHORIZE_WITH_NEW_DEVICE_KEY_MUTATION,
"variables": {
"inupt": {
"deviceName": "test_token",
}
},
},
)
assert response.status_code == 200
assert response.json.get("data") is None

View file

@ -0,0 +1,534 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import json
import pytest
import datetime
from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
API_RECOVERY_QUERY = """
recoveryKey {
exists
valid
creationDate
expirationDate
usesLeft
}
"""
def test_graphql_recovery_key_status_unauthorized(client, tokens_file):
response = client.post(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is None
def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is False
assert response.json["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
API_RECOVERY_KEY_GENERATE_MUTATION = """
mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput!) {
getNewRecoveryApiKey(limits: $limits) {
success
message
code
key
}
}
"""
API_RECOVERY_KEY_USE_MUTATION = """
mutation TestUseRecoveryKey($input: UseRecoveryKeyInput!) {
useRecoveryApiKey(input: $input) {
success
message
code
token
}
}
"""
def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": None,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is True
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18
assert read_json(tokens_file)["recovery_token"] is not None
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
key = response.json["data"]["getNewRecoveryApiKey"]["key"]
assert (
datetime.datetime.strptime(
time_generated, "%Y-%m-%dT%H:%M:%S.%fZ"
) - datetime.timedelta(seconds=5) < datetime.datetime.now()
)
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
# Try to use token
response = client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"]
assert read_json(tokens_file)["tokens"][2]["name"] == "test_token"
# Try to use token again
response = client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"]
assert read_json(tokens_file)["tokens"][3]["name"] == "test_token2"
def test_graphql_generate_recovery_key_with_expiration_date(client, authorized_client, tokens_file):
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": expiration_date_str,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is True
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["key"].split(" ").__len__() == 18
assert read_json(tokens_file)["recovery_token"] is not None
key = response.json["data"]["getNewRecoveryApiKey"]["key"]
assert read_json(tokens_file)["recovery_token"]["expirationDate"] == expiration_date_str
assert read_json(tokens_file)["recovery_token"]["token"] == mnemonic_to_hex(key)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(
time_generated, "%Y-%m-%dT%H:%M:%S.%fZ"
) - datetime.timedelta(seconds=5) < datetime.datetime.now()
)
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == expiration_date_str
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][2]["token"]
# Try to use token again
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
assert response.json["data"]["useRecoveryKey"]["token"] == read_json(tokens_file)["tokens"][3]["token"]
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"]["expirationDate"] = datetime.datetime.now() - datetime.timedelta(minutes=5)
write_json(tokens_file, new_data)
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": key,
"tokenName": "test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is False
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 404
assert response.json["data"]["useRecoveryKey"]["token"] is None
assert read_json(tokens_file)["tokens"] == new_data["tokens"]
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] == new_data["recovery_token"]["expiration"]
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
def test_graphql_generate_recoevry_key_with_expiration_in_the_past(authorized_client, tokens_file):
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": expiration_date_str,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None
assert read_json(tokens_file)["tokens"] == []
assert "recovery_token" not in read_json(tokens_file)
def test_graphql_generate_recovery_key_with_invalid_time_format(authorized_client, tokens_file):
expiration_date = "invalid_time_format"
expiration_date_str = expiration_date
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": expiration_date_str,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None
assert read_json(tokens_file)["tokens"] == []
assert "recovery_token" not in read_json(tokens_file)
def test_graphql_generate_recovery_key_with_limited_uses(authorized_client, tokens_file):
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"expirationDate": None,
"uses": 2,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is True
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 200
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is not None
mnemonic_key = response.json["data"]["getNewRecoveryApiKey"]["key"]
key = mnemonic_to_hex(mnemonic_key)
assert read_json(tokens_file)["recovery_token"]["token"] == key
assert read_json(tokens_file)["recovery_token"]["uses_left"] == 2
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 2
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": mnemonic_key,
"tokenName": "test_token1",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 1
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": mnemonic_key,
"tokenName": "test_token2",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is True
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 200
assert response.json["data"]["useRecoveryKey"]["token"] is not None
# Try to get token status
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_RECOVERY_QUERY])
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is False
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] is not None
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is not None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] == 0
# Try to use token
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_USE_MUTATION,
"variables": {
"input": {
"token": mnemonic_key,
"tokenName": "test_token3",
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["useRecoveryKey"]["success"] is False
assert response.json["data"]["useRecoveryKey"]["message"] is not None
assert response.json["data"]["useRecoveryKey"]["code"] == 404
assert response.json["data"]["useRecoveryKey"]["token"] is None
def test_graphql_generate_recovery_key_with_negative_uses(authorized_client, tokens_file):
# Try to get token status
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": -1,
"expirationDate": None,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None
def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_file):
# Try to get token status
response = authorized_client.post(
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": 0,
"expirationDate": None,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["getNewRecoveryApiKey"]["success"] is False
assert response.json["data"]["getNewRecoveryApiKey"]["message"] is not None
assert response.json["data"]["getNewRecoveryApiKey"]["code"] == 400
assert response.json["data"]["getNewRecoveryApiKey"]["key"] is None

View file

@ -0,0 +1,28 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
from tests.common import generate_api_query
API_VERSION_QUERY = "version"
def test_graphql_get_api_version(authorized_client):
response = authorized_client.get(
"/graphql",
json={
"query": generate_api_query([API_VERSION_QUERY])
},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]
def test_graphql_api_version_unauthorized(client):
response = client.get(
"/graphql",
json={
"query": generate_api_query([API_VERSION_QUERY])
},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]