feat: add caa record (#149)

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/149
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
Co-authored-by: dettlaff <dettlaff@riseup.net>
Co-committed-by: dettlaff <dettlaff@riseup.net>
This commit is contained in:
dettlaff 2024-10-14 14:29:00 +03:00 committed by Inex Code
parent 11e020c0e1
commit 03d751e591
7 changed files with 412 additions and 3 deletions

View file

@ -19,6 +19,7 @@ class ServiceDnsRecord(BaseModel):
type: str
name: str
content: str
ttl: int
display_name: str
priority: Optional[int] = None

View file

@ -8,6 +8,7 @@ from os import makedirs
from os import listdir
from os.path import join
from shutil import copyfile, copytree, rmtree
from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.forgejo import Forgejo
from selfprivacy_api.services.jitsimeet import JitsiMeet
@ -25,7 +26,7 @@ import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.api_icon import API_ICON
from selfprivacy_api.utils import USERDATA_FILE, DKIM_DIR, SECRETS_FILE, get_domain
from selfprivacy_api.utils.block_devices import BlockDevices
from shutil import copyfile, copytree, rmtree
from selfprivacy_api.utils import read_account_uri
CONFIG_STASH_DIR = "/etc/selfprivacy/dump"
@ -61,8 +62,22 @@ class ServiceManager(Service):
def get_all_required_dns_records() -> list[ServiceDnsRecord]:
ip4 = network_utils.get_ip4()
ip6 = network_utils.get_ip6()
dns_records: list[ServiceDnsRecord] = []
try:
dns_records.append(
ServiceDnsRecord(
type="CAA",
name=get_domain(),
content=f'128 issue "letsencrypt.org;accounturi={read_account_uri()}"',
ttl=3600,
display_name="CAA record",
)
)
except Exception as e:
print(f"Error creating CAA: {e}")
for service in ServiceManager.get_enabled_services():
dns_records += service.get_dns_records(ip4, ip6)
return dns_records

View file

@ -7,6 +7,7 @@ import os
import subprocess
import portalocker
import typing
import glob
from traceback import format_tb as format_traceback
@ -20,6 +21,10 @@ USERDATA_FILE = "/etc/nixos/userdata.json"
SECRETS_FILE = "/etc/selfprivacy/secrets.json"
DKIM_DIR = "/var/dkim/"
ACCOUNT_PATH_PATTERN = (
"/var/lib/acme/.lego/accounts/*/acme-v02.api.letsencrypt.org/*/account.json"
)
class UserDataFiles(Enum):
"""Enum for userdata files"""
@ -234,3 +239,16 @@ def write_to_log(message):
def pretty_error(e: Exception) -> str:
traceback = "/r".join(format_traceback(e.__traceback__))
return type(e).__name__ + ": " + str(e) + ": " + traceback
def read_account_uri() -> str:
account_file = glob.glob(ACCOUNT_PATH_PATTERN)
if not account_file:
raise FileNotFoundError(
f"No account files found matching: {ACCOUNT_PATH_PATTERN}"
)
with open(account_file[0], "r") as file:
account_info = json.load(file)
return account_info["registration"]["uri"]

View file

@ -18,6 +18,41 @@ from tests.test_graphql.common import (
from tests.test_dkim import no_dkim_file, dkim_file
from tests.test_system import assert_provider
from unittest.mock import mock_open
@pytest.fixture
def account_file_mock(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.__init__.glob.glob",
return_value=[str(datadir / "account.json")],
)
return datadir
@pytest.fixture
def account_file_404(mocker, datadir):
mocker.patch("selfprivacy_api.utils.__init__.glob.glob", return_value=[])
return datadir
@pytest.fixture
def no_uri_account_file_mock(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.__init__.glob.glob",
return_value=[str(datadir / "no_uri_account.json")],
)
return datadir
@pytest.fixture
def blank_account_file_mock(mocker, datadir):
mocker.patch(
"selfprivacy_api.utils.__init__.glob.glob",
return_value=[str(datadir / "blank_file_account.json")],
)
return datadir
@pytest.fixture
def turned_on(mocker, datadir):
@ -272,7 +307,12 @@ def is_dns_record_in_array(records, dns_record) -> bool:
def test_graphql_get_domain(
authorized_client, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
authorized_client,
mock_get_ip4,
mock_get_ip6,
turned_on,
mock_dkim_key,
account_file_mock,
):
"""Test get domain"""
response = authorized_client.post(
@ -354,10 +394,24 @@ def test_graphql_get_domain(
ttl=18000,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="CAA",
content='128 issue "letsencrypt.org;accounturi=https://acme-v02.api.letsencrypt.org/acme/acct/234340396"',
ttl=3600,
),
)
def test_dns_records_no_duplicates(
authorized_client, mock_get_ip4, mock_get_ip6, turned_on, mock_dkim_key
authorized_client,
mock_get_ip4,
mock_get_ip6,
turned_on,
mock_dkim_key,
account_file_mock,
):
"""Check for duplicate DNS records"""
response = authorized_client.post(
@ -387,6 +441,7 @@ def test_graphql_get_domain_no_dkim(
mock_get_ip6,
no_dkim_file,
turned_on,
account_file_mock,
):
"""Test no DKIM file situation gets properly handled"""
response = authorized_client.post(
@ -403,6 +458,303 @@ def test_graphql_get_domain_no_dkim(
raise ValueError("unexpected record found:", record)
def test_graphql_get_domain_no_uri_account_file(
authorized_client,
mock_get_ip4,
mock_get_ip6,
turned_on,
mock_dkim_key,
no_uri_account_file_mock,
):
"""Test get domain"""
response = authorized_client.post(
"/graphql",
json={
"query": generate_system_query([API_GET_DOMAIN_INFO]),
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["system"]["domainInfo"]["domain"] == "test-domain.tld"
)
assert (
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
)
assert response.json()["data"]["system"]["domainInfo"]["provider"] == "CLOUDFLARE"
dns_records = response.json()["data"]["system"]["domainInfo"]["requiredDnsRecords"]
assert is_dns_record_in_array(dns_records, dns_record())
assert is_dns_record_in_array(dns_records, dns_record(record_type="AAAA"))
assert is_dns_record_in_array(dns_records, dns_record(name="api"))
assert is_dns_record_in_array(
dns_records, dns_record(name="api", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="cloud"))
assert is_dns_record_in_array(
dns_records, dns_record(name="cloud", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="git"))
assert is_dns_record_in_array(
dns_records, dns_record(name="git", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="meet"))
assert is_dns_record_in_array(
dns_records, dns_record(name="meet", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="password"))
assert is_dns_record_in_array(
dns_records, dns_record(name="password", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="social"))
assert is_dns_record_in_array(
dns_records, dns_record(name="social", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="vpn"))
assert is_dns_record_in_array(
dns_records, dns_record(name="vpn", record_type="AAAA")
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="MX",
content="test-domain.tld",
priority=10,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="_dmarc", record_type="TXT", content="v=DMARC1; p=none", ttl=18000
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="TXT",
content="v=spf1 a mx ip4:157.90.247.192 -all",
ttl=18000,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="selector._domainkey",
record_type="TXT",
content="I am a DKIM key",
ttl=18000,
),
)
assert not is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="CAA",
content='128 issue "letsencrypt.org;accounturi=https://acme-v02.api.letsencrypt.org/acme/acct/234340396"',
ttl=3600,
),
)
def test_graphql_get_domain_not_found_account_file(
authorized_client,
mock_get_ip4,
mock_get_ip6,
turned_on,
mock_dkim_key,
account_file_404,
):
"""Test get domain"""
response = authorized_client.post(
"/graphql",
json={
"query": generate_system_query([API_GET_DOMAIN_INFO]),
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["system"]["domainInfo"]["domain"] == "test-domain.tld"
)
assert (
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
)
assert response.json()["data"]["system"]["domainInfo"]["provider"] == "CLOUDFLARE"
dns_records = response.json()["data"]["system"]["domainInfo"]["requiredDnsRecords"]
assert is_dns_record_in_array(dns_records, dns_record())
assert is_dns_record_in_array(dns_records, dns_record(record_type="AAAA"))
assert is_dns_record_in_array(dns_records, dns_record(name="api"))
assert is_dns_record_in_array(
dns_records, dns_record(name="api", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="cloud"))
assert is_dns_record_in_array(
dns_records, dns_record(name="cloud", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="git"))
assert is_dns_record_in_array(
dns_records, dns_record(name="git", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="meet"))
assert is_dns_record_in_array(
dns_records, dns_record(name="meet", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="password"))
assert is_dns_record_in_array(
dns_records, dns_record(name="password", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="social"))
assert is_dns_record_in_array(
dns_records, dns_record(name="social", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="vpn"))
assert is_dns_record_in_array(
dns_records, dns_record(name="vpn", record_type="AAAA")
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="MX",
content="test-domain.tld",
priority=10,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="_dmarc", record_type="TXT", content="v=DMARC1; p=none", ttl=18000
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="TXT",
content="v=spf1 a mx ip4:157.90.247.192 -all",
ttl=18000,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="selector._domainkey",
record_type="TXT",
content="I am a DKIM key",
ttl=18000,
),
)
assert not is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="CAA",
content='128 issue "letsencrypt.org;accounturi=https://acme-v02.api.letsencrypt.org/acme/acct/234340396"',
ttl=3600,
),
)
def test_graphql_get_domain_black_account_file(
authorized_client,
mock_get_ip4,
mock_get_ip6,
turned_on,
mock_dkim_key,
blank_account_file_mock,
):
"""Test get domain"""
response = authorized_client.post(
"/graphql",
json={
"query": generate_system_query([API_GET_DOMAIN_INFO]),
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert (
response.json()["data"]["system"]["domainInfo"]["domain"] == "test-domain.tld"
)
assert (
response.json()["data"]["system"]["domainInfo"]["hostname"] == "test-instance"
)
assert response.json()["data"]["system"]["domainInfo"]["provider"] == "CLOUDFLARE"
dns_records = response.json()["data"]["system"]["domainInfo"]["requiredDnsRecords"]
assert is_dns_record_in_array(dns_records, dns_record())
assert is_dns_record_in_array(dns_records, dns_record(record_type="AAAA"))
assert is_dns_record_in_array(dns_records, dns_record(name="api"))
assert is_dns_record_in_array(
dns_records, dns_record(name="api", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="cloud"))
assert is_dns_record_in_array(
dns_records, dns_record(name="cloud", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="git"))
assert is_dns_record_in_array(
dns_records, dns_record(name="git", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="meet"))
assert is_dns_record_in_array(
dns_records, dns_record(name="meet", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="password"))
assert is_dns_record_in_array(
dns_records, dns_record(name="password", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="social"))
assert is_dns_record_in_array(
dns_records, dns_record(name="social", record_type="AAAA")
)
assert is_dns_record_in_array(dns_records, dns_record(name="vpn"))
assert is_dns_record_in_array(
dns_records, dns_record(name="vpn", record_type="AAAA")
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="MX",
content="test-domain.tld",
priority=10,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="_dmarc", record_type="TXT", content="v=DMARC1; p=none", ttl=18000
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="TXT",
content="v=spf1 a mx ip4:157.90.247.192 -all",
ttl=18000,
),
)
assert is_dns_record_in_array(
dns_records,
dns_record(
name="selector._domainkey",
record_type="TXT",
content="I am a DKIM key",
ttl=18000,
),
)
assert not is_dns_record_in_array(
dns_records,
dns_record(
name="test-domain.tld",
record_type="CAA",
content='128 issue "letsencrypt.org;accounturi=https://acme-v02.api.letsencrypt.org/acme/acct/234340396"',
ttl=3600,
),
)
API_GET_TIMEZONE = """
settings {
timezone

View file

@ -0,0 +1,12 @@
{
"email": "meowmeow@bloodwine.cyou",
"registration": {
"body": {
"status": "valid",
"contact": [
"mailto:meow@bloodwine.cyou"
]
},
"uri": "https://acme-v02.api.letsencrypt.org/acme/acct/234340396"
}
}

View file

@ -0,0 +1,11 @@
{
"email": "meowmeow@bloodwine.cyou",
"registration": {
"body": {
"status": "valid",
"contact": [
"mailto:meow@bloodwine.cyou"
]
}
}
}