Merge pull request 'def_tests_reworked' (#88) from def_tests_reworked into master

Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/88
Reviewed-by: Inex Code <inex.code@selfprivacy.org>
This commit is contained in:
Inex Code 2024-03-05 16:40:15 +02:00
commit 8a607b9609
21 changed files with 995 additions and 434 deletions

View file

@ -0,0 +1,34 @@
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.jobs import Jobs, Job
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.services.tasks import move_service as move_service_task
class ServiceNotFoundError(Exception):
pass
class VolumeNotFoundError(Exception):
pass
def move_service(service_id: str, volume_name: str) -> Job:
service = get_service_by_id(service_id)
if service is None:
raise ServiceNotFoundError(f"No such service:{service_id}")
volume = BlockDevices().get_block_device(volume_name)
if volume is None:
raise VolumeNotFoundError(f"No such volume:{volume_name}")
service.assert_can_move(volume)
job = Jobs.add(
type_id=f"services.{service.get_id()}.move",
name=f"Move {service.get_display_name()}",
description=f"Moving {service.get_display_name()} data to {volume.name}",
)
move_service_task(service, volume, job)
return job

View file

@ -6,17 +6,24 @@ from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs import JobStatus
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from traceback import format_tb as format_traceback
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn,
GenericMutationReturn,
)
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from selfprivacy_api.actions.services import (
move_service,
ServiceNotFoundError,
VolumeNotFoundError,
)
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.utils.block_devices import BlockDevices
@strawberry.type
@ -60,7 +67,7 @@ class ServicesMutations:
except Exception as e:
return ServiceMutationReturn(
success=False,
message=format_error(e),
message=pretty_error(e),
code=400,
)
@ -86,7 +93,7 @@ class ServicesMutations:
except Exception as e:
return ServiceMutationReturn(
success=False,
message=format_error(e),
message=pretty_error(e),
code=400,
)
return ServiceMutationReturn(
@ -153,31 +160,32 @@ class ServicesMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated])
def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn:
"""Move service."""
# We need a service instance for a reply later
service = get_service_by_id(input.service_id)
if service is None:
return ServiceJobMutationReturn(
success=False,
message="Service not found.",
message=f"Service does not exist: {input.service_id}",
code=404,
)
# TODO: make serviceImmovable and BlockdeviceNotFound exceptions
# in the move_to_volume() function and handle them here
if not service.is_movable():
try:
job = move_service(input.service_id, input.location)
except (ServiceNotFoundError, VolumeNotFoundError) as e:
return ServiceJobMutationReturn(
success=False,
message="Service is not movable.",
message=pretty_error(e),
code=404,
)
except Exception as e:
return ServiceJobMutationReturn(
success=False,
message=pretty_error(e),
code=400,
service=service_to_graphql_service(service),
)
volume = BlockDevices().get_block_device(input.location)
if volume is None:
return ServiceJobMutationReturn(
success=False,
message="Volume not found.",
code=404,
service=service_to_graphql_service(service),
)
job = service.move_to_volume(volume)
if job.status in [JobStatus.CREATED, JobStatus.RUNNING]:
return ServiceJobMutationReturn(
success=True,
@ -197,12 +205,13 @@ class ServicesMutations:
else:
return ServiceJobMutationReturn(
success=False,
message=f"Service move failure: {job.status_text}",
message=f"While moving service and performing the step '{job.status_text}', error occured: {job.error}",
code=400,
service=service_to_graphql_service(service),
job=job_to_api_job(job),
)
def format_error(e: Exception) -> str:
return type(e).__name__ + ": " + str(e)
def pretty_error(e: Exception) -> str:
traceback = "/r".join(format_traceback(e.__traceback__))
return type(e).__name__ + ": " + str(e) + ": " + traceback

View file

@ -268,6 +268,20 @@ class Jobs:
return False
def report_progress(progress: int, job: Job, status_text: str) -> None:
"""
A terse way to call a common operation, for readability
job.report_progress() would be even better
but it would go against how this file is written
"""
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=status_text,
progress=progress,
)
def _redis_key_from_uuid(uuid_string) -> str:
return "jobs:" + str(uuid_string)

View file

@ -3,12 +3,10 @@ import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON
@ -101,20 +99,3 @@ class Bitwarden(Service):
@staticmethod
def get_folders() -> List[str]:
return ["/var/lib/bitwarden", "/var/lib/bitwarden_rs"]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.bitwarden.move",
name="Move Bitwarden",
description=f"Moving Bitwarden data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"bitwarden",
)
return job

