mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-18 08:29:14 +00:00
Merge remote-tracking branch 'origin/master' into quotas
This commit is contained in:
commit
39baa3725b
|
@ -312,7 +312,7 @@ class Backups:
|
|||
Backups._prune_auto_snaps(service)
|
||||
service.post_restore()
|
||||
except Exception as error:
|
||||
Jobs.update(job, status=JobStatus.ERROR)
|
||||
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
|
||||
raise error
|
||||
|
||||
Jobs.update(job, status=JobStatus.FINISHED)
|
||||
|
@ -437,9 +437,14 @@ class Backups:
|
|||
snapshot: Snapshot,
|
||||
job: Job,
|
||||
) -> None:
|
||||
Jobs.update(
|
||||
job, status=JobStatus.CREATED, status_text=f"Waiting for pre-restore backup"
|
||||
)
|
||||
failsafe_snapshot = Backups.back_up(service, BackupReason.PRE_RESTORE)
|
||||
|
||||
Jobs.update(job, status=JobStatus.RUNNING)
|
||||
Jobs.update(
|
||||
job, status=JobStatus.RUNNING, status_text=f"Restoring from {snapshot.id}"
|
||||
)
|
||||
try:
|
||||
Backups._restore_service_from_snapshot(
|
||||
service,
|
||||
|
@ -447,9 +452,19 @@ class Backups:
|
|||
verify=False,
|
||||
)
|
||||
except Exception as error:
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.ERROR,
|
||||
status_text=f"Restore failed with {str(error)}, reverting to {failsafe_snapshot.id}",
|
||||
)
|
||||
Backups._restore_service_from_snapshot(
|
||||
service, failsafe_snapshot.id, verify=False
|
||||
)
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.ERROR,
|
||||
status_text=f"Restore failed with {str(error)}, reverted to {failsafe_snapshot.id}",
|
||||
)
|
||||
raise error
|
||||
|
||||
@staticmethod
|
||||
|
@ -466,20 +481,33 @@ class Backups:
|
|||
|
||||
try:
|
||||
Backups._assert_restorable(snapshot)
|
||||
Jobs.update(
|
||||
job, status=JobStatus.RUNNING, status_text="Stopping the service"
|
||||
)
|
||||
with StoppedService(service):
|
||||
Backups.assert_dead(service)
|
||||
if strategy == RestoreStrategy.INPLACE:
|
||||
Backups._inplace_restore(service, snapshot, job)
|
||||
else: # verify_before_download is our default
|
||||
Jobs.update(job, status=JobStatus.RUNNING)
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.RUNNING,
|
||||
status_text=f"Restoring from {snapshot.id}",
|
||||
)
|
||||
Backups._restore_service_from_snapshot(
|
||||
service, snapshot.id, verify=True
|
||||
)
|
||||
|
||||
service.post_restore()
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.RUNNING,
|
||||
progress=90,
|
||||
status_text="Restarting the service",
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
Jobs.update(job, status=JobStatus.ERROR)
|
||||
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
|
||||
raise error
|
||||
|
||||
Jobs.update(job, status=JobStatus.FINISHED)
|
||||
|
|
|
@ -9,8 +9,8 @@ from typing import List, Optional, TypeVar, Callable
|
|||
from collections.abc import Iterable
|
||||
from json.decoder import JSONDecodeError
|
||||
from os.path import exists, join
|
||||
from os import listdir
|
||||
from time import sleep
|
||||
from os import mkdir
|
||||
from shutil import rmtree
|
||||
|
||||
from selfprivacy_api.graphql.common_types.backup import BackupReason
|
||||
from selfprivacy_api.backup.util import output_yielder, sync
|
||||
|
@ -131,32 +131,6 @@ class ResticBackupper(AbstractBackupper):
|
|||
output,
|
||||
)
|
||||
|
||||
def mount_repo(self, mount_directory):
|
||||
mount_command = self.restic_command("mount", mount_directory)
|
||||
mount_command.insert(0, "nohup")
|
||||
handle = subprocess.Popen(
|
||||
mount_command,
|
||||
stdout=subprocess.DEVNULL,
|
||||
shell=False,
|
||||
)
|
||||
sleep(2)
|
||||
if "ids" not in listdir(mount_directory):
|
||||
raise IOError("failed to mount dir ", mount_directory)
|
||||
return handle
|
||||
|
||||
def unmount_repo(self, mount_directory):
|
||||
mount_command = ["umount", "-l", mount_directory]
|
||||
with subprocess.Popen(
|
||||
mount_command, stdout=subprocess.PIPE, shell=False
|
||||
) as handle:
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
# TODO: check for exit code?
|
||||
if "error" in output.lower():
|
||||
return IOError("failed to unmount dir ", mount_directory, ": ", output)
|
||||
|
||||
if not listdir(mount_directory) == []:
|
||||
return IOError("failed to unmount dir ", mount_directory)
|
||||
|
||||
@staticmethod
|
||||
def __flatten_list(list_to_flatten):
|
||||
"""string-aware list flattener"""
|
||||
|
@ -364,20 +338,21 @@ class ResticBackupper(AbstractBackupper):
|
|||
if verify:
|
||||
self._raw_verified_restore(snapshot_id, target=temp_dir)
|
||||
snapshot_root = temp_dir
|
||||
else: # attempting inplace restore via mount + sync
|
||||
self.mount_repo(temp_dir)
|
||||
snapshot_root = join(temp_dir, "ids", snapshot_id)
|
||||
for folder in folders:
|
||||
src = join(snapshot_root, folder.strip("/"))
|
||||
if not exists(src):
|
||||
raise ValueError(
|
||||
f"No such path: {src}. We tried to find {folder}"
|
||||
)
|
||||
dst = folder
|
||||
sync(src, dst)
|
||||
|
||||
assert snapshot_root is not None
|
||||
for folder in folders:
|
||||
src = join(snapshot_root, folder.strip("/"))
|
||||
if not exists(src):
|
||||
raise ValueError(f"No such path: {src}. We tried to find {folder}")
|
||||
dst = folder
|
||||
sync(src, dst)
|
||||
|
||||
if not verify:
|
||||
self.unmount_repo(temp_dir)
|
||||
else: # attempting inplace restore
|
||||
for folder in folders:
|
||||
rmtree(folder)
|
||||
mkdir(folder)
|
||||
self._raw_verified_restore(snapshot_id, target="/")
|
||||
return
|
||||
|
||||
def _raw_verified_restore(self, snapshot_id, target="/"):
|
||||
"""barebones restic restore"""
|
||||
|
|
|
@ -20,9 +20,6 @@ from selfprivacy_api.utils.redis_model_storage import (
|
|||
from selfprivacy_api.backup.providers.provider import AbstractBackupProvider
|
||||
from selfprivacy_api.backup.providers import get_kind
|
||||
|
||||
# a hack to store file path.
|
||||
REDIS_SNAPSHOT_CACHE_EXPIRE_SECONDS = 24 * 60 * 60 # one day
|
||||
|
||||
REDIS_SNAPSHOTS_PREFIX = "backups:snapshots:"
|
||||
REDIS_LAST_BACKUP_PREFIX = "backups:last-backed-up:"
|
||||
REDIS_INITTED_CACHE = "backups:repo_initted"
|
||||
|
@ -95,7 +92,6 @@ class Storage:
|
|||
"""Stores snapshot metadata in redis for caching purposes"""
|
||||
snapshot_key = Storage.__snapshot_key(snapshot)
|
||||
store_model_as_hash(redis, snapshot_key, snapshot)
|
||||
redis.expire(snapshot_key, REDIS_SNAPSHOT_CACHE_EXPIRE_SECONDS)
|
||||
|
||||
@staticmethod
|
||||
def delete_cached_snapshot(snapshot: Snapshot) -> None:
|
||||
|
|
|
@ -7,13 +7,17 @@ from selfprivacy_api.graphql.common_types.backup import RestoreStrategy, BackupR
|
|||
|
||||
from selfprivacy_api.models.backup.snapshot import Snapshot
|
||||
from selfprivacy_api.utils.huey import huey
|
||||
from huey import crontab
|
||||
from selfprivacy_api.services.service import Service
|
||||
from selfprivacy_api.backup import Backups
|
||||
|
||||
SNAPSHOT_CACHE_TTL_HOURS = 6
|
||||
|
||||
|
||||
def validate_datetime(dt: datetime) -> bool:
|
||||
"""
|
||||
Validates that the datetime passed in is timezone-aware.
|
||||
Validates that it is time to back up.
|
||||
Also ensures that the timezone-aware time is used.
|
||||
"""
|
||||
if dt.tzinfo is None:
|
||||
return Backups.is_time_to_backup(dt.replace(tzinfo=timezone.utc))
|
||||
|
@ -52,3 +56,8 @@ def automatic_backup():
|
|||
time = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
for service in Backups.services_to_back_up(time):
|
||||
start_backup(service, BackupReason.AUTO)
|
||||
|
||||
|
||||
@huey.periodic_task(crontab(hour=SNAPSHOT_CACHE_TTL_HOURS))
|
||||
def reload_snapshot_cache():
|
||||
Backups.force_snapshot_cache_reload()
|
||||
|
|
|
@ -13,7 +13,7 @@ from selfprivacy_api.services.owned_path import OwnedPath
|
|||
from selfprivacy_api import utils
|
||||
from selfprivacy_api.utils.waitloop import wait_until_true
|
||||
|
||||
DEFAULT_START_STOP_TIMEOUT = 10 * 60
|
||||
DEFAULT_START_STOP_TIMEOUT = 5 * 60
|
||||
|
||||
|
||||
class ServiceStatus(Enum):
|
||||
|
@ -283,18 +283,28 @@ class StoppedService:
|
|||
|
||||
def __enter__(self) -> Service:
|
||||
self.original_status = self.service.get_status()
|
||||
if self.original_status != ServiceStatus.INACTIVE:
|
||||
self.service.stop()
|
||||
wait_until_true(
|
||||
lambda: self.service.get_status() == ServiceStatus.INACTIVE,
|
||||
timeout_sec=DEFAULT_START_STOP_TIMEOUT,
|
||||
)
|
||||
if self.original_status not in [ServiceStatus.INACTIVE, ServiceStatus.FAILED]:
|
||||
try:
|
||||
self.service.stop()
|
||||
wait_until_true(
|
||||
lambda: self.service.get_status() == ServiceStatus.INACTIVE,
|
||||
timeout_sec=DEFAULT_START_STOP_TIMEOUT,
|
||||
)
|
||||
except TimeoutError as error:
|
||||
raise TimeoutError(
|
||||
f"timed out waiting for {self.service.get_display_name()} to stop"
|
||||
) from error
|
||||
return self.service
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self.original_status in [ServiceStatus.ACTIVATING, ServiceStatus.ACTIVE]:
|
||||
self.service.start()
|
||||
wait_until_true(
|
||||
lambda: self.service.get_status() == ServiceStatus.ACTIVE,
|
||||
timeout_sec=DEFAULT_START_STOP_TIMEOUT,
|
||||
)
|
||||
try:
|
||||
self.service.start()
|
||||
wait_until_true(
|
||||
lambda: self.service.get_status() == ServiceStatus.ACTIVE,
|
||||
timeout_sec=DEFAULT_START_STOP_TIMEOUT,
|
||||
)
|
||||
except TimeoutError as error:
|
||||
raise TimeoutError(
|
||||
f"timed out waiting for {self.service.get_display_name()} to start"
|
||||
) from error
|
||||
|
|
|
@ -135,8 +135,12 @@ class DummyService(Service):
|
|||
|
||||
@classmethod
|
||||
def stop(cls):
|
||||
cls.set_status(ServiceStatus.DEACTIVATING)
|
||||
cls.change_status_with_async_delay(ServiceStatus.INACTIVE, cls.startstop_delay)
|
||||
# simulate a failing service unable to stop
|
||||
if not cls.get_status() == ServiceStatus.FAILED:
|
||||
cls.set_status(ServiceStatus.DEACTIVATING)
|
||||
cls.change_status_with_async_delay(
|
||||
ServiceStatus.INACTIVE, cls.startstop_delay
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def start(cls):
|
||||
|
|
|
@ -12,8 +12,11 @@ from copy import copy
|
|||
import secrets
|
||||
|
||||
|
||||
import tempfile
|
||||
|
||||
import selfprivacy_api.services as services
|
||||
from selfprivacy_api.services import Service, get_all_services
|
||||
from selfprivacy_api.services.service import ServiceStatus
|
||||
|
||||
from selfprivacy_api.services import get_service_by_id
|
||||
from selfprivacy_api.services.test_service import DummyService
|
||||
|
@ -35,7 +38,11 @@ from selfprivacy_api.backup.backuppers.restic_backupper import ResticBackupper
|
|||
from selfprivacy_api.backup.jobs import add_backup_job, add_restore_job
|
||||
|
||||
|
||||
from selfprivacy_api.backup.tasks import start_backup, restore_snapshot
|
||||
from selfprivacy_api.backup.tasks import (
|
||||
start_backup,
|
||||
restore_snapshot,
|
||||
reload_snapshot_cache,
|
||||
)
|
||||
from selfprivacy_api.backup.storage import Storage
|
||||
from selfprivacy_api.backup.jobs import get_backup_job
|
||||
|
||||
|
@ -832,10 +839,19 @@ def restore_strategy(request) -> RestoreStrategy:
|
|||
return RestoreStrategy.INPLACE
|
||||
|
||||
|
||||
@pytest.fixture(params=["failed", "healthy"])
|
||||
def failed(request) -> bool:
|
||||
if request.param == "failed":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def test_restore_snapshot_task(
|
||||
backups, dummy_service, restore_strategy, simulated_service_stopping_delay
|
||||
backups, dummy_service, restore_strategy, simulated_service_stopping_delay, failed
|
||||
):
|
||||
dummy_service.set_delay(simulated_service_stopping_delay)
|
||||
if failed:
|
||||
dummy_service.set_status(ServiceStatus.FAILED)
|
||||
|
||||
Backups.back_up(dummy_service)
|
||||
snaps = Backups.get_snapshots(dummy_service)
|
||||
|
@ -1097,25 +1113,6 @@ def test_sync_nonexistent_src(dummy_service):
|
|||
sync(src, dst)
|
||||
|
||||
|
||||
# Restic lowlevel
|
||||
def test_mount_umount(backups, dummy_service, tmpdir):
|
||||
Backups.back_up(dummy_service)
|
||||
backupper = Backups.provider().backupper
|
||||
assert isinstance(backupper, ResticBackupper)
|
||||
|
||||
mountpoint = tmpdir / "mount"
|
||||
makedirs(mountpoint)
|
||||
assert path.exists(mountpoint)
|
||||
assert len(listdir(mountpoint)) == 0
|
||||
|
||||
handle = backupper.mount_repo(mountpoint)
|
||||
assert len(listdir(mountpoint)) != 0
|
||||
|
||||
backupper.unmount_repo(mountpoint)
|
||||
# handle.terminate()
|
||||
assert len(listdir(mountpoint)) == 0
|
||||
|
||||
|
||||
def test_move_blocks_backups(backups, dummy_service, restore_strategy):
|
||||
snap = Backups.back_up(dummy_service)
|
||||
job = Jobs.add(
|
||||
|
@ -1178,3 +1175,23 @@ def test_operations_while_locked(backups, dummy_service):
|
|||
# check that no locks were left
|
||||
Backups.provider().backupper.lock()
|
||||
Backups.provider().backupper.unlock()
|
||||
|
||||
|
||||
# a paranoid check to weed out problems with tempdirs that are not dependent on us
|
||||
def test_tempfile():
|
||||
with tempfile.TemporaryDirectory() as temp:
|
||||
assert path.exists(temp)
|
||||
assert not path.exists(temp)
|
||||
|
||||
|
||||
# Storage
|
||||
def test_cache_invalidaton_task(backups, dummy_service):
|
||||
Backups.back_up(dummy_service)
|
||||
assert len(Storage.get_cached_snapshots()) == 1
|
||||
|
||||
# Does not trigger resync
|
||||
Storage.invalidate_snapshot_storage()
|
||||
assert Storage.get_cached_snapshots() == []
|
||||
|
||||
reload_snapshot_cache()
|
||||
assert len(Storage.get_cached_snapshots()) == 1
|
||||
|
|
Loading…
Reference in a new issue