Change datetime formats, more tests

This commit is contained in:
Inex Code 2022-07-08 18:28:08 +03:00
parent 9bd2896db8
commit e3354c73ef
8 changed files with 615 additions and 87 deletions

View file

@ -37,8 +37,8 @@ class DeviceApiTokenMutationReturn(MutationReturnInterface):
class RecoveryKeyLimitsInput:
"""Recovery key limits input"""
expiration_date: typing.Optional[datetime.datetime]
uses: typing.Optional[int]
expiration_date: typing.Optional[datetime.datetime] = None
uses: typing.Optional[int] = None
@strawberry.input
@ -61,26 +61,30 @@ class UseNewDeviceKeyInput:
class ApiMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def get_new_recovery_api_key(
self, limits: RecoveryKeyLimitsInput
self, limits: typing.Optional[RecoveryKeyLimitsInput] = None
) -> ApiKeyMutationReturn:
"""Generate recovery key"""
if limits.expiration_date is not None:
if limits.expiration_date < datetime.datetime.now():
return ApiKeyMutationReturn(
success=False,
message="Expiration date must be in the future",
code=400,
key=None,
)
if limits.uses is not None:
if limits.uses < 1:
return ApiKeyMutationReturn(
success=False,
message="Uses must be greater than 0",
code=400,
key=None,
)
key = generate_recovery_token(limits.expiration_date, limits.uses)
if limits is not None:
if limits.expiration_date is not None:
if limits.expiration_date < datetime.datetime.now():
return ApiKeyMutationReturn(
success=False,
message="Expiration date must be in the future",
code=400,
key=None,
)
if limits.uses is not None:
if limits.uses < 1:
return ApiKeyMutationReturn(
success=False,
message="Uses must be greater than 0",
code=400,
key=None,
)
if limits is not None:
key = generate_recovery_token(limits.expiration_date, limits.uses)
else:
key = generate_recovery_token(None, None)
return ApiKeyMutationReturn(
success=True,
message="Recovery key generated",

View file

@ -158,7 +158,7 @@ class System:
timestamp=None,
)
)
domain: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info)
domain_info: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info)
settings: SystemSettings = SystemSettings()
info: SystemInfo = SystemInfo()
provider: SystemProviderInfo = strawberry.field(resolver=get_system_provider_info)

View file

