mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-25 18:26:34 +00:00
refactor(dkim): do not use popen
This commit is contained in:
parent
2f25329c43
commit
bcbe1ff50c
|
@ -13,6 +13,7 @@ USERDATA_FILE = "/etc/nixos/userdata/userdata.json"
|
||||||
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
|
TOKENS_FILE = "/etc/nixos/userdata/tokens.json"
|
||||||
JOBS_FILE = "/etc/nixos/userdata/jobs.json"
|
JOBS_FILE = "/etc/nixos/userdata/jobs.json"
|
||||||
DOMAIN_FILE = "/var/domain"
|
DOMAIN_FILE = "/var/domain"
|
||||||
|
DKIM_DIR = "/var/dkim/"
|
||||||
|
|
||||||
|
|
||||||
class UserDataFiles(Enum):
|
class UserDataFiles(Enum):
|
||||||
|
@ -167,27 +168,31 @@ def parse_date(date_str: str) -> datetime.datetime:
|
||||||
raise ValueError("Invalid date string")
|
raise ValueError("Invalid date string")
|
||||||
|
|
||||||
|
|
||||||
|
def parse_dkim(dkim: str) -> str:
|
||||||
|
# extract key from file
|
||||||
|
dkim = dkim.split("(")[1]
|
||||||
|
dkim = dkim.split(")")[0]
|
||||||
|
# replace all quotes with nothing
|
||||||
|
dkim = dkim.replace('"', "")
|
||||||
|
# trim whitespace, remove newlines and tabs
|
||||||
|
dkim = dkim.strip()
|
||||||
|
dkim = dkim.replace("\n", "")
|
||||||
|
dkim = dkim.replace("\t", "")
|
||||||
|
# remove all redundant spaces
|
||||||
|
dkim = " ".join(dkim.split())
|
||||||
|
return dkim
|
||||||
|
|
||||||
|
|
||||||
def get_dkim_key(domain: str, parse: bool = True) -> typing.Optional[str]:
|
def get_dkim_key(domain: str, parse: bool = True) -> typing.Optional[str]:
|
||||||
"""Get DKIM key from /var/dkim/<domain>.selector.txt"""
|
"""Get DKIM key from /var/dkim/<domain>.selector.txt"""
|
||||||
if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
|
|
||||||
# Is this really neccessary to use Popen here?
|
dkim_path = os.path.join(DKIM_DIR, domain + ".selector.txt")
|
||||||
cat_process = subprocess.Popen(
|
if os.path.exists(dkim_path):
|
||||||
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
|
with open(dkim_path, encoding="utf-8") as dkim_file:
|
||||||
)
|
dkim = dkim_file.read()
|
||||||
dkim = cat_process.communicate()[0]
|
if parse:
|
||||||
if parse:
|
dkim = parse_dkim(dkim)
|
||||||
# Extract key from file
|
return dkim
|
||||||
dkim = dkim.split(b"(")[1]
|
|
||||||
dkim = dkim.split(b")")[0]
|
|
||||||
# Replace all quotes with nothing
|
|
||||||
dkim = dkim.replace(b'"', b"")
|
|
||||||
# Trim whitespace, remove newlines and tabs
|
|
||||||
dkim = dkim.strip()
|
|
||||||
dkim = dkim.replace(b"\n", b"")
|
|
||||||
dkim = dkim.replace(b"\t", b"")
|
|
||||||
# Remove all redundant spaces
|
|
||||||
dkim = b" ".join(dkim.split())
|
|
||||||
return str(dkim, "utf-8")
|
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -1,68 +1,30 @@
|
||||||
import pytest
|
import pytest
|
||||||
import typing
|
|
||||||
|
|
||||||
|
import os
|
||||||
from os import path
|
from os import path
|
||||||
from unittest.mock import DEFAULT
|
|
||||||
from tests.conftest import global_data_dir
|
from tests.conftest import global_data_dir
|
||||||
|
|
||||||
from selfprivacy_api.utils import get_dkim_key, get_domain
|
from selfprivacy_api.utils import get_dkim_key, get_domain
|
||||||
import selfprivacy_api.utils as utils
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
|
||||||
|
DKIM_FILE_CONTENT = b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for test-domain.tld\n'
|
||||||
class ProcessMock:
|
|
||||||
"""Mock subprocess.Popen"""
|
|
||||||
|
|
||||||
def __init__(self, args, **kwargs):
|
|
||||||
self.args = args
|
|
||||||
self.kwargs = kwargs
|
|
||||||
|
|
||||||
def communicate():
|
|
||||||
return (
|
|
||||||
b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for test-domain.tld\n',
|
|
||||||
None,
|
|
||||||
)
|
|
||||||
|
|
||||||
|
|
||||||
class NoFileMock(ProcessMock):
|
|
||||||
def communicate():
|
|
||||||
return (b"", None)
|
|
||||||
|
|
||||||
|
|
||||||
def _path_exists_with_masked_paths(filepath, masked_paths: typing.List[str]):
|
|
||||||
if filepath in masked_paths:
|
|
||||||
return False
|
|
||||||
else:
|
|
||||||
# this will cause the mocker to return the standard path.exists output
|
|
||||||
# see https://docs.python.org/3/library/unittest.mock.html#unittest.mock.Mock.side_effect
|
|
||||||
return DEFAULT
|
|
||||||
|
|
||||||
|
|
||||||
def path_exists_func_but_with_masked_paths(masked_paths: typing.List[str]):
|
|
||||||
"""
|
|
||||||
Sometimes we do not want to pretend that no files exist at all, but that only specific files do not exist
|
|
||||||
This provides the needed path.exists function for some arbitrary list of masked paths
|
|
||||||
"""
|
|
||||||
return lambda x: _path_exists_with_masked_paths(x, masked_paths)
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_all_paths_exist(mocker):
|
def dkim_file(mocker, domain_file, tmpdir):
|
||||||
mock = mocker.patch("os.path.exists", autospec=True, return_value=True)
|
domain = get_domain()
|
||||||
return mock
|
assert domain is not None
|
||||||
|
assert domain != ""
|
||||||
|
|
||||||
|
filename = domain + ".selector.txt"
|
||||||
|
dkim_path = path.join(tmpdir, filename)
|
||||||
|
|
||||||
@pytest.fixture
|
with open(dkim_path, "wb") as file:
|
||||||
def mock_subproccess_popen_dkimfile(mocker):
|
file.write(DKIM_FILE_CONTENT)
|
||||||
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
|
|
||||||
return mock
|
|
||||||
|
|
||||||
|
mocker.patch("selfprivacy_api.utils.DKIM_DIR", tmpdir)
|
||||||
@pytest.fixture
|
return dkim_path
|
||||||
def mock_subproccess_popen(mocker):
|
|
||||||
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=NoFileMock)
|
|
||||||
return mock
|
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -74,46 +36,25 @@ def domain_file(mocker):
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
def mock_no_dkim_file(mocker):
|
def no_dkim_file(dkim_file):
|
||||||
"""
|
os.remove(dkim_file)
|
||||||
Should have domain mocks
|
assert path.exists(dkim_file) is False
|
||||||
"""
|
return dkim_file
|
||||||
domain = utils.get_domain()
|
|
||||||
# try:
|
|
||||||
# domain = get_domain()
|
|
||||||
# except Exception as e:
|
|
||||||
# domain = ""
|
|
||||||
|
|
||||||
masked_files = ["/var/dkim/" + domain + ".selector.txt"]
|
|
||||||
mock = mocker.patch(
|
|
||||||
"os.path.exists",
|
|
||||||
side_effect=path_exists_func_but_with_masked_paths(masked_files),
|
|
||||||
)
|
|
||||||
return mock
|
|
||||||
|
|
||||||
|
|
||||||
###############################################################################
|
###############################################################################
|
||||||
|
|
||||||
|
|
||||||
def test_get_dkim_key(
|
def test_get_dkim_key(domain_file, dkim_file):
|
||||||
mock_subproccess_popen_dkimfile, mock_all_paths_exist, domain_file
|
|
||||||
):
|
|
||||||
"""Test DKIM key"""
|
"""Test DKIM key"""
|
||||||
dkim_key = get_dkim_key("test-domain.tld")
|
dkim_key = get_dkim_key("test-domain.tld")
|
||||||
assert (
|
assert (
|
||||||
dkim_key
|
dkim_key
|
||||||
== "v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB"
|
== "v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB"
|
||||||
)
|
)
|
||||||
assert mock_subproccess_popen_dkimfile.call_args[0][0] == [
|
|
||||||
"cat",
|
|
||||||
"/var/dkim/test-domain.tld.selector.txt",
|
|
||||||
]
|
|
||||||
|
|
||||||
|
|
||||||
def test_no_dkim_key(
|
def test_no_dkim_key(domain_file, no_dkim_file):
|
||||||
authorized_client, domain_file, mock_no_dkim_file, mock_subproccess_popen
|
|
||||||
):
|
|
||||||
"""Test no DKIM key"""
|
"""Test no DKIM key"""
|
||||||
dkim_key = get_dkim_key("test-domain.tld")
|
dkim_key = get_dkim_key("test-domain.tld")
|
||||||
assert dkim_key is None
|
assert dkim_key is None
|
||||||
assert mock_subproccess_popen.called == False
|
|
||||||
|
|
|
@ -6,7 +6,7 @@ import pytest
|
||||||
|
|
||||||
from tests.common import generate_system_query, read_json
|
from tests.common import generate_system_query, read_json
|
||||||
from tests.test_graphql.common import assert_empty
|
from tests.test_graphql.common import assert_empty
|
||||||
from tests.test_dkim import mock_no_dkim_file
|
from tests.test_dkim import no_dkim_file, dkim_file
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -338,7 +338,7 @@ def test_graphql_get_domain_no_dkim(
|
||||||
domain_file,
|
domain_file,
|
||||||
mock_get_ip4,
|
mock_get_ip4,
|
||||||
mock_get_ip6,
|
mock_get_ip6,
|
||||||
mock_no_dkim_file,
|
no_dkim_file,
|
||||||
turned_on,
|
turned_on,
|
||||||
):
|
):
|
||||||
"""Test no DKIM file situation gets properly handled"""
|
"""Test no DKIM file situation gets properly handled"""
|
||||||
|
|
Loading…
Reference in a new issue