From 52a58d94e7706c2a356e80c9f004ff71a4d57279 Mon Sep 17 00:00:00 2001 From: Inex Code Date: Tue, 2 Aug 2022 22:50:16 +0300 Subject: [PATCH] Test subscription --- selfprivacy_api/app.py | 3 +- selfprivacy_api/graphql/schema.py | 12 + .../graphql/subscriptions/__init__.py | 0 selfprivacy_api/graphql/subscriptions/jobs.py | 26 +++ selfprivacy_api/jobs/__init__.py | 83 ++++--- selfprivacy_api/jobs/test.py | 56 +++++ selfprivacy_api/restic_controller/tasks.py | 4 +- .../services/nextcloud/__init__.py | 215 +++++++++++++++++- selfprivacy_api/services/service.py | 12 +- selfprivacy_api/utils/block_devices.py | 12 +- selfprivacy_api/utils/huey.py | 4 + selfprivacy_api/utils/migrate_to_binds.py | 103 +++++++++ 12 files changed, 485 insertions(+), 45 deletions(-) create mode 100644 selfprivacy_api/graphql/subscriptions/__init__.py create mode 100644 selfprivacy_api/graphql/subscriptions/jobs.py create mode 100644 selfprivacy_api/jobs/test.py create mode 100644 selfprivacy_api/utils/huey.py create mode 100644 selfprivacy_api/utils/migrate_to_binds.py diff --git a/selfprivacy_api/app.py b/selfprivacy_api/app.py index 15142f0..b22d034 100644 --- a/selfprivacy_api/app.py +++ b/selfprivacy_api/app.py @@ -18,7 +18,8 @@ from selfprivacy_api.resources.system import api_system from selfprivacy_api.resources.services import services as api_services from selfprivacy_api.resources.api_auth import auth as api_auth -from selfprivacy_api.restic_controller.tasks import huey, init_restic +from selfprivacy_api.utils.huey import huey +from selfprivacy_api.restic_controller.tasks import init_restic from selfprivacy_api.migrations import run_migrations diff --git a/selfprivacy_api/graphql/schema.py b/selfprivacy_api/graphql/schema.py index c2d6a10..4a7aad5 100644 --- a/selfprivacy_api/graphql/schema.py +++ b/selfprivacy_api/graphql/schema.py @@ -4,6 +4,7 @@ import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations +from selfprivacy_api.graphql.mutations.mutation_interface import GenericMutationReturn from selfprivacy_api.graphql.mutations.ssh_mutations import SshMutations from selfprivacy_api.graphql.mutations.storage_mutation import StorageMutations from selfprivacy_api.graphql.mutations.system_mutations import SystemMutations @@ -14,6 +15,7 @@ from selfprivacy_api.graphql.queries.system import System from selfprivacy_api.graphql.mutations.users_mutations import UserMutations from selfprivacy_api.graphql.queries.users import Users +from selfprivacy_api.jobs.test import test_job @strawberry.type @@ -51,6 +53,16 @@ class Mutation( ): """Root schema for mutations""" + @strawberry.mutation + def test_mutation(self) -> GenericMutationReturn: + """Test mutation""" + test_job() + return GenericMutationReturn( + success=True, + message="Test mutation", + code=200, + ) + pass diff --git a/selfprivacy_api/graphql/subscriptions/__init__.py b/selfprivacy_api/graphql/subscriptions/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/selfprivacy_api/graphql/subscriptions/jobs.py b/selfprivacy_api/graphql/subscriptions/jobs.py new file mode 100644 index 0000000..55d4c10 --- /dev/null +++ b/selfprivacy_api/graphql/subscriptions/jobs.py @@ -0,0 +1,26 @@ +import asyncio +from typing import AsyncGenerator +import typing + +import strawberry +from selfprivacy_api.graphql import IsAuthenticated + +from selfprivacy_api.jobs import Job, Jobs + +@strawberry.type +class JobSubscription: + @strawberry.subscription(permission_classes=[IsAuthenticated]) + async def job_subscription(self) -> AsyncGenerator[typing.List[Job], None]: + is_updated = True + def callback(jobs: typing.List[Job]): + nonlocal is_updated + is_updated = True + Jobs().add_observer(callback) + try: + while True: + if is_updated: + is_updated = False + yield Jobs().jobs + except GeneratorExit: + Jobs().remove_observer(callback) + return diff --git a/selfprivacy_api/jobs/__init__.py b/selfprivacy_api/jobs/__init__.py index a467583..d1ab948 100644 --- a/selfprivacy_api/jobs/__init__.py +++ b/selfprivacy_api/jobs/__init__.py @@ -16,6 +16,7 @@ A job is a dictionary with the following keys: """ import typing import datetime +import asyncio import json import os import time @@ -44,6 +45,8 @@ class Job: name: str, description: str, status: JobStatus, + status_text: typing.Optional[str], + progress: typing.Optional[int], created_at: datetime.datetime, updated_at: datetime.datetime, finished_at: typing.Optional[datetime.datetime], @@ -54,45 +57,25 @@ class Job: self.name = name self.description = description self.status = status + self.status_text = status_text or "" + self.progress = progress or 0 self.created_at = created_at self.updated_at = updated_at self.finished_at = finished_at self.error = error self.result = result - def to_dict(self) -> dict: - """ - Convert the job to a dictionary. - """ - return { - "id": self.id, - "name": self.name, - "description": self.description, - "status": self.status, - "created_at": self.created_at, - "updated_at": self.updated_at, - "finished_at": self.finished_at, - "error": self.error, - "result": self.result, - } - - def to_json(self) -> str: - """ - Convert the job to a JSON string. - """ - return json.dumps(self.to_dict()) - def __str__(self) -> str: """ Convert the job to a string. """ - return self.to_json() + return f"{self.name} - {self.status}" def __repr__(self) -> str: """ Convert the job to a string. """ - return self.to_json() + return f"{self.name} - {self.status}" class Jobs: @@ -120,9 +103,30 @@ class Jobs: else: Jobs.__instance = self self.jobs = [] + # Observers of the jobs list. + self.observers = [] + + def add_observer(self, observer: typing.Callable[[typing.List[Job]], None]) -> None: + """ + Add an observer to the jobs list. + """ + self.observers.append(observer) + + def remove_observer(self, observer: typing.Callable[[typing.List[Job]], None]) -> None: + """ + Remove an observer from the jobs list. + """ + self.observers.remove(observer) + + def _notify_observers(self) -> None: + """ + Notify the observers of the jobs list. + """ + for observer in self.observers: + observer(self.jobs) def add( - self, name: str, description: str, status: JobStatus = JobStatus.CREATED + self, name: str, description: str, status: JobStatus = JobStatus.CREATED, status_text: str = "", progress: int = 0 ) -> Job: """ Add a job to the jobs list. @@ -131,6 +135,8 @@ class Jobs: name=name, description=description, status=status, + status_text=status_text, + progress=progress, created_at=datetime.datetime.now(), updated_at=datetime.datetime.now(), finished_at=None, @@ -138,6 +144,9 @@ class Jobs: result=None, ) self.jobs.append(job) + # Notify the observers. + self._notify_observers() + return job def remove(self, job: Job) -> None: @@ -145,15 +154,19 @@ class Jobs: Remove a job from the jobs list. """ self.jobs.remove(job) + # Notify the observers. + self._notify_observers() def update( self, job: Job, - name: typing.Optional[str], - description: typing.Optional[str], status: JobStatus, - error: typing.Optional[str], - result: typing.Optional[str], + status_text: typing.Optional[str] = None, + progress: typing.Optional[int] = None, + name: typing.Optional[str] = None, + description: typing.Optional[str] = None, + error: typing.Optional[str] = None, + result: typing.Optional[str] = None, ) -> Job: """ Update a job in the jobs list. @@ -162,10 +175,20 @@ class Jobs: job.name = name if description is not None: job.description = description + if status_text is not None: + job.status_text = status_text + if progress is not None: + job.progress = progress job.status = status job.updated_at = datetime.datetime.now() job.error = error job.result = result + if status == JobStatus.FINISHED or status == JobStatus.ERROR: + job.finished_at = datetime.datetime.now() + + # Notify the observers. + self._notify_observers() + return job def get_job(self, id: str) -> typing.Optional[Job]: @@ -177,7 +200,7 @@ class Jobs: return job return None - def get_jobs(self) -> list: + def get_jobs(self) -> typing.List[Job]: """ Get the jobs list. """ diff --git a/selfprivacy_api/jobs/test.py b/selfprivacy_api/jobs/test.py new file mode 100644 index 0000000..13856a1 --- /dev/null +++ b/selfprivacy_api/jobs/test.py @@ -0,0 +1,56 @@ +import time +from selfprivacy_api.utils.huey import huey +from selfprivacy_api.jobs import JobStatus, Jobs + + +@huey.task() +def test_job(): + job = Jobs().add( + name="Test job", + description="This is a test job.", + status=JobStatus.CREATED, + status_text="", + progress=0, + ) + time.sleep(5) + Jobs().update( + job=job, + status=JobStatus.RUNNING, + status_text="Performing pre-move checks...", + progress=5, + ) + time.sleep(5) + Jobs().update( + job=job, + status=JobStatus.RUNNING, + status_text="Performing pre-move checks...", + progress=10, + ) + time.sleep(5) + Jobs().update( + job=job, + status=JobStatus.RUNNING, + status_text="Performing pre-move checks...", + progress=15, + ) + time.sleep(5) + Jobs().update( + job=job, + status=JobStatus.RUNNING, + status_text="Performing pre-move checks...", + progress=20, + ) + time.sleep(5) + Jobs().update( + job=job, + status=JobStatus.RUNNING, + status_text="Performing pre-move checks...", + progress=25, + ) + time.sleep(5) + Jobs().update( + job=job, + status=JobStatus.FINISHED, + status_text="Job finished.", + progress=100, + ) diff --git a/selfprivacy_api/restic_controller/tasks.py b/selfprivacy_api/restic_controller/tasks.py index 4c610c4..f583d8b 100644 --- a/selfprivacy_api/restic_controller/tasks.py +++ b/selfprivacy_api/restic_controller/tasks.py @@ -1,10 +1,8 @@ """Tasks for the restic controller.""" from huey import crontab -from huey.contrib.mini import MiniHuey +from selfprivacy_api.utils.huey import huey from . import ResticController, ResticStates -huey = MiniHuey() - @huey.task() def init_restic(): diff --git a/selfprivacy_api/services/nextcloud/__init__.py b/selfprivacy_api/services/nextcloud/__init__.py index 525f657..a0604b2 100644 --- a/selfprivacy_api/services/nextcloud/__init__.py +++ b/selfprivacy_api/services/nextcloud/__init__.py @@ -1,10 +1,16 @@ """Class representing Nextcloud service.""" import base64 import subprocess +import time +import typing import psutil -from selfprivacy_api.services.service import Service, ServiceStatus +import pathlib +import shutil +from selfprivacy_api.jobs import Job, JobStatus, Jobs +from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus from selfprivacy_api.utils import ReadUserData, WriteUserData - +from selfprivacy_api.utils.block_devices import BlockDevice +from selfprivacy_api.utils.huey import huey class Nextcloud(Service): """Class representing Nextcloud service.""" @@ -92,5 +98,206 @@ class Nextcloud(Service): """Return Nextcloud logs.""" return "" - def get_storage_usage(self): - return psutil.disk_usage("/var/lib/nextcloud").used + def get_storage_usage(self) -> int: + """ + Calculate the real storage usage of /var/lib/nextcloud and all subdirectories. + Calculate using pathlib. + Do not follow symlinks. + """ + storage_usage = 0 + for path in pathlib.Path("/var/lib/nextcloud").rglob("**/*"): + if path.is_dir(): + continue + storage_usage += path.stat().st_size + return storage_usage + + def get_location(self) -> str: + """Get the name of disk where Nextcloud is installed.""" + with ReadUserData() as user_data: + if user_data.get("useBinds", False): + return user_data.get("nextcloud", {}).get("location", "sda1") + else: + return "sda1" + + def get_dns_records(self) -> typing.List[ServiceDnsRecord]: + return super().get_dns_records() + + def move_to_volume(self, volume: BlockDevice): + job = Jobs().add( + name="services.nextcloud.move", + description=f"Moving Nextcloud to volume {volume.name}", + ) + move_nextcloud(self, volume, job) + return job + + +@huey.task() +def move_nextcloud(nextcloud: Nextcloud, volume: BlockDevice, job: Job): + """Move Nextcloud to another volume.""" + job = Jobs().update( + job=job, + status_text="Performing pre-move checks...", + status=JobStatus.RUNNING, + ) + with ReadUserData() as user_data: + if not user_data.get("useBinds", False): + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Server is not using binds.", + ) + return + # Check if we are on the same volume + old_location = nextcloud.get_location() + if old_location == volume.name: + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Nextcloud is already on this volume.", + ) + return + # Check if there is enough space on the new volume + if volume.fsavail < nextcloud.get_storage_usage(): + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Not enough space on the new volume.", + ) + return + # Make sure the volume is mounted + if f"/volumes/{volume.name}" not in volume.mountpoints: + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Volume is not mounted.", + ) + return + # Make sure current actual directory exists + if not pathlib.Path(f"/volumes/{old_location}/nextcloud").exists(): + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Nextcloud is not found.", + ) + return + + # Stop Nextcloud + Jobs().update( + job=job, + status=JobStatus.RUNNING, + status_text="Stopping Nextcloud...", + progress=5, + ) + nextcloud.stop() + # Wait for Nextcloud to stop, check every second + # If it does not stop in 30 seconds, abort + for _ in range(30): + if nextcloud.get_status() != ServiceStatus.RUNNING: + break + time.sleep(1) + else: + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Nextcloud did not stop in 30 seconds.", + ) + return + + # Unmount old volume + Jobs().update( + job=job, + status_text="Unmounting old folder...", + status=JobStatus.RUNNING, + progress=10, + ) + try: + subprocess.run(["umount", "/var/lib/nextcloud"], check=True) + except subprocess.CalledProcessError: + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Unable to unmount old volume.", + ) + return + # Move data to new volume and set correct permissions + Jobs().update( + job=job, + status_text="Moving data to new volume...", + status=JobStatus.RUNNING, + progress=20, + ) + shutil.move( + f"/volumes/{old_location}/nextcloud", f"/volumes/{volume.name}/nextcloud" + ) + + Jobs().update( + job=job, + status_text="Making sure Nextcloud owns its files...", + status=JobStatus.RUNNING, + progress=70, + ) + try: + subprocess.run( + [ + "chown", + "-R", + "nextcloud:nextcloud", + f"/volumes/{volume.name}/nextcloud", + ], + check=True, + ) + except subprocess.CalledProcessError as error: + print(error.output) + Jobs().update( + job=job, + status=JobStatus.RUNNING, + error="Unable to set ownership of new volume. Nextcloud may not be able to access its files. Continuing anyway.", + ) + return + + # Mount new volume + Jobs().update( + job=job, + status_text="Mounting Nextcloud data...", + status=JobStatus.RUNNING, + progress=90, + ) + try: + subprocess.run( + [ + "mount", + "--bind", + f"/volumes/{volume.name}/nextcloud", + "/var/lib/nextcloud", + ], + check=True, + ) + except subprocess.CalledProcessError as error: + print(error.output) + Jobs().update( + job=job, + status=JobStatus.ERROR, + error="Unable to mount new volume.", + ) + return + + # Update userdata + Jobs().update( + job=job, + status_text="Finishing move...", + status=JobStatus.RUNNING, + progress=95, + ) + with WriteUserData() as user_data: + if "nextcloud" not in user_data: + user_data["nextcloud"] = {} + user_data["nextcloud"]["location"] = volume.name + # Start Nextcloud + nextcloud.start() + Jobs().update( + job=job, + status=JobStatus.FINISHED, + result="Nextcloud moved successfully.", + status_text="Starting Nextcloud...", + progress=100, + ) diff --git a/selfprivacy_api/services/service.py b/selfprivacy_api/services/service.py index a0e6ae6..7c0b09e 100644 --- a/selfprivacy_api/services/service.py +++ b/selfprivacy_api/services/service.py @@ -3,6 +3,8 @@ from abc import ABC, abstractmethod from enum import Enum import typing +from selfprivacy_api.utils.block_devices import BlockDevice + class ServiceStatus(Enum): """Enum for service status""" @@ -85,9 +87,17 @@ class Service(ABC): pass @abstractmethod - def get_storage_usage(self): + def get_storage_usage(self) -> int: pass @abstractmethod def get_dns_records(self) -> typing.List[ServiceDnsRecord]: pass + + @abstractmethod + def get_location(self) -> str: + pass + + @abstractmethod + def move_to_volume(self, volume: BlockDevice): + pass diff --git a/selfprivacy_api/utils/block_devices.py b/selfprivacy_api/utils/block_devices.py index e6adddc..b33c7aa 100644 --- a/selfprivacy_api/utils/block_devices.py +++ b/selfprivacy_api/utils/block_devices.py @@ -16,7 +16,7 @@ def get_block_device(device_name): "-J", "-b", "-o", - "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINT,LABEL,UUID,SIZE, MODEL,SERIAL,TYPE", + "NAME,PATH,FSAVAIL,FSSIZE,FSTYPE,FSUSED,MOUNTPOINTS,LABEL,UUID,SIZE, MODEL,SERIAL,TYPE", device_name, ] ) @@ -47,7 +47,7 @@ class BlockDevice: self.fssize = block_device["fssize"] self.fstype = block_device["fstype"] self.fsused = block_device["fsused"] - self.mountpoint = block_device["mountpoint"] + self.mountpoints = block_device["mountpoints"] self.label = block_device["label"] self.uuid = block_device["uuid"] self.size = block_device["size"] @@ -60,7 +60,7 @@ class BlockDevice: return self.name def __repr__(self): - return f"" + return f"" def __eq__(self, other): return self.name == other.name @@ -77,7 +77,7 @@ class BlockDevice: self.fssize = device["fssize"] self.fstype = device["fstype"] self.fsused = device["fsused"] - self.mountpoint = device["mountpoint"] + self.mountpoints = device["mountpoints"] self.label = device["label"] self.uuid = device["uuid"] self.size = device["size"] @@ -92,7 +92,7 @@ class BlockDevice: "fssize": self.fssize, "fstype": self.fstype, "fsused": self.fsused, - "mountpoint": self.mountpoint, + "mountpoints": self.mountpoints, "label": self.label, "uuid": self.uuid, "size": self.size, @@ -219,6 +219,6 @@ class BlockDevices: """ block_devices = [] for block_device in self.block_devices: - if block_device.mountpoint == mountpoint: + if mountpoint in block_device.mountpoints: block_devices.append(block_device) return block_devices diff --git a/selfprivacy_api/utils/huey.py b/selfprivacy_api/utils/huey.py new file mode 100644 index 0000000..9803e7b --- /dev/null +++ b/selfprivacy_api/utils/huey.py @@ -0,0 +1,4 @@ +"""MiniHuey singleton.""" +from huey.contrib.mini import MiniHuey + +huey = MiniHuey() diff --git a/selfprivacy_api/utils/migrate_to_binds.py b/selfprivacy_api/utils/migrate_to_binds.py new file mode 100644 index 0000000..faac03b --- /dev/null +++ b/selfprivacy_api/utils/migrate_to_binds.py @@ -0,0 +1,103 @@ +"""Function to perform migration of app data to binds.""" +import subprocess +import psutil +import pathlib +import shutil +from selfprivacy_api.services.nextcloud import Nextcloud +from selfprivacy_api.utils import WriteUserData +from selfprivacy_api.utils.block_devices import BlockDevices + +class BindMigrationConfig: + """Config for bind migration. + For each service provide block device name. + """ + email_block_device: str + bitwarden_block_device: str + gitea_block_device: str + nextcloud_block_device: str + pleroma_block_device: str + + +def migrate_to_binds(config: BindMigrationConfig): + """Migrate app data to binds.""" + + # Get block devices. + block_devices = BlockDevices().get_block_devices() + block_device_names = [ device.name for device in block_devices ] + + # Get all unique required block devices + required_block_devices = [] + for block_device_name in config.__dict__.values(): + if block_device_name not in required_block_devices: + required_block_devices.append(block_device_name) + + # Check if all block devices from config are present. + for block_device_name in required_block_devices: + if block_device_name not in block_device_names: + raise Exception(f"Block device {block_device_name} is not present.") + + # Make sure all required block devices are mounted. + # sda1 is the root partition and is always mounted. + for block_device_name in required_block_devices: + if block_device_name == "sda1": + continue + block_device = BlockDevices().get_block_device(block_device_name) + if block_device is None: + raise Exception(f"Block device {block_device_name} is not present.") + if f"/volumes/{block_device_name}" not in block_device.mountpoints: + raise Exception(f"Block device {block_device_name} is not mounted.") + + # Activate binds in userdata + with WriteUserData() as user_data: + if "email" not in user_data: + user_data["email"] = {} + user_data["email"]["block_device"] = config.email_block_device + if "bitwarden" not in user_data: + user_data["bitwarden"] = {} + user_data["bitwarden"]["block_device"] = config.bitwarden_block_device + if "gitea" not in user_data: + user_data["gitea"] = {} + user_data["gitea"]["block_device"] = config.gitea_block_device + if "nextcloud" not in user_data: + user_data["nextcloud"] = {} + user_data["nextcloud"]["block_device"] = config.nextcloud_block_device + if "pleroma" not in user_data: + user_data["pleroma"] = {} + user_data["pleroma"]["block_device"] = config.pleroma_block_device + + user_data["useBinds"] = True + + # Make sure /volumes/sda1 exists. + pathlib.Path("/volumes/sda1").mkdir(parents=True, exist_ok=True) + + # Perform migration of Nextcloud. + # Data is moved from /var/lib/nextcloud to /volumes//nextcloud. + # /var/lib/nextcloud is removed and /volumes//nextcloud is mounted as bind mount. + + # Turn off Nextcloud + Nextcloud().stop() + + # Move data from /var/lib/nextcloud to /volumes//nextcloud. + # /var/lib/nextcloud is removed and /volumes//nextcloud is mounted as bind mount. + nextcloud_data_path = pathlib.Path("/var/lib/nextcloud") + nextcloud_bind_path = pathlib.Path(f"/volumes/{config.nextcloud_block_device}/nextcloud") + if nextcloud_data_path.exists(): + shutil.move(str(nextcloud_data_path), str(nextcloud_bind_path)) + else: + raise Exception("Nextcloud data path does not exist.") + + # Make sure folder /var/lib/nextcloud exists. + nextcloud_data_path.mkdir(mode=0o750, parents=True, exist_ok=True) + + # Make sure this folder is owned by user nextcloud and group nextcloud. + shutil.chown(nextcloud_bind_path, user="nextcloud", group="nextcloud") + shutil.chown(nextcloud_data_path, user="nextcloud", group="nextcloud") + + # Mount nextcloud bind mount. + subprocess.run(["mount","--bind", str(nextcloud_bind_path), str(nextcloud_data_path)], check=True) + + # Recursively chown all files in nextcloud bind mount. + subprocess.run(["chown", "-R", "nextcloud:nextcloud", str(nextcloud_data_path)], check=True) + + # Start Nextcloud + Nextcloud().start()