View file

@ -1,260 +0,0 @@
"""Generic handler for moving services"""
from __future__ import annotations
import subprocess
import time
import pathlib
import shutil
from pydantic import BaseModel
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils import ReadUserData, WriteUserData
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.owned_path import OwnedPath
class FolderMoveNames(BaseModel):
name: str
bind_location: str
owner: str
group: str
@staticmethod
def from_owned_path(path: OwnedPath) -> FolderMoveNames:
return FolderMoveNames(
name=FolderMoveNames.get_foldername(path.path),
bind_location=path.path,
owner=path.owner,
group=path.group,
)
@staticmethod
def get_foldername(path: str) -> str:
return path.split("/")[-1]
@staticmethod
def default_foldermoves(service: Service) -> list[FolderMoveNames]:
return [
FolderMoveNames.from_owned_path(folder)
for folder in service.get_owned_folders()
]
@huey.task()
def move_service(
service: Service,
volume: BlockDevice,
job: Job,
folder_names: list[FolderMoveNames],
userdata_location: str,
):
"""Move a service to another volume."""
job = Jobs.update(
job=job,
status_text="Performing pre-move checks...",
status=JobStatus.RUNNING,
)
service_name = service.get_display_name()
with ReadUserData() as user_data:
if not user_data.get("useBinds", False):
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Server is not using binds.",
)
return
# Check if we are on the same volume
old_volume = service.get_drive()
if old_volume == volume.name:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is already on this volume.",
)
return
# Check if there is enough space on the new volume
if int(volume.fsavail) < service.get_storage_usage():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Not enough space on the new volume.",
)
return
# Make sure the volume is mounted
if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Volume is not mounted.",
)
return
# Make sure current actual directory exists and if its user and group are correct
for folder in folder_names:
if not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").exists():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is not found.",
)
return
if not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").is_dir():
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} is not a directory.",
)
return
if (
not pathlib.Path(f"/volumes/{old_volume}/{folder.name}").owner()
== folder.owner
):
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} owner is not {folder.owner}.",
)
return
# Stop service
Jobs.update(
job=job,
status=JobStatus.RUNNING,
status_text=f"Stopping {service_name}...",
progress=5,
)
service.stop()
# Wait for the service to stop, check every second
# If it does not stop in 30 seconds, abort
for _ in range(30):
if service.get_status() not in (
ServiceStatus.ACTIVATING,
ServiceStatus.DEACTIVATING,
):
break
time.sleep(1)
else:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=f"{service_name} did not stop in 30 seconds.",
)
return
# Unmount old volume
Jobs.update(
job=job,
status_text="Unmounting old folder...",
status=JobStatus.RUNNING,
progress=10,
)
for folder in folder_names:
try:
subprocess.run(
["umount", folder.bind_location],
check=True,
)
except subprocess.CalledProcessError:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Unable to unmount old volume.",
)
return
# Move data to new volume and set correct permissions
Jobs.update(
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=20,
)
current_progress = 20
folder_percentage = 50 // len(folder_names)
for folder in folder_names:
shutil.move(
f"/volumes/{old_volume}/{folder.name}",
f"/volumes/{volume.name}/{folder.name}",
)
Jobs.update(
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=current_progress + folder_percentage,
)
Jobs.update(
job=job,
status_text=f"Making sure {service_name} owns its files...",
status=JobStatus.RUNNING,
progress=70,
)
for folder in folder_names:
try:
subprocess.run(
[
"chown",
"-R",
f"{folder.owner}:{folder.group}",
f"/volumes/{volume.name}/{folder.name}",
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
Jobs.update(
job=job,
status=JobStatus.RUNNING,
error=f"Unable to set ownership of new volume. {service_name} may not be able to access its files. Continuing anyway.",
)
# Mount new volume
Jobs.update(
job=job,
status_text=f"Mounting {service_name} data...",
status=JobStatus.RUNNING,
progress=90,
)
for folder in folder_names:
try:
subprocess.run(
[
"mount",
"--bind",
f"/volumes/{volume.name}/{folder.name}",
folder.bind_location,
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
Jobs.update(
job=job,
status=JobStatus.ERROR,
error="Unable to mount new volume.",
)
return
# Update userdata
Jobs.update(
job=job,
status_text="Finishing move...",
status=JobStatus.RUNNING,
progress=95,
)
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if userdata_location not in user_data["modules"]:
user_data["modules"][userdata_location] = {}
user_data["modules"][userdata_location]["location"] = volume.name
# Start service
service.start()
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)

View file

@ -3,12 +3,10 @@ import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.gitea.icon import GITEA_ICON
@ -96,20 +94,3 @@ class Gitea(Service):
@staticmethod
def get_folders() -> List[str]:
return ["/var/lib/gitea"]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.gitea.move",
name="Move Gitea",
description=f"Moving Gitea data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"gitea",
)
return job

View file

@ -4,14 +4,11 @@ import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import (
get_service_status_from_several_units,
)
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api import utils
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.mailserver.icon import MAILSERVER_ICON
@ -166,20 +163,3 @@ class MailServer(Service):
),
)
return dns_records
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.email.move",
name="Move Mail Server",
description=f"Moving mailserver data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"simple-nixos-mailserver",
)
return job

