Add GraphQL endpoints related to binds

This commit is contained in:
inexcode 2022-08-18 00:58:56 +04:00
parent 7fe51eb665
commit 87c036de7f
26 changed files with 1333 additions and 184 deletions

View file

@ -0,0 +1,49 @@
"""Jobs status"""
# pylint: disable=too-few-public-methods
import datetime
import typing
import strawberry
from selfprivacy_api.jobs import Job, Jobs
@strawberry.type
class ApiJob:
"""Job type for GraphQL."""
uid: str
name: str
description: str
status: str
status_text: typing.Optional[str]
progress: typing.Optional[int]
created_at: datetime.datetime
updated_at: datetime.datetime
finished_at: typing.Optional[datetime.datetime]
error: typing.Optional[str]
result: typing.Optional[str]
def job_to_api_job(job: Job) -> ApiJob:
"""Convert a Job from jobs controller to a GraphQL ApiJob."""
return ApiJob(
uid=str(job.uid),
name=job.name,
description=job.description,
status=job.status.name,
status_text=job.status_text,
progress=job.progress,
created_at=job.created_at,
updated_at=job.updated_at,
finished_at=job.finished_at,
error=job.error,
result=job.result,
)
def get_api_job_by_id(job_id: str) -> typing.Optional[ApiJob]:
"""Get a job for GraphQL by its ID."""
job = Jobs.get_instance().get_job(job_id)
if job is None:
return None
return job_to_api_job(job)

View file

@ -1,4 +1,7 @@
import strawberry import strawberry
import typing
from selfprivacy_api.graphql.common_types.jobs import ApiJob
@strawberry.interface @strawberry.interface
@ -11,3 +14,8 @@ class MutationReturnInterface:
@strawberry.type @strawberry.type
class GenericMutationReturn(MutationReturnInterface): class GenericMutationReturn(MutationReturnInterface):
pass pass
@strawberry.type
class GenericJobButationReturn(MutationReturnInterface):
job: typing.Optional[ApiJob] = None

View file

@ -0,0 +1,168 @@
"""Services mutations"""
# pylint: disable=too-few-public-methods
import typing
import strawberry
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobButationReturn,
GenericMutationReturn,
)
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.utils.block_devices import BlockDevices
@strawberry.type
class ServiceMutationReturn(GenericMutationReturn):
"""Service mutation return type."""
service: typing.Optional[Service] = None
@strawberry.input
class MoveServiceInput:
"""Move service input type."""
service_id: str
location: str
@strawberry.type
class ServiceJobMutationReturn(GenericJobButationReturn):
"""Service job mutation return type."""
service: typing.Optional[Service] = None
@strawberry.type
class ServicesMutations:
"""Services mutations."""
@strawberry.mutation
def enable_service(self, service_id: str) -> ServiceMutationReturn:
"""Enable service."""
service = get_service_by_id(service_id)
if service is None:
return ServiceMutationReturn(
success=False,
message="Service not found.",
code=404,
)
service.enable()
return ServiceMutationReturn(
success=True,
message="Service enabled.",
code=200,
service=service_to_graphql_service(service),
)
@strawberry.mutation
def disable_service(self, service_id: str) -> ServiceMutationReturn:
"""Disable service."""
service = get_service_by_id(service_id)
if service is None:
return ServiceMutationReturn(
success=False,
message="Service not found.",
code=404,
)
service.disable()
return ServiceMutationReturn(
success=True,
message="Service disabled.",
code=200,
service=service_to_graphql_service(service),
)
@strawberry.mutation
def stop_service(self, service_id: str) -> ServiceMutationReturn:
"""Stop service."""
service = get_service_by_id(service_id)
if service is None:
return ServiceMutationReturn(
success=False,
message="Service not found.",
code=404,
)
service.stop()
return ServiceMutationReturn(
success=True,
message="Service stopped.",
code=200,
service=service_to_graphql_service(service),
)
@strawberry.mutation
def start_service(self, service_id: str) -> ServiceMutationReturn:
"""Start service."""
service = get_service_by_id(service_id)
if service is None:
return ServiceMutationReturn(
success=False,
message="Service not found.",
code=404,
)
service.start()
return ServiceMutationReturn(
success=True,
message="Service started.",
code=200,
service=service_to_graphql_service(service),
)
@strawberry.mutation
def restart_service(self, service_id: str) -> ServiceMutationReturn:
"""Restart service."""
service = get_service_by_id(service_id)
if service is None:
return ServiceMutationReturn(
success=False,
message="Service not found.",
code=404,
)
service.restart()
return ServiceMutationReturn(
success=True,
message="Service restarted.",
code=200,
service=service_to_graphql_service(service),
)
@strawberry.mutation
def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn:
"""Move service."""
service = get_service_by_id(input.service_id)
if service is None:
return ServiceJobMutationReturn(
success=False,
message="Service not found.",
code=404,
)
if not service.is_movable():
return ServiceJobMutationReturn(
success=False,
message="Service is not movable.",
code=400,
service=service_to_graphql_service(service),
)
volume = BlockDevices().get_block_device(input.location)
if volume is None:
return ServiceJobMutationReturn(
success=False,
message="Volume not found.",
code=404,
service=service_to_graphql_service(service),
)
job = service.move_to_volume(volume)
return ServiceJobMutationReturn(
success=True,
message="Service moved.",
code=200,
service=service_to_graphql_service(service),
job=job_to_api_job(job),
)

View file

@ -1,10 +1,28 @@
"""Storage devices mutations""" """Storage devices mutations"""
import strawberry import strawberry
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.utils.block_devices import BlockDevices from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.graphql.mutations.mutation_interface import ( from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobButationReturn,
GenericMutationReturn, GenericMutationReturn,
) )
from selfprivacy_api.jobs.migrate_to_binds import (
BindMigrationConfig,
is_bind_migrated,
start_bind_migration,
)
@strawberry.input
class MigrateToBindsInput:
"""Migrate to binds input"""
email_block_device: str
bitwarden_block_device: str
gitea_block_device: str
nextcloud_block_device: str
pleroma_block_device: str
@strawberry.type @strawberry.type
@ -59,3 +77,25 @@ class StorageMutations:
return GenericMutationReturn( return GenericMutationReturn(
success=False, code=409, message="Volume not unmounted (already unmounted?)" success=False, code=409, message="Volume not unmounted (already unmounted?)"
) )
def migrate_to_binds(self, input: MigrateToBindsInput) -> GenericJobButationReturn:
"""Migrate to binds"""
if not is_bind_migrated():
return GenericJobButationReturn(
success=False, code=409, message="Already migrated to binds"
)
job = start_bind_migration(
BindMigrationConfig(
email_block_device=input.email_block_device,
bitwarden_block_device=input.bitwarden_block_device,
gitea_block_device=input.gitea_block_device,
nextcloud_block_device=input.nextcloud_block_device,
pleroma_block_device=input.pleroma_block_device,
)
)
return GenericJobButationReturn(
success=True,
code=200,
message="Migration to binds started, rebuild the system to apply changes",
job=job_to_api_job(job),
)

