Merge remote-tracking branch 'origin/system-rebuild-tracking' into system-rebuild-tracking

This commit is contained in:
Inex Code 2024-03-05 14:41:43 +03:00
commit c733cfeb9e
21 changed files with 257 additions and 224 deletions

View file

@ -25,6 +25,7 @@
pylsp-mypy
python-lsp-black
python-lsp-server
pyflakes
typer # for strawberry
] ++ strawberry-graphql.optional-dependencies.cli));

View file

@ -129,7 +129,7 @@ in
# TODO get URL from systemd template parameter?
ExecStartPre = ''
${nix} flake update \
--override-input selfprivacy-nixos-config git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=inex/test-systemd-rebuild
--override-input selfprivacy-nixos-config git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes
'';
ExecStart = ''
${nixos-rebuild} switch --flake .#${config-id}

View file

@ -14,6 +14,10 @@ def backup_job_type(service: Service) -> str:
return f"{job_type_prefix(service)}.backup"
def autobackup_job_type() -> str:
return "backups.autobackup"
def restore_job_type(service: Service) -> str:
return f"{job_type_prefix(service)}.restore"
@ -36,6 +40,17 @@ def is_something_running_for(service: Service) -> bool:
return len(running_jobs) != 0
def add_autobackup_job(services: List[Service]) -> Job:
service_names = [s.get_display_name() for s in services]
pretty_service_list: str = ", ".join(service_names)
job = Jobs.add(
type_id=autobackup_job_type(),
name="Automatic backup",
description=f"Scheduled backup for services: {pretty_service_list}",
)
return job
def add_backup_job(service: Service) -> Job:
if is_something_running_for(service):
message = (
@ -78,12 +93,14 @@ def get_job_by_type(type_id: str) -> Optional[Job]:
JobStatus.RUNNING,
]:
return job
return None
def get_failed_job_by_type(type_id: str) -> Optional[Job]:
for job in Jobs.get_jobs():
if job.type_id == type_id and job.status == JobStatus.ERROR:
return job
return None
def get_backup_job(service: Service) -> Optional[Job]:

View file

@ -12,9 +12,9 @@ from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.utils.huey import huey
from huey import crontab
from selfprivacy_api.services.service import Service
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.backup import Backups
from selfprivacy_api.backup.jobs import add_autobackup_job
from selfprivacy_api.jobs import Jobs, JobStatus, Job
@ -72,14 +72,42 @@ def restore_snapshot(
return True
def do_autobackup() -> None:
"""
Body of autobackup task, broken out to test it
For some reason, we cannot launch periodic huey tasks
inside tests
"""
time = datetime.utcnow().replace(tzinfo=timezone.utc)
services_to_back_up = Backups.services_to_back_up(time)
job = add_autobackup_job(services_to_back_up)
progress_per_service = 100 // len(services_to_back_up)
progress = 0
Jobs.update(job, JobStatus.RUNNING, progress=progress)
for service in services_to_back_up:
try:
Backups.back_up(service, BackupReason.AUTO)
except Exception as error:
Jobs.update(
job,
status=JobStatus.ERROR,
error=type(error).__name__ + ": " + str(error),
)
return
progress = progress + progress_per_service
Jobs.update(job, JobStatus.RUNNING, progress=progress)
Jobs.update(job, JobStatus.FINISHED)
@huey.periodic_task(validate_datetime=validate_datetime)
def automatic_backup():
def automatic_backup() -> None:
"""
The worker periodic task that starts the automatic backup process.
"""
time = datetime.utcnow().replace(tzinfo=timezone.utc)
for service in Backups.services_to_back_up(time):
start_backup(service, BackupReason.AUTO)
do_autobackup()
@huey.periodic_task(crontab(hour="*/" + str(SNAPSHOT_CACHE_TTL_HOURS)))

View file

@ -27,4 +27,4 @@ async def get_token_header(
def get_api_version() -> str:
"""Get API version"""
return "3.0.0"
return "3.0.1"

View file

@ -8,6 +8,7 @@ from selfprivacy_api.graphql.common_types.dns import DnsRecord
from selfprivacy_api.services import get_service_by_id, get_services_by_location
from selfprivacy_api.services import Service as ServiceInterface
from selfprivacy_api.utils.block_devices import BlockDevices
import selfprivacy_api.utils.network as network_utils
def get_usages(root: "StorageVolume") -> list["StorageUsageInterface"]:
@ -141,7 +142,9 @@ def service_to_graphql_service(service: ServiceInterface) -> Service:
priority=record.priority,
display_name=record.display_name,
)
for record in service.get_dns_records()
for record in service.get_dns_records(
network_utils.get_ip4(), network_utils.get_ip6()
)
],
)