View file

@ -0,0 +1,72 @@
"""Generic handler for moving services"""
from __future__ import annotations
import shutil
from typing import List
from selfprivacy_api.jobs import Job, report_progress
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.owned_path import Bind
class MoveError(Exception):
"""Move of the data has failed"""
def check_volume(volume: BlockDevice, space_needed: int) -> None:
# Check if there is enough space on the new volume
if int(volume.fsavail) < space_needed:
raise MoveError("Not enough space on the new volume.")
# Make sure the volume is mounted
if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints:
raise MoveError("Volume is not mounted.")
def check_binds(volume_name: str, binds: List[Bind]) -> None:
# Make sure current actual directory exists and if its user and group are correct
for bind in binds:
bind.validate()
def unbind_folders(owned_folders: List[Bind]) -> None:
for folder in owned_folders:
folder.unbind()
# May be moved into Bind
def move_data_to_volume(
binds: List[Bind],
new_volume: BlockDevice,
job: Job,
) -> List[Bind]:
current_progress = job.progress
if current_progress is None:
current_progress = 0
progress_per_folder = 50 // len(binds)
for bind in binds:
old_location = bind.location_at_volume()
bind.drive = new_volume
new_location = bind.location_at_volume()
try:
shutil.move(old_location, new_location)
except Exception as error:
raise MoveError(
f"could not move {old_location} to {new_location} : {str(error)}"
) from error
progress = current_progress + progress_per_folder
report_progress(progress, job, "Moving data to new volume...")
return binds
def ensure_folder_ownership(folders: List[Bind]) -> None:
for folder in folders:
folder.ensure_ownership()
def bind_folders(folders: List[Bind]):
for folder in folders:
folder.bind()

View file

@ -2,12 +2,13 @@
import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON
@ -101,18 +102,3 @@ class Nextcloud(Service):
@staticmethod
def get_folders() -> List[str]:
return ["/var/lib/nextcloud"]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.nextcloud.move",
name="Move Nextcloud",
description=f"Moving Nextcloud to volume {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"nextcloud",
)
return job

View file

