mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-25 18:26:34 +00:00
More auth tests
This commit is contained in:
parent
08c7f62e93
commit
40501401b4
7
.vscode/settings.json
vendored
7
.vscode/settings.json
vendored
|
@ -1,5 +1,10 @@
|
||||||
{
|
{
|
||||||
"python.formatting.provider": "black",
|
"python.formatting.provider": "black",
|
||||||
"python.linting.pylintEnabled": true,
|
"python.linting.pylintEnabled": true,
|
||||||
"python.linting.enabled": true
|
"python.linting.enabled": true,
|
||||||
|
"python.testing.pytestArgs": [
|
||||||
|
"tests"
|
||||||
|
],
|
||||||
|
"python.testing.unittestEnabled": false,
|
||||||
|
"python.testing.pytestEnabled": true
|
||||||
}
|
}
|
|
@ -98,8 +98,6 @@ class Tokens(Resource):
|
||||||
"""
|
"""
|
||||||
# Get token from header
|
# Get token from header
|
||||||
token = request.headers.get("Authorization").split(" ")[1]
|
token = request.headers.get("Authorization").split(" ")[1]
|
||||||
if not is_token_valid(token):
|
|
||||||
return {"message": "Token not found"}, 404
|
|
||||||
new_token = refresh_token(token)
|
new_token = refresh_token(token)
|
||||||
if new_token is None:
|
if new_token is None:
|
||||||
return {"message": "Token not found"}, 404
|
return {"message": "Token not found"}, 404
|
||||||
|
|
|
@ -29,7 +29,8 @@ class NewDevice(Resource):
|
||||||
400:
|
400:
|
||||||
description: Bad request
|
description: Bad request
|
||||||
"""
|
"""
|
||||||
return get_new_device_auth_token()
|
token = get_new_device_auth_token()
|
||||||
|
return {"token": token}
|
||||||
|
|
||||||
|
|
||||||
class AuthorizeDevice(Resource):
|
class AuthorizeDevice(Resource):
|
||||||
|
|
|
@ -146,10 +146,10 @@ def is_recovery_token_valid():
|
||||||
if "recovery_token" not in tokens:
|
if "recovery_token" not in tokens:
|
||||||
return False
|
return False
|
||||||
recovery_token = tokens["recovery_token"]
|
recovery_token = tokens["recovery_token"]
|
||||||
if "uses_left" in recovery_token:
|
if "uses_left" in recovery_token and recovery_token["uses_left"] is not None:
|
||||||
if recovery_token["uses_left"] <= 0:
|
if recovery_token["uses_left"] <= 0:
|
||||||
return False
|
return False
|
||||||
if "expiration" not in recovery_token:
|
if "expiration" not in recovery_token or recovery_token["expiration"] is None:
|
||||||
return True
|
return True
|
||||||
return datetime.now() < datetime.strptime(
|
return datetime.now() < datetime.strptime(
|
||||||
recovery_token["expiration"], "%Y-%m-%d %H:%M:%S.%f"
|
recovery_token["expiration"], "%Y-%m-%d %H:%M:%S.%f"
|
||||||
|
@ -238,7 +238,10 @@ def use_mnemonic_recoverery_token(mnemonic_phrase, name):
|
||||||
}
|
}
|
||||||
)
|
)
|
||||||
if "recovery_token" in tokens:
|
if "recovery_token" in tokens:
|
||||||
if "uses_left" in tokens["recovery_token"]:
|
if (
|
||||||
|
"uses_left" in tokens["recovery_token"]
|
||||||
|
and tokens["recovery_token"]["uses_left"] is not None
|
||||||
|
):
|
||||||
tokens["recovery_token"]["uses_left"] -= 1
|
tokens["recovery_token"]["uses_left"] -= 1
|
||||||
return token
|
return token
|
||||||
|
|
||||||
|
|
|
@ -2,11 +2,15 @@ import pytest
|
||||||
from flask import testing
|
from flask import testing
|
||||||
from selfprivacy_api.app import create_app
|
from selfprivacy_api.app import create_app
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def tokens_file(mocker, shared_datadir):
|
def tokens_file(mocker, shared_datadir):
|
||||||
mock = mocker.patch("selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json")
|
mock = mocker.patch(
|
||||||
|
"selfprivacy_api.utils.TOKENS_FILE", shared_datadir / "tokens.json"
|
||||||
|
)
|
||||||
return mock
|
return mock
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def app():
|
def app():
|
||||||
app = create_app(
|
app = create_app(
|
||||||
|
|
|
@ -1,7 +1,10 @@
|
||||||
# pylint: disable=redefined-outer-name
|
# pylint: disable=redefined-outer-name
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
|
import datetime
|
||||||
import json
|
import json
|
||||||
import pytest
|
import pytest
|
||||||
|
from mnemonic import Mnemonic
|
||||||
|
|
||||||
|
|
||||||
TOKENS_FILE_CONTETS = {
|
TOKENS_FILE_CONTETS = {
|
||||||
"tokens": [
|
"tokens": [
|
||||||
|
@ -64,6 +67,7 @@ def test_delete_token(authorized_client, tokens_file):
|
||||||
]
|
]
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
def test_delete_self_token(authorized_client, tokens_file):
|
def test_delete_self_token(authorized_client, tokens_file):
|
||||||
response = authorized_client.delete(
|
response = authorized_client.delete(
|
||||||
"/auth/tokens", json={"token_name": "test_token"}
|
"/auth/tokens", json={"token_name": "test_token"}
|
||||||
|
@ -71,6 +75,7 @@ def test_delete_self_token(authorized_client, tokens_file):
|
||||||
assert response.status_code == 400
|
assert response.status_code == 400
|
||||||
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
def test_delete_nonexistent_token(authorized_client, tokens_file):
|
def test_delete_nonexistent_token(authorized_client, tokens_file):
|
||||||
response = authorized_client.delete(
|
response = authorized_client.delete(
|
||||||
"/auth/tokens", json={"token_name": "test_token3"}
|
"/auth/tokens", json={"token_name": "test_token3"}
|
||||||
|
@ -78,13 +83,165 @@ def test_delete_nonexistent_token(authorized_client, tokens_file):
|
||||||
assert response.status_code == 404
|
assert response.status_code == 404
|
||||||
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
def test_refresh_token_unauthorized(client, tokens_file):
|
def test_refresh_token_unauthorized(client, tokens_file):
|
||||||
response = client.post("/auth/tokens")
|
response = client.post("/auth/tokens")
|
||||||
assert response.status_code == 401
|
assert response.status_code == 401
|
||||||
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
def test_refresh_token(authorized_client, tokens_file):
|
def test_refresh_token(authorized_client, tokens_file):
|
||||||
response = authorized_client.post("/auth/tokens")
|
response = authorized_client.post("/auth/tokens")
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
new_token = response.json["token"]
|
new_token = response.json["token"]
|
||||||
assert read_json(tokens_file)["tokens"][0]["token"] == new_token
|
assert read_json(tokens_file)["tokens"][0]["token"] == new_token
|
||||||
|
|
||||||
|
|
||||||
|
# new device
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_new_device_auth_token_unauthorized(client, tokens_file):
|
||||||
|
response = client.get("/auth/new_device")
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_new_device_auth_token(authorized_client, tokens_file):
|
||||||
|
response = authorized_client.post("/auth/new_device")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "token" in response.json
|
||||||
|
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
|
||||||
|
assert read_json(tokens_file)["new_device"]["token"] == token
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
|
||||||
|
response = authorized_client.post("/auth/new_device")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "token" in response.json
|
||||||
|
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
|
||||||
|
assert read_json(tokens_file)["new_device"]["token"] == token
|
||||||
|
response = client.post(
|
||||||
|
"/auth/new_device/authorize",
|
||||||
|
json={"token": response.json["token"], "device": "new_device"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"]
|
||||||
|
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorize_new_device_with_invalid_token(client, tokens_file):
|
||||||
|
response = client.post(
|
||||||
|
"/auth/new_device/authorize",
|
||||||
|
json={"token": "invalid_token", "device": "new_device"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 404
|
||||||
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
|
||||||
|
response = authorized_client.post("/auth/new_device")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "token" in response.json
|
||||||
|
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
|
||||||
|
assert read_json(tokens_file)["new_device"]["token"] == token
|
||||||
|
response = client.post(
|
||||||
|
"/auth/new_device/authorize",
|
||||||
|
json={"token": response.json["token"], "device": "new_device"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert read_json(tokens_file)["tokens"][2]["token"] == response.json["token"]
|
||||||
|
assert read_json(tokens_file)["tokens"][2]["name"] == "new_device"
|
||||||
|
response = client.post(
|
||||||
|
"/auth/new_device/authorize",
|
||||||
|
json={"token": response.json["token"], "device": "new_device"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_and_authorize_token_after_12_minutes(
|
||||||
|
client, authorized_client, tokens_file
|
||||||
|
):
|
||||||
|
response = authorized_client.post("/auth/new_device")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "token" in response.json
|
||||||
|
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
|
||||||
|
assert read_json(tokens_file)["new_device"]["token"] == token
|
||||||
|
|
||||||
|
file_data = read_json(tokens_file)
|
||||||
|
file_data["new_device"]["expiration"] = str(
|
||||||
|
datetime.datetime.now() - datetime.timedelta(minutes=13)
|
||||||
|
)
|
||||||
|
write_json(tokens_file, file_data)
|
||||||
|
|
||||||
|
response = client.post(
|
||||||
|
"/auth/new_device/authorize",
|
||||||
|
json={"token": response.json["token"], "device": "new_device"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 404
|
||||||
|
|
||||||
|
|
||||||
|
def test_authorize_without_token(client, tokens_file):
|
||||||
|
response = client.post(
|
||||||
|
"/auth/new_device/authorize",
|
||||||
|
json={"device": "new_device"},
|
||||||
|
)
|
||||||
|
assert response.status_code == 400
|
||||||
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
|
# Recovery tokens
|
||||||
|
# GET /auth/recovery_token returns token status
|
||||||
|
# - if token is valid, returns 200 and token status
|
||||||
|
# - token status:
|
||||||
|
# - exists (boolean)
|
||||||
|
# - valid (boolean)
|
||||||
|
# - date (string)
|
||||||
|
# - expiration (string)
|
||||||
|
# - uses_left (int)
|
||||||
|
# - if token is invalid, returns 400 and empty body
|
||||||
|
# POST /auth/recovery_token generates a new token
|
||||||
|
# has two optional parameters:
|
||||||
|
# - expiration (string in datetime format)
|
||||||
|
# - uses_left (int)
|
||||||
|
# POST /auth/recovery_token/use uses the token
|
||||||
|
# required arguments:
|
||||||
|
# - token (string)
|
||||||
|
# - device (string)
|
||||||
|
# - if token is valid, returns 200 and token
|
||||||
|
# - if token is invalid, returns 404
|
||||||
|
# - if request is invalid, returns 400
|
||||||
|
|
||||||
|
|
||||||
|
def test_get_recovery_token_status_unauthorized(client, tokens_file):
|
||||||
|
response = client.get("/auth/recovery_token")
|
||||||
|
assert response.status_code == 401
|
||||||
|
assert read_json(tokens_file) == TOKENS_FILE_CONTETS
|
||||||
|
|
||||||
|
|
||||||
|
def test_generate_recovery_token(authorized_client, client, tokens_file):
|
||||||
|
# Generate token without expiration and uses_left
|
||||||
|
response = authorized_client.post("/auth/recovery_token")
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert "token" in response.json
|
||||||
|
token = Mnemonic(language="english").to_entropy(response.json["token"]).hex()
|
||||||
|
assert read_json(tokens_file)["recovery_token"]["token"] == token
|
||||||
|
|
||||||
|
# Try to use the token
|
||||||
|
recovery_response = client.post(
|
||||||
|
"/auth/recovery_token/use",
|
||||||
|
json={"token": response.json["token"], "device": "recovery_device"},
|
||||||
|
)
|
||||||
|
assert recovery_response.status_code == 200
|
||||||
|
new_token = recovery_response.json["token"]
|
||||||
|
assert read_json(tokens_file)["tokens"][2]["token"] == new_token
|
||||||
|
assert read_json(tokens_file)["tokens"][2]["name"] == "recovery_device"
|
||||||
|
|
||||||
|
# Try to use token again
|
||||||
|
recovery_response = client.post(
|
||||||
|
"/auth/recovery_token/use",
|
||||||
|
json={"token": response.json["token"], "device": "recovery_device2"},
|
||||||
|
)
|
||||||
|
assert recovery_response.status_code == 200
|
||||||
|
new_token = recovery_response.json["token"]
|
||||||
|
assert read_json(tokens_file)["tokens"][3]["token"] == new_token
|
||||||
|
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
|
||||||
|
|
|
@ -3,6 +3,8 @@
|
||||||
import json
|
import json
|
||||||
import pytest
|
import pytest
|
||||||
|
|
||||||
|
from selfprivacy_api.utils import WriteUserData, ReadUserData
|
||||||
|
|
||||||
|
|
||||||
def test_get_api_version(authorized_client):
|
def test_get_api_version(authorized_client):
|
||||||
response = authorized_client.get("/api/version")
|
response = authorized_client.get("/api/version")
|
||||||
|
@ -20,3 +22,15 @@ def test_get_swagger_json(authorized_client):
|
||||||
response = authorized_client.get("/api/swagger.json")
|
response = authorized_client.get("/api/swagger.json")
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert "swagger" in response.get_json()
|
assert "swagger" in response.get_json()
|
||||||
|
|
||||||
|
|
||||||
|
def test_read_invalid_user_data():
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
with ReadUserData("invalid") as user_data:
|
||||||
|
pass
|
||||||
|
|
||||||
|
|
||||||
|
def test_write_invalid_user_data():
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
with WriteUserData("invalid") as user_data:
|
||||||
|
pass
|
||||||
|
|
Loading…
Reference in a new issue