add basic system getters

This commit is contained in:
Inex Code 2022-06-24 21:14:20 +03:00
parent c6a3588e33
commit 80e5550f7d
14 changed files with 294 additions and 33 deletions

View file

@ -93,10 +93,7 @@ def create_app(test_config=None):
return jsonify({}), 404
app.add_url_rule(
"/graphql",
view_func=AsyncGraphQLView.as_view(
"graphql", schema=schema
)
"/graphql", view_func=AsyncGraphQLView.as_view("graphql", schema=schema)
)
if app.config["ENABLE_SWAGGER"] == "1":

View file

@ -7,8 +7,10 @@ from flask import request
from selfprivacy_api.utils.auth import is_token_valid
class IsAuthenticated(BasePermission):
"""Is authenticated permission"""
message = "You must be authenticated to access this resource."
def has_permission(self, source: typing.Any, info: Info, **kwargs) -> bool:

View file

@ -18,20 +18,28 @@ from selfprivacy_api.utils.auth import (
get_token_name,
)
def get_api_version() -> str:
"""Get API version"""
return "1.2.7"
@strawberry.type
class ApiDevice:
"""A single device with SelfPrivacy app installed"""
name: str
creation_date: datetime.datetime
is_caller: bool
def get_devices() -> typing.List[ApiDevice]:
"""Get list of devices"""
caller_name = get_token_name(request.headers.get("Authorization").split(" ")[1] if request.headers.get("Authorization") is not None else None)
caller_name = get_token_name(
request.headers.get("Authorization").split(" ")[1]
if request.headers.get("Authorization") is not None
else None
)
tokens = get_tokens_info()
return [
ApiDevice(
@ -46,34 +54,52 @@ def get_devices() -> typing.List[ApiDevice]:
@strawberry.type
class ApiRecoveryKeyStatus:
"""Recovery key status"""
exists: bool
valid: bool
creation_date: typing.Optional[datetime.datetime]
expiration_date: typing.Optional[datetime.datetime]
uses_left: typing.Optional[int]
def get_recovery_key_status() -> ApiRecoveryKeyStatus:
"""Get recovery key status"""
if not is_recovery_token_exists():
return ApiRecoveryKeyStatus(
exists=False, valid=False, creation_date=None, expiration_date=None, uses_left=None
exists=False,
valid=False,
creation_date=None,
expiration_date=None,
uses_left=None,
)
status = get_recovery_token_status()
if status is None:
return ApiRecoveryKeyStatus(
exists=False, valid=False, creation_date=None, expiration_date=None, uses_left=None
exists=False,
valid=False,
creation_date=None,
expiration_date=None,
uses_left=None,
)
return ApiRecoveryKeyStatus(
exists=True,
valid=is_recovery_token_valid(),
creation_date=parse_date(status["date"]),
expiration_date=parse_date(status["expiration"]) if status["expiration"] is not None else None,
expiration_date=parse_date(status["expiration"])
if status["expiration"] is not None
else None,
uses_left=status["uses_left"] if status["uses_left"] is not None else None,
)
@strawberry.type
class Api:
"""API access status"""
version: str = strawberry.field(resolver=get_api_version)
devices: typing.List[ApiDevice] = strawberry.field(resolver=get_devices, permission_classes=[IsAuthenticated])
recovery_key: ApiRecoveryKeyStatus = strawberry.field(resolver=get_recovery_key_status, permission_classes=[IsAuthenticated])
devices: typing.List[ApiDevice] = strawberry.field(
resolver=get_devices, permission_classes=[IsAuthenticated]
)
recovery_key: ApiRecoveryKeyStatus = strawberry.field(
resolver=get_recovery_key_status, permission_classes=[IsAuthenticated]
)

View file

@ -4,22 +4,26 @@ import datetime
import typing
import strawberry
@strawberry.enum
class Severity(Enum):
"""
Severity of an alert.
"""
INFO = "INFO"
WARNING = "WARNING"
ERROR = "ERROR"
CRITICAL = "CRITICAL"
SUCCESS = "SUCCESS"
@strawberry.type
class Alert:
"""
Alert type.
"""
severity: Severity
title: str
message: str

View file

@ -4,10 +4,12 @@ import datetime
import typing
import strawberry
@strawberry.enum
class DnsProvider(Enum):
CLOUDFLARE = "CLOUDFLARE"
@strawberry.enum
class ServerProvider(Enum):
HETZNER = "HETZNER"

View file

@ -1,68 +1,158 @@
"""Common system information and settings"""
# pylint: disable=too-few-public-methods
import subprocess
import typing
import strawberry
from selfprivacy_api.graphql.queries.common import Alert
from selfprivacy_api.graphql.queries.providers import DnsProvider, ServerProvider
from selfprivacy_api.utils import ReadUserData
@strawberry.type
class DnsRecord:
"""DNS record"""
recordType: str
name: str
content: str
ttl: int
priority: typing.Optional[int]
@strawberry.type
class SystemDomainInfo:
"""Information about the system domain"""
domain: str
hostname: str
provider: DnsProvider
required_dns_records: typing.List[DnsRecord]
def get_system_domain_info() -> SystemDomainInfo:
"""Get basic system domain info"""
with ReadUserData() as user_data:
return SystemDomainInfo(
domain=user_data["domain"],
hostname=user_data["hostname"],
provider=DnsProvider.CLOUDFLARE,
# TODO: get ip somehow
required_dns_records=[],
)
@strawberry.type
class AutoUpgradeOptions:
"""Automatic upgrade options"""
enable: bool
allow_reboot: bool
def get_auto_upgrade_options() -> AutoUpgradeOptions:
"""Get automatic upgrade options"""
with ReadUserData() as user_data:
if "autoUpgrade" not in user_data:
return AutoUpgradeOptions(enable=True, allow_reboot=False)
if "enable" not in user_data["autoUpgrade"]:
user_data["autoUpgrade"]["enable"] = True
if "allowReboot" not in user_data["autoUpgrade"]:
user_data["autoUpgrade"]["allowReboot"] = False
return AutoUpgradeOptions(
enable=user_data["autoUpgrade"]["enable"],
allow_reboot=user_data["autoUpgrade"]["allowReboot"],
)
@strawberry.type
class SshSettings:
"""SSH settings and root SSH keys"""
enable: bool
password_authentication: bool
root_ssh_keys: typing.List[str]
def get_ssh_settings() -> SshSettings:
"""Get SSH settings"""
with ReadUserData() as user_data:
if "ssh" not in user_data:
return SshSettings(
enable=False, password_authentication=False, root_ssh_keys=[]
)
if "enable" not in user_data["ssh"]:
user_data["ssh"]["enable"] = False
if "passwordAuthentication" not in user_data["ssh"]:
user_data["ssh"]["passwordAuthentication"] = False
if "rootKeys" not in user_data["ssh"]:
user_data["ssh"]["rootKeys"] = []
return SshSettings(
enable=user_data["ssh"]["enable"],
password_authentication=user_data["ssh"]["passwordAuthentication"],
root_ssh_keys=user_data["ssh"]["rootKeys"],
)
def get_system_timezone() -> str:
"""Get system timezone"""
with ReadUserData() as user_data:
if "timezone" not in user_data:
return "Europe/Uzhgorod"
return user_data["timezone"]
@strawberry.type
class SystemSettings:
"""Common system settings"""
auto_upgrade: AutoUpgradeOptions
ssh: SshSettings
timezone: str
auto_upgrade: AutoUpgradeOptions = strawberry.field(
resolver=get_auto_upgrade_options
)
ssh: SshSettings = strawberry.field(resolver=get_ssh_settings)
timezone: str = strawberry.field(resolver=get_system_timezone)
def get_system_version() -> str:
"""Get system version"""
return subprocess.check_output(["uname", "-a"]).decode("utf-8").strip()
def get_python_version() -> str:
"""Get Python version"""
return subprocess.check_output(["python", "-V"]).decode("utf-8").strip()
@strawberry.type
class SystemInfo:
"""System components versions"""
system_version: str
python_version: str
system_version: str = strawberry.field(resolver=get_system_version)
python_version: str = strawberry.field(resolver=get_python_version)
@strawberry.type
class SystemProviderInfo:
"""Information about the VPS/Dedicated server provider"""
provider: ServerProvider
id: str
def get_system_provider_info() -> SystemProviderInfo:
"""Get system provider info"""
return SystemProviderInfo(provider=ServerProvider.HETZNER, id="UNKNOWN")
@strawberry.type
class System:
"""
Base system type which represents common system status
"""
status: Alert
domain: SystemDomainInfo
domain: SystemDomainInfo = strawberry.field(resolver=get_system_domain_info)
settings: SystemSettings
info: SystemInfo
provider: SystemProviderInfo
busy: bool
provider: SystemProviderInfo = strawberry.field(resolver=get_system_provider_info)
busy: bool = False

View file

@ -10,10 +10,13 @@ from selfprivacy_api.graphql.queries.system import System
@strawberry.type
class Query:
"""Root schema for queries"""
system: System
@strawberry.field
def api(self) -> Api:
"""API access status"""
return Api()
schema = strawberry.Schema(query=Query)

View file

@ -3,6 +3,7 @@
from flask_restful import Resource
from selfprivacy_api.graphql.queries.api import get_api_version
class ApiVersion(Resource):
"""SelfPrivacy API version"""

View file

@ -5,6 +5,10 @@ import subprocess
import pytz
from flask import Blueprint
from flask_restful import Resource, Api, reqparse
from selfprivacy_api.graphql.queries.system import (
get_python_version,
get_system_version,
)
from selfprivacy_api.utils import WriteUserData, ReadUserData
@ -256,9 +260,7 @@ class SystemVersion(Resource):
description: Unauthorized
"""
return {
"system_version": subprocess.check_output(["uname", "-a"])
.decode("utf-8")
.strip()
"system_version": get_system_version(),
}
@ -279,7 +281,7 @@ class PythonVersion(Resource):
401:
description: Unauthorized
"""
return subprocess.check_output(["python", "-V"]).decode("utf-8").strip()
return get_python_version()
class PullRepositoryChanges(Resource):

View file

@ -121,7 +121,12 @@ def is_username_forbidden(username):
return False
def parse_date(date_str: str) -> datetime.datetime:
"""Parse date string which can be in
%Y-%m-%dT%H:%M:%S.%fZ or %Y-%m-%d %H:%M:%S.%f format"""
return datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ") if date_str.endswith("Z") else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
return (
datetime.datetime.strptime(date_str, "%Y-%m-%dT%H:%M:%S.%fZ")
if date_str.endswith("Z")
else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
)

11
tests/common.py Normal file
View file

@ -0,0 +1,11 @@
import json
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
def write_json(file_path, data):
with open(file_path, "w", encoding="utf-8") as file:
json.dump(data, file, indent=4)

View file

@ -6,6 +6,8 @@ import re
import pytest
from mnemonic import Mnemonic
from .common import read_json, write_json
TOKENS_FILE_CONTETS = {
"tokens": [
@ -23,16 +25,6 @@ TOKENS_FILE_CONTETS = {
}
def read_json(file_path):
with open(file_path, "r", encoding="utf-8") as file:
return json.load(file)
def write_json(file_path, data):
with open(file_path, "w", encoding="utf-8") as file:
json.dump(data, file, indent=4)
def test_get_tokens_info(authorized_client, tokens_file):
response = authorized_client.get("/auth/tokens")
assert response.status_code == 200

View file

@ -0,0 +1,14 @@
{
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314"
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314"
}
]
}

View file

@ -0,0 +1,112 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
import json
import pytest
TOKENS_FILE_CONTETS = {
"tokens": [
{
"token": "TEST_TOKEN",
"name": "test_token",
"date": "2022-01-14 08:31:10.789314",
},
{
"token": "TEST_TOKEN2",
"name": "test_token2",
"date": "2022-01-14 08:31:10.789314",
},
]
}
def test_graphql_get_api_version(authorized_client):
response = authorized_client.get(
"/graphql",
json={
"query": """
query {
api {
version
}
}
"""
},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]
def test_graphql_api_version_unauthorized(client):
response = client.get(
"/graphql",
json={
"query": """
query {
api {
version
}
}
"""
},
)
assert response.status_code == 200
assert "version" in response.get_json()["data"]["api"]
def test_graphql_tokens_info(authorized_client, tokens_file):
response = authorized_client.get(
"/graphql",
json={
"query": """
query {
api {
devices {
creationDate
isCaller
name
}
}
}
"""
},
)
assert response.status_code == 200
assert response.json == {
"data": {
"api": {
"devices": [
{
"creationDate": "2022-01-14T08:31:10.789314",
"isCaller": True,
"name": "test_token",
},
{
"creationDate": "2022-01-14T08:31:10.789314",
"isCaller": False,
"name": "test_token2",
},
]
}
}
}
def test_graphql_tokens_info_unauthorized(client, tokens_file):
response = client.get(
"/graphql",
json={
"query": """
query {
api {
devices {
creationDate
isCaller
name
}
}
}
"""
},
)
assert response.status_code == 200
assert response.json["data"] is None