View file

@ -56,14 +56,18 @@ def get_all_required_dns_records() -> list[ServiceDnsRecord]:
ttl=3600,
display_name="SelfPrivacy API",
),
ServiceDnsRecord(
type="AAAA",
name="api",
content=ip6,
ttl=3600,
display_name="SelfPrivacy API (IPv6)",
),
]
if ip6 is not None:
dns_records.append(
ServiceDnsRecord(
type="AAAA",
name="api",
content=ip6,
ttl=3600,
display_name="SelfPrivacy API (IPv6)",
)
)
for service in get_enabled_services():
dns_records += service.get_dns_records()
dns_records += service.get_dns_records(ip4, ip6)
return dns_records

View file

@ -1,15 +1,14 @@
"""Class representing Bitwarden service"""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON
@ -41,11 +40,15 @@ class Bitwarden(Service):
return "vaultwarden"
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://password.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "password"
@staticmethod
def is_movable() -> bool:
return True
@ -96,29 +99,9 @@ class Bitwarden(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/bitwarden", "/var/lib/bitwarden_rs"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
"""Return list of DNS records for Bitwarden service."""
return [
ServiceDnsRecord(
type="A",
name="password",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Bitwarden",
),
ServiceDnsRecord(
type="AAAA",
name="password",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Bitwarden (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.bitwarden.move",

View file

@ -1,15 +1,14 @@
"""Class representing Bitwarden service"""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.gitea.icon import GITEA_ICON
@ -37,11 +36,15 @@ class Gitea(Service):
return base64.b64encode(GITEA_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://git.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "git"
@staticmethod
def is_movable() -> bool:
return True
@ -91,28 +94,9 @@ class Gitea(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/gitea"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="git",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Gitea",
),
ServiceDnsRecord(
type="AAAA",
name="git",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Gitea (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.gitea.move",

View file

@ -1,16 +1,15 @@
"""Class representing Jitsi Meet service"""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job
from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.jitsimeet.icon import JITSI_ICON
@ -38,11 +37,15 @@ class JitsiMeet(Service):
return base64.b64encode(JITSI_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://meet.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "meet"
@staticmethod
def is_movable() -> bool:
return False
@ -98,29 +101,8 @@ class JitsiMeet(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/jitsi-meet"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
ip4 = network_utils.get_ip4()
ip6 = network_utils.get_ip6()
return [
ServiceDnsRecord(
type="A",
name="meet",
content=ip4,
ttl=3600,
display_name="Jitsi",
),
ServiceDnsRecord(
type="AAAA",
name="meet",
content=ip6,
ttl=3600,
display_name="Jitsi (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
raise NotImplementedError("jitsi-meet service is not movable")

View file

@ -2,7 +2,7 @@
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
@ -12,7 +12,6 @@ from selfprivacy_api.utils.systemd import (
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api import utils
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.mailserver.icon import MAILSERVER_ICON
@ -40,10 +39,14 @@ class MailServer(Service):
return "virtualMail"
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
return None
@staticmethod
def get_subdomain() -> Optional[str]:
return None
@staticmethod
def is_movable() -> bool:
return True
@ -102,20 +105,18 @@ class MailServer(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/vmail", "/var/sieve"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
@classmethod
def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]:
domain = utils.get_domain()
dkim_record = utils.get_dkim_key(domain)
ip4 = network_utils.get_ip4()
ip6 = network_utils.get_ip6()
if dkim_record is None:
return []
return [
dns_records = [
ServiceDnsRecord(
type="A",
name=domain,
@ -123,13 +124,6 @@ class MailServer(Service):
ttl=3600,
display_name="Root Domain",
),
ServiceDnsRecord(
type="AAAA",
name=domain,
content=ip6,
ttl=3600,
display_name="Root Domain (IPv6)",
),
ServiceDnsRecord(
type="MX",
name=domain,
@ -161,6 +155,18 @@ class MailServer(Service):
),
]
if ip6 is not None:
dns_records.append(
ServiceDnsRecord(
type="AAAA",
name=domain,
content=ip6,
ttl=3600,
display_name="Root Domain (IPv6)",
),
)
return dns_records
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.email.move",

View file

@ -1,14 +1,13 @@
"""Class representing Nextcloud service."""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON
@ -36,11 +35,15 @@ class Nextcloud(Service):
return base64.b64encode(NEXTCLOUD_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://cloud.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "cloud"
@staticmethod
def is_movable() -> bool:
return True
@ -96,28 +99,9 @@ class Nextcloud(Service):
return ""
@staticmethod
def get_folders() -> typing.List[str]:
def get_folders() -> List[str]:
return ["/var/lib/nextcloud"]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="cloud",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Nextcloud",
),
ServiceDnsRecord(
type="AAAA",
name="cloud",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Nextcloud (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.nextcloud.move",

View file

@ -4,10 +4,9 @@ import subprocess
import typing
from selfprivacy_api.jobs import Job
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.ocserv.icon import OCSERV_ICON
import selfprivacy_api.utils.network as network_utils
class Ocserv(Service):
@ -34,6 +33,10 @@ class Ocserv(Service):
"""Return service url."""
return None
@staticmethod
def get_subdomain() -> typing.Optional[str]:
return "vpn"
@staticmethod
def is_movable() -> bool:
return False
@ -78,25 +81,6 @@ class Ocserv(Service):
def get_logs():
return ""
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="vpn",
content=network_utils.get_ip4(),
ttl=3600,
display_name="OpenConnect VPN",
),
ServiceDnsRecord(
type="AAAA",
name="vpn",
content=network_utils.get_ip6(),
ttl=3600,
display_name="OpenConnect VPN (IPv6)",
),
]
@staticmethod
def get_folders() -> typing.List[str]:
return []

View file

@ -1,15 +1,14 @@
"""Class representing Nextcloud service."""
import base64
import subprocess
import typing
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON
@ -33,11 +32,15 @@ class Pleroma(Service):
return base64.b64encode(PLEROMA_ICON.encode("utf-8")).decode("utf-8")
@staticmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""Return service url."""
domain = get_domain()
return f"https://social.{domain}"
@staticmethod
def get_subdomain() -> Optional[str]:
return "social"
@staticmethod
def is_movable() -> bool:
return True
@ -82,7 +85,7 @@ class Pleroma(Service):
return ""
@staticmethod
def get_owned_folders() -> typing.List[OwnedPath]:
def get_owned_folders() -> List[OwnedPath]:
"""
Get a list of occupied directories with ownership info
pleroma has folders that are owned by different users
@ -100,25 +103,6 @@ class Pleroma(Service):
),
]
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
return [
ServiceDnsRecord(
type="A",
name="social",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Pleroma",
),
ServiceDnsRecord(
type="AAAA",
name="social",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Pleroma (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.pleroma.move",

View file

@ -1,7 +1,7 @@
"""Abstract class for a service running on a server"""
from abc import ABC, abstractmethod
from enum import Enum
import typing
from typing import List, Optional
from pydantic import BaseModel
from selfprivacy_api.jobs import Job
@ -12,7 +12,7 @@ from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api import utils
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils import ReadUserData, WriteUserData, get_domain
from selfprivacy_api.utils import ReadUserData, WriteUserData
DEFAULT_START_STOP_TIMEOUT = 5 * 60
@ -35,7 +35,7 @@ class ServiceDnsRecord(BaseModel):
content: str
ttl: int
display_name: str
priority: typing.Optional[int] = None
priority: Optional[int] = None
class Service(ABC):
@ -78,14 +78,22 @@ class Service(ABC):
@staticmethod
@abstractmethod
def get_url() -> typing.Optional[str]:
def get_url() -> Optional[str]:
"""
The url of the service if it is accessible from the internet browser.
"""
pass
@staticmethod
@abstractmethod
def get_subdomain() -> Optional[str]:
"""
The assigned primary subdomain for this service.
"""
pass
@classmethod
def get_user(cls) -> typing.Optional[str]:
def get_user(cls) -> Optional[str]:
"""
The user that owns the service's files.
Defaults to the service's id.
@ -93,7 +101,7 @@ class Service(ABC):
return cls.get_id()
@classmethod
def get_group(cls) -> typing.Optional[str]:
def get_group(cls) -> Optional[str]:
"""
The group that owns the service's files.
Defaults to the service's user.
@ -209,10 +217,32 @@ class Service(ABC):
storage_used += get_storage_usage(folder)
return storage_used
@staticmethod
@abstractmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
pass
@classmethod
def get_dns_records(cls, ip4: str, ip6: Optional[str]) -> List[ServiceDnsRecord]:
subdomain = cls.get_subdomain()
display_name = cls.get_display_name()
if subdomain is None:
return []
dns_records = [
ServiceDnsRecord(
type="A",
name=subdomain,
content=ip4,
ttl=3600,
display_name=display_name,
)
]
if ip6 is not None:
dns_records.append(
ServiceDnsRecord(
type="AAAA",
name=subdomain,
content=ip6,
ttl=3600,
display_name=f"{display_name} (IPv6)",
)
)
return dns_records
@classmethod
def get_drive(cls) -> str:
@ -237,7 +267,7 @@ class Service(ABC):
return root_device
@classmethod
def get_folders(cls) -> typing.List[str]:
def get_folders(cls) -> List[str]:
"""
get a plain list of occupied directories
Default extracts info from overriden get_owned_folders()
@ -249,7 +279,7 @@ class Service(ABC):
return [owned_folder.path for owned_folder in cls.get_owned_folders()]
@classmethod
def get_owned_folders(cls) -> typing.List[OwnedPath]:
def get_owned_folders(cls) -> List[OwnedPath]:
"""
Get a list of occupied directories with ownership info
Default extracts info from overriden get_folders()

View file

@ -65,6 +65,10 @@ class DummyService(Service):
domain = "test.com"
return f"https://password.{domain}"
@staticmethod
def get_subdomain() -> typing.Optional[str]:
return "password"
@classmethod
def is_movable(cls) -> bool:
return cls.movable
@ -185,26 +189,6 @@ class DummyService(Service):
def get_folders(cls) -> List[str]:
return cls.folders
@staticmethod
def get_dns_records() -> typing.List[ServiceDnsRecord]:
"""Return list of DNS records for Bitwarden service."""
return [
ServiceDnsRecord(
type="A",
name="password",
content=network_utils.get_ip4(),
ttl=3600,
display_name="Test Service",
),
ServiceDnsRecord(
type="AAAA",
name="password",
content=network_utils.get_ip6(),
ttl=3600,
display_name="Test Service (IPv6)",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id=f"services.{self.get_id()}.move",

View file

@ -2,6 +2,7 @@
"""Network utils"""
import subprocess
import re
import ipaddress
from typing import Optional
@ -17,13 +18,15 @@ def get_ip4() -> str:
return ip4.group(1) if ip4 else ""
def get_ip6() -> str:
def get_ip6() -> Optional[str]:
"""Get IPv6 address"""
try:
ip6 = subprocess.check_output(["ip", "addr", "show", "dev", "eth0"]).decode(
"utf-8"
)
ip6 = re.search(r"inet6 (\S+)\/\d+", ip6)
ip6_addresses = subprocess.check_output(
["ip", "addr", "show", "dev", "eth0"]
).decode("utf-8")
ip6_addresses = re.findall(r"inet6 (\S+)\/\d+", ip6_addresses)
for address in ip6_addresses:
if ipaddress.IPv6Address(address).is_global:
return address
except subprocess.CalledProcessError:
ip6 = None
return ip6.group(1) if ip6 else ""
return None

View file

@ -2,7 +2,7 @@ from setuptools import setup, find_packages
setup(
name="selfprivacy_api",
version="3.0.0",
version="3.0.1",
packages=find_packages(),
scripts=[
"selfprivacy_api/app.py",

View file

@ -14,9 +14,12 @@ from selfprivacy_api.graphql.common_types.backup import (
from selfprivacy_api.backup import Backups, Snapshot
from selfprivacy_api.backup.tasks import (
prune_autobackup_snapshots,
do_autobackup,
)
from selfprivacy_api.backup.jobs import autobackup_job_type
from tests.test_backup import backups
from tests.test_backup import backups, assert_job_finished
from tests.test_graphql.test_services import only_dummy_service
def backuppable_services() -> list[Service]:
@ -63,6 +66,32 @@ def test_set_autobackup_period(backups):
assert Backups.autobackup_period_minutes() is None
def test_autobackup_taskbody(backups, only_dummy_service):
# We cannot test the timed task itself, but we reduced it
# to one line, and we test this line here
dummy_service = only_dummy_service
now = datetime.now(timezone.utc)
backup_period = 13 # minutes
assert Backups.get_all_snapshots() == []
assert_job_finished(autobackup_job_type(), count=0)
Backups.set_autobackup_period_minutes(backup_period)
assert Backups.is_time_to_backup_service(dummy_service, now)
assert Backups.is_time_to_backup(now)
assert dummy_service in Backups.services_to_back_up(now)
assert len(Backups.services_to_back_up(now)) == 1
do_autobackup()
snapshots = Backups.get_all_snapshots()
assert len(snapshots) == 1
assert snapshots[0].service_name == dummy_service.get_id()
assert snapshots[0].reason == BackupReason.AUTO
assert_job_finished(autobackup_job_type(), count=1)
def test_autobackup_timer_periods(backups, dummy_service):
now = datetime.now(timezone.utc)
backup_period = 13 # minutes

View file

@ -8,6 +8,19 @@ import pytest
from selfprivacy_api.utils.network import get_ip4, get_ip6
OUTPUT_STRING = b"""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff
altname enp0s3
altname ens3
inet 157.90.247.192/32 brd 157.90.247.192 scope global dynamic eth0
valid_lft 46061sec preferred_lft 35261sec
inet6 fe80::9400:ff:fef1:34ae/64 scope link
valid_lft forever preferred_lft forever
inet6 2a01:4f8:c17:7e3d::2/64 scope global
valid_lft forever preferred_lft forever
"""
OUTPUT_STRING_WITOUT_IP6 = b"""
2: eth0: <BROADCAST,MULTICAST,UP,LOWER_UP> mtu 1500 qdisc fq_codel state UP group default qlen 1000
link/ether 96:00:00:f1:34:ae brd ff:ff:ff:ff:ff:ff
altname enp0s3
@ -31,6 +44,14 @@ def ip_process_mock(mocker):
return mock
@pytest.fixture
def ip_process_mock_without_ip6(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=OUTPUT_STRING_WITOUT_IP6
)
return mock
@pytest.fixture
def failed_ip_process_mock(mocker):
mock = mocker.patch(
@ -62,24 +83,29 @@ def test_get_ip4(ip_process_mock):
def test_get_ip6(ip_process_mock):
"""Test get IPv6 address"""
ip6 = get_ip6()
assert ip6 == "fe80::9400:ff:fef1:34ae"
assert ip6 == "2a01:4f8:c17:7e3d::2"
def test_failed_get_ip4(failed_ip_process_mock):
ip4 = get_ip4()
assert ip4 is ""
assert ip4 == ""
def test_failed_get_ip6(failed_ip_process_mock):
ip6 = get_ip6()
assert ip6 is ""
assert ip6 is None
def test_failed_get_ip6_when_none(ip_process_mock_without_ip6):
ip6 = get_ip6()
assert ip6 is None
def test_failed_subprocess_get_ip4(failed_subprocess_call):
ip4 = get_ip4()
assert ip4 is ""
assert ip4 == ""
def test_failed_subprocess_get_ip6(failed_subprocess_call):
ip6 = get_ip6()
assert ip6 is ""
assert ip6 is None

View file

@ -168,13 +168,14 @@ def test_enabling_disabling_writes_json(
# more detailed testing of this is in test_graphql/test_system.py
# Using the same random global IPs as the test_network_utils
def test_mailserver_with_dkim_returns_some_dns(dkim_file):
records = MailServer().get_dns_records()
records = MailServer().get_dns_records("157.90.247.192", "2a01:4f8:c17:7e3d::2")
assert len(records) > 0
def test_mailserver_with_no_dkim_returns_no_dns(no_dkim_file):
assert MailServer().get_dns_records() == []
assert MailServer().get_dns_records("157.90.247.192", "2a01:4f8:c17:7e3d::2") == []
def test_services_enabled_by_default(generic_userdata):