View file

@ -2,25 +2,15 @@
# pylint: disable=too-few-public-methods # pylint: disable=too-few-public-methods
import typing import typing
import strawberry import strawberry
import datetime from selfprivacy_api.graphql.common_types.jobs import (
ApiJob,
get_api_job_by_id,
job_to_api_job,
)
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Jobs
@strawberry.type
class ApiJob:
name: str
description: str
status: str
status_text: typing.Optional[str]
progress: typing.Optional[int]
created_at: datetime.datetime
updated_at: datetime.datetime
finished_at: typing.Optional[datetime.datetime]
error: typing.Optional[str]
result: typing.Optional[str]
@strawberry.type @strawberry.type
class Job: class Job:
@strawberry.field @strawberry.field
@ -28,18 +18,8 @@ class Job:
Jobs.get_instance().get_jobs() Jobs.get_instance().get_jobs()
return [ return [job_to_api_job(job) for job in Jobs.get_instance().get_jobs()]
ApiJob(
name=job.name, @strawberry.field
description=job.description, def get_job(self, job_id: str) -> typing.Optional[ApiJob]:
status=job.status.name, return get_api_job_by_id(job_id)
status_text=job.status_text,
progress=job.progress,
created_at=job.created_at,
updated_at=job.updated_at,
finished_at=job.finished_at,
error=job.error,
result=job.result,
)
for job in Jobs.get_instance().get_jobs()
]

View file

@ -13,6 +13,6 @@ from selfprivacy_api.services import get_all_services
@strawberry.type @strawberry.type
class Services: class Services:
@strawberry.field @strawberry.field
def all_services(self, info) -> typing.List[Service]: def all_services(self) -> typing.List[Service]:
services = get_all_services() services = get_all_services()
return [service_to_graphql_service(service) for service in services] return [service_to_graphql_service(service) for service in services]

View file

@ -7,6 +7,8 @@ from selfprivacy_api.graphql.common_types.dns import DnsRecord
from selfprivacy_api.graphql.queries.common import Alert, Severity from selfprivacy_api.graphql.queries.common import Alert, Severity
from selfprivacy_api.graphql.queries.providers import DnsProvider, ServerProvider from selfprivacy_api.graphql.queries.providers import DnsProvider, ServerProvider
from selfprivacy_api.jobs import Jobs
from selfprivacy_api.jobs.migrate_to_binds import is_bind_migrated
from selfprivacy_api.utils import ReadUserData from selfprivacy_api.utils import ReadUserData
import selfprivacy_api.actions.system as system_actions import selfprivacy_api.actions.system as system_actions
import selfprivacy_api.actions.ssh as ssh_actions import selfprivacy_api.actions.ssh as ssh_actions
@ -103,6 +105,11 @@ class SystemInfo:
system_version: str = strawberry.field(resolver=get_system_version) system_version: str = strawberry.field(resolver=get_system_version)
python_version: str = strawberry.field(resolver=get_python_version) python_version: str = strawberry.field(resolver=get_python_version)
@strawberry.field
def using_binds(self) -> bool:
"""Check if the system is using BINDs"""
return is_bind_migrated()
@strawberry.type @strawberry.type
class SystemProviderInfo: class SystemProviderInfo:
@ -135,7 +142,7 @@ class System:
settings: SystemSettings = SystemSettings() settings: SystemSettings = SystemSettings()
info: SystemInfo = SystemInfo() info: SystemInfo = SystemInfo()
provider: SystemProviderInfo = strawberry.field(resolver=get_system_provider_info) provider: SystemProviderInfo = strawberry.field(resolver=get_system_provider_info)
busy: bool = False busy: bool = Jobs.is_busy()
@strawberry.field @strawberry.field
def working_directory(self) -> str: def working_directory(self) -> str:

View file