@ -1,7 +1,126 @@
from __future__ import annotations
import subprocess
import pathlib
from pydantic import BaseModel
from os.path import exists
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
# tests override it to a tmpdir
VOLUMES_PATH = "/volumes"
class BindError(Exception):
pass
class OwnedPath(BaseModel):
"""
A convenient interface for explicitly defining ownership of service folders.
One overrides Service.get_owned_paths() for this.
Why this exists?:
One could use Bind to define ownership but then one would need to handle drive which
is unnecessary and produces code duplication.
It is also somewhat semantically wrong to include Owned Path into Bind
instead of user and group. Because owner and group in Bind are applied to
the original folder on the drive, not to the binding path. But maybe it is
ok since they are technically both owned. Idk yet.
"""
path: str
owner: str
group: str
class Bind:
"""
A directory that resides on some volume but we mount it into fs where we need it.
Used for storing service data.
"""
def __init__(self, binding_path: str, owner: str, group: str, drive: BlockDevice):
self.binding_path = binding_path
self.owner = owner
self.group = group
self.drive = drive
# TODO: delete owned path interface from Service
@staticmethod
def from_owned_path(path: OwnedPath, drive_name: str) -> Bind:
drive = BlockDevices().get_block_device(drive_name)
if drive is None:
raise BindError(f"No such drive: {drive_name}")
return Bind(
binding_path=path.path, owner=path.owner, group=path.group, drive=drive
)
def bind_foldername(self) -> str:
return self.binding_path.split("/")[-1]
def location_at_volume(self) -> str:
return f"{VOLUMES_PATH}/{self.drive.name}/{self.bind_foldername()}"
def validate(self) -> None:
path = pathlib.Path(self.location_at_volume())
if not path.exists():
raise BindError(f"directory {path} is not found.")
if not path.is_dir():
raise BindError(f"{path} is not a directory.")
if path.owner() != self.owner:
raise BindError(f"{path} is not owned by {self.owner}.")
def bind(self) -> None:
if not exists(self.binding_path):
raise BindError(f"cannot bind to a non-existing path: {self.binding_path}")
source = self.location_at_volume()
target = self.binding_path
try:
subprocess.run(
["mount", "--bind", source, target],
stderr=subprocess.PIPE,
check=True,
)
except subprocess.CalledProcessError as error:
print(error.stderr)
raise BindError(f"Unable to bind {source} to {target} :{error.stderr}")
def unbind(self) -> None:
if not exists(self.binding_path):
raise BindError(f"cannot unbind a non-existing path: {self.binding_path}")
try:
subprocess.run(
# umount -l ?
["umount", self.binding_path],
check=True,
)
except subprocess.CalledProcessError:
raise BindError(f"Unable to unmount folder {self.binding_path}.")
pass
def ensure_ownership(self) -> None:
true_location = self.location_at_volume()
try:
subprocess.run(
[
"chown",
"-R",
f"{self.owner}:{self.group}",
# Could we just chown the binded location instead?
true_location,
],
check=True,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as error:
print(error.stderr)
error_message = (
f"Unable to set ownership of {true_location} :{error.stderr}"
)
raise BindError(error_message)

View file

@ -2,13 +2,13 @@
import base64
import subprocess
from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_status_getter import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON
@ -88,7 +88,7 @@ class Pleroma(Service):
def get_owned_folders() -> List[OwnedPath]:
"""
Get a list of occupied directories with ownership info
pleroma has folders that are owned by different users
Pleroma has folders that are owned by different users
"""
return [
OwnedPath(
@ -102,18 +102,3 @@ class Pleroma(Service):
group="postgres",
),
]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.pleroma.move",
name="Move Pleroma",
description=f"Moving Pleroma to volume {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"pleroma",
)
return job

View file

