feature(services): check before moving task and before move itself

This commit is contained in:
Houkime 2024-02-18 23:58:00 +00:00 committed by Inex Code
parent d7ef2ed09a
commit 17a1e34c0d
13 changed files with 192 additions and 229 deletions

View file

@ -0,0 +1,36 @@
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.jobs import Jobs, Job
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.services.tasks import move_service as move_service_task
class ServiceNotFoundError(Exception):
pass
class VolumeNotFoundError(Exception):
pass
def move_service(service_id: str, volume_name: str) -> Job:
service = get_service_by_id(service_id)
if service is None:
raise ServiceNotFoundError(f"No such service:{service_id}")
volume = BlockDevices().get_block_device(volume_name)
if volume is None:
raise VolumeNotFoundError(f"No such volume:{volume_name}")
service.assert_can_move(volume)
job = Jobs.add(
type_id=f"services.{service.get_id()}.move",
name=f"Move {service.get_display_name()}",
description=f"Moving {service.get_display_name()} data to {volume.name}",
)
handle = move_service_task(service, volume, job)
# Nonblocking
handle()
return job

View file

@ -5,18 +5,25 @@ import strawberry
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.jobs import job_to_api_job from selfprivacy_api.graphql.common_types.jobs import job_to_api_job
from selfprivacy_api.jobs import JobStatus from selfprivacy_api.jobs import JobStatus
from selfprivacy_api.utils.block_devices import BlockDevices
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from selfprivacy_api.graphql.mutations.mutation_interface import ( from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericJobMutationReturn, GenericJobMutationReturn,
GenericMutationReturn, GenericMutationReturn,
) )
from selfprivacy_api.graphql.common_types.service import (
Service,
service_to_graphql_service,
)
from selfprivacy_api.actions.services import (
move_service,
ServiceNotFoundError,
VolumeNotFoundError,
)
from selfprivacy_api.services.moving import MoveError
from selfprivacy_api.services import get_service_by_id from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.utils.block_devices import BlockDevices
@strawberry.type @strawberry.type
@ -60,7 +67,7 @@ class ServicesMutations:
except Exception as e: except Exception as e:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
message=format_error(e), message=pretty_error(e),
code=400, code=400,
) )
@ -86,7 +93,7 @@ class ServicesMutations:
except Exception as e: except Exception as e:
return ServiceMutationReturn( return ServiceMutationReturn(
success=False, success=False,
message=format_error(e), message=pretty_error(e),
code=400, code=400,
) )
return ServiceMutationReturn( return ServiceMutationReturn(
@ -153,31 +160,31 @@ class ServicesMutations:
@strawberry.mutation(permission_classes=[IsAuthenticated]) @strawberry.mutation(permission_classes=[IsAuthenticated])
def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn: def move_service(self, input: MoveServiceInput) -> ServiceJobMutationReturn:
"""Move service.""" """Move service."""
# We need a service instance for a reply later
service = get_service_by_id(input.service_id) service = get_service_by_id(input.service_id)
if service is None: if service is None:
return ServiceJobMutationReturn( return ServiceJobMutationReturn(
success=False, success=False,
message=f"Service not found: {input.service_id}", message=f"Service does not exist: {input.service_id}",
code=404, code=404,
) )
# TODO: make serviceImmovable and BlockdeviceNotFound exceptions
# in the move_to_volume() function and handle them here try:
if not service.is_movable(): job = move_service(input.service_id, input.location)
except (ServiceNotFoundError, VolumeNotFoundError) as e:
return ServiceJobMutationReturn( return ServiceJobMutationReturn(
success=False, success=False,
message=f"Service is not movable: {service.get_display_name()}", message=pretty_error(e),
code=404,
)
except Exception as e:
return ServiceJobMutationReturn(
success=False,
message=pretty_error(e),
code=400, code=400,
service=service_to_graphql_service(service), service=service_to_graphql_service(service),
) )
volume = BlockDevices().get_block_device(input.location)
if volume is None:
return ServiceJobMutationReturn(
success=False,
message=f"Volume not found: {input.location}",
code=404,
service=service_to_graphql_service(service),
)
job = service.move_to_volume(volume)
if job.status in [JobStatus.CREATED, JobStatus.RUNNING]: if job.status in [JobStatus.CREATED, JobStatus.RUNNING]:
return ServiceJobMutationReturn( return ServiceJobMutationReturn(
success=True, success=True,
@ -204,5 +211,5 @@ class ServicesMutations:
) )
def format_error(e: Exception) -> str: def pretty_error(e: Exception) -> str:
return type(e).__name__ + ": " + str(e) return type(e).__name__ + ": " + str(e)

View file

@ -3,12 +3,10 @@ import base64
import subprocess import subprocess
from typing import Optional, List from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON from selfprivacy_api.services.bitwarden.icon import BITWARDEN_ICON
@ -101,20 +99,3 @@ class Bitwarden(Service):
@staticmethod @staticmethod
def get_folders() -> List[str]: def get_folders() -> List[str]:
return ["/var/lib/bitwarden", "/var/lib/bitwarden_rs"] return ["/var/lib/bitwarden", "/var/lib/bitwarden_rs"]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.bitwarden.move",
name="Move Bitwarden",
description=f"Moving Bitwarden data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"bitwarden",
)
return job

View file

@ -3,12 +3,10 @@ import base64
import subprocess import subprocess
from typing import Optional, List from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.gitea.icon import GITEA_ICON from selfprivacy_api.services.gitea.icon import GITEA_ICON
@ -96,20 +94,3 @@ class Gitea(Service):
@staticmethod @staticmethod
def get_folders() -> List[str]: def get_folders() -> List[str]:
return ["/var/lib/gitea"] return ["/var/lib/gitea"]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.gitea.move",
name="Move Gitea",
description=f"Moving Gitea data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"gitea",
)
return job

