selfprivacy-rest-api/selfprivacy_api/services/nextcloud/__init__.py

304 lines
9.5 KiB
Python
Raw Normal View History

2022-07-25 14:08:31 +00:00
"""Class representing Nextcloud service."""
import base64
import subprocess
2022-08-02 19:50:16 +00:00
import time
import typing
2022-07-25 14:08:31 +00:00
import psutil
2022-08-02 19:50:16 +00:00
import pathlib
import shutil
from selfprivacy_api.jobs import Job, JobStatus, Jobs
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
2022-07-25 14:08:31 +00:00
from selfprivacy_api.utils import ReadUserData, WriteUserData
2022-08-02 19:50:16 +00:00
from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.utils.huey import huey
2022-07-25 14:08:31 +00:00
class Nextcloud(Service):
"""Class representing Nextcloud service."""
def get_id(self) -> str:
"""Return service id."""
return "nextcloud"
def get_display_name(self) -> str:
"""Return service display name."""
return "Nextcloud"
def get_description(self) -> str:
"""Return service description."""
return "Nextcloud is a cloud storage service that offers a web interface and a desktop client."
def get_svg_icon(self) -> str:
"""Read SVG icon from file and return it as base64 encoded string."""
with open("selfprivacy_api/services/nextcloud/nextcloud.svg", "rb") as f:
return base64.b64encode(f.read()).decode("utf-8")
def is_enabled(self) -> bool:
with ReadUserData() as user_data:
return user_data.get("nextcloud", {}).get("enable", False)
def get_status(self) -> ServiceStatus:
"""
Return Nextcloud status from systemd.
Use command return code to determine status.
Return code 0 means service is running.
Return code 1 or 2 means service is in error stat.
Return code 3 means service is stopped.
Return code 4 means service is off.
"""
service_status = subprocess.Popen(
["systemctl", "status", "phpfpm-nextcloud.service"]
)
service_status.communicate()[0]
if service_status.returncode == 0:
return ServiceStatus.RUNNING
elif service_status.returncode == 1 or service_status.returncode == 2:
return ServiceStatus.ERROR
elif service_status.returncode == 3:
return ServiceStatus.STOPPED
elif service_status.returncode == 4:
return ServiceStatus.OFF
else:
return ServiceStatus.DEGRADED
def enable(self):
"""Enable Nextcloud service."""
with WriteUserData() as user_data:
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["enable"] = True
def disable(self):
"""Disable Nextcloud service."""
with WriteUserData() as user_data:
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["enable"] = False
def stop(self):
"""Stop Nextcloud service."""
subprocess.Popen(["systemctl", "stop", "phpfpm-nextcloud.service"])
def start(self):
"""Start Nextcloud service."""
subprocess.Popen(["systemctl", "start", "phpfpm-nextcloud.service"])
def restart(self):
"""Restart Nextcloud service."""
subprocess.Popen(["systemctl", "restart", "phpfpm-nextcloud.service"])
def get_configuration(self) -> dict:
"""Return Nextcloud configuration."""
return {}
def set_configuration(self, config_items):
return super().set_configuration(config_items)
def get_logs(self):
"""Return Nextcloud logs."""
return ""
2022-08-02 19:50:16 +00:00
def get_storage_usage(self) -> int:
"""
Calculate the real storage usage of /var/lib/nextcloud and all subdirectories.
Calculate using pathlib.
Do not follow symlinks.
"""
storage_usage = 0
for path in pathlib.Path("/var/lib/nextcloud").rglob("**/*"):
if path.is_dir():
continue
storage_usage += path.stat().st_size
return storage_usage
def get_location(self) -> str:
"""Get the name of disk where Nextcloud is installed."""
with ReadUserData() as user_data:
if user_data.get("useBinds", False):
return user_data.get("nextcloud", {}).get("location", "sda1")
else:
return "sda1"
def get_dns_records(self) -> typing.List[ServiceDnsRecord]:
return super().get_dns_records()
def move_to_volume(self, volume: BlockDevice):
2022-08-02 20:30:03 +00:00
job = Jobs.get_instance().add(
2022-08-02 19:50:16 +00:00
name="services.nextcloud.move",
description=f"Moving Nextcloud to volume {volume.name}",
)
move_nextcloud(self, volume, job)
return job
@huey.task()
def move_nextcloud(nextcloud: Nextcloud, volume: BlockDevice, job: Job):
"""Move Nextcloud to another volume."""
2022-08-02 20:30:03 +00:00
job = Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status_text="Performing pre-move checks...",
status=JobStatus.RUNNING,
)
with ReadUserData() as user_data:
if not user_data.get("useBinds", False):
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Server is not using binds.",
)
return
# Check if we are on the same volume
old_location = nextcloud.get_location()
if old_location == volume.name:
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Nextcloud is already on this volume.",
)
return
# Check if there is enough space on the new volume
if volume.fsavail < nextcloud.get_storage_usage():
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Not enough space on the new volume.",
)
return
# Make sure the volume is mounted
if f"/volumes/{volume.name}" not in volume.mountpoints:
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Volume is not mounted.",
)
return
# Make sure current actual directory exists
if not pathlib.Path(f"/volumes/{old_location}/nextcloud").exists():
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Nextcloud is not found.",
)
return
# Stop Nextcloud
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.RUNNING,
status_text="Stopping Nextcloud...",
progress=5,
)
nextcloud.stop()
# Wait for Nextcloud to stop, check every second
# If it does not stop in 30 seconds, abort
for _ in range(30):
if nextcloud.get_status() != ServiceStatus.RUNNING:
break
time.sleep(1)
else:
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Nextcloud did not stop in 30 seconds.",
)
return
# Unmount old volume
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status_text="Unmounting old folder...",
status=JobStatus.RUNNING,
progress=10,
)
try:
subprocess.run(["umount", "/var/lib/nextcloud"], check=True)
except subprocess.CalledProcessError:
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Unable to unmount old volume.",
)
return
# Move data to new volume and set correct permissions
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status_text="Moving data to new volume...",
status=JobStatus.RUNNING,
progress=20,
)
shutil.move(
f"/volumes/{old_location}/nextcloud", f"/volumes/{volume.name}/nextcloud"
)
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status_text="Making sure Nextcloud owns its files...",
status=JobStatus.RUNNING,
progress=70,
)
try:
subprocess.run(
[
"chown",
"-R",
"nextcloud:nextcloud",
f"/volumes/{volume.name}/nextcloud",
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.RUNNING,
error="Unable to set ownership of new volume. Nextcloud may not be able to access its files. Continuing anyway.",
)
return
# Mount new volume
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status_text="Mounting Nextcloud data...",
status=JobStatus.RUNNING,
progress=90,
)
try:
subprocess.run(
[
"mount",
"--bind",
f"/volumes/{volume.name}/nextcloud",
"/var/lib/nextcloud",
],
check=True,
)
except subprocess.CalledProcessError as error:
print(error.output)
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.ERROR,
error="Unable to mount new volume.",
)
return
# Update userdata
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status_text="Finishing move...",
status=JobStatus.RUNNING,
progress=95,
)
with WriteUserData() as user_data:
if "nextcloud" not in user_data:
user_data["nextcloud"] = {}
user_data["nextcloud"]["location"] = volume.name
# Start Nextcloud
nextcloud.start()
2022-08-02 20:30:03 +00:00
Jobs.get_instance().update(
2022-08-02 19:50:16 +00:00
job=job,
status=JobStatus.FINISHED,
result="Nextcloud moved successfully.",
status_text="Starting Nextcloud...",
progress=100,
)