@ -4,12 +4,22 @@ from enum import Enum
from typing import List, Optional
from pydantic import BaseModel
from selfprivacy_api.jobs import Job
from selfprivacy_api.jobs import Job, Jobs, JobStatus, report_progress
from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.owned_path import OwnedPath, Bind
from selfprivacy_api.services.moving import (
check_binds,
check_volume,
unbind_folders,
bind_folders,
ensure_folder_ownership,
MoveError,
move_data_to_volume,
)
from selfprivacy_api import utils
from selfprivacy_api.utils.waitloop import wait_until_true
from selfprivacy_api.utils import ReadUserData, WriteUserData
@ -294,19 +304,134 @@ class Service(ABC):
def get_foldername(path: str) -> str:
return path.split("/")[-1]
@abstractmethod
def move_to_volume(self, volume: BlockDevice) -> Job:
"""Cannot raise errors.
Returns errors as an errored out Job instead."""
pass
# TODO: with better json utils, it can be one line, and not a separate function
@classmethod
def set_location(cls, volume: BlockDevice):
"""
Only changes userdata
"""
service_id = cls.get_id()
with WriteUserData() as user_data:
if "modules" not in user_data:
user_data["modules"] = {}
if service_id not in user_data["modules"]:
user_data["modules"][service_id] = {}
user_data["modules"][service_id]["location"] = volume.name
def binds(self) -> List[Bind]:
owned_folders = self.get_owned_folders()
return [
Bind.from_owned_path(folder, self.get_drive()) for folder in owned_folders
]
def assert_can_move(self, new_volume):
"""
Checks if the service can be moved to new volume
Raises errors if it cannot
"""
service_name = self.get_display_name()
if not self.is_movable():
raise MoveError(f"{service_name} is not movable")
with ReadUserData() as user_data:
if not user_data.get("useBinds", False):
raise MoveError("Server is not using binds.")
current_volume_name = self.get_drive()
if current_volume_name == new_volume.name:
raise MoveError(f"{service_name} is already on volume {new_volume}")
check_volume(new_volume, space_needed=self.get_storage_usage())
binds = self.binds()
if binds == []:
raise MoveError("nothing to move")
check_binds(current_volume_name, binds)
def do_move_to_volume(
self,
new_volume: BlockDevice,
job: Job,
):
"""
Move a service to another volume.
Note: It may be much simpler to write it per bind, but a bit less safe?
"""
service_name = self.get_display_name()
binds = self.binds()
report_progress(10, job, "Unmounting folders from old volume...")
unbind_folders(binds)
report_progress(20, job, "Moving data to new volume...")
binds = move_data_to_volume(binds, new_volume, job)
report_progress(70, job, f"Making sure {service_name} owns its files...")
try:
ensure_folder_ownership(binds)
except Exception as error:
# We have logged it via print and we additionally log it here in the error field
# We are continuing anyway but Job has no warning field
Jobs.update(
job,
JobStatus.RUNNING,
error=f"Service {service_name} will not be able to write files: "
+ str(error),
)
report_progress(90, job, f"Mounting {service_name} data...")
bind_folders(binds)
report_progress(95, job, f"Finishing moving {service_name}...")
self.set_location(new_volume)
Jobs.update(
job=job,
status=JobStatus.FINISHED,
result=f"{service_name} moved successfully.",
status_text=f"Starting {service_name}...",
progress=100,
)
def move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
service_name = self.get_display_name()
report_progress(0, job, "Performing pre-move checks...")
self.assert_can_move(volume)
report_progress(5, job, f"Stopping {service_name}...")
assert self is not None
with StoppedService(self):
report_progress(9, job, "Stopped service, starting the move...")
self.do_move_to_volume(volume, job)
return job
@classmethod
def owned_path(cls, path: str):
"""A default guess on folder ownership"""
"""Default folder ownership"""
service_name = cls.get_display_name()
try:
owner = cls.get_user()
if owner is None:
# TODO: assume root?
# (if we do not want to do assumptions, maybe not declare user optional?)
raise LookupError(f"no user for service: {service_name}")
group = cls.get_group()
if group is None:
raise LookupError(f"no group for service: {service_name}")
except Exception as error:
raise LookupError(
f"when deciding a bind for folder {path} of service {service_name}, error: {str(error)}"
)
return OwnedPath(
path=path,
owner=cls.get_user(),
group=cls.get_group(),
owner=owner,
group=group,
)
def pre_backup(self):

View file

@ -0,0 +1,22 @@
from selfprivacy_api.services import Service
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import Job, Jobs, JobStatus
@huey.task()
def move_service(service: Service, new_volume: BlockDevice, job: Job) -> bool:
"""
Move service's folders to new physical volume
Does not raise exceptions (we cannot handle exceptions from tasks).
Reports all errors via job.
"""
try:
service.move_to_volume(new_volume, job)
except Exception as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=type(e).__name__ + " " + str(e),
)
return True

View file

@ -11,7 +11,6 @@ from os import path
from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.generic_service_mover import move_service, FolderMoveNames
import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
@ -189,23 +188,10 @@ class DummyService(Service):
def get_folders(cls) -> List[str]:
return cls.folders
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id=f"services.{self.get_id()}.move",
name=f"Move {self.get_display_name()}",
description=f"Moving {self.get_display_name()} data to {volume.name}",
)
def do_move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
if self.simulate_moving is False:
# completely generic code, TODO: make it the default impl.
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
self.get_id(),
)
return super(DummyService, self).do_move_to_volume(volume, job)
else:
Jobs.update(job, status=JobStatus.FINISHED)
self.set_drive(volume.name)
return job

View file

@ -4,6 +4,8 @@ import subprocess
import json
import typing
from pydantic import BaseModel
from selfprivacy_api.utils import WriteUserData
from selfprivacy_api.utils.singleton_metaclass import SingletonMetaclass
@ -169,6 +171,9 @@ class BlockDevice:
return False
# TODO: SingletonMetaclass messes with tests and is able to persist state
# between them. If you have very weird test crosstalk that's probably why
# I am not sure it NEEDS to be SingletonMetaclass
class BlockDevices(metaclass=SingletonMetaclass):
"""Singleton holding all Block devices"""

View file