@ -33,6 +33,7 @@ class JobStatus(Enum):
""" """
Status of a job. Status of a job.
""" """
CREATED = "CREATED" CREATED = "CREATED"
RUNNING = "RUNNING" RUNNING = "RUNNING"
FINISHED = "FINISHED" FINISHED = "FINISHED"
@ -43,7 +44,9 @@ class Job(BaseModel):
""" """
Job class. Job class.
""" """
uid: UUID = uuid.uuid4() uid: UUID = uuid.uuid4()
type_id: str
name: str name: str
description: str description: str
status: JobStatus status: JobStatus
@ -84,16 +87,18 @@ class Jobs:
else: else:
Jobs.__instance = self Jobs.__instance = self
def reset(self) -> None: @staticmethod
def reset() -> None:
""" """
Reset the jobs list. Reset the jobs list.
""" """
with WriteUserData(UserDataFiles.JOBS) as user_data: with WriteUserData(UserDataFiles.JOBS) as user_data:
user_data["jobs"] = [] user_data["jobs"] = []
@staticmethod
def add( def add(
self,
name: str, name: str,
type_id: str,
description: str, description: str,
status: JobStatus = JobStatus.CREATED, status: JobStatus = JobStatus.CREATED,
status_text: str = "", status_text: str = "",
@ -104,6 +109,7 @@ class Jobs:
""" """
job = Job( job = Job(
name=name, name=name,
type_id=type_id,
description=description, description=description,
status=status, status=status,
status_text=status_text, status_text=status_text,
@ -135,8 +141,8 @@ class Jobs:
del user_data["jobs"][i] del user_data["jobs"][i]
break break
@staticmethod
def update( def update(
self,
job: Job, job: Job,
status: JobStatus, status: JobStatus,
status_text: typing.Optional[str] = None, status_text: typing.Optional[str] = None,
@ -174,7 +180,8 @@ class Jobs:
return job return job
def get_job(self, id: str) -> typing.Optional[Job]: @staticmethod
def get_job(uid: str) -> typing.Optional[Job]:
""" """
Get a job from the jobs list. Get a job from the jobs list.
""" """
@ -182,11 +189,12 @@ class Jobs:
if "jobs" not in user_data: if "jobs" not in user_data:
user_data["jobs"] = [] user_data["jobs"] = []
for job in user_data["jobs"]: for job in user_data["jobs"]:
if job["uid"] == id: if job["uid"] == uid:
return Job(**job) return Job(**job)
return None return None
def get_jobs(self) -> typing.List[Job]: @staticmethod
def get_jobs() -> typing.List[Job]:
""" """
Get the jobs list. Get the jobs list.
""" """
@ -197,3 +205,16 @@ class Jobs:
return [Job(**job) for job in user_data["jobs"]] return [Job(**job) for job in user_data["jobs"]]
except json.decoder.JSONDecodeError: except json.decoder.JSONDecodeError:
return [] return []
@staticmethod
def is_busy() -> bool:
"""
Check if there is a job running.
"""
with ReadUserData(UserDataFiles.JOBS) as user_data:
if "jobs" not in user_data:
user_data["jobs"] = []
for job in user_data["jobs"]:
if job["status"] == JobStatus.RUNNING.value:
return True
return False

View file

@ -0,0 +1,285 @@
"""Function to perform migration of app data to binds."""
import subprocess
import psutil
import pathlib
import shutil
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.gitea import Gitea
from selfprivacy_api.services.mailserver import MailServer
from selfprivacy_api.services.nextcloud import Nextcloud
from selfprivacy_api.services.pleroma import Pleroma
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.utils.block_devices import BlockDevices
class BindMigrationConfig(BaseModel):
"""Config for bind migration.
For each service provide block device name.
"""
email_block_device: str
bitwarden_block_device: str
gitea_block_device: str
nextcloud_block_device: str
pleroma_block_device: str
def is_bind_migrated() -> bool:
"""Check if bind migration was performed."""
with ReadUserData() as user_data:
return user_data.get("useBinds", False)
def activate_binds(config: BindMigrationConfig):
"""Activate binds."""
# Activate binds in userdata
with WriteUserData() as user_data:
if "email" not in user_data:
user_data["email"] = {}
user_data["email"]["location"] = config.email_block_device
if "bitwarden" not in user_data:
user_data["bitwarden"] = {}
user_data["bitwarden"]["location"] = config.bitwarden_block_device
if "gitea" not in user_data:
user_data["gitea"] = {}
user_data["gitea"]["location"] = config.gitea_block_device
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["location"] = config.nextcloud_block_device
if "pleroma" not in user_data:
user_data["pleroma"] = {}
user_data["pleroma"]["location"] = config.pleroma_block_device
user_data["useBinds"] = True
def move_folder(
data_path: pathlib.Path, bind_path: pathlib.Path, user: str, group: str
):
"""Move folder from data to bind."""
if data_path.exists():
shutil.move(str(data_path), str(bind_path))
else:
return
data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
shutil.chown(str(bind_path), user=user, group=group)
shutil.chown(str(data_path), user=user, group=group)
subprocess.run(["mount", "--bind", str(bind_path), str(data_path)], check=True)
subprocess.run(["chown", "-R", f"{user}:{group}", str(data_path)], check=True)
@huey.task()
def migrate_to_binds(config: BindMigrationConfig, job: Job):
"""Migrate app data to binds."""
# Exit if migration is already done
if is_bind_migrated():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Migration already done.",
)
return
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=0,
status_text="Checking if all volumes are available.",
)
# Get block devices.
block_devices = BlockDevices().get_block_devices()
block_device_names = [device.name for device in block_devices]
# Get all unique required block devices
required_block_devices = []
for block_device_name in config.__dict__.values():
if block_device_name not in required_block_devices:
required_block_devices.append(block_device_name)
# Check if all block devices from config are present.
for block_device_name in required_block_devices:
if block_device_name not in block_device_names:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"Block device {block_device_name} not found.",
)
return
# Make sure all required block devices are mounted.
# sda1 is the root partition and is always mounted.
for block_device_name in required_block_devices:
if block_device_name == "sda1":
continue
block_device = BlockDevices().get_block_device(block_device_name)
if block_device is None:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"Block device {block_device_name} not found.",
)
return
if f"/volumes/{block_device_name}" not in block_device.mountpoints:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"Block device {block_device_name} not mounted.",
)
return
# Make sure /volumes/sda1 exists.
pathlib.Path("/volumes/sda1").mkdir(parents=True, exist_ok=True)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=5,
status_text="Activating binds in NixOS config.",
)
activate_binds(config)
# Perform migration of Nextcloud.
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=10,
status_text="Migrating Nextcloud.",
)
Nextcloud().stop()
move_folder(
data_path=pathlib.Path("/var/lib/nextcloud"),
bind_path=pathlib.Path(f"/volumes/{config.nextcloud_block_device}/nextcloud"),
user="nextcloud",
group="nextcloud",
)
# Start Nextcloud
Nextcloud().start()
# Perform migration of Bitwarden
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=28,
status_text="Migrating Bitwarden.",
)
Bitwarden().stop()
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden"),
bind_path=pathlib.Path(f"/volumes/{config.bitwarden_block_device}/bitwarden"),
user="vaultwarden",
group="vaultwarden",
)
move_folder(
data_path=pathlib.Path("/var/lib/bitwarden_rs"),
bind_path=pathlib.Path(
f"/volumes/{config.bitwarden_block_device}/bitwarden_rs"
),
user="vaultwarden",
group="vaultwarden",
)
# Start Bitwarden
Bitwarden().start()
# Perform migration of Gitea
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=46,
status_text="Migrating Gitea.",
)
Gitea().stop()
move_folder(
data_path=pathlib.Path("/var/lib/gitea"),
bind_path=pathlib.Path(f"/volumes/{config.gitea_block_device}/gitea"),
user="gitea",
group="gitea",
)
Gitea().start()
# Perform migration of Mail server
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=64,
status_text="Migrating Mail server.",
)
MailServer().stop()
move_folder(
data_path=pathlib.Path("/var/vmail"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/vmail"),
user="virtualMail",
group="virtualMail",
)
move_folder(
data_path=pathlib.Path("/var/sieve"),
bind_path=pathlib.Path(f"/volumes/{config.email_block_device}/sieve"),
user="virtualMail",
group="virtualMail",
)
MailServer().start()
# Perform migration of Pleroma
Jobs.update(
job=job,
status=JobStatus.RUNNING,
progress=82,
status_text="Migrating Pleroma.",
)
Pleroma().stop()
move_folder(
data_path=pathlib.Path("/var/lib/pleroma"),
bind_path=pathlib.Path(f"/volumes/{config.pleroma_block_device}/pleroma"),
user="pleroma",
group="pleroma",
)
Pleroma().start()
Jobs.update(
job=job,
status=JobStatus.FINISHED,
progress=100,
status_text="Migration finished.",
result="Migration finished.",
)
def start_bind_migration(config: BindMigrationConfig) -> Job:
"""Start migration."""
job = Jobs.add(
type_id="migrations.migrate_to_binds",
name="Migrate to binds",
description="Migration required to use the new disk space management.",
)
migrate_to_binds(config, job)
return job

View file

@ -6,6 +6,7 @@ from selfprivacy_api.jobs import JobStatus, Jobs
@huey.task() @huey.task()
def test_job(): def test_job():
job = Jobs.get_instance().add( job = Jobs.get_instance().add(
type_id="test",
name="Test job", name="Test job",
description="This is a test job.", description="This is a test job.",
status=JobStatus.CREATED, status=JobStatus.CREATED,

View file

@ -137,9 +137,10 @@ class Bitwarden(Service):
), ),
] ]
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.get_instance().add( job = Jobs.get_instance().add(
name="services.bitwarden.move", type_id="services.bitwarden.move",
name="Move Bitwarden",
description=f"Moving Bitwarden data to {volume.name}", description=f"Moving Bitwarden data to {volume.name}",
) )
@ -155,7 +156,7 @@ class Bitwarden(Service):
owner="vaultwarden", owner="vaultwarden",
), ),
FolderMoveNames( FolderMoveNames(
name="bitwarden", name="bitwarden_rs",
bind_location="/var/lib/bitwarden_rs", bind_location="/var/lib/bitwarden_rs",
group="vaultwarden", group="vaultwarden",
owner="vaultwarden", owner="vaultwarden",

View file

@ -3,7 +3,7 @@ import base64
import subprocess import subprocess
import typing import typing
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.generic_status_getter import get_service_status from selfprivacy_api.services.generic_status_getter import get_service_status
@ -134,9 +134,10 @@ class Gitea(Service):
), ),
] ]
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.get_instance().add( job = Jobs.get_instance().add(
name="services.gitea.move", type_id="services.gitea.move",
name="Move Gitea",
description=f"Moving Gitea data to {volume.name}", description=f"Moving Gitea data to {volume.name}",
) )