@ -4,6 +4,7 @@ from datetime import datetime
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.api_auth import api
from selfprivacy_api.utils import parse_date
from selfprivacy_api.utils.auth import (
is_recovery_token_exists,
is_recovery_token_valid,
@ -129,19 +130,17 @@ class RecoveryToken(Resource):
# Convert expiration date to datetime and return 400 if it is not valid
if args["expiration"]:
try:
expiration = datetime.strptime(
args["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
expiration = parse_date(args["expiration"])
# Retrun 400 if expiration date is in the past
if expiration < datetime.now():
return {"message": "Expiration date cannot be in the past"}, 400
except ValueError:
return {
"error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSSZ"
"error": "Invalid expiration date. Use YYYY-MM-DDTHH:MM:SS.SSS"
}, 400
else:
expiration = None
if args["uses"] != None and args["uses"] < 1:
if args["uses"] is not None and args["uses"] < 1:
return {"message": "Uses must be greater than 0"}, 400
# Generate recovery token
token = generate_recovery_token(expiration, args["uses"])

View file

@ -125,13 +125,29 @@ def is_username_forbidden(username):
def parse_date(date_str: str) -> datetime.datetime:
"""Parse date string which can be in
%Y-%m-%dT%H:%M:%S.%fZ or %Y-%m-%d %H:%M:%S.%f format"""
return (
datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
if date_str.endswith("Z")
else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
)
"""Parse date string which can be in one of these formats:
- %Y-%m-%dT%H:%M:%S.%fZ
- %Y-%m-%dT%H:%M:%S.%f
- %Y-%m-%d %H:%M:%S.%fZ
- %Y-%m-%d %H:%M:%S.%f
"""
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%fZ")
except ValueError:
pass
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
except ValueError:
pass
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
except ValueError:
pass
try:
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%f")
except ValueError:
pass
raise ValueError("Invalid date string")
def get_dkim_key(domain):

View file

@ -7,7 +7,7 @@ import typing
from mnemonic import Mnemonic
from . import ReadUserData, UserDataFiles, WriteUserData
from . import ReadUserData, UserDataFiles, WriteUserData, parse_date
"""
Token are stored in the tokens.json file.
@ -121,7 +121,7 @@ def create_token(name):
{
"token": token,
"name": name,
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")),
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")),
}
)
return token
@ -161,9 +161,7 @@ def is_recovery_token_valid():
return False
if "expiration" not in recovery_token or recovery_token["expiration"] is None:
return True
return datetime.now() < datetime.strptime(
recovery_token["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
return datetime.now() < parse_date(recovery_token["expiration"])
def get_recovery_token_status():
@ -213,8 +211,8 @@ def generate_recovery_token(
with WriteUserData(UserDataFiles.TOKENS) as tokens:
tokens["recovery_token"] = {
"token": recovery_token_str,
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%fZ")),
"expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
"date": str(datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f")),
"expiration": expiration.strftime("%Y-%m-%dT%H:%M:%S.%f")
if expiration is not None
else None,
"uses_left": uses_left if uses_left is not None else None,
@ -285,14 +283,7 @@ def _get_new_device_auth_token():
new_device = tokens["new_device"]
if "expiration" not in new_device:
return None
if new_device["expiration"].endswith("Z"):
expiration = datetime.strptime(
new_device["expiration"], "%Y-%m-%dT%H:%M:%S.%fZ"
)
else:
expiration = datetime.strptime(
new_device["expiration"], "%Y-%m-%d %H:%M:%S.%f"
)
expiration = parse_date(new_device["expiration"])
if datetime.now() > expiration:
return None
return new_device["token"]

View file

@ -25,6 +25,13 @@ TOKENS_FILE_CONTETS = {
]
}
DATE_FORMATS = [
"%Y-%m-%dT%H:%M:%S.%fZ",
"%Y-%m-%dT%H:%M:%S.%f",
"%Y-%m-%d %H:%M:%S.%fZ",
"%Y-%m-%d %H:%M:%S.%f",
]
def test_get_tokens_info(authorized_client, tokens_file):
response = authorized_client.get("/auth/tokens")
@ -261,7 +268,7 @@ def test_generate_recovery_token(authorized_client, client, tokens_file):
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
@ -298,14 +305,14 @@ def test_generate_recovery_token(authorized_client, client, tokens_file):
assert read_json(tokens_file)["tokens"][3]["name"] == "recovery_device2"
@pytest.mark.parametrize("timeformat", DATE_FORMATS)
def test_generate_recovery_token_with_expiration_date(
authorized_client, client, tokens_file
authorized_client, client, tokens_file, timeformat
):
# Generate token with expiration date
# Generate expiration date in the future
# Expiration date format is YYYY-MM-DDTHH:MM:SS.SSSZ
expiration_date = datetime.datetime.now() + datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
expiration_date_str = expiration_date.strftime(timeformat)
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date_str},
@ -315,13 +322,15 @@ def test_generate_recovery_token_with_expiration_date(
mnemonic_token = response.json["token"]
token = Mnemonic(language="english").to_entropy(mnemonic_token).hex()
assert read_json(tokens_file)["recovery_token"]["token"] == token
assert read_json(tokens_file)["recovery_token"]["expiration"] == expiration_date_str
assert datetime.datetime.strptime(
read_json(tokens_file)["recovery_token"]["expiration"], "%Y-%m-%dT%H:%M:%S.%f"
) == datetime.datetime.strptime(expiration_date_str, timeformat)
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
# Assert that the token was generated near the current time
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
@ -333,7 +342,7 @@ def test_generate_recovery_token_with_expiration_date(
"exists": True,
"valid": True,
"date": time_generated,
"expiration": expiration_date_str,
"expiration": expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%f"),
"uses_left": None,
}
@ -360,7 +369,7 @@ def test_generate_recovery_token_with_expiration_date(
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"]["expiration"] = datetime.datetime.now().strftime(
"%Y-%m-%dT%H:%M:%S.%fZ"
"%Y-%m-%dT%H:%M:%S.%f"
)
write_json(tokens_file, new_data)
recovery_response = client.post(
@ -383,12 +392,13 @@ def test_generate_recovery_token_with_expiration_date(
}
@pytest.mark.parametrize("timeformat", DATE_FORMATS)
def test_generate_recovery_token_with_expiration_in_the_past(
authorized_client, tokens_file
authorized_client, tokens_file, timeformat
):
# Server must return 400 if expiration date is in the past
expiration_date = datetime.datetime.now() - datetime.timedelta(minutes=5)
expiration_date_str = expiration_date.strftime("%Y-%m-%dT%H:%M:%S.%fZ")
expiration_date_str = expiration_date.strftime(timeformat)
response = authorized_client.post(
"/auth/recovery_token",
json={"expiration": expiration_date_str},
@ -429,7 +439,7 @@ def test_generate_recovery_token_with_limited_uses(
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)

View file

@ -2,6 +2,7 @@
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import json
from time import strftime
import pytest
import datetime
@ -58,7 +59,7 @@ def test_graphql_recovery_key_status_when_none_exists(authorized_client, tokens_
API_RECOVERY_KEY_GENERATE_MUTATION = """
mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput!) {
mutation TestGenerateRecoveryKey($limits: RecoveryKeyLimitsInput) {
getNewRecoveryApiKey(limits: $limits) {
success
message
@ -85,12 +86,6 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
"/graphql",
json={
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": None,
},
},
},
)
assert response.status_code == 200
@ -107,7 +102,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
assert time_generated is not None
key = response.json["data"]["getNewRecoveryApiKey"]["key"]
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
@ -122,7 +117,9 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"][
"creationDate"
] == time_generated.replace("Z", "")
assert response.json["data"]["api"]["recoveryKey"]["expirationDate"] is None
assert response.json["data"]["api"]["recoveryKey"]["usesLeft"] is None
@ -134,7 +131,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
"variables": {
"input": {
"key": key,
"deviceName": "test_token",
"deviceName": "new_test_token",
},
},
},
@ -149,7 +146,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
response.json["data"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][2]["token"]
)
assert read_json(tokens_file)["tokens"][2]["name"] == "test_token"
assert read_json(tokens_file)["tokens"][2]["name"] == "new_test_token"
# Try to use token again
response = client.post(
@ -159,7 +156,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
"variables": {
"input": {
"key": key,
"deviceName": "test_token2",
"deviceName": "new_test_token2",
},
},
},
@ -174,7 +171,7 @@ def test_graphql_generate_recovery_key(client, authorized_client, tokens_file):
response.json["data"]["useRecoveryApiKey"]["token"]
== read_json(tokens_file)["tokens"][3]["token"]
)
assert read_json(tokens_file)["tokens"][3]["name"] == "test_token2"
assert read_json(tokens_file)["tokens"][3]["name"] == "new_test_token2"
def test_graphql_generate_recovery_key_with_expiration_date(
@ -188,7 +185,6 @@ def test_graphql_generate_recovery_key_with_expiration_date(
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": expiration_date_str,
},
},
@ -212,7 +208,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
time_generated = read_json(tokens_file)["recovery_token"]["date"]
assert time_generated is not None
assert (
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%fZ")
datetime.datetime.strptime(time_generated, "%Y-%m-%dT%H:%M:%S.%f")
- datetime.timedelta(seconds=5)
< datetime.datetime.now()
)
@ -227,7 +223,9 @@ def test_graphql_generate_recovery_key_with_expiration_date(
assert response.json["data"]["api"]["recoveryKey"] is not None
assert response.json["data"]["api"]["recoveryKey"]["exists"] is True
assert response.json["data"]["api"]["recoveryKey"]["valid"] is True
assert response.json["data"]["api"]["recoveryKey"]["creationDate"] == time_generated
assert response.json["data"]["api"]["recoveryKey"][
"creationDate"
] == time_generated.replace("Z", "")
assert (
response.json["data"]["api"]["recoveryKey"]["expirationDate"]
== expiration_date_str
@ -242,7 +240,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
"variables": {
"input": {
"key": key,
"deviceName": "test_token",
"deviceName": "new_test_token",
},
},
},
@ -266,7 +264,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
"variables": {
"input": {
"key": key,
"deviceName": "test_token2",
"deviceName": "new_test_token2",
},
},
},
@ -284,9 +282,9 @@ def test_graphql_generate_recovery_key_with_expiration_date(
# Try to use token after expiration date
new_data = read_json(tokens_file)
new_data["recovery_token"][
"expirationDate"
] = datetime.datetime.now() - datetime.timedelta(minutes=5)
new_data["recovery_token"]["expiration"] = (
datetime.datetime.now() - datetime.timedelta(minutes=5)
).strftime("%Y-%m-%dT%H:%M:%S.%f")
write_json(tokens_file, new_data)
response = authorized_client.post(
"/graphql",
@ -295,7 +293,7 @@ def test_graphql_generate_recovery_key_with_expiration_date(
"variables": {
"input": {
"key": key,
"deviceName": "test_token3",
"deviceName": "new_test_token3",
},
},
},
@ -339,7 +337,6 @@ def test_graphql_generate_recovery_key_with_expiration_in_the_past(
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": expiration_date_str,
},
},
@ -366,7 +363,6 @@ def test_graphql_generate_recovery_key_with_invalid_time_format(
"query": API_RECOVERY_KEY_GENERATE_MUTATION,
"variables": {
"limits": {
"uses": None,
"expirationDate": expiration_date_str,
},
},
@ -521,7 +517,6 @@ def test_graphql_generate_recovery_key_with_negative_uses(
"variables": {
"limits": {
"uses": -1,
"expirationDate": None,
},
},
},
@ -543,7 +538,6 @@ def test_graphql_generate_recovery_key_with_zero_uses(authorized_client, tokens_
"variables": {
"limits": {
"uses": 0,
"expirationDate": None,
},
},
},

View file

@ -338,8 +338,170 @@ def test_graphql_change_timezone_unauthorized(client, turned_on):
assert response.json.get("data") is None
API_CHANGE_SERVER_SETTINGS = """
mutation changeServerSettings($settings: SystemSettingsInput!) {
def test_graphql_change_timezone(authorized_client, turned_on):
"""Test change timezone"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_TIMEZONE_MUTATION,
"variables": {
"timezone": "Europe/Helsinki",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeTimezone"]["success"] is True
assert response.json["data"]["changeTimezone"]["message"] is not None
assert response.json["data"]["changeTimezone"]["code"] == 200
assert response.json["data"]["changeTimezone"]["timezone"] == "Europe/Helsinki"
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Helsinki"
def test_graphql_change_timezone_on_undefined(authorized_client, undefined_config):
"""Test change timezone when none is defined in config"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_TIMEZONE_MUTATION,
"variables": {
"timezone": "Europe/Helsinki",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeTimezone"]["success"] is True
assert response.json["data"]["changeTimezone"]["message"] is not None
assert response.json["data"]["changeTimezone"]["code"] == 200
assert response.json["data"]["changeTimezone"]["timezone"] == "Europe/Helsinki"
assert (
read_json(undefined_config / "undefined.json")["timezone"] == "Europe/Helsinki"
)
def test_graphql_change_timezone_without_timezone(authorized_client, turned_on):
"""Test change timezone without timezone"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_TIMEZONE_MUTATION,
"variables": {
"timezone": "",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeTimezone"]["success"] is False
assert response.json["data"]["changeTimezone"]["message"] is not None
assert response.json["data"]["changeTimezone"]["code"] == 400
assert response.json["data"]["changeTimezone"]["timezone"] is None
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow"
def test_graphql_change_timezone_with_invalid_timezone(authorized_client, turned_on):
"""Test change timezone with invalid timezone"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_TIMEZONE_MUTATION,
"variables": {
"timezone": "Invlaid/Timezone",
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeTimezone"]["success"] is False
assert response.json["data"]["changeTimezone"]["message"] is not None
assert response.json["data"]["changeTimezone"]["code"] == 400
assert response.json["data"]["changeTimezone"]["timezone"] is None
assert read_json(turned_on / "turned_on.json")["timezone"] == "Europe/Moscow"
API_GET_AUTO_UPGRADE_SETTINGS_QUERY = """
settings {
autoUpgrade {
enableAutoUpgrade
allowReboot
}
}
"""
def test_graphql_get_auto_upgrade_unauthorized(client, turned_on):
"""Test get auto upgrade settings without auth"""
response = client.get(
"/graphql",
json={
"query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY,
},
)
assert response.status_code == 200
assert response.json.get("data") is None
def test_graphql_get_auto_upgrade(authorized_client, turned_on):
"""Test get auto upgrade settings"""
response = authorized_client.get(
"/graphql",
json={
"query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is True
assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is True
def test_graphql_get_auto_upgrade_on_undefined(authorized_client, undefined_config):
"""Test get auto upgrade settings when none is defined in config"""
response = authorized_client.get(
"/graphql",
json={
"query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is True
assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is False
def test_graphql_get_auto_upgrade_without_vlaues(authorized_client, no_values):
"""Test get auto upgrade settings without values"""
response = authorized_client.get(
"/graphql",
json={
"query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is True
assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is False
def test_graphql_get_auto_upgrade_turned_off(authorized_client, turned_off):
"""Test get auto upgrade settings when turned off"""
response = authorized_client.get(
"/graphql",
json={
"query": API_GET_AUTO_UPGRADE_SETTINGS_QUERY,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert (
response.json["data"]["settings"]["autoUpgrade"]["enableAutoUpgrade"] is False
)
assert response.json["data"]["settings"]["autoUpgrade"]["allowReboot"] is False
API_CHANGE_AUTO_UPGRADE_SETTINGS = """
mutation changeServerSettings($settings: AutoUpgradeSettingsInput!) {
changeAutoUpgradeSettings(settings: $settings) {
success
message
@ -349,3 +511,355 @@ mutation changeServerSettings($settings: SystemSettingsInput!) {
}
}
"""
def test_graphql_change_auto_upgrade_unauthorized(client, turned_on):
"""Test change auto upgrade settings without auth"""
response = client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"enableAutoUpgrade": True,
"allowReboot": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is None
def test_graphql_change_auto_upgrade(authorized_client, turned_on):
"""Test change auto upgrade settings"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"enableAutoUpgrade": False,
"allowReboot": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True
assert read_json(turned_on / "turned_on.json")["autoUpgrade"]["enable"] is False
assert read_json(turned_on / "turned_on.json")["autoUpgrade"]["allowReboot"] is True
def test_graphql_change_auto_upgrade_on_undefined(authorized_client, undefined_config):
"""Test change auto upgrade settings when none is defined in config"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"enableAutoUpgrade": False,
"allowReboot": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True
assert (
read_json(undefined_config / "undefined.json")["autoUpgrade"]["enable"] is False
)
assert (
read_json(undefined_config / "undefined.json")["autoUpgrade"]["allowReboot"]
is True
)
def test_graphql_change_auto_upgrade_without_vlaues(authorized_client, no_values):
"""Test change auto upgrade settings without values"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"enableAutoUpgrade": True,
"allowReboot": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is True
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True
assert read_json(no_values / "no_values.json")["autoUpgrade"]["enable"] is True
assert read_json(no_values / "no_values.json")["autoUpgrade"]["allowReboot"] is True
def test_graphql_change_auto_upgrade_turned_off(authorized_client, turned_off):
"""Test change auto upgrade settings when turned off"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"enableAutoUpgrade": True,
"allowReboot": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is True
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True
assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is True
assert (
read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is True
)
def test_grphql_change_auto_upgrade_without_enable(authorized_client, turned_off):
"""Test change auto upgrade settings without enable"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"allowReboot": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is True
assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is False
assert (
read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is True
)
def test_graphql_change_auto_upgrade_without_allow_reboot(
authorized_client, turned_off
):
"""Test change auto upgrade settings without allow reboot"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {
"enableAutoUpgrade": True,
},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is True
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is False
assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is True
assert (
read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is False
)
def test_graphql_change_auto_upgrade_with_empty_input(authorized_client, turned_off):
"""Test change auto upgrade settings with empty input"""
response = authorized_client.post(
"/graphql",
json={
"query": API_CHANGE_AUTO_UPGRADE_SETTINGS,
"variables": {
"settings": {},
},
},
)
assert response.status_code == 200
assert response.json.get("data") is None
assert response.json["data"]["changeAutoUpgradeSettings"]["success"] is True
assert response.json["data"]["changeAutoUpgradeSettings"]["message"] is not None
assert response.json["data"]["changeAutoUpgradeSettings"]["code"] == 200
assert (
response.json["data"]["changeAutoUpgradeSettings"]["enableAutoUpgrade"] is False
)
assert response.json["data"]["changeAutoUpgradeSettings"]["allowReboot"] is False
assert read_json(turned_off / "turned_off.json")["autoUpgrade"]["enable"] is False
assert (
read_json(turned_off / "turned_off.json")["autoUpgrade"]["allowReboot"] is False
)
API_REBUILD_SYSTEM_MUTATION = """
mutation rebuildSystem() {
runSystemRebuild {
success
message
code
}
}
"""
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
"""Test system rebuild without authorization"""
response = client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json.get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
"""Test system rebuild"""
response = authorized_client.post(
"/graphql",
json={
"query": API_REBUILD_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["runSystemRebuild"]["success"] is True
assert response.json["data"]["runSystemRebuild"]["message"] is not None
assert response.json["data"]["runSystemRebuild"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-rebuild.service",
]
API_UPGRADE_SYSTEM_MUTATION = """
mutation upgradeSystem() {
runSystemUpgrade {
success
message
code
}
}
"""
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
"""Test system upgrade without authorization"""
response = client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json.get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
"""Test system upgrade"""
response = authorized_client.post(
"/graphql",
json={
"query": API_UPGRADE_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["runSystemUpgrade"]["success"] is True
assert response.json["data"]["runSystemUpgrade"]["message"] is not None
assert response.json["data"]["runSystemUpgrade"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-upgrade.service",
]
API_ROLLBACK_SYSTEM_MUTATION = """
mutation rollbackSystem() {
runSystemRollback {
success
message
code
}
}
"""
def test_graphql_system_rollback_unauthorized(client, mock_subprocess_popen):
"""Test system rollback without authorization"""
response = client.post(
"/graphql",
json={
"query": API_ROLLBACK_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json.get("data") is None
assert mock_subprocess_popen.call_count == 0
def test_graphql_system_rollback(authorized_client, mock_subprocess_popen):
"""Test system rollback"""
response = authorized_client.post(
"/graphql",
json={
"query": API_ROLLBACK_SYSTEM_MUTATION,
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["runSystemRollback"]["success"] is True
assert response.json["data"]["runSystemRollback"]["message"] is not None
assert response.json["data"]["runSystemRollback"]["code"] == 200
assert mock_subprocess_popen.call_count == 1
assert mock_subprocess_popen.call_args[0][0] == [
"systemctl",
"start",
"sp-nixos-rollback.service",
]