@ -4,6 +4,7 @@
import os
import pytest
import datetime
import subprocess
from os import path
from os import makedirs
@ -136,7 +137,19 @@ def wrong_auth_client(huey_database, redis_repo_with_tokens):
@pytest.fixture()
def raw_dummy_service(tmpdir):
def volume_folders(tmpdir, mocker):
volumes_dir = path.join(tmpdir, "volumes")
makedirs(volumes_dir)
volumenames = ["sda1", "sda2"]
for d in volumenames:
service_dir = path.join(volumes_dir, d)
makedirs(service_dir)
mock = mocker.patch("selfprivacy_api.services.owned_path.VOLUMES_PATH", volumes_dir)
@pytest.fixture()
def raw_dummy_service(tmpdir) -> DummyService:
dirnames = ["test_service", "also_test_service"]
service_dirs = []
for d in dirnames:
@ -161,11 +174,38 @@ def raw_dummy_service(tmpdir):
return service
def ensure_user_exists(user: str):
try:
output = subprocess.check_output(
["useradd", "-U", user], stderr=subprocess.PIPE, shell=False
)
except subprocess.CalledProcessError as error:
if b"already exists" not in error.stderr:
raise error
try:
output = subprocess.check_output(
["useradd", user], stderr=subprocess.PIPE, shell=False
)
except subprocess.CalledProcessError as error:
assert b"already exists" in error.stderr
return
raise ValueError("could not create user", user)
@pytest.fixture()
def dummy_service(
tmpdir, raw_dummy_service, generic_userdata
) -> Generator[Service, None, None]:
service = raw_dummy_service
user = service.get_user()
# TODO: use create_user from users actions. But it will need NIXOS to be there
# and react to our changes to files.
# from selfprivacy_api.actions.users import create_user
# create_user(user, "yay, it is me")
ensure_user_exists(user)
# register our service
services.services.append(service)

92
tests/test_binds.py Normal file
View file

@ -0,0 +1,92 @@
import pytest
from os import mkdir, rmdir
from os.path import join, exists
from tests.conftest import ensure_user_exists
from tests.test_graphql.test_services import mock_lsblk_devices
from selfprivacy_api.services.owned_path import Bind, BindError
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.utils.waitloop import wait_until_true
BINDTESTS_USER = "binduser"
TESTFILE_CONTENTS = "testissimo"
TESTFILE_NAME = "testfile"
@pytest.fixture()
def bind_user():
ensure_user_exists(BINDTESTS_USER)
return BINDTESTS_USER
def prepare_test_bind(tmpdir, bind_user) -> Bind:
test_binding_name = "bindy_dir"
binding_path = join(tmpdir, test_binding_name)
drive = BlockDevices().get_block_device("sda2")
assert drive is not None
bind = Bind(
binding_path=binding_path, owner=bind_user, group=bind_user, drive=drive
)
source_dir = bind.location_at_volume()
mkdir(source_dir)
mkdir(binding_path)
testfile_path = join(source_dir, TESTFILE_NAME)
with open(testfile_path, "w") as file:
file.write(TESTFILE_CONTENTS)
return bind
def test_bind_unbind(volume_folders, tmpdir, bind_user, mock_lsblk_devices):
bind = prepare_test_bind(tmpdir, bind_user)
bind.ensure_ownership()
bind.validate()
testfile_path = join(bind.location_at_volume(), TESTFILE_NAME)
assert exists(testfile_path)
with open(testfile_path, "r") as file:
assert file.read() == TESTFILE_CONTENTS
bind.bind()
testfile_binding_path = join(bind.binding_path, TESTFILE_NAME)
assert exists(testfile_path)
with open(testfile_path, "r") as file:
assert file.read() == TESTFILE_CONTENTS
bind.unbind()
# wait_until_true(lambda : not exists(testfile_binding_path), timeout_sec=2)
assert not exists(testfile_binding_path)
assert exists(bind.binding_path)
def test_bind_nonexistent_target(volume_folders, tmpdir, bind_user, mock_lsblk_devices):
bind = prepare_test_bind(tmpdir, bind_user)
bind.ensure_ownership()
bind.validate()
rmdir(bind.binding_path)
with pytest.raises(BindError):
bind.bind()
def test_unbind_nonexistent_target(
volume_folders, tmpdir, bind_user, mock_lsblk_devices
):
bind = prepare_test_bind(tmpdir, bind_user)
bind.ensure_ownership()
bind.validate()
bind.bind()
bind.binding_path = "/bogus"
with pytest.raises(BindError):
bind.unbind()

View file

@ -410,6 +410,7 @@ def lsblk_full_mock(mocker):
mock = mocker.patch(
"subprocess.check_output", autospec=True, return_value=FULL_LSBLK_OUTPUT
)
BlockDevices().update()
return mock