View file

@ -145,9 +145,10 @@ class MailServer(Service):
), ),
] ]
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.get_instance().add( job = Jobs.get_instance().add(
name="services.mailserver.move", type_id="services.mailserver.move",
name="Move Mail Server",
description=f"Moving mailserver data to {volume.name}", description=f"Moving mailserver data to {volume.name}",
) )

View file

@ -2,7 +2,7 @@
import base64 import base64
import subprocess import subprocess
import typing import typing
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.generic_status_getter import get_service_status from selfprivacy_api.services.generic_status_getter import get_service_status
@ -142,9 +142,10 @@ class Nextcloud(Service):
), ),
] ]
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.get_instance().add( job = Jobs.get_instance().add(
name="services.nextcloud.move", type_id="services.nextcloud.move",
name="Move Nextcloud",
description=f"Moving Nextcloud to volume {volume.name}", description=f"Moving Nextcloud to volume {volume.name}",
) )
move_service( move_service(

View file

@ -2,7 +2,7 @@
import base64 import base64
import subprocess import subprocess
import typing import typing
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.generic_status_getter import get_service_status from selfprivacy_api.services.generic_status_getter import get_service_status
@ -104,5 +104,5 @@ class Ocserv(Service):
def get_storage_usage() -> int: def get_storage_usage() -> int:
return 0 return 0
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
raise NotImplementedError("ocserv service is not movable") raise NotImplementedError("ocserv service is not movable")

View file

@ -2,7 +2,7 @@
import base64 import base64
import subprocess import subprocess
import typing import typing
from selfprivacy_api.jobs import Jobs from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.generic_status_getter import get_service_status from selfprivacy_api.services.generic_status_getter import get_service_status
@ -122,9 +122,10 @@ class Pleroma(Service):
), ),
] ]
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.get_instance().add( job = Jobs.get_instance().add(
name="services.pleroma.move", type_id="services.pleroma.move",
name="Move Pleroma",
description=f"Moving Pleroma to volume {volume.name}", description=f"Moving Pleroma to volume {volume.name}",
) )
move_service( move_service(

View file

@ -4,6 +4,7 @@ from enum import Enum
import typing import typing
from pydantic import BaseModel from pydantic import BaseModel
from selfprivacy_api.jobs import Job
from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.utils.block_devices import BlockDevice
@ -133,5 +134,5 @@ class Service(ABC):
pass pass
@abstractmethod @abstractmethod
def move_to_volume(self, volume: BlockDevice): def move_to_volume(self, volume: BlockDevice) -> Job:
pass pass

View file

@ -17,12 +17,12 @@ def get_block_device(device_name):
"-b", "-b",
"-o", "-o",
"NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE", "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE",
device_name, f"/dev/{device_name}",
] ]
) )
lsblk_output = lsblk_output.decode("utf-8") lsblk_output = lsblk_output.decode("utf-8")
lsblk_output = json.loads(lsblk_output) lsblk_output = json.loads(lsblk_output)
return lsblk_output["blockdevices"] return lsblk_output["blockdevices"][0]
def resize_block_device(block_device) -> bool: def resize_block_device(block_device) -> bool:
@ -30,9 +30,11 @@ def resize_block_device(block_device) -> bool:
Resize a block device. Return True if successful. Resize a block device. Return True if successful.
""" """
resize_command = ["resize2fs", block_device] resize_command = ["resize2fs", block_device]
resize_process = subprocess.Popen(resize_command, shell=False) try:
resize_process.communicate() subprocess.check_output(resize_command, shell=False)
return resize_process.returncode == 0 except subprocess.CalledProcessError:
return False
return True
class BlockDevice: class BlockDevice:
@ -43,14 +45,14 @@ class BlockDevice:
def __init__(self, block_device): def __init__(self, block_device):
self.name = block_device["name"] self.name = block_device["name"]
self.path = block_device["path"] self.path = block_device["path"]
self.fsavail = block_device["fsavail"] self.fsavail = str(block_device["fsavail"])
self.fssize = block_device["fssize"] self.fssize = str(block_device["fssize"])
self.fstype = block_device["fstype"] self.fstype = block_device["fstype"]
self.fsused = block_device["fsused"] self.fsused = str(block_device["fsused"])
self.mountpoints = block_device["mountpoints"] self.mountpoints = block_device["mountpoints"]
self.label = block_device["label"] self.label = block_device["label"]
self.uuid = block_device["uuid"] self.uuid = block_device["uuid"]
self.size = block_device["size"] self.size = str(block_device["size"])
self.model = block_device["model"] self.model = block_device["model"]
self.serial = block_device["serial"] self.serial = block_device["serial"]
self.type = block_device["type"] self.type = block_device["type"]
@ -73,14 +75,14 @@ class BlockDevice:
Update current data and return a dictionary of stats. Update current data and return a dictionary of stats.
""" """
device = get_block_device(self.name) device = get_block_device(self.name)
self.fsavail = device["fsavail"] self.fsavail = str(device["fsavail"])
self.fssize = device["fssize"] self.fssize = str(device["fssize"])
self.fstype = device["fstype"] self.fstype = device["fstype"]
self.fsused = device["fsused"] self.fsused = str(device["fsused"])
self.mountpoints = device["mountpoints"] self.mountpoints = device["mountpoints"]
self.label = device["label"] self.label = device["label"]
self.uuid = device["uuid"] self.uuid = device["uuid"]
self.size = device["size"] self.size = str(device["size"])
self.model = device["model"] self.model = device["model"]
self.serial = device["serial"] self.serial = device["serial"]
self.type = device["type"] self.type = device["type"]