View file

@ -4,14 +4,11 @@ import base64
import subprocess import subprocess
from typing import Optional, List from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import ( from selfprivacy_api.utils.systemd import (
get_service_status_from_several_units, get_service_status_from_several_units,
) )
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api import utils from selfprivacy_api import utils
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.mailserver.icon import MAILSERVER_ICON from selfprivacy_api.services.mailserver.icon import MAILSERVER_ICON
@ -166,20 +163,3 @@ class MailServer(Service):
), ),
) )
return dns_records return dns_records
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.email.move",
name="Move Mail Server",
description=f"Moving mailserver data to {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"simple-nixos-mailserver",
)
return job

View file

@ -14,10 +14,9 @@ from selfprivacy_api.services.owned_path import OwnedPath
class MoveError(Exception): class MoveError(Exception):
"""Move failed""" """Move failed"""
def get_foldername(path: str) -> str:
return path.split("/")[-1]
def get_foldername(p: OwnedPath) -> str:
return p.path.split("/")[-1]
def check_volume(volume: BlockDevice, space_needed: int) -> bool: def check_volume(volume: BlockDevice, space_needed: int) -> bool:
@ -26,10 +25,7 @@ def check_volume(volume: BlockDevice, space_needed: int) -> bool:
raise MoveError("Not enough space on the new volume.") raise MoveError("Not enough space on the new volume.")
# Make sure the volume is mounted # Make sure the volume is mounted
if ( if not volume.is_root() and f"/volumes/{volume.name}" not in volume.mountpoints:
not volume.is_root()
and f"/volumes/{volume.name}" not in volume.mountpoints
):
raise MoveError("Volume is not mounted.") raise MoveError("Volume is not mounted.")
@ -39,11 +35,11 @@ def check_folders(current_volume: BlockDevice, folders: List[OwnedPath]) -> None
path = pathlib.Path(f"/volumes/{current_volume}/{get_foldername(folder)}") path = pathlib.Path(f"/volumes/{current_volume}/{get_foldername(folder)}")
if not path.exists(): if not path.exists():
raise MoveError(f"{path} is not found.") raise MoveError(f"directory {path} is not found.")
if not path.is_dir(): if not path.is_dir():
raise MoveError(f"{path} is not a directory.") raise MoveError(f"{path} is not a directory.")
if path.owner() != folder.owner: if path.owner() != folder.owner:
raise MoveError(f"{path} owner is not {folder.owner}.") raise MoveError(f"{path} is not owned by {folder.owner}.")
def unbind_folders(owned_folders: List[OwnedPath]) -> None: def unbind_folders(owned_folders: List[OwnedPath]) -> None:
@ -66,7 +62,7 @@ def move_folders_to_volume(
current_progress = job.progress current_progress = job.progress
folder_percentage = 50 // len(folders) folder_percentage = 50 // len(folders)
for folder in folders: for folder in folders:
folder_name = get_foldername(folder.path) folder_name = get_foldername(folder)
shutil.move( shutil.move(
f"/volumes/{old_volume}/{folder_name}", f"/volumes/{old_volume}/{folder_name}",
f"/volumes/{new_volume.name}/{folder_name}", f"/volumes/{new_volume.name}/{folder_name}",
@ -75,11 +71,9 @@ def move_folders_to_volume(
report_progress(progress, job, "Moving data to new volume...") report_progress(progress, job, "Moving data to new volume...")
def ensure_folder_ownership( def ensure_folder_ownership(folders: List[OwnedPath], volume: BlockDevice) -> None:
folders: List[OwnedPath], volume: BlockDevice
) -> None:
for folder in folders: for folder in folders:
true_location = f"/volumes/{volume.name}/{get_foldername(folder.path)}" true_location = f"/volumes/{volume.name}/{get_foldername(folder)}"
try: try:
subprocess.run( subprocess.run(
[ [
@ -87,12 +81,14 @@ def ensure_folder_ownership(
"-R", "-R",
f"{folder.owner}:{folder.group}", f"{folder.owner}:{folder.group}",
# Could we just chown the binded location instead? # Could we just chown the binded location instead?
true_location true_location,
], ],
check=True, check=True,
) )
except subprocess.CalledProcessError as error: except subprocess.CalledProcessError as error:
error_message = f"Unable to set ownership of {true_location} :{error.output}" error_message = (
f"Unable to set ownership of {true_location} :{error.output}"
)
print(error.output) print(error.output)
raise MoveError(error_message) raise MoveError(error_message)
@ -104,7 +100,7 @@ def bind_folders(folders: List[OwnedPath], volume: BlockDevice) -> None:
[ [
"mount", "mount",
"--bind", "--bind",
f"/volumes/{volume.name}/{get_foldername(folder.path)}", f"/volumes/{volume.name}/{get_foldername(folder)}",
folder.path, folder.path,
], ],
check=True, check=True,

View file

@ -2,12 +2,13 @@
import base64 import base64
import subprocess import subprocess
from typing import Optional, List from typing import Optional, List
from selfprivacy_api.utils import get_domain
from selfprivacy_api.jobs import Job, Jobs from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service
from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON from selfprivacy_api.services.nextcloud.icon import NEXTCLOUD_ICON
@ -101,18 +102,3 @@ class Nextcloud(Service):
@staticmethod @staticmethod
def get_folders() -> List[str]: def get_folders() -> List[str]:
return ["/var/lib/nextcloud"] return ["/var/lib/nextcloud"]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.nextcloud.move",
name="Move Nextcloud",
description=f"Moving Nextcloud to volume {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"nextcloud",
)
return job

View file

@ -2,13 +2,13 @@
import base64 import base64
import subprocess import subprocess
from typing import Optional, List from typing import Optional, List
from selfprivacy_api.jobs import Job, Jobs
from selfprivacy_api.services.generic_service_mover import FolderMoveNames, move_service from selfprivacy_api.utils import get_domain
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils.systemd import get_service_status from selfprivacy_api.utils.systemd import get_service_status
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils import get_domain
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON from selfprivacy_api.services.pleroma.icon import PLEROMA_ICON
@ -88,7 +88,7 @@ class Pleroma(Service):
def get_owned_folders() -> List[OwnedPath]: def get_owned_folders() -> List[OwnedPath]:
""" """
Get a list of occupied directories with ownership info Get a list of occupied directories with ownership info
pleroma has folders that are owned by different users Pleroma has folders that are owned by different users
""" """
return [ return [
OwnedPath( OwnedPath(
@ -102,18 +102,3 @@ class Pleroma(Service):
group="postgres", group="postgres",
), ),
] ]
def move_to_volume(self, volume: BlockDevice) -> Job:
job = Jobs.add(
type_id="services.pleroma.move",
name="Move Pleroma",
description=f"Moving Pleroma to volume {volume.name}",
)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
"pleroma",
)
return job

View file

@ -10,7 +10,15 @@ from selfprivacy_api.utils.block_devices import BlockDevice, BlockDevices
from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.moving import check_folders, check_volume, unbind_folders, bind_folders, ensure_folder_ownership, MoveError, move_folders_to_volume from selfprivacy_api.services.moving import (
check_folders,
check_volume,
unbind_folders,
bind_folders,
ensure_folder_ownership,
MoveError,
move_folders_to_volume,
)
from selfprivacy_api import utils from selfprivacy_api import utils
from selfprivacy_api.utils.waitloop import wait_until_true from selfprivacy_api.utils.waitloop import wait_until_true
@ -316,12 +324,15 @@ class Service(ABC):
Checks if the service can be moved to new volume Checks if the service can be moved to new volume
Raises errors if it cannot Raises errors if it cannot
""" """
service_name = self.get_display_name()
if not self.is_movable():
raise MoveError(f"{service_name} is not movable")
with ReadUserData() as user_data: with ReadUserData() as user_data:
if not user_data.get("useBinds", False): if not user_data.get("useBinds", False):
raise MoveError("Server is not using binds.") raise MoveError("Server is not using binds.")
current_volume_name = self.get_drive() current_volume_name = self.get_drive()
service_name = self.get_display_name()
if current_volume_name == new_volume.name: if current_volume_name == new_volume.name:
raise MoveError(f"{service_name} is already on volume {new_volume}") raise MoveError(f"{service_name} is already on volume {new_volume}")
@ -333,7 +344,6 @@ class Service(ABC):
check_folders(current_volume_name, owned_folders) check_folders(current_volume_name, owned_folders)
def do_move_to_volume( def do_move_to_volume(
self, self,
new_volume: BlockDevice, new_volume: BlockDevice,
@ -341,19 +351,11 @@ class Service(ABC):
): ):
""" """
Move a service to another volume. Move a service to another volume.
Is not allowed to raise errors because it is a task.
""" """
service_name = self.get_display_name() service_name = self.get_display_name()
old_volume_name = self.get_drive() old_volume_name = self.get_drive()
owned_folders = self.get_owned_folders() owned_folders = self.get_owned_folders()
report_progress(0, job, "Performing pre-move checks...")
# TODO: move trying to the task
try:
report_progress(5, job, f"Stopping {service_name}...")
with StoppedService(self):
report_progress(10, job, "Unmounting folders from old volume...") report_progress(10, job, "Unmounting folders from old volume...")
unbind_folders(owned_folders) unbind_folders(owned_folders)
@ -366,13 +368,18 @@ class Service(ABC):
except Exception as error: except Exception as error:
# We have logged it via print and we additionally log it here in the error field # We have logged it via print and we additionally log it here in the error field
# We are continuing anyway but Job has no warning field # We are continuing anyway but Job has no warning field
Jobs.update(job, JobStatus.RUNNING, error=f"Service {service_name} will not be able to write files: " + str(error)) Jobs.update(
job,
JobStatus.RUNNING,
error=f"Service {service_name} will not be able to write files: "
+ str(error),
)
report_progress(90, job, f"Mounting {service_name} data...") report_progress(90, job, f"Mounting {service_name} data...")
bind_folders(owned_folders, new_volume) bind_folders(owned_folders, new_volume)
report_progress(95, job, f"Finishing moving {service_name}...") report_progress(95, job, f"Finishing moving {service_name}...")
self.set_location(self, new_volume) self.set_location(new_volume)
Jobs.update( Jobs.update(
job=job, job=job,
@ -381,19 +388,20 @@ class Service(ABC):
status_text=f"Starting {service_name}...", status_text=f"Starting {service_name}...",
progress=100, progress=100,
) )
except Exception as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=type(e).__name__ + " " + str(e),
)
def move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
service_name = self.get_display_name()
@abstractmethod report_progress(0, job, "Performing pre-move checks...")
def move_to_volume(self, volume: BlockDevice) -> Job: self.assert_can_move(volume)
"""Cannot raise errors.
Returns errors as an errored out Job instead.""" report_progress(5, job, f"Stopping {service_name}...")
pass assert self is not None
with StoppedService(self):
report_progress(9, job, f"Stopped server, starting the move...")
self.do_move_to_volume(volume, job)
return job
@classmethod @classmethod
def owned_path(cls, path: str): def owned_path(cls, path: str):

View file

@ -1,11 +1,22 @@
from selfprivacy_api.services import Service from selfprivacy_api.services import Service
from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils.huey import huey from selfprivacy_api.utils.huey import huey
from selfprivacy_api.jobs import Job, Jobs, JobStatus
@huey.task() @huey.task()
def move_service( def move_service(service: Service, new_volume: BlockDevice, job: Job) -> bool:
service: Service, """
new_volume: BlockDevice, Move service's folders to new physical volume
): Does not raise exceptions (we cannot handle exceptions from tasks).
service.move_to_volume(new_volume) Reports all errors via job.
"""
try:
service.move_to_volume(new_volume, job)
except Exception as e:
Jobs.update(
job=job,
status=JobStatus.ERROR,
error=type(e).__name__ + " " + str(e),
)
return True

View file

@ -11,7 +11,6 @@ from os import path
from selfprivacy_api.jobs import Job, Jobs, JobStatus from selfprivacy_api.jobs import Job, Jobs, JobStatus
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
from selfprivacy_api.utils.block_devices import BlockDevice from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.generic_service_mover import move_service, FolderMoveNames
import selfprivacy_api.utils.network as network_utils import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
@ -189,23 +188,10 @@ class DummyService(Service):
def get_folders(cls) -> List[str]: def get_folders(cls) -> List[str]:
return cls.folders return cls.folders
def move_to_volume(self, volume: BlockDevice) -> Job: def do_move_to_volume(self, volume: BlockDevice, job: Job) -> Job:
job = Jobs.add(
type_id=f"services.{self.get_id()}.move",
name=f"Move {self.get_display_name()}",
description=f"Moving {self.get_display_name()} data to {volume.name}",
)
if self.simulate_moving is False: if self.simulate_moving is False:
# completely generic code, TODO: make it the default impl. return super(DummyService, self).do_move_to_volume(volume, job)
move_service(
self,
volume,
job,
FolderMoveNames.default_foldermoves(self),
self.get_id(),
)
else: else:
Jobs.update(job, status=JobStatus.FINISHED) Jobs.update(job, status=JobStatus.FINISHED)
self.set_drive(volume.name) self.set_drive(volume.name)
return job return job

View file

@ -8,6 +8,8 @@ from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
# from selfprivacy_api.services.moving import check_folders
from tests.common import generate_service_query from tests.common import generate_service_query
from tests.test_graphql.common import assert_empty, assert_ok, get_data from tests.test_graphql.common import assert_empty, assert_ok, get_data
from tests.test_block_device_utils import lsblk_singular_mock from tests.test_block_device_utils import lsblk_singular_mock
@ -32,7 +34,7 @@ MOVER_MOCK_PROCESS = CompletedProcess(["ls"], returncode=0)
@pytest.fixture() @pytest.fixture()
def mock_check_service_mover_folders(mocker): def mock_check_service_mover_folders(mocker):
mock = mocker.patch( mock = mocker.patch(
"selfprivacy_api.services.generic_service_mover.check_folders", "selfprivacy_api.services.service.check_folders",
autospec=True, autospec=True,
return_value=None, return_value=None,
) )
@ -495,9 +497,14 @@ def test_disable_enable(authorized_client, only_dummy_service):
def test_move_immovable(authorized_client, only_dummy_service): def test_move_immovable(authorized_client, only_dummy_service):
dummy_service = only_dummy_service dummy_service = only_dummy_service
dummy_service.set_movable(False) dummy_service.set_movable(False)
mutation_response = api_move(authorized_client, dummy_service, "sda1") root = BlockDevices().get_root_block_device()
mutation_response = api_move(authorized_client, dummy_service, root.name)
data = get_data(mutation_response)["services"]["moveService"] data = get_data(mutation_response)["services"]["moveService"]
assert_errorcode(data, 400) assert_errorcode(data, 400)
try:
assert "not movable" in data["message"]
except AssertionError:
raise ValueError("wrong type of error?: ", data["message"])
# is there a meaning in returning the service in this? # is there a meaning in returning the service in this?
assert data["service"] is not None assert data["service"] is not None
@ -519,8 +526,7 @@ def test_move_no_such_volume(authorized_client, only_dummy_service):
data = get_data(mutation_response)["services"]["moveService"] data = get_data(mutation_response)["services"]["moveService"]
assert_notfound(data) assert_notfound(data)
# is there a meaning in returning the service in this? assert data["service"] is None
assert data["service"] is not None
assert data["job"] is None assert data["job"] is None
@ -538,7 +544,8 @@ def test_move_same_volume(authorized_client, dummy_service):
# is there a meaning in returning the service in this? # is there a meaning in returning the service in this?
assert data["service"] is not None assert data["service"] is not None
assert data["job"] is not None # We do not create a job if task is not created
assert data["job"] is None
def test_graphql_move_service_without_folders_on_old_volume( def test_graphql_move_service_without_folders_on_old_volume(

View file

@ -13,7 +13,6 @@ from selfprivacy_api.services.bitwarden import Bitwarden
from selfprivacy_api.services.pleroma import Pleroma from selfprivacy_api.services.pleroma import Pleroma
from selfprivacy_api.services.mailserver import MailServer from selfprivacy_api.services.mailserver import MailServer
from selfprivacy_api.services.owned_path import OwnedPath from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_service_mover import FolderMoveNames
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
@ -81,19 +80,19 @@ def test_paths_from_owned_paths():
] ]
def test_foldermoves_from_ownedpaths(): # def test_foldermoves_from_ownedpaths():
owned = OwnedPath( # owned = OwnedPath(
path="var/lib/bitwarden", # path="var/lib/bitwarden",
group="vaultwarden", # group="vaultwarden",
owner="vaultwarden", # owner="vaultwarden",
) # )
assert FolderMoveNames.from_owned_path(owned) == FolderMoveNames( # assert FolderMoveNames.from_owned_path(owned) == FolderMoveNames(
name="bitwarden", # name="bitwarden",
bind_location="var/lib/bitwarden", # bind_location="var/lib/bitwarden",
group="vaultwarden", # group="vaultwarden",
owner="vaultwarden", # owner="vaultwarden",
) # )
def test_enabling_disabling_reads_json(dummy_service: DummyService): def test_enabling_disabling_reads_json(dummy_service: DummyService):