mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-24 04:51:27 +00:00
chore: Merge 2.4.3
This commit is contained in:
parent
aa29d8dfbb
commit
8ffd08e95f
|
@ -370,7 +370,6 @@ class ResticBackupper(AbstractBackupper):
|
||||||
stderr=subprocess.STDOUT,
|
stderr=subprocess.STDOUT,
|
||||||
shell=False,
|
shell=False,
|
||||||
) as handle:
|
) as handle:
|
||||||
|
|
||||||
# for some reason restore does not support
|
# for some reason restore does not support
|
||||||
# nice reporting of progress via json
|
# nice reporting of progress via json
|
||||||
output = handle.communicate()[0].decode("utf-8")
|
output = handle.communicate()[0].decode("utf-8")
|
||||||
|
|
|
@ -17,7 +17,6 @@ class UserType(Enum):
|
||||||
|
|
||||||
@strawberry.type
|
@strawberry.type
|
||||||
class User:
|
class User:
|
||||||
|
|
||||||
user_type: UserType
|
user_type: UserType
|
||||||
username: str
|
username: str
|
||||||
# userHomeFolderspace: UserHomeFolderUsage
|
# userHomeFolderspace: UserHomeFolderUsage
|
||||||
|
@ -32,7 +31,6 @@ class UserMutationReturn(MutationReturnInterface):
|
||||||
|
|
||||||
|
|
||||||
def get_user_by_username(username: str) -> typing.Optional[User]:
|
def get_user_by_username(username: str) -> typing.Optional[User]:
|
||||||
|
|
||||||
user = users_actions.get_user_by_username(username)
|
user = users_actions.get_user_by_username(username)
|
||||||
if user is None:
|
if user is None:
|
||||||
return None
|
return None
|
||||||
|
|
|
@ -15,7 +15,6 @@ from selfprivacy_api.jobs import Jobs
|
||||||
class Job:
|
class Job:
|
||||||
@strawberry.field
|
@strawberry.field
|
||||||
def get_jobs(self) -> typing.List[ApiJob]:
|
def get_jobs(self) -> typing.List[ApiJob]:
|
||||||
|
|
||||||
Jobs.get_jobs()
|
Jobs.get_jobs()
|
||||||
|
|
||||||
return [job_to_api_job(job) for job in Jobs.get_jobs()]
|
return [job_to_api_job(job) for job in Jobs.get_jobs()]
|
||||||
|
|
|
@ -58,7 +58,9 @@ class JitsiMeet(Service):
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def is_enabled() -> bool:
|
def is_enabled() -> bool:
|
||||||
with ReadUserData() as user_data:
|
with ReadUserData() as user_data:
|
||||||
return user_data.get("modules", {}).get("jitsi-meet", {}).get("enable", False)
|
return (
|
||||||
|
user_data.get("modules", {}).get("jitsi-meet", {}).get("enable", False)
|
||||||
|
)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def get_status() -> ServiceStatus:
|
def get_status() -> ServiceStatus:
|
||||||
|
|
|
@ -210,9 +210,13 @@ class Service(ABC):
|
||||||
return root_device
|
return root_device
|
||||||
with utils.ReadUserData() as userdata:
|
with utils.ReadUserData() as userdata:
|
||||||
if userdata.get("useBinds", False):
|
if userdata.get("useBinds", False):
|
||||||
return userdata.get("modules", {}).get(cls.get_id(), {}).get(
|
return (
|
||||||
"location",
|
userdata.get("modules", {})
|
||||||
root_device,
|
.get(cls.get_id(), {})
|
||||||
|
.get(
|
||||||
|
"location",
|
||||||
|
root_device,
|
||||||
|
)
|
||||||
)
|
)
|
||||||
else:
|
else:
|
||||||
return root_device
|
return root_device
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -251,7 +251,6 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(
|
||||||
def test_graphql_generate_recovery_key_with_limited_uses(
|
def test_graphql_generate_recovery_key_with_limited_uses(
|
||||||
authorized_client, client, tokens_file
|
authorized_client, client, tokens_file
|
||||||
):
|
):
|
||||||
|
|
||||||
mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
|
mnemonic_key = graphql_make_new_recovery_key(authorized_client, uses=2)
|
||||||
|
|
||||||
status = graphql_recovery_status(authorized_client)
|
status = graphql_recovery_status(authorized_client)
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -8,12 +8,6 @@ from tests.common import generate_system_query, read_json
|
||||||
from tests.test_graphql.common import assert_empty
|
from tests.test_graphql.common import assert_empty
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def domain_file(mocker, datadir):
|
|
||||||
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
|
|
||||||
return datadir
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def turned_on(mocker, datadir):
|
def turned_on(mocker, datadir):
|
||||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
|
||||||
|
@ -250,7 +244,7 @@ def is_dns_record_in_array(records, dns_record) -> bool:
|
||||||
|
|
||||||
|
|
||||||
def test_graphql_get_domain(
|
def test_graphql_get_domain(
|
||||||
authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
|
authorized_client, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
|
||||||
):
|
):
|
||||||
"""Test get domain"""
|
"""Test get domain"""
|
||||||
response = authorized_client.post(
|
response = authorized_client.post(
|
||||||
|
@ -261,7 +255,9 @@ def test_graphql_get_domain(
|
||||||
)
|
)
|
||||||
assert response.status_code == 200
|
assert response.status_code == 200
|
||||||
assert response.json().get("data") is not None
|
assert response.json().get("data") is not None
|
||||||
assert response.json()["data"]["system"]["domainInfo"]["domain"] == "test.tld"
|
assert (
|
||||||
|
response.json()["data"]["system"]["domainInfo"]["domain"] == "test-domain.tld"
|
||||||
|
)
|
||||||
assert (
|
assert (
|
||||||
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
|
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
|
||||||
)
|
)
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
test-domain.tld
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -4,7 +4,7 @@
|
||||||
"enableSwagger": false
|
"enableSwagger": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"resticPassword": "PASS",
|
"resticPassword": "PASS",
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -196,7 +196,6 @@ def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_pop
|
||||||
|
|
||||||
|
|
||||||
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
|
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
|
||||||
response = authorized_client.post(
|
response = authorized_client.post(
|
||||||
"/graphql",
|
"/graphql",
|
||||||
json={
|
json={
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -1,457 +0,0 @@
|
||||||
# pylint: disable=redefined-outer-name
|
|
||||||
# pylint: disable=unused-argument
|
|
||||||
# pylint: disable=missing-function-docstring
|
|
||||||
import datetime
|
|
||||||
from datetime import timezone
|
|
||||||
import pytest
|
|
||||||
|
|
||||||
from tests.conftest import TOKENS_FILE_CONTENTS
|
|
||||||
from tests.common import (
|
|
||||||
RECOVERY_KEY_VALIDATION_DATETIME,
|
|
||||||
DEVICE_KEY_VALIDATION_DATETIME,
|
|
||||||
NearFuture,
|
|
||||||
assert_recovery_recent,
|
|
||||||
)
|
|
||||||
from tests.common import five_minutes_into_future_naive_utc as five_minutes_into_future
|
|
||||||
from tests.common import five_minutes_into_past_naive_utc as five_minutes_into_past
|
|
||||||
|
|
||||||
DATE_FORMATS = [
|
|
||||||
"%Y-%m-%dT%H:%M:%S.%fZ",
|
|
||||||
"%Y-%m-%dT%H:%M:%S.%f",
|
|
||||||
"%Y-%m-%d %H:%M:%S.%fZ",
|
|
||||||
"%Y-%m-%d %H:%M:%S.%f",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def assert_original(client):
|
|
||||||
new_tokens = rest_get_tokens_info(client)
|
|
||||||
|
|
||||||
for token in TOKENS_FILE_CONTENTS["tokens"]:
|
|
||||||
assert_token_valid(client, token["token"])
|
|
||||||
for new_token in new_tokens:
|
|
||||||
if new_token["name"] == token["name"]:
|
|
||||||
assert (
|
|
||||||
datetime.datetime.fromisoformat(new_token["date"]) == token["date"]
|
|
||||||
)
|
|
||||||
assert_no_recovery(client)
|
|
||||||
|
|
||||||
|
|
||||||
def assert_token_valid(client, token):
|
|
||||||
client.headers.update({"Authorization": "Bearer " + token})
|
|
||||||
assert rest_get_tokens_info(client) is not None
|
|
||||||
|
|
||||||
|
|
||||||
def rest_get_tokens_info(client):
|
|
||||||
response = client.get("/auth/tokens")
|
|
||||||
assert response.status_code == 200
|
|
||||||
return response.json()
|
|
||||||
|
|
||||||
|
|
||||||
def rest_try_authorize_new_device(client, token, device_name):
|
|
||||||
response = client.post(
|
|
||||||
"/auth/new_device/authorize",
|
|
||||||
json={
|
|
||||||
"token": token,
|
|
||||||
"device": device_name,
|
|
||||||
},
|
|
||||||
)
|
|
||||||
return response
|
|
||||||
|
|
||||||
|
|
||||||
def rest_make_recovery_token(client, expires_at=None, timeformat=None, uses=None):
|
|
||||||
json = {}
|
|
||||||
|
|
||||||
if expires_at is not None:
|
|
||||||
assert timeformat is not None
|
|
||||||
expires_at_str = expires_at.strftime(timeformat)
|
|
||||||
json["expiration"] = expires_at_str
|
|
||||||
|
|
||||||
if uses is not None:
|
|
||||||
json["uses"] = uses
|
|
||||||
|
|
||||||
if json == {}:
|
|
||||||
response = client.post("/auth/recovery_token")
|
|
||||||
else:
|
|
||||||
response = client.post(
|
|
||||||
"/auth/recovery_token",
|
|
||||||
json=json,
|
|
||||||
)
|
|
||||||
|
|
||||||
if not response.status_code == 200:
|
|
||||||
raise ValueError(response.reason, response.text, response.json()["detail"])
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert "token" in response.json()
|
|
||||||
return response.json()["token"]
|
|
||||||
|
|
||||||
|
|
||||||
def rest_get_recovery_status(client):
|
|
||||||
response = client.get("/auth/recovery_token")
|
|
||||||
assert response.status_code == 200
|
|
||||||
return response.json()
|
|
||||||
|
|
||||||
|
|
||||||
def rest_get_recovery_date(client):
|
|
||||||
status = rest_get_recovery_status(client)
|
|
||||||
assert "date" in status
|
|
||||||
return status["date"]
|
|
||||||
|
|
||||||
|
|
||||||
def assert_no_recovery(client):
|
|
||||||
assert not rest_get_recovery_status(client)["exists"]
|
|
||||||
|
|
||||||
|
|
||||||
def rest_recover_with_mnemonic(client, mnemonic_token, device_name):
|
|
||||||
recovery_response = client.post(
|
|
||||||
"/auth/recovery_token/use",
|
|
||||||
json={"token": mnemonic_token, "device": device_name},
|
|
||||||
)
|
|
||||||
assert recovery_response.status_code == 200
|
|
||||||
new_token = recovery_response.json()["token"]
|
|
||||||
assert_token_valid(client, new_token)
|
|
||||||
return new_token
|
|
||||||
|
|
||||||
|
|
||||||
# Tokens
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_tokens_info(authorized_client, tokens_file):
|
|
||||||
assert sorted(rest_get_tokens_info(authorized_client), key=lambda x: x["name"]) == [
|
|
||||||
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True},
|
|
||||||
{
|
|
||||||
"name": "test_token2",
|
|
||||||
"date": "2022-01-14T08:31:10.789314",
|
|
||||||
"is_caller": False,
|
|
||||||
},
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_tokens_unauthorized(client, tokens_file):
|
|
||||||
response = client.get("/auth/tokens")
|
|
||||||
assert response.status_code == 401
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_token_unauthorized(client, authorized_client, tokens_file):
|
|
||||||
response = client.delete("/auth/tokens")
|
|
||||||
assert response.status_code == 401
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_token(authorized_client, tokens_file):
|
|
||||||
response = authorized_client.delete(
|
|
||||||
"/auth/tokens", json={"token_name": "test_token2"}
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert rest_get_tokens_info(authorized_client) == [
|
|
||||||
{"name": "test_token", "date": "2022-01-14T08:31:10.789314", "is_caller": True}
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_self_token(authorized_client, tokens_file):
|
|
||||||
response = authorized_client.delete(
|
|
||||||
"/auth/tokens", json={"token_name": "test_token"}
|
|
||||||
)
|
|
||||||
assert response.status_code == 400
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_nonexistent_token(authorized_client, tokens_file):
|
|
||||||
response = authorized_client.delete(
|
|
||||||
"/auth/tokens", json={"token_name": "test_token3"}
|
|
||||||
)
|
|
||||||
assert response.status_code == 404
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_refresh_token_unauthorized(client, authorized_client, tokens_file):
|
|
||||||
response = client.post("/auth/tokens")
|
|
||||||
assert response.status_code == 401
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_refresh_token(authorized_client, tokens_file):
|
|
||||||
response = authorized_client.post("/auth/tokens")
|
|
||||||
assert response.status_code == 200
|
|
||||||
new_token = response.json()["token"]
|
|
||||||
assert_token_valid(authorized_client, new_token)
|
|
||||||
|
|
||||||
|
|
||||||
# New device
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_new_device_auth_token_unauthorized(client, authorized_client, tokens_file):
|
|
||||||
response = client.post("/auth/new_device")
|
|
||||||
assert response.status_code == 401
|
|
||||||
assert "token" not in response.json()
|
|
||||||
assert "detail" in response.json()
|
|
||||||
# We only can check existence of a token we know.
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_and_delete_new_device_token(client, authorized_client, tokens_file):
|
|
||||||
token = rest_get_new_device_token(authorized_client)
|
|
||||||
response = authorized_client.delete("/auth/new_device", json={"token": token})
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert rest_try_authorize_new_device(client, token, "new_device").status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
def test_delete_token_unauthenticated(client, authorized_client, tokens_file):
|
|
||||||
token = rest_get_new_device_token(authorized_client)
|
|
||||||
response = client.delete("/auth/new_device", json={"token": token})
|
|
||||||
assert response.status_code == 401
|
|
||||||
assert rest_try_authorize_new_device(client, token, "new_device").status_code == 200
|
|
||||||
|
|
||||||
|
|
||||||
def rest_get_new_device_token(client):
|
|
||||||
response = client.post("/auth/new_device")
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert "token" in response.json()
|
|
||||||
return response.json()["token"]
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_and_authorize_new_device(client, authorized_client, tokens_file):
|
|
||||||
token = rest_get_new_device_token(authorized_client)
|
|
||||||
response = rest_try_authorize_new_device(client, token, "new_device")
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert_token_valid(authorized_client, response.json()["token"])
|
|
||||||
|
|
||||||
|
|
||||||
def test_authorize_new_device_with_invalid_token(
|
|
||||||
client, authorized_client, tokens_file
|
|
||||||
):
|
|
||||||
response = rest_try_authorize_new_device(client, "invalid_token", "new_device")
|
|
||||||
assert response.status_code == 404
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_and_authorize_used_token(client, authorized_client, tokens_file):
|
|
||||||
token_to_be_used_2_times = rest_get_new_device_token(authorized_client)
|
|
||||||
response = rest_try_authorize_new_device(
|
|
||||||
client, token_to_be_used_2_times, "new_device"
|
|
||||||
)
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert_token_valid(authorized_client, response.json()["token"])
|
|
||||||
|
|
||||||
response = rest_try_authorize_new_device(
|
|
||||||
client, token_to_be_used_2_times, "new_device"
|
|
||||||
)
|
|
||||||
assert response.status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_and_authorize_token_after_12_minutes(
|
|
||||||
client, authorized_client, tokens_file, mocker
|
|
||||||
):
|
|
||||||
token = rest_get_new_device_token(authorized_client)
|
|
||||||
|
|
||||||
# TARDIS sounds
|
|
||||||
mock = mocker.patch(DEVICE_KEY_VALIDATION_DATETIME, NearFuture)
|
|
||||||
|
|
||||||
response = rest_try_authorize_new_device(client, token, "new_device")
|
|
||||||
assert response.status_code == 404
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_authorize_without_token(client, authorized_client, tokens_file):
|
|
||||||
response = client.post(
|
|
||||||
"/auth/new_device/authorize",
|
|
||||||
json={"device": "new_device"},
|
|
||||||
)
|
|
||||||
assert response.status_code == 422
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
# Recovery tokens
|
|
||||||
# GET /auth/recovery_token returns token status
|
|
||||||
# - if token is valid, returns 200 and token status
|
|
||||||
# - token status:
|
|
||||||
# - exists (boolean)
|
|
||||||
# - valid (boolean)
|
|
||||||
# - date (string)
|
|
||||||
# - expiration (string)
|
|
||||||
# - uses_left (int)
|
|
||||||
# - if token is invalid, returns 400 and empty body
|
|
||||||
# POST /auth/recovery_token generates a new token
|
|
||||||
# has two optional parameters:
|
|
||||||
# - expiration (string in datetime format)
|
|
||||||
# - uses_left (int)
|
|
||||||
# POST /auth/recovery_token/use uses the token
|
|
||||||
# required arguments:
|
|
||||||
# - token (string)
|
|
||||||
# - device (string)
|
|
||||||
# - if token is valid, returns 200 and token
|
|
||||||
# - if token is invalid, returns 404
|
|
||||||
# - if request is invalid, returns 400
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_recovery_token_status_unauthorized(client, authorized_client, tokens_file):
|
|
||||||
response = client.get("/auth/recovery_token")
|
|
||||||
assert response.status_code == 401
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_get_recovery_token_when_none_exists(authorized_client, tokens_file):
|
|
||||||
response = authorized_client.get("/auth/recovery_token")
|
|
||||||
assert response.status_code == 200
|
|
||||||
assert response.json() == {
|
|
||||||
"exists": False,
|
|
||||||
"valid": False,
|
|
||||||
"date": None,
|
|
||||||
"expiration": None,
|
|
||||||
"uses_left": None,
|
|
||||||
}
|
|
||||||
assert_original(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_generate_recovery_token(authorized_client, client, tokens_file):
|
|
||||||
# Generate token without expiration and uses_left
|
|
||||||
mnemonic_token = rest_make_recovery_token(authorized_client)
|
|
||||||
|
|
||||||
time_generated = rest_get_recovery_date(authorized_client)
|
|
||||||
assert_recovery_recent(time_generated)
|
|
||||||
|
|
||||||
assert rest_get_recovery_status(authorized_client) == {
|
|
||||||
"exists": True,
|
|
||||||
"valid": True,
|
|
||||||
"date": time_generated,
|
|
||||||
"expiration": None,
|
|
||||||
"uses_left": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
|
|
||||||
# And again
|
|
||||||
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("timeformat", DATE_FORMATS)
|
|
||||||
def test_generate_recovery_token_with_expiration_date(
|
|
||||||
authorized_client, client, tokens_file, timeformat, mocker
|
|
||||||
):
|
|
||||||
# Generate token with expiration date
|
|
||||||
# Generate expiration date in the future
|
|
||||||
expiration_date = five_minutes_into_future()
|
|
||||||
mnemonic_token = rest_make_recovery_token(
|
|
||||||
authorized_client, expires_at=expiration_date, timeformat=timeformat
|
|
||||||
)
|
|
||||||
|
|
||||||
time_generated = rest_get_recovery_date(authorized_client)
|
|
||||||
assert_recovery_recent(time_generated)
|
|
||||||
|
|
||||||
assert rest_get_recovery_status(authorized_client) == {
|
|
||||||
"exists": True,
|
|
||||||
"valid": True,
|
|
||||||
"date": time_generated,
|
|
||||||
"expiration": expiration_date.replace(tzinfo=timezone.utc).isoformat(),
|
|
||||||
"uses_left": None,
|
|
||||||
}
|
|
||||||
|
|
||||||
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
|
|
||||||
# And again
|
|
||||||
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
|
|
||||||
|
|
||||||
# Try to use token after expiration date
|
|
||||||
mock = mocker.patch(RECOVERY_KEY_VALIDATION_DATETIME, NearFuture)
|
|
||||||
device_name = "recovery_device3"
|
|
||||||
recovery_response = client.post(
|
|
||||||
"/auth/recovery_token/use",
|
|
||||||
json={"token": mnemonic_token, "device": device_name},
|
|
||||||
)
|
|
||||||
assert recovery_response.status_code == 404
|
|
||||||
# Assert that the token was not created
|
|
||||||
assert device_name not in [
|
|
||||||
token["name"] for token in rest_get_tokens_info(authorized_client)
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.mark.parametrize("timeformat", DATE_FORMATS)
|
|
||||||
def test_generate_recovery_token_with_expiration_in_the_past(
|
|
||||||
authorized_client, tokens_file, timeformat
|
|
||||||
):
|
|
||||||
# Server must return 400 if expiration date is in the past
|
|
||||||
expiration_date = five_minutes_into_past()
|
|
||||||
expiration_date_str = expiration_date.strftime(timeformat)
|
|
||||||
response = authorized_client.post(
|
|
||||||
"/auth/recovery_token",
|
|
||||||
json={"expiration": expiration_date_str},
|
|
||||||
)
|
|
||||||
assert response.status_code == 400
|
|
||||||
assert_no_recovery(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_generate_recovery_token_with_invalid_time_format(
|
|
||||||
authorized_client, tokens_file
|
|
||||||
):
|
|
||||||
# Server must return 400 if expiration date is in the past
|
|
||||||
expiration_date = "invalid_time_format"
|
|
||||||
response = authorized_client.post(
|
|
||||||
"/auth/recovery_token",
|
|
||||||
json={"expiration": expiration_date},
|
|
||||||
)
|
|
||||||
assert response.status_code == 422
|
|
||||||
assert_no_recovery(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_generate_recovery_token_with_limited_uses(
|
|
||||||
authorized_client, client, tokens_file
|
|
||||||
):
|
|
||||||
# Generate token with limited uses
|
|
||||||
mnemonic_token = rest_make_recovery_token(authorized_client, uses=2)
|
|
||||||
|
|
||||||
time_generated = rest_get_recovery_date(authorized_client)
|
|
||||||
assert_recovery_recent(time_generated)
|
|
||||||
|
|
||||||
assert rest_get_recovery_status(authorized_client) == {
|
|
||||||
"exists": True,
|
|
||||||
"valid": True,
|
|
||||||
"date": time_generated,
|
|
||||||
"expiration": None,
|
|
||||||
"uses_left": 2,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Try to use the token
|
|
||||||
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device")
|
|
||||||
|
|
||||||
assert rest_get_recovery_status(authorized_client) == {
|
|
||||||
"exists": True,
|
|
||||||
"valid": True,
|
|
||||||
"date": time_generated,
|
|
||||||
"expiration": None,
|
|
||||||
"uses_left": 1,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Try to use token again
|
|
||||||
rest_recover_with_mnemonic(client, mnemonic_token, "recover_device2")
|
|
||||||
|
|
||||||
assert rest_get_recovery_status(authorized_client) == {
|
|
||||||
"exists": True,
|
|
||||||
"valid": False,
|
|
||||||
"date": time_generated,
|
|
||||||
"expiration": None,
|
|
||||||
"uses_left": 0,
|
|
||||||
}
|
|
||||||
|
|
||||||
# Try to use token after limited uses
|
|
||||||
recovery_response = client.post(
|
|
||||||
"/auth/recovery_token/use",
|
|
||||||
json={"token": mnemonic_token, "device": "recovery_device3"},
|
|
||||||
)
|
|
||||||
assert recovery_response.status_code == 404
|
|
||||||
|
|
||||||
|
|
||||||
def test_generate_recovery_token_with_negative_uses(
|
|
||||||
authorized_client, client, tokens_file
|
|
||||||
):
|
|
||||||
# Generate token with limited uses
|
|
||||||
response = authorized_client.post(
|
|
||||||
"/auth/recovery_token",
|
|
||||||
json={"uses": -2},
|
|
||||||
)
|
|
||||||
assert response.status_code == 400
|
|
||||||
assert_no_recovery(authorized_client)
|
|
||||||
|
|
||||||
|
|
||||||
def test_generate_recovery_token_with_zero_uses(authorized_client, client, tokens_file):
|
|
||||||
# Generate token with limited uses
|
|
||||||
response = authorized_client.post(
|
|
||||||
"/auth/recovery_token",
|
|
||||||
json={"uses": 0},
|
|
||||||
)
|
|
||||||
assert response.status_code == 400
|
|
||||||
assert_no_recovery(authorized_client)
|
|
|
@ -13,12 +13,6 @@ def read_json(file_path):
|
||||||
return json.load(file)
|
return json.load(file)
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
|
||||||
def domain_file(mocker, datadir):
|
|
||||||
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
|
|
||||||
return datadir
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def turned_on(mocker, datadir):
|
def turned_on(mocker, datadir):
|
||||||
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
|
||||||
|
@ -108,7 +102,7 @@ def test_wrong_auth(wrong_auth_client):
|
||||||
assert response.status_code == 401
|
assert response.status_code == 401
|
||||||
|
|
||||||
|
|
||||||
def test_get_domain(authorized_client, domain_file):
|
def test_get_domain(authorized_client, turned_on):
|
||||||
assert get_domain() == "test-domain.tld"
|
assert get_domain() == "test-domain.tld"
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1 +0,0 @@
|
||||||
test-domain.tld
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": true
|
"enable": true
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
|
@ -7,7 +7,7 @@
|
||||||
"enable": false
|
"enable": false
|
||||||
},
|
},
|
||||||
"databasePassword": "PASSWORD",
|
"databasePassword": "PASSWORD",
|
||||||
"domain": "test.tld",
|
"domain": "test-domain.tld",
|
||||||
"hashedMasterPassword": "HASHED_PASSWORD",
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
"hostname": "test-instance",
|
"hostname": "test-instance",
|
||||||
"nextcloud": {
|
"nextcloud": {
|
||||||
|
|
Loading…
Reference in a new issue