View file

@ -1,112 +0,0 @@
"""Function to perform migration of app data to binds."""
import subprocess
import psutil
import pathlib
import shutil
from selfprivacy_api.services.nextcloud import Nextcloud
from selfprivacy_api.utils import WriteUserData
from selfprivacy_api.utils.block_devices import BlockDevices
class BindMigrationConfig:
"""Config for bind migration.
For each service provide block device name.
"""
email_block_device: str
bitwarden_block_device: str
gitea_block_device: str
nextcloud_block_device: str
pleroma_block_device: str
def migrate_to_binds(config: BindMigrationConfig):
"""Migrate app data to binds."""
# Get block devices.
block_devices = BlockDevices().get_block_devices()
block_device_names = [device.name for device in block_devices]
# Get all unique required block devices
required_block_devices = []
for block_device_name in config.__dict__.values():
if block_device_name not in required_block_devices:
required_block_devices.append(block_device_name)
# Check if all block devices from config are present.
for block_device_name in required_block_devices:
if block_device_name not in block_device_names:
raise Exception(f"Block device {block_device_name} is not present.")
# Make sure all required block devices are mounted.
# sda1 is the root partition and is always mounted.
for block_device_name in required_block_devices:
if block_device_name == "sda1":
continue
block_device = BlockDevices().get_block_device(block_device_name)
if block_device is None:
raise Exception(f"Block device {block_device_name} is not present.")
if f"/volumes/{block_device_name}" not in block_device.mountpoints:
raise Exception(f"Block device {block_device_name} is not mounted.")
# Activate binds in userdata
with WriteUserData() as user_data:
if "email" not in user_data:
user_data["email"] = {}
user_data["email"]["block_device"] = config.email_block_device
if "bitwarden" not in user_data:
user_data["bitwarden"] = {}
user_data["bitwarden"]["block_device"] = config.bitwarden_block_device
if "gitea" not in user_data:
user_data["gitea"] = {}
user_data["gitea"]["block_device"] = config.gitea_block_device
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["block_device"] = config.nextcloud_block_device
if "pleroma" not in user_data:
user_data["pleroma"] = {}
user_data["pleroma"]["block_device"] = config.pleroma_block_device
user_data["useBinds"] = True
# Make sure /volumes/sda1 exists.
pathlib.Path("/volumes/sda1").mkdir(parents=True, exist_ok=True)
# Perform migration of Nextcloud.
# Data is moved from /var/lib/nextcloud to /volumes/<block_device_name>/nextcloud.
# /var/lib/nextcloud is removed and /volumes/<block_device_name>/nextcloud is mounted as bind mount.
# Turn off Nextcloud
Nextcloud().stop()
# Move data from /var/lib/nextcloud to /volumes/<block_device_name>/nextcloud.
# /var/lib/nextcloud is removed and /volumes/<block_device_name>/nextcloud is mounted as bind mount.
nextcloud_data_path = pathlib.Path("/var/lib/nextcloud")
nextcloud_bind_path = pathlib.Path(
f"/volumes/{config.nextcloud_block_device}/nextcloud"
)
if nextcloud_data_path.exists():
shutil.move(str(nextcloud_data_path), str(nextcloud_bind_path))
else:
raise Exception("Nextcloud data path does not exist.")
# Make sure folder /var/lib/nextcloud exists.
nextcloud_data_path.mkdir(mode=0o750, parents=True, exist_ok=True)
# Make sure this folder is owned by user nextcloud and group nextcloud.
shutil.chown(nextcloud_bind_path, user="nextcloud", group="nextcloud")
shutil.chown(nextcloud_data_path, user="nextcloud", group="nextcloud")
# Mount nextcloud bind mount.
subprocess.run(
["mount", "--bind", str(nextcloud_bind_path), str(nextcloud_data_path)],
check=True,
)
# Recursively chown all files in nextcloud bind mount.
subprocess.run(
["chown", "-R", "nextcloud:nextcloud", str(nextcloud_data_path)], check=True
)
# Start Nextcloud
Nextcloud().start()