View file

@ -0,0 +1,268 @@
import pytest
class BlockDeviceMockReturnTrue:
"""Mock BlockDevices"""
def __init__(self, *args, **kwargs):
self.args = args
self.kwargs = kwargs
def mount(self):
return True
def unmount(self):
return True
def resize(self):
return True
returncode = 0
class BlockDevicesMockReturnTrue:
def get_block_device(name: str): # type: ignore
return BlockDeviceMockReturnTrue()
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
class BlockDevicesMockReturnNone:
def get_block_device(name: str): # type: ignore
return None
def __new__(cls, *args, **kwargs):
pass
def __init__(self):
pass
@pytest.fixture
def mock_block_devices_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.graphql.mutations.storage_mutations.BlockDevices",
# "selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnTrue,
)
return mock
@pytest.fixture
def mock_block_devices_return_none(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices",
autospec=True,
return_value=BlockDevicesMockReturnNone,
)
return mock
@pytest.fixture
def mock_block_device_return_true(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevice",
autospec=True,
return_value=BlockDeviceMockReturnTrue,
)
return mock
API_RESIZE_VOLUME_MUTATION = """
mutation resizeVolume($name: String!) {
resizeVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_resize_volume_unauthorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_resize_volume_nonexistent_block_device(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 404
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is False
def test_graphql_resize_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_RESIZE_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["resizeVolume"]["code"] == 200
assert response.json()["data"]["resizeVolume"]["message"] is not None
assert response.json()["data"]["resizeVolume"]["success"] is True
API_MOUNT_VOLUME_MUTATION = """
mutation mountVolume($name: String!) {
mountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_mount_volume_unauthorized_client(
client, mock_block_device_return_true
):
response = client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_mount_already_mounted_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 404
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is False
def test_graphql_mount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_MOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["mountVolume"]["code"] == 200
assert response.json()["data"]["mountVolume"]["message"] is not None
assert response.json()["data"]["mountVolume"]["success"] is True
API_UNMOUNT_VOLUME_MUTATION = """
mutation unmountVolume($name: String!) {
unmountVolume(name: $name) {
success
message
code
}
}
"""
def test_graphql_unmount_volume_unauthorized_client(
client, mock_block_devices_return_true
):
response = client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is None
def test_graphql_unmount_not_found_volume(
authorized_client, mock_block_devices_return_none
):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 404
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is False
def test_graphql_unmount_volume(authorized_client, mock_block_devices_return_true):
response = authorized_client.post(
"/graphql",
json={
"query": API_UNMOUNT_VOLUME_MUTATION,
"variables": {"name": "sdx"},
},
)
assert response.status_code == 200
assert response.json().get("data") is not None
assert response.json()["data"]["unmountVolume"]["code"] == 200
assert response.json()["data"]["unmountVolume"]["message"] is not None
assert response.json()["data"]["unmountVolume"]["success"] is True

View file

@ -1,5 +1,8 @@
import pytest
import shutil
from typing import Generator
from os import mkdir
from selfprivacy_api.utils.block_devices import BlockDevices
@ -10,6 +13,75 @@ from selfprivacy_api.services.test_service import DummyService
from tests.common import generate_service_query
from tests.test_graphql.common import assert_empty, assert_ok, get_data
from tests.test_block_device_utils import lsblk_singular_mock
LSBLK_BLOCKDEVICES_DICTS = [
{
"name": "sda1",
"path": "/dev/sda1",
"fsavail": "4614107136",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14345314304",
"mountpoints": ["/nix/store", "/"],
"label": None,
"uuid": "ec80c004-baec-4a2c-851d-0e1807135511",
"size": 20210236928,
"model": None,
"serial": None,
"type": "part",
},
{
"name": "sda2",
"path": "/dev/sda2",
"fsavail": "4614107136",
"fssize": "19814920192",
"fstype": "ext4",
"fsused": "14345314304",
"mountpoints": ["/home"],
"label": None,
"uuid": "deadbeef-baec-4a2c-851d-0e1807135511",
"size": 20210236928,
"model": None,
"serial": None,
"type": "part",
},
]
@pytest.fixture()
def mock_lsblk_devices(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.block_devices.BlockDevices.lsblk_device_dicts",
autospec=True,
return_value=LSBLK_BLOCKDEVICES_DICTS,
)
BlockDevices().update()
assert BlockDevices().lsblk_device_dicts() == LSBLK_BLOCKDEVICES_DICTS
devices = BlockDevices().get_block_devices()
assert len(devices) == 2
names = [device.name for device in devices]
assert "sda1" in names
assert "sda2" in names
return mock
@pytest.fixture()
def dummy_service_with_binds(dummy_service, mock_lsblk_devices, volume_folders):
binds = dummy_service.binds()
for bind in binds:
path = bind.binding_path
shutil.move(bind.binding_path, bind.location_at_volume())
mkdir(bind.binding_path)
bind.ensure_ownership()
bind.validate()
bind.bind()
return dummy_service
@pytest.fixture()
@ -23,6 +95,16 @@ def only_dummy_service(dummy_service) -> Generator[DummyService, None, None]:
service_module.services.extend(back_copy)
@pytest.fixture()
def mock_check_volume(mocker):
mock = mocker.patch(
"selfprivacy_api.services.service.check_volume",
autospec=True,
return_value=None,
)
return mock
API_START_MUTATION = """
mutation TestStartService($service_id: String!) {
services {
@ -465,23 +547,36 @@ def test_disable_enable(authorized_client, only_dummy_service):
def test_move_immovable(authorized_client, only_dummy_service):
dummy_service = only_dummy_service
dummy_service.set_movable(False)
mutation_response = api_move(authorized_client, dummy_service, "sda1")
root = BlockDevices().get_root_block_device()
mutation_response = api_move(authorized_client, dummy_service, root.name)
data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 400)
try:
assert "not movable" in data["message"]
except AssertionError:
raise ValueError("wrong type of error?: ", data["message"])
# is there a meaning in returning the service in this?
assert data["service"] is not None
assert data["job"] is None
def test_move_no_such_service(authorized_client, only_dummy_service):
mutation_response = api_move_by_name(authorized_client, "bogus_service", "sda1")
data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 404)
assert data["service"] is None
assert data["job"] is None
def test_move_no_such_volume(authorized_client, only_dummy_service):
dummy_service = only_dummy_service
mutation_response = api_move(authorized_client, dummy_service, "bogus_volume")
data = get_data(mutation_response)["services"]["moveService"]
assert_notfound(data)
# is there a meaning in returning the service in this?
assert data["service"] is not None
assert data["service"] is None
assert data["job"] is None
@ -499,7 +594,49 @@ def test_move_same_volume(authorized_client, dummy_service):
# is there a meaning in returning the service in this?
assert data["service"] is not None
assert data["job"] is not None
# We do not create a job if task is not created
assert data["job"] is None
def test_graphql_move_service_without_folders_on_old_volume(
authorized_client,
generic_userdata,
mock_lsblk_devices,
dummy_service: DummyService,
):
target = "sda1"
BlockDevices().update()
assert BlockDevices().get_block_device(target) is not None
dummy_service.set_simulated_moves(False)
dummy_service.set_drive("sda2")
mutation_response = api_move(authorized_client, dummy_service, target)
data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 400)
assert "sda2/test_service is not found" in data["message"]
def test_graphql_move_service(
authorized_client,
generic_userdata,
mock_check_volume,
dummy_service_with_binds,
):
dummy_service = dummy_service_with_binds
origin = "sda1"
target = "sda2"
assert BlockDevices().get_block_device(target) is not None
assert BlockDevices().get_block_device(origin) is not None
dummy_service.set_drive(origin)
dummy_service.set_simulated_moves(False)
mutation_response = api_move(authorized_client, dummy_service, target)
data = get_data(mutation_response)["services"]["moveService"]
assert_ok(data)
def test_mailservice_cannot_enable_disable(authorized_client):

View file

@ -13,7 +13,6 @@ from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.pleroma import Pleroma
from selfprivacy_api.services.mailserver import MailServer
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_service_mover import FolderMoveNames
from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
@ -81,21 +80,6 @@ def test_paths_from_owned_paths():
]
def test_foldermoves_from_ownedpaths():
owned = OwnedPath(
path="var/lib/bitwarden",
group="vaultwarden",
owner="vaultwarden",
)
assert FolderMoveNames.from_owned_path(owned) == FolderMoveNames(
name="bitwarden",
bind_location="var/lib/bitwarden",
group="vaultwarden",
owner="vaultwarden",
)
def test_enabling_disabling_reads_json(dummy_service: DummyService):
with WriteUserData() as data:
data["modules"][dummy_service.get_id()]["enable"] = False