fix(dns-records): Fix DKIM parser

Previously API relied on client to parse DKIM DNS string, as it was a
separate endpoint. But now client blindly trusts the API, but parser
was not migrated over to the API.
This commit is contained in:
Inex Code 2023-01-16 18:41:54 +03:00
parent e7a49e170d
commit 4e7261c9c4
5 changed files with 40 additions and 7 deletions

View file

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str: def get_api_version() -> str:
"""Get API version""" """Get API version"""
return "2.1.0" return "2.1.1"

View file

@ -117,7 +117,7 @@ async def get_mailserver_dkim():
"""Get the DKIM record for the mailserver""" """Get the DKIM record for the mailserver"""
domain = get_domain() domain = get_domain()
dkim = get_dkim_key(domain) dkim = get_dkim_key(domain, parse=False)
if dkim is None: if dkim is None:
raise HTTPException(status_code=404, detail="DKIM record not found") raise HTTPException(status_code=404, detail="DKIM record not found")
dkim = base64.b64encode(dkim.encode("utf-8")).decode("utf-8") dkim = base64.b64encode(dkim.encode("utf-8")).decode("utf-8")

View file

@ -164,13 +164,25 @@ def parse_date(date_str: str) -> datetime.datetime:
raise ValueError("Invalid date string") raise ValueError("Invalid date string")
def get_dkim_key(domain): def get_dkim_key(domain, parse=True):
"""Get DKIM key from /var/dkim/<domain>.selector.txt""" """Get DKIM key from /var/dkim/<domain>.selector.txt"""
if os.path.exists("/var/dkim/" + domain + ".selector.txt"): if os.path.exists("/var/dkim/" + domain + ".selector.txt"):
cat_process = subprocess.Popen( cat_process = subprocess.Popen(
["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE ["cat", "/var/dkim/" + domain + ".selector.txt"], stdout=subprocess.PIPE
) )
dkim = cat_process.communicate()[0] dkim = cat_process.communicate()[0]
if parse:
# Extract key from file
dkim = dkim.split(b"(")[1]
dkim = dkim.split(b")")[0]
# Replace all quotes with nothing
dkim = dkim.replace(b'"', b"")
# Trim whitespace, remove newlines and tabs
dkim = dkim.strip()
dkim = dkim.replace(b"\n", b"")
dkim = dkim.replace(b"\t", b"")
# Remove all redundant spaces
dkim = b" ".join(dkim.split())
return str(dkim, "utf-8") return str(dkim, "utf-8")
return None return None

View file

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup( setup(
name="selfprivacy_api", name="selfprivacy_api",
version="2.1.0", version="2.1.1",
packages=find_packages(), packages=find_packages(),
scripts=[ scripts=[
"selfprivacy_api/app.py", "selfprivacy_api/app.py",

View file

@ -2,6 +2,8 @@ import base64
import json import json
import pytest import pytest
from selfprivacy_api.utils import get_dkim_key
############################################################################### ###############################################################################
@ -13,7 +15,10 @@ class ProcessMock:
self.kwargs = kwargs self.kwargs = kwargs
def communicate(): def communicate():
return (b"I am a DKIM key", None) return (
b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for example.com\n',
None,
)
class NoFileMock(ProcessMock): class NoFileMock(ProcessMock):
@ -63,11 +68,27 @@ def test_illegal_methods(authorized_client, mock_subproccess_popen):
assert response.status_code == 405 assert response.status_code == 405
def test_dkim_key(authorized_client, mock_subproccess_popen): def test_get_dkim_key(mock_subproccess_popen):
"""Test DKIM key""" """Test DKIM key"""
dkim_key = get_dkim_key("example.com")
assert (
dkim_key
== "v=DKIM1; k=rsa; p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB"
)
assert mock_subproccess_popen.call_args[0][0] == [
"cat",
"/var/dkim/example.com.selector.txt",
]
def test_dkim_key(authorized_client, mock_subproccess_popen):
"""Test old REST DKIM key endpoint"""
response = authorized_client.get("/services/mailserver/dkim") response = authorized_client.get("/services/mailserver/dkim")
assert response.status_code == 200 assert response.status_code == 200
assert base64.b64decode(response.text) == b"I am a DKIM key" assert (
base64.b64decode(response.text)
== b'selector._domainkey\tIN\tTXT\t( "v=DKIM1; k=rsa; "\n\t "p=MIGfMA0GCSqGSIb3DQEBAQUAA4GNADCBiQKBgQDNn/IhEz1SxgHxxxI8vlPYC2dNueiLe1GC4SYz8uHimC8SDkMvAwm7rqi2SimbFgGB5nccCNOqCkrIqJTCB9vufqBnVKAjshHqpOr5hk4JJ1T/AGQKWinstmDbfTLPYTbU8ijZrwwGeqQLlnXR5nSN0GB9GazheA9zaPsT6PV+aQIDAQAB" ) ; ----- DKIM key selector for example.com\n'
)
assert mock_subproccess_popen.call_args[0][0] == [ assert mock_subproccess_popen.call_args[0][0] == [
"cat", "cat",
"/var/dkim/example.com.selector.txt", "/var/dkim/example.com.selector.txt",