View file

@ -13,14 +13,14 @@ def tokens_file(mocker, shared_datadir):
) )
return mock return mock
@pytest.fixture @pytest.fixture
def jobs_file(mocker, shared_datadir): def jobs_file(mocker, shared_datadir):
"""Mock tokens file.""" """Mock tokens file."""
mock = mocker.patch( mock = mocker.patch("selfprivacy_api.utils.JOBS_FILE", shared_datadir / "jobs.json")
"selfprivacy_api.utils.JOBS_FILE", shared_datadir / "jobs.json"
)
return mock return mock
@pytest.fixture @pytest.fixture
def huey_database(mocker, shared_datadir): def huey_database(mocker, shared_datadir):
"""Mock huey database.""" """Mock huey database."""

View file

@ -0,0 +1,484 @@
#!/usr/bin/env python3
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
import json
import subprocess
import pytest
from selfprivacy_api.utils.block_devices import (
BlockDevice,
BlockDevices,
get_block_device,
resize_block_device,
)
from tests.common import read_json
SINGLE_LSBLK_OUTPUT = b"""
{
"blockdevices": [
{
"name": "sda1",
"path": "/dev/sda1",
"fsavail": "4614107136",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14345314304",
"mountpoints": [
"/nix/store", "/"
],
"label": null,
"uuid": "ec80c004-baec-4a2c-851d-0e1807135511",
"size": 20210236928,
"model": null,
"serial": null,
"type": "part"
}
]
}
"""
@pytest.fixture
def lsblk_singular_mock(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=SINGLE_LSBLK_OUTPUT
)
return mock
@pytest.fixture
def failed_check_output_mock(mocker):
mock = mocker.patch(
"subprocess.check_output",
autospec=True,
side_effect=subprocess.CalledProcessError(
returncode=1, cmd=["some", "command"]
),
)
return mock
@pytest.fixture
def only_root_in_userdata(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "only_root.json")
assert read_json(datadir / "only_root.json")["volumes"][0]["device"] == "/dev/sda1"
assert (
read_json(datadir / "only_root.json")["volumes"][0]["mountPoint"]
== "/volumes/sda1"
)
assert read_json(datadir / "only_root.json")["volumes"][0]["filesystem"] == "ext4"
return datadir
@pytest.fixture
def no_devices_in_userdata(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_devices.json")
assert read_json(datadir / "no_devices.json")["volumes"] == []
return datadir
@pytest.fixture
def undefined_devices_in_userdata(mocker, datadir):
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
assert "volumes" not in read_json(datadir / "undefined.json")
return datadir
def test_create_block_device_object(lsblk_singular_mock):
output = get_block_device("sda1")
assert lsblk_singular_mock.call_count == 1
assert lsblk_singular_mock.call_args[0][0] == [
"lsblk",
"-J",
"-b",
"-o",
"NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE",
"/dev/sda1",
]
assert output == json.loads(SINGLE_LSBLK_OUTPUT)["blockdevices"][0]
def test_resize_block_device(lsblk_singular_mock):
result = resize_block_device("sdb")
assert result is True
assert lsblk_singular_mock.call_count == 1
assert lsblk_singular_mock.call_args[0][0] == [
"resize2fs",
"sdb",
]
def test_resize_block_device_failed(failed_check_output_mock):
result = resize_block_device("sdb")
assert result is False
assert failed_check_output_mock.call_count == 1
assert failed_check_output_mock.call_args[0][0] == [
"resize2fs",
"sdb",
]
VOLUME_LSBLK_OUTPUT = b"""
{
"blockdevices": [
{
"name": "sdb",
"path": "/dev/sdb",
"fsavail": "11888545792",
"fssize": "12573614080",
"fstype": "ext4",
"fsused": "24047616",
"mountpoints": [
"/volumes/sdb"
],
"label": null,
"uuid": "fa9d0026-ee23-4047-b8b1-297ae16fa751",
"size": 12884901888,
"model": "Volume",
"serial": "21378102",
"type": "disk"
}
]
}
"""
def test_create_block_device(lsblk_singular_mock):
block_device = BlockDevice(json.loads(VOLUME_LSBLK_OUTPUT)["blockdevices"][0])
assert block_device.name == "sdb"
assert block_device.path == "/dev/sdb"
assert block_device.fsavail == "11888545792"
assert block_device.fssize == "12573614080"
assert block_device.fstype == "ext4"
assert block_device.fsused == "24047616"
assert block_device.mountpoints == ["/volumes/sdb"]
assert block_device.label is None
assert block_device.uuid == "fa9d0026-ee23-4047-b8b1-297ae16fa751"
assert block_device.size == "12884901888"
assert block_device.model == "Volume"
assert block_device.serial == "21378102"
assert block_device.type == "disk"
assert block_device.locked is False
assert str(block_device) == "sdb"
assert (
repr(block_device)
== "<BlockDevice sdb of size 12884901888 mounted at ['/volumes/sdb']>"
)
assert hash(block_device) == hash("sdb")
def test_block_devices_equal(lsblk_singular_mock):
block_device = BlockDevice(json.loads(VOLUME_LSBLK_OUTPUT)["blockdevices"][0])
block_device2 = BlockDevice(json.loads(VOLUME_LSBLK_OUTPUT)["blockdevices"][0])
assert block_device == block_device2
@pytest.fixture
def resize_block_mock(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.resize_block_device",
autospec=True,
return_value=True,
)
return mock
def test_call_resize_from_block_device(lsblk_singular_mock, resize_block_mock):
block_device = BlockDevice(json.loads(VOLUME_LSBLK_OUTPUT)["blockdevices"][0])
block_device.resize()
assert resize_block_mock.call_count == 1
assert resize_block_mock.call_args[0][0] == "/dev/sdb"
assert lsblk_singular_mock.call_count == 0
def test_get_stats_from_block_device(lsblk_singular_mock):
block_device = BlockDevice(json.loads(SINGLE_LSBLK_OUTPUT)["blockdevices"][0])
stats = block_device.stats()
assert stats == {
"name": "sda1",
"path": "/dev/sda1",
"fsavail": "4614107136",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14345314304",
"mountpoints": ["/nix/store", "/"],
"label": None,
"uuid": "ec80c004-baec-4a2c-851d-0e1807135511",
"size": "20210236928",
"model": None,
"serial": None,
"type": "part",
}
assert lsblk_singular_mock.call_count == 1
assert lsblk_singular_mock.call_args[0][0] == [
"lsblk",
"-J",
"-b",
"-o",
"NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE,MODEL,SERIAL,TYPE",
"/dev/sda1",
]
def test_mount_block_device(lsblk_singular_mock, only_root_in_userdata):
block_device = BlockDevice(json.loads(SINGLE_LSBLK_OUTPUT)["blockdevices"][0])
result = block_device.mount()
assert result is False
volume = BlockDevice(json.loads(VOLUME_LSBLK_OUTPUT)["blockdevices"][0])
result = volume.mount()
assert result is True
assert (
read_json(only_root_in_userdata / "only_root.json")["volumes"][1]["device"]
== "/dev/sdb"
)
assert (
read_json(only_root_in_userdata / "only_root.json")["volumes"][1]["mountPoint"]
== "/volumes/sdb"
)
assert (
read_json(only_root_in_userdata / "only_root.json")["volumes"][1]["fsType"]
== "ext4"
)
def test_mount_block_device_when_undefined(
lsblk_singular_mock, undefined_devices_in_userdata
):
block_device = BlockDevice(json.loads(SINGLE_LSBLK_OUTPUT)["blockdevices"][0])
result = block_device.mount()
assert result is True
assert (
read_json(undefined_devices_in_userdata / "undefined.json")["volumes"][0][
"device"
]
== "/dev/sda1"
)
assert (
read_json(undefined_devices_in_userdata / "undefined.json")["volumes"][0][
"mountPoint"
]
== "/volumes/sda1"
)
assert (
read_json(undefined_devices_in_userdata / "undefined.json")["volumes"][0][
"fsType"
]
== "ext4"
)
def test_unmount_block_device(lsblk_singular_mock, only_root_in_userdata):
block_device = BlockDevice(json.loads(SINGLE_LSBLK_OUTPUT)["blockdevices"][0])
result = block_device.unmount()
assert result is True
volume = BlockDevice(json.loads(VOLUME_LSBLK_OUTPUT)["blockdevices"][0])
result = volume.unmount()
assert result is False
assert len(read_json(only_root_in_userdata / "only_root.json")["volumes"]) == 0
def test_unmount_block_device_when_undefined(
lsblk_singular_mock, undefined_devices_in_userdata
):
block_device = BlockDevice(json.loads(SINGLE_LSBLK_OUTPUT)["blockdevices"][0])
result = block_device.unmount()
assert result is False
assert (
len(read_json(undefined_devices_in_userdata / "undefined.json")["volumes"]) == 0
)
FULL_LSBLK_OUTPUT = b"""
{
"blockdevices": [
{
"name": "sda",
"path": "/dev/sda",
"fsavail": null,
"fssize": null,
"fstype": null,
"fsused": null,
"mountpoints": [
null
],
"label": null,
"uuid": null,
"size": 20480786432,
"model": "QEMU HARDDISK",
"serial": "drive-scsi0-0-0-0",
"type": "disk",
"children": [
{
"name": "sda1",
"path": "/dev/sda1",
"fsavail": "4605702144",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14353719296",
"mountpoints": [
"/nix/store", "/"
],
"label": null,
"uuid": "ec80c004-baec-4a2c-851d-0e1807135511",
"size": 20210236928,
"model": null,
"serial": null,
"type": "part"
},{
"name": "sda14",
"path": "/dev/sda14",
"fsavail": null,
"fssize": null,
"fstype": null,
"fsused": null,
"mountpoints": [
null
],
"label": null,
"uuid": null,
"size": 1048576,
"model": null,
"serial": null,
"type": "part"
},{
"name": "sda15",
"path": "/dev/sda15",
"fsavail": null,
"fssize": null,
"fstype": "vfat",
"fsused": null,
"mountpoints": [
null
],
"label": null,
"uuid": "6B29-5BA7",
"size": 268435456,
"model": null,
"serial": null,
"type": "part"
}
]
},{
"name": "sdb",
"path": "/dev/sdb",
"fsavail": "11888545792",
"fssize": "12573614080",
"fstype": "ext4",
"fsused": "24047616",
"mountpoints": [
"/volumes/sdb"
],
"label": null,
"uuid": "fa9d0026-ee23-4047-b8b1-297ae16fa751",
"size": 12884901888,
"model": "Volume",
"serial": "21378102",
"type": "disk"
},{
"name": "sr0",
"path": "/dev/sr0",
"fsavail": null,
"fssize": null,
"fstype": null,
"fsused": null,
"mountpoints": [
null
],
"label": null,
"uuid": null,
"size": 1073741312,
"model": "QEMU DVD-ROM",
"serial": "QM00003",
"type": "rom"
}
]
}
"""
@pytest.fixture
def lsblk_full_mock(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=FULL_LSBLK_OUTPUT
)
return mock
def test_get_block_devices(lsblk_full_mock):
block_devices = BlockDevices().get_block_devices()
assert len(block_devices) == 2
assert block_devices[0].name == "sda1"
assert block_devices[0].path == "/dev/sda1"
assert block_devices[0].fsavail == "4605702144"
assert block_devices[0].fssize == "19814920192"
assert block_devices[0].fstype == "ext4"
assert block_devices[0].fsused == "14353719296"
assert block_devices[0].mountpoints == ["/nix/store", "/"]
assert block_devices[0].label is None
assert block_devices[0].uuid == "ec80c004-baec-4a2c-851d-0e1807135511"
assert block_devices[0].size == "20210236928"
assert block_devices[0].model is None
assert block_devices[0].serial is None
assert block_devices[0].type == "part"
assert block_devices[1].name == "sdb"
assert block_devices[1].path == "/dev/sdb"
assert block_devices[1].fsavail == "11888545792"
assert block_devices[1].fssize == "12573614080"
assert block_devices[1].fstype == "ext4"
assert block_devices[1].fsused == "24047616"
assert block_devices[1].mountpoints == ["/volumes/sdb"]
assert block_devices[1].label is None
assert block_devices[1].uuid == "fa9d0026-ee23-4047-b8b1-297ae16fa751"
assert block_devices[1].size == "12884901888"
assert block_devices[1].model == "Volume"
assert block_devices[1].serial == "21378102"
assert block_devices[1].type == "disk"
def test_get_block_device(lsblk_full_mock):
block_device = BlockDevices().get_block_device("sda1")
assert block_device is not None
assert block_device.name == "sda1"
assert block_device.path == "/dev/sda1"
assert block_device.fsavail == "4605702144"
assert block_device.fssize == "19814920192"
assert block_device.fstype == "ext4"
assert block_device.fsused == "14353719296"
assert block_device.mountpoints == ["/nix/store", "/"]
assert block_device.label is None
assert block_device.uuid == "ec80c004-baec-4a2c-851d-0e1807135511"
assert block_device.size == "20210236928"
assert block_device.model is None
assert block_device.serial is None
assert block_device.type == "part"
def test_get_nonexistent_block_device(lsblk_full_mock):
block_device = BlockDevices().get_block_device("sda2")
assert block_device is None
def test_get_block_devices_by_mountpoint(lsblk_full_mock):
block_devices = BlockDevices().get_block_devices_by_mountpoint("/nix/store")
assert len(block_devices) == 1
assert block_devices[0].name == "sda1"
assert block_devices[0].path == "/dev/sda1"
assert block_devices[0].fsavail == "4605702144"
assert block_devices[0].fssize == "19814920192"
assert block_devices[0].fstype == "ext4"
assert block_devices[0].fsused == "14353719296"
assert block_devices[0].mountpoints == ["/nix/store", "/"]
assert block_devices[0].label is None
assert block_devices[0].uuid == "ec80c004-baec-4a2c-851d-0e1807135511"
assert block_devices[0].size == "20210236928"
assert block_devices[0].model is None
assert block_devices[0].serial is None
assert block_devices[0].type == "part"
def test_get_block_devices_by_mountpoint_no_match(lsblk_full_mock):
block_devices = BlockDevices().get_block_devices_by_mountpoint("/foo")
assert len(block_devices) == 0

View file

@ -0,0 +1,54 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"volumes": [
]
}

