Add more tests

This commit is contained in:
Inex Code 2022-07-05 08:14:37 +03:00
parent 503a39f390
commit 376bf1ef77
11 changed files with 509 additions and 10 deletions

View file

@ -7,7 +7,7 @@ from flask_restful import Resource
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils import get_dkim_key, get_domain
class DKIMKey(Resource):
@ -31,15 +31,11 @@ class DKIMKey(Resource):
"""
domain = get_domain()
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
dkim = base64.b64encode(dkim)
dkim = str(dkim, "utf-8")
return dkim
return "DKIM file not found", 404
dkim = get_dkim_key(domain)
if dkim is None:
return "DKIM file not found", 404
dkim = base64.b64encode(dkim.encode("utf-8")).decode("utf-8")
return dkim
api.add_resource(DKIMKey, "/mailserver/dkim")

View file

@ -3,6 +3,8 @@
import datetime
from enum import Enum
import json
import os
import subprocess
import portalocker
@ -130,3 +132,13 @@ def parse_date(date_str: str) -> datetime.datetime:
if date_str.endswith("Z")
else datetime.datetime.strptime(date_str, "%Y-%m-%d %H:%M:%S.%f")
)
def get_dkim_key(domain):
"""Get DKIM key from /var/dkim/<domain>.selector.txt"""
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
)
dkim = cat_process.communicate()[0]
return str(dkim, "utf-8")
return None

View file

@ -0,0 +1,22 @@
#!/usr/bin/env python3
"""Network utils"""
import subprocess
import re
def get_ip4():
"""Get IPv4 address"""
try:
ip4 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8")
ip4 = re.search(r"inet (\d+\.\d+\.\d+\.\d+)\/\d+", ip4)
except subprocess.CalledProcessError:
ip4 = None
return ip4.group(1) if ip4 else None
def get_ip6():
"""Get IPv6 address"""
try:
ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode("utf-8")
ip6 = re.search(r"inet6 (\S+)\/\d+", ip6)
except subprocess.CalledProcessError:
ip6 = None
return ip6.group(1) if ip6 else None

View file

@ -13,5 +13,8 @@ def write_json(file_path, data):
def generate_api_query(query_array):
return "query TestApi {\n api {" + "\n".join(query_array) + "}\n}"
def generate_system_query(query_array):
return "query TestSystem {\n system {" + "\n".join(query_array) + "}\n}"
def mnemonic_to_hex(mnemonic):
return Mnemonic(language="english").to_entropy(mnemonic).hex()

View file

@ -0,0 +1,227 @@
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import json
import pytest
import datetime
from tests.common import generate_system_query, read_json, write_json
@pytest.fixture
def domain_file(mocker, datadir):
mocker.patch("selfprivacy_api.utils.DOMAIN_FILE", datadir / "domain")
return datadir
@pytest.fixture
def turned_on(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_on.json")
assert read_json(datadir / "turned_on.json")["autoUpgrade"]["enable"] == True
assert read_json(datadir / "turned_on.json")["autoUpgrade"]["allowReboot"] == True
assert read_json(datadir / "turned_on.json")["timezone"] == "Europe/Moscow"
return datadir
@pytest.fixture
def turned_off(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "turned_off.json")
assert read_json(datadir / "turned_off.json")["autoUpgrade"]["enable"] == False
assert read_json(datadir / "turned_off.json")["autoUpgrade"]["allowReboot"] == False
assert read_json(datadir / "turned_off.json")["timezone"] == "Europe/Moscow"
return datadir
@pytest.fixture
def undefined_config(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "autoUpgrade" not in read_json(datadir / "undefined.json")
assert "timezone" not in read_json(datadir / "undefined.json")
return datadir
@pytest.fixture
def no_values(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_values.json")
assert "enable" not in read_json(datadir / "no_values.json")["autoUpgrade"]
assert "allowReboot" not in read_json(datadir / "no_values.json")["autoUpgrade"]
return datadir
class ProcessMock:
"""Mock subprocess.Popen"""
def __init__(self, args, **kwargs):
self.args = args
self.kwargs = kwargs
def communicate():
return (b"", None)
returncode = 0
class BrokenServiceMock(ProcessMock):
def communicate():
return (b"Testing error", None)
returncode = 3
@pytest.fixture
def mock_subprocess_popen(mocker):
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
return mock
@pytest.fixture
def mock_os_chdir(mocker):
mock = mocker.patch("os.chdir", autospec=True)
return mock
@pytest.fixture
def mock_broken_service(mocker):
mock = mocker.patch(
"subprocess.Popen", autospec=True, return_value=BrokenServiceMock
)
return mock
@pytest.fixture
def mock_subprocess_check_output(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=b"Testing Linux"
)
return mock
@pytest.fixture
def mock_get_ip4(mocker):
mock = mocker.patch("selfprivacy_api.utils.get_ip4", autospec=True, return_value="157.90.247.192")
return mock
@pytest.fixture
def mock_get_ip6(mocker):
mock = mocker.patch("selfprivacy_api.utils.get_ip6", autospec=True, return_value="fe80::9400:ff:fef1:34ae")
return mock
@pytest.fixture
def mock_dkim_key(mocker):
mock = mocker.patch("selfprivacy_api.utils.get_dkim_key", autospec=True, return_value="I am a DKIM key")
API_PYTHON_VERSION_INFO = """
info {
pythonVersion
}
"""
def test_graphql_wrong_auth(wrong_auth_client):
"""Test wrong auth"""
response = wrong_auth_client.get(
"/graphql",
json={
"query": generate_system_query([API_PYTHON_VERSION_INFO]),
},
)
assert response.status_code == 200
assert response.json.get("data") is None
API_GET_DOMAIN_INFO = """
domainInfo {
domain
hostname
provider
requiredDnsRecords {
type
name
content
ttl
priority
}
}
"""
def dns_record(type="A", name="test.tld", content=None, ttl=3600, priority=None):
if content is None:
if type == "A":
content = "157.90.247.192"
elif type == "AAAA":
content = "fe80::9400:ff:fef1:34ae"
return {
"type": type,
"name": name,
"content": content,
"ttl": ttl,
"priority": priority,
}
def test_graphql_get_domain(authorized_client, domain_file, mock_get_ip4, mock_get_ip6, turned_on):
"""Test get domain"""
response = authorized_client.get(
"/graphql",
json={
"query": generate_system_query([API_GET_DOMAIN_INFO]),
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["system"]["domainInfo"]["domain"] == "test.tld"
assert response.json["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
assert response.json["data"]["system"]["domainInfo"]["provider"] == "HETZNER"
assert response.json["data"]["system"]["domainInfo"]["requiredDnsRecords"] == [
dns_record(),
dns_record(type="AAAA"),
dns_record(name="api.test.tld"),
dns_record(name="api.test.tld", type="AAAA"),
dns_record(name="cloud.test.tld"),
dns_record(name="cloud.test.tld", type="AAAA"),
dns_record(name="git.test.tld"),
dns_record(name="git.test.tld", type="AAAA"),
dns_record(name="meet.test.tld"),
dns_record(name="meet.test.tld", type="AAAA"),
dns_record(name="password.test.tld"),
dns_record(name="password.test.tld", type="AAAA"),
dns_record(name="social.test.tld"),
dns_record(name="social.test.tld", type="AAAA"),
dns_record(name="vpn.test.tld"),
dns_record(name="vpn.test.tld", type="AAAA"),
dns_record(name="test.tld", type="MX", content="test.tld", priority=10),
dns_record(name="_dmarc.test.tld", type="TXT", content="v=DMARC1; p=none", ttl=18000),
dns_record(name="test.tld", type="TXT", content="v=spf1 a mx ip4:157.90.247.192 -all", ttl=18000),
dns_record(name="selector._domainkey.test.tld", type="TXT", content="I am a DKIM key", ttl=18000),
]
API_GET_TIMEZONE = """
settings {
timezone
}
"""
def test_graphql_get_timezone_unauthorized(unauthorized_client, turned_on):
"""Test get timezone"""
response = unauthorized_client.get(
"/graphql",
json={
"query": generate_system_query([API_GET_TIMEZONE]),
},
)
assert response.status_code == 200
assert response.json.get("data") is None
def test_graphql_get_timezone(authorized_client, turned_on):
"""Test get timezone"""
response = authorized_client.get(
"/graphql",
json={
"query": generate_system_query([API_GET_TIMEZONE]),
},
)
assert response.status_code == 200
assert response.json.get("data") is not None
assert response.json["data"]["system"]["settings"]["timezone"] == "Europe/Moscow"
API_GET_PYTHON_VERSION = """
info {
pythonVersion
}
"""

View file

@ -0,0 +1 @@
test-domain.tld

View file

@ -0,0 +1,50 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": false,
"allowReboot": false
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,47 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -0,0 +1,37 @@
#!/usr/bin/env python3
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import pytest
from selfprivacy_api.utils.network import get_ip4, get_ip6
OUTPUT_STRING = b"""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff
altname enp0s3
altname ens3
inet 157.90.247.192/32 brd 157.90.247.192 scope global dynamic eth0
valid_lft 46061sec preferred_lft 35261sec
inet6 fe80::9400:ff:fef1:34ae/64 scope link
valid_lft forever preferred_lft forever
"""
FAILED_OUTPUT_STRING = b"""
Device "eth0" does not exist.
"""
@pytest.fixture
def ip_process_mock(mocker):
mock = mocker.patch("subprocess.check_output", autospec=True, return_value=OUTPUT_STRING)
return mock
def test_get_ip4(ip_process_mock):
"""Test get IPv4 address"""
ip4 = get_ip4()
assert ip4 == "157.90.247.192"
def test_get_ip6(ip_process_mock):
"""Test get IPv6 address"""
ip6 = get_ip6()
assert ip6 == "fe80::9400:ff:fef1:34ae"