View file

@ -0,0 +1,59 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
],
"volumes": [
{
"device": "/dev/sda1",
"mountPoint": "/volumes/sda1",
"filesystem": "ext4"
}
]
}

View file

@ -0,0 +1,52 @@
{
"backblaze": {
"accountId": "ID",
"accountKey": "KEY",
"bucket": "selfprivacy"
},
"api": {
"token": "TEST_TOKEN",
"enableSwagger": false
},
"bitwarden": {
"enable": true
},
"cloudflare": {
"apiKey": "TOKEN"
},
"databasePassword": "PASSWORD",
"domain": "test.tld",
"hashedMasterPassword": "HASHED_PASSWORD",
"hostname": "test-instance",
"nextcloud": {
"adminPassword": "ADMIN",
"databasePassword": "ADMIN",
"enable": true
},
"resticPassword": "PASS",
"ssh": {
"enable": true,
"passwordAuthentication": true,
"rootKeys": [
"ssh-ed25519 KEY test@pc"
]
},
"username": "tester",
"gitea": {
"enable": false
},
"ocserv": {
"enable": true
},
"pleroma": {
"enable": true
},
"autoUpgrade": {
"enable": true,
"allowReboot": true
},
"timezone": "Europe/Moscow",
"sshKeys": [
"ssh-rsa KEY test@pc"
]
}

View file

@ -6,11 +6,13 @@ import pytest
from selfprivacy_api.utils import WriteUserData, ReadUserData from selfprivacy_api.utils import WriteUserData, ReadUserData
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Jobs, JobStatus
def test_jobs(jobs_file, shared_datadir): def test_jobs(jobs_file, shared_datadir):
jobs = Jobs() jobs = Jobs()
assert jobs.get_jobs() == [] assert jobs.get_jobs() == []
test_job = jobs.add( test_job = jobs.add(
type_id="test",
name="Test job", name="Test job",
description="This is a test job.", description="This is a test job.",
status=JobStatus.CREATED, status=JobStatus.CREATED,

View file

@ -2,6 +2,7 @@
# pylint: disable=redefined-outer-name # pylint: disable=redefined-outer-name
# pylint: disable=unused-argument # pylint: disable=unused-argument
# pylint: disable=missing-function-docstring # pylint: disable=missing-function-docstring
import subprocess
import pytest import pytest
from selfprivacy_api.utils.network import get_ip4, get_ip6 from selfprivacy_api.utils.network import get_ip4, get_ip6
@ -30,6 +31,28 @@ def ip_process_mock(mocker):
return mock return mock
@pytest.fixture
def failed_ip_process_mock(mocker):
mock = mocker.patch(
"subprocess.check_output",
autospec=True,
return_value=FAILED_OUTPUT_STRING,
)
return mock
@pytest.fixture
def failed_subprocess_call(mocker):
mock = mocker.patch(
"subprocess.check_output",
autospec=True,
side_effect=subprocess.CalledProcessError(
returncode=1, cmd=["ip", "addr", "show", "dev", "eth0"]
),
)
return mock
def test_get_ip4(ip_process_mock): def test_get_ip4(ip_process_mock):
"""Test get IPv4 address""" """Test get IPv4 address"""
ip4 = get_ip4() ip4 = get_ip4()
@ -40,3 +63,23 @@ def test_get_ip6(ip_process_mock):
"""Test get IPv6 address""" """Test get IPv6 address"""
ip6 = get_ip6() ip6 = get_ip6()
assert ip6 == "fe80::9400:ff:fef1:34ae" assert ip6 == "fe80::9400:ff:fef1:34ae"
def test_failed_get_ip4(failed_ip_process_mock):
ip4 = get_ip4()
assert ip4 is ""
def test_failed_get_ip6(failed_ip_process_mock):
ip6 = get_ip6()
assert ip6 is ""
def test_failed_subprocess_get_ip4(failed_subprocess_call):
ip4 = get_ip4()
assert ip4 is ""
def test_failed_subprocess_get_ip6(failed_subprocess_call):
ip6 = get_ip6()
assert ip6 is ""