mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-31 05:06:41 +00:00
Merge branch 'master' into def/nix-collect-garbage-endpoint
This commit is contained in:
commit
ec404464d8
|
@ -2,6 +2,7 @@
|
|||
This module contains the controller class for backups.
|
||||
"""
|
||||
from datetime import datetime, timedelta
|
||||
import os
|
||||
from os import statvfs
|
||||
from typing import List, Optional
|
||||
|
||||
|
@ -43,6 +44,13 @@ DEFAULT_JSON_PROVIDER = {
|
|||
"bucket": "",
|
||||
}
|
||||
|
||||
BACKUP_PROVIDER_ENVS = {
|
||||
"kind": "BACKUP_KIND",
|
||||
"login": "BACKUP_LOGIN",
|
||||
"key": "BACKUP_KEY",
|
||||
"location": "BACKUP_LOCATION",
|
||||
}
|
||||
|
||||
|
||||
class NotDeadError(AssertionError):
|
||||
"""
|
||||
|
@ -132,6 +140,24 @@ class Backups:
|
|||
Storage.store_provider(none_provider)
|
||||
return none_provider
|
||||
|
||||
@staticmethod
|
||||
def set_provider_from_envs():
|
||||
for env in BACKUP_PROVIDER_ENVS.values():
|
||||
if env not in os.environ.keys():
|
||||
raise ValueError(
|
||||
f"Cannot set backup provider from envs, there is no {env} set"
|
||||
)
|
||||
|
||||
kind_str = os.environ[BACKUP_PROVIDER_ENVS["kind"]]
|
||||
kind_enum = BackupProviderEnum[kind_str]
|
||||
provider = Backups._construct_provider(
|
||||
kind=kind_enum,
|
||||
login=os.environ[BACKUP_PROVIDER_ENVS["login"]],
|
||||
key=os.environ[BACKUP_PROVIDER_ENVS["key"]],
|
||||
location=os.environ[BACKUP_PROVIDER_ENVS["location"]],
|
||||
)
|
||||
Storage.store_provider(provider)
|
||||
|
||||
@staticmethod
|
||||
def _construct_provider(
|
||||
kind: BackupProviderEnum,
|
||||
|
@ -210,6 +236,14 @@ class Backups:
|
|||
Backups.provider().backupper.init()
|
||||
Storage.mark_as_init()
|
||||
|
||||
@staticmethod
|
||||
def erase_repo() -> None:
|
||||
"""
|
||||
Completely empties the remote
|
||||
"""
|
||||
Backups.provider().backupper.erase_repo()
|
||||
Storage.mark_as_uninitted()
|
||||
|
||||
@staticmethod
|
||||
def is_initted() -> bool:
|
||||
"""
|
||||
|
@ -249,7 +283,7 @@ class Backups:
|
|||
Backups._store_last_snapshot(tag, snapshot)
|
||||
service.post_restore()
|
||||
except Exception as error:
|
||||
Jobs.update(job, status=JobStatus.ERROR)
|
||||
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
|
||||
raise error
|
||||
|
||||
Jobs.update(job, status=JobStatus.FINISHED)
|
||||
|
@ -272,9 +306,14 @@ class Backups:
|
|||
snapshot: Snapshot,
|
||||
job: Job,
|
||||
) -> None:
|
||||
Jobs.update(
|
||||
job, status=JobStatus.CREATED, status_text=f"Waiting for pre-restore backup"
|
||||
)
|
||||
failsafe_snapshot = Backups.back_up(service)
|
||||
|
||||
Jobs.update(job, status=JobStatus.RUNNING)
|
||||
Jobs.update(
|
||||
job, status=JobStatus.RUNNING, status_text=f"Restoring from {snapshot.id}"
|
||||
)
|
||||
try:
|
||||
Backups._restore_service_from_snapshot(
|
||||
service,
|
||||
|
@ -282,9 +321,19 @@ class Backups:
|
|||
verify=False,
|
||||
)
|
||||
except Exception as error:
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.ERROR,
|
||||
status_text=f"Restore failed with {str(error)}, reverting to {failsafe_snapshot.id}",
|
||||
)
|
||||
Backups._restore_service_from_snapshot(
|
||||
service, failsafe_snapshot.id, verify=False
|
||||
)
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.ERROR,
|
||||
status_text=f"Restore failed with {str(error)}, reverted to {failsafe_snapshot.id}",
|
||||
)
|
||||
raise error
|
||||
|
||||
@staticmethod
|
||||
|
@ -301,20 +350,33 @@ class Backups:
|
|||
|
||||
try:
|
||||
Backups._assert_restorable(snapshot)
|
||||
Jobs.update(
|
||||
job, status=JobStatus.RUNNING, status_text="Stopping the service"
|
||||
)
|
||||
with StoppedService(service):
|
||||
Backups.assert_dead(service)
|
||||
if strategy == RestoreStrategy.INPLACE:
|
||||
Backups._inplace_restore(service, snapshot, job)
|
||||
else: # verify_before_download is our default
|
||||
Jobs.update(job, status=JobStatus.RUNNING)
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.RUNNING,
|
||||
status_text=f"Restoring from {snapshot.id}",
|
||||
)
|
||||
Backups._restore_service_from_snapshot(
|
||||
service, snapshot.id, verify=True
|
||||
)
|
||||
|
||||
service.post_restore()
|
||||
Jobs.update(
|
||||
job,
|
||||
status=JobStatus.RUNNING,
|
||||
progress=90,
|
||||
status_text="Restarting the service",
|
||||
)
|
||||
|
||||
except Exception as error:
|
||||
Jobs.update(job, status=JobStatus.ERROR)
|
||||
Jobs.update(job, status=JobStatus.ERROR, status_text=str(error))
|
||||
raise error
|
||||
|
||||
Jobs.update(job, status=JobStatus.FINISHED)
|
||||
|
@ -405,10 +467,18 @@ class Backups:
|
|||
|
||||
@staticmethod
|
||||
def forget_snapshot(snapshot: Snapshot) -> None:
|
||||
"""Deletes a snapshot from the storage"""
|
||||
"""Deletes a snapshot from the repo and from cache"""
|
||||
Backups.provider().backupper.forget_snapshot(snapshot.id)
|
||||
Storage.delete_cached_snapshot(snapshot)
|
||||
|
||||
@staticmethod
|
||||
def forget_all_snapshots():
|
||||
"""deliberately erase all snapshots we made"""
|
||||
# there is no dedicated optimized command for this,
|
||||
# but maybe we can have a multi-erase
|
||||
for snapshot in Backups.get_all_snapshots():
|
||||
Backups.forget_snapshot(snapshot)
|
||||
|
||||
@staticmethod
|
||||
def force_snapshot_cache_reload() -> None:
|
||||
"""
|
||||
|
|
|
@ -36,6 +36,11 @@ class AbstractBackupper(ABC):
|
|||
"""Initialize the repository"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def erase_repo(self) -> None:
|
||||
"""Completely empties the remote"""
|
||||
raise NotImplementedError
|
||||
|
||||
@abstractmethod
|
||||
def restore_from_backup(
|
||||
self,
|
||||
|
|
|
@ -23,6 +23,11 @@ class NoneBackupper(AbstractBackupper):
|
|||
def init(self):
|
||||
raise NotImplementedError
|
||||
|
||||
def erase_repo(self) -> None:
|
||||
"""Completely empties the remote"""
|
||||
# this one is already empty
|
||||
pass
|
||||
|
||||
def restore_from_backup(self, snapshot_id: str, folders: List[str], verify=True):
|
||||
"""Restore a target folder using a snapshot"""
|
||||
raise NotImplementedError
|
||||
|
|
|
@ -1,14 +1,16 @@
|
|||
from __future__ import annotations
|
||||
|
||||
import subprocess
|
||||
import json
|
||||
import datetime
|
||||
import tempfile
|
||||
|
||||
from typing import List
|
||||
from typing import List, TypeVar, Callable
|
||||
from collections.abc import Iterable
|
||||
from json.decoder import JSONDecodeError
|
||||
from os.path import exists, join
|
||||
from os import listdir
|
||||
from time import sleep
|
||||
from os import mkdir
|
||||
from shutil import rmtree
|
||||
|
||||
from selfprivacy_api.backup.util import output_yielder, sync
|
||||
from selfprivacy_api.backup.backuppers import AbstractBackupper
|
||||
|
@ -21,6 +23,25 @@ from selfprivacy_api.backup.local_secret import LocalBackupSecret
|
|||
|
||||
SHORT_ID_LEN = 8
|
||||
|
||||
T = TypeVar("T", bound=Callable)
|
||||
|
||||
|
||||
def unlocked_repo(func: T) -> T:
|
||||
"""unlock repo and retry if it appears to be locked"""
|
||||
|
||||
def inner(self: ResticBackupper, *args, **kwargs):
|
||||
try:
|
||||
return func(self, *args, **kwargs)
|
||||
except Exception as error:
|
||||
if "unable to create lock" in str(error):
|
||||
self.unlock()
|
||||
return func(self, *args, **kwargs)
|
||||
else:
|
||||
raise error
|
||||
|
||||
# Above, we manually guarantee that the type returned is compatible.
|
||||
return inner # type: ignore
|
||||
|
||||
|
||||
class ResticBackupper(AbstractBackupper):
|
||||
def __init__(self, login_flag: str, key_flag: str, storage_type: str) -> None:
|
||||
|
@ -40,20 +61,25 @@ class ResticBackupper(AbstractBackupper):
|
|||
def restic_repo(self) -> str:
|
||||
# https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html#other-services-via-rclone
|
||||
# https://forum.rclone.org/t/can-rclone-be-run-solely-with-command-line-options-no-config-no-env-vars/6314/5
|
||||
return f"rclone:{self.storage_type}{self.repo}"
|
||||
return f"rclone:{self.rclone_repo()}"
|
||||
|
||||
def rclone_repo(self) -> str:
|
||||
return f"{self.storage_type}{self.repo}"
|
||||
|
||||
def rclone_args(self):
|
||||
return "rclone.args=serve restic --stdio " + self.backend_rclone_args()
|
||||
return "rclone.args=serve restic --stdio " + " ".join(
|
||||
self.backend_rclone_args()
|
||||
)
|
||||
|
||||
def backend_rclone_args(self) -> str:
|
||||
acc_arg = ""
|
||||
key_arg = ""
|
||||
def backend_rclone_args(self) -> list[str]:
|
||||
args = []
|
||||
if self.account != "":
|
||||
acc_arg = f"{self.login_flag} {self.account}"
|
||||
acc_args = [self.login_flag, self.account]
|
||||
args.extend(acc_args)
|
||||
if self.key != "":
|
||||
key_arg = f"{self.key_flag} {self.key}"
|
||||
|
||||
return f"{acc_arg} {key_arg}"
|
||||
key_args = [self.key_flag, self.key]
|
||||
args.extend(key_args)
|
||||
return args
|
||||
|
||||
def _password_command(self):
|
||||
return f"echo {LocalBackupSecret.get()}"
|
||||
|
@ -79,31 +105,26 @@ class ResticBackupper(AbstractBackupper):
|
|||
command.extend(ResticBackupper.__flatten_list(args))
|
||||
return command
|
||||
|
||||
def mount_repo(self, mount_directory):
|
||||
mount_command = self.restic_command("mount", mount_directory)
|
||||
mount_command.insert(0, "nohup")
|
||||
handle = subprocess.Popen(
|
||||
mount_command,
|
||||
stdout=subprocess.DEVNULL,
|
||||
shell=False,
|
||||
)
|
||||
sleep(2)
|
||||
if "ids" not in listdir(mount_directory):
|
||||
raise IOError("failed to mount dir ", mount_directory)
|
||||
return handle
|
||||
def erase_repo(self) -> None:
|
||||
"""Fully erases repo on remote, can be reinitted again"""
|
||||
command = [
|
||||
"rclone",
|
||||
"purge",
|
||||
self.rclone_repo(),
|
||||
]
|
||||
backend_args = self.backend_rclone_args()
|
||||
if backend_args:
|
||||
command.extend(backend_args)
|
||||
|
||||
def unmount_repo(self, mount_directory):
|
||||
mount_command = ["umount", "-l", mount_directory]
|
||||
with subprocess.Popen(
|
||||
mount_command, stdout=subprocess.PIPE, shell=False
|
||||
) as handle:
|
||||
with subprocess.Popen(command, stdout=subprocess.PIPE, shell=False) as handle:
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
# TODO: check for exit code?
|
||||
if "error" in output.lower():
|
||||
return IOError("failed to unmount dir ", mount_directory, ": ", output)
|
||||
|
||||
if not listdir(mount_directory) == []:
|
||||
return IOError("failed to unmount dir ", mount_directory)
|
||||
if handle.returncode != 0:
|
||||
raise ValueError(
|
||||
"purge exited with errorcode",
|
||||
handle.returncode,
|
||||
":",
|
||||
output,
|
||||
)
|
||||
|
||||
@staticmethod
|
||||
def __flatten_list(list_to_flatten):
|
||||
|
@ -116,6 +137,7 @@ class ResticBackupper(AbstractBackupper):
|
|||
result.append(item)
|
||||
return result
|
||||
|
||||
@unlocked_repo
|
||||
def start_backup(self, folders: List[str], tag: str) -> Snapshot:
|
||||
"""
|
||||
Start backup with restic
|
||||
|
@ -139,8 +161,10 @@ class ResticBackupper(AbstractBackupper):
|
|||
raise ValueError("No service with id ", tag)
|
||||
|
||||
job = get_backup_job(service)
|
||||
output = []
|
||||
try:
|
||||
for raw_message in output_yielder(backup_command):
|
||||
output.append(raw_message)
|
||||
message = self.parse_message(
|
||||
raw_message,
|
||||
job,
|
||||
|
@ -151,7 +175,13 @@ class ResticBackupper(AbstractBackupper):
|
|||
tag,
|
||||
)
|
||||
except ValueError as error:
|
||||
raise ValueError("Could not create a snapshot: ", messages) from error
|
||||
raise ValueError(
|
||||
"Could not create a snapshot: ",
|
||||
str(error),
|
||||
output,
|
||||
"parsed messages:",
|
||||
messages,
|
||||
) from error
|
||||
|
||||
@staticmethod
|
||||
def _snapshot_from_backup_messages(messages, repo_name) -> Snapshot:
|
||||
|
@ -200,23 +230,72 @@ class ResticBackupper(AbstractBackupper):
|
|||
if "created restic repository" not in output:
|
||||
raise ValueError("cannot init a repo: " + output)
|
||||
|
||||
@unlocked_repo
|
||||
def is_initted(self) -> bool:
|
||||
command = self.restic_command(
|
||||
"check",
|
||||
"--json",
|
||||
)
|
||||
|
||||
with subprocess.Popen(
|
||||
command,
|
||||
stdout=subprocess.PIPE,
|
||||
shell=False,
|
||||
stderr=subprocess.STDOUT,
|
||||
) as handle:
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
if not ResticBackupper.has_json(output):
|
||||
if handle.returncode != 0:
|
||||
if "unable to create lock" in output:
|
||||
raise ValueError("Stale lock detected: ", output)
|
||||
return False
|
||||
# raise NotImplementedError("error(big): " + output)
|
||||
return True
|
||||
|
||||
def unlock(self) -> None:
|
||||
"""Remove stale locks."""
|
||||
command = self.restic_command(
|
||||
"unlock",
|
||||
)
|
||||
|
||||
with subprocess.Popen(
|
||||
command,
|
||||
stdout=subprocess.PIPE,
|
||||
shell=False,
|
||||
stderr=subprocess.STDOUT,
|
||||
) as handle:
|
||||
# communication forces to complete and for returncode to get defined
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
if handle.returncode != 0:
|
||||
raise ValueError("cannot unlock the backup repository: ", output)
|
||||
|
||||
def lock(self) -> None:
|
||||
"""
|
||||
Introduce a stale lock.
|
||||
Mainly for testing purposes.
|
||||
Double lock is supposed to fail
|
||||
"""
|
||||
command = self.restic_command(
|
||||
"check",
|
||||
)
|
||||
|
||||
# using temporary cache in /run/user/1000/restic-check-cache-817079729
|
||||
# repository 9639c714 opened (repository version 2) successfully, password is correct
|
||||
# created new cache in /run/user/1000/restic-check-cache-817079729
|
||||
# create exclusive lock for repository
|
||||
# load indexes
|
||||
# check all packs
|
||||
# check snapshots, trees and blobs
|
||||
# [0:00] 100.00% 1 / 1 snapshots
|
||||
# no errors were found
|
||||
|
||||
try:
|
||||
for line in output_yielder(command):
|
||||
if "indexes" in line:
|
||||
break
|
||||
if "unable" in line:
|
||||
raise ValueError(line)
|
||||
except Exception as error:
|
||||
raise ValueError("could not lock repository") from error
|
||||
|
||||
@unlocked_repo
|
||||
def restored_size(self, snapshot_id: str) -> int:
|
||||
"""
|
||||
Size of a snapshot
|
||||
|
@ -230,6 +309,7 @@ class ResticBackupper(AbstractBackupper):
|
|||
with subprocess.Popen(
|
||||
command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
shell=False,
|
||||
) as handle:
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
|
@ -239,6 +319,7 @@ class ResticBackupper(AbstractBackupper):
|
|||
except ValueError as error:
|
||||
raise ValueError("cannot restore a snapshot: " + output) from error
|
||||
|
||||
@unlocked_repo
|
||||
def restore_from_backup(
|
||||
self,
|
||||
snapshot_id,
|
||||
|
@ -255,20 +336,21 @@ class ResticBackupper(AbstractBackupper):
|
|||
if verify:
|
||||
self._raw_verified_restore(snapshot_id, target=temp_dir)
|
||||
snapshot_root = temp_dir
|
||||
else: # attempting inplace restore via mount + sync
|
||||
self.mount_repo(temp_dir)
|
||||
snapshot_root = join(temp_dir, "ids", snapshot_id)
|
||||
|
||||
assert snapshot_root is not None
|
||||
for folder in folders:
|
||||
src = join(snapshot_root, folder.strip("/"))
|
||||
if not exists(src):
|
||||
raise ValueError(f"No such path: {src}. We tried to find {folder}")
|
||||
raise ValueError(
|
||||
f"No such path: {src}. We tried to find {folder}"
|
||||
)
|
||||
dst = folder
|
||||
sync(src, dst)
|
||||
|
||||
if not verify:
|
||||
self.unmount_repo(temp_dir)
|
||||
else: # attempting inplace restore
|
||||
for folder in folders:
|
||||
rmtree(folder)
|
||||
mkdir(folder)
|
||||
self._raw_verified_restore(snapshot_id, target="/")
|
||||
return
|
||||
|
||||
def _raw_verified_restore(self, snapshot_id, target="/"):
|
||||
"""barebones restic restore"""
|
||||
|
@ -277,7 +359,10 @@ class ResticBackupper(AbstractBackupper):
|
|||
)
|
||||
|
||||
with subprocess.Popen(
|
||||
restore_command, stdout=subprocess.PIPE, shell=False
|
||||
restore_command,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
shell=False,
|
||||
) as handle:
|
||||
|
||||
# for some reason restore does not support
|
||||
|
@ -297,6 +382,7 @@ class ResticBackupper(AbstractBackupper):
|
|||
output,
|
||||
)
|
||||
|
||||
@unlocked_repo
|
||||
def forget_snapshot(self, snapshot_id) -> None:
|
||||
"""
|
||||
Either removes snapshot or marks it for deletion later,
|
||||
|
@ -332,10 +418,7 @@ class ResticBackupper(AbstractBackupper):
|
|||
) # none should be impossible after communicate
|
||||
if handle.returncode != 0:
|
||||
raise ValueError(
|
||||
"forget exited with errorcode",
|
||||
handle.returncode,
|
||||
":",
|
||||
output,
|
||||
"forget exited with errorcode", handle.returncode, ":", output, err
|
||||
)
|
||||
|
||||
def _load_snapshots(self) -> object:
|
||||
|
@ -361,8 +444,9 @@ class ResticBackupper(AbstractBackupper):
|
|||
try:
|
||||
return ResticBackupper.parse_json_output(output)
|
||||
except ValueError as error:
|
||||
raise ValueError("Cannot load snapshots: ") from error
|
||||
raise ValueError("Cannot load snapshots: ", output) from error
|
||||
|
||||
@unlocked_repo
|
||||
def get_snapshots(self) -> List[Snapshot]:
|
||||
"""Get all snapshots from the repo"""
|
||||
snapshots = []
|
||||
|
|
|
@ -16,17 +16,13 @@ from selfprivacy_api.utils.redis_model_storage import (
|
|||
from selfprivacy_api.backup.providers.provider import AbstractBackupProvider
|
||||
from selfprivacy_api.backup.providers import get_kind
|
||||
|
||||
# a hack to store file path.
|
||||
REDIS_SNAPSHOT_CACHE_EXPIRE_SECONDS = 24 * 60 * 60 # one day
|
||||
|
||||
REDIS_SNAPSHOTS_PREFIX = "backups:snapshots:"
|
||||
REDIS_LAST_BACKUP_PREFIX = "backups:last-backed-up:"
|
||||
REDIS_INITTED_CACHE_PREFIX = "backups:initted_services:"
|
||||
REDIS_INITTED_CACHE = "backups:repo_initted"
|
||||
|
||||
REDIS_PROVIDER_KEY = "backups:provider"
|
||||
REDIS_AUTOBACKUP_PERIOD_KEY = "backups:autobackup_period"
|
||||
|
||||
|
||||
redis = RedisPool().get_connection()
|
||||
|
||||
|
||||
|
@ -38,9 +34,9 @@ class Storage:
|
|||
"""Deletes all backup related data from redis"""
|
||||
redis.delete(REDIS_PROVIDER_KEY)
|
||||
redis.delete(REDIS_AUTOBACKUP_PERIOD_KEY)
|
||||
redis.delete(REDIS_INITTED_CACHE)
|
||||
|
||||
prefixes_to_clean = [
|
||||
REDIS_INITTED_CACHE_PREFIX,
|
||||
REDIS_SNAPSHOTS_PREFIX,
|
||||
REDIS_LAST_BACKUP_PREFIX,
|
||||
]
|
||||
|
@ -89,7 +85,6 @@ class Storage:
|
|||
"""Stores snapshot metadata in redis for caching purposes"""
|
||||
snapshot_key = Storage.__snapshot_key(snapshot)
|
||||
store_model_as_hash(redis, snapshot_key, snapshot)
|
||||
redis.expire(snapshot_key, REDIS_SNAPSHOT_CACHE_EXPIRE_SECONDS)
|
||||
|
||||
@staticmethod
|
||||
def delete_cached_snapshot(snapshot: Snapshot) -> None:
|
||||
|
@ -162,11 +157,16 @@ class Storage:
|
|||
@staticmethod
|
||||
def has_init_mark() -> bool:
|
||||
"""Returns True if the repository was initialized"""
|
||||
if redis.exists(REDIS_INITTED_CACHE_PREFIX):
|
||||
if redis.exists(REDIS_INITTED_CACHE):
|
||||
return True
|
||||
return False
|
||||
|
||||
@staticmethod
|
||||
def mark_as_init():
|
||||
"""Marks the repository as initialized"""
|
||||
redis.set(REDIS_INITTED_CACHE_PREFIX, 1)
|
||||
redis.set(REDIS_INITTED_CACHE, 1)
|
||||
|
||||
@staticmethod
|
||||
def mark_as_uninitted():
|
||||
"""Marks the repository as initialized"""
|
||||
redis.delete(REDIS_INITTED_CACHE)
|
||||
|
|
|
@ -7,13 +7,17 @@ from selfprivacy_api.graphql.common_types.backup import RestoreStrategy
|
|||
|
||||
from selfprivacy_api.models.backup.snapshot import Snapshot
|
||||
from selfprivacy_api.utils.huey import huey
|
||||
from huey import crontab
|
||||
from selfprivacy_api.services.service import Service
|
||||
from selfprivacy_api.backup import Backups
|
||||
|
||||
SNAPSHOT_CACHE_TTL_HOURS = 6
|
||||
|
||||
|
||||
def validate_datetime(dt: datetime) -> bool:
|
||||
"""
|
||||
Validates that the datetime passed in is timezone-aware.
|
||||
Validates that it is time to back up.
|
||||
Also ensures that the timezone-aware time is used.
|
||||
"""
|
||||
if dt.tzinfo is None:
|
||||
return Backups.is_time_to_backup(dt.replace(tzinfo=timezone.utc))
|
||||
|
@ -50,3 +54,8 @@ def automatic_backup():
|
|||
time = datetime.utcnow().replace(tzinfo=timezone.utc)
|
||||
for service in Backups.services_to_back_up(time):
|
||||
start_backup(service)
|
||||
|
||||
|
||||
@huey.periodic_task(crontab(hour=SNAPSHOT_CACHE_TTL_HOURS))
|
||||
def reload_snapshot_cache():
|
||||
Backups.force_snapshot_cache_reload()
|
||||
|
|
|
@ -1,8 +1,10 @@
|
|||
import subprocess
|
||||
from os.path import exists
|
||||
from typing import Generator
|
||||
|
||||
|
||||
def output_yielder(command):
|
||||
def output_yielder(command) -> Generator[str, None, None]:
|
||||
"""Note: If you break during iteration, it kills the process"""
|
||||
with subprocess.Popen(
|
||||
command,
|
||||
shell=False,
|
||||
|
@ -10,9 +12,15 @@ def output_yielder(command):
|
|||
stderr=subprocess.STDOUT,
|
||||
universal_newlines=True,
|
||||
) as handle:
|
||||
if handle is None or handle.stdout is None:
|
||||
raise ValueError("could not run command: ", command)
|
||||
|
||||
try:
|
||||
for line in iter(handle.stdout.readline, ""):
|
||||
if "NOTICE:" not in line:
|
||||
yield line
|
||||
except GeneratorExit:
|
||||
handle.kill()
|
||||
|
||||
|
||||
def sync(src_path: str, dest_path: str):
|
||||
|
|
|
@ -27,4 +27,4 @@ async def get_token_header(
|
|||
|
||||
def get_api_version() -> str:
|
||||
"""Get API version"""
|
||||
return "2.2.1"
|
||||
return "2.3.1"
|
||||
|
|
|
@ -157,6 +157,35 @@ class BackupMutations:
|
|||
job=job_to_api_job(job),
|
||||
)
|
||||
|
||||
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||
def forget_snapshot(self, snapshot_id: str) -> GenericMutationReturn:
|
||||
"""Forget a snapshot.
|
||||
Makes it inaccessible from the server.
|
||||
After some time, the data (encrypted) will not be recoverable
|
||||
from the backup server too, but not immediately"""
|
||||
|
||||
snap = Backups.get_snapshot_by_id(snapshot_id)
|
||||
if snap is None:
|
||||
return GenericMutationReturn(
|
||||
success=False,
|
||||
code=404,
|
||||
message=f"snapshot {snapshot_id} not found",
|
||||
)
|
||||
|
||||
try:
|
||||
Backups.forget_snapshot(snap)
|
||||
return GenericMutationReturn(
|
||||
success=True,
|
||||
code=200,
|
||||
message="",
|
||||
)
|
||||
except Exception as error:
|
||||
return GenericMutationReturn(
|
||||
success=False,
|
||||
code=400,
|
||||
message=str(error),
|
||||
)
|
||||
|
||||
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||
def force_snapshots_reload(self) -> GenericMutationReturn:
|
||||
"""Force snapshots reload"""
|
||||
|
|
|
@ -13,7 +13,7 @@ from selfprivacy_api.services.owned_path import OwnedPath
|
|||
from selfprivacy_api import utils
|
||||
from selfprivacy_api.utils.waitloop import wait_until_true
|
||||
|
||||
DEFAULT_START_STOP_TIMEOUT = 10 * 60
|
||||
DEFAULT_START_STOP_TIMEOUT = 5 * 60
|
||||
|
||||
|
||||
class ServiceStatus(Enum):
|
||||
|
@ -283,18 +283,28 @@ class StoppedService:
|
|||
|
||||
def __enter__(self) -> Service:
|
||||
self.original_status = self.service.get_status()
|
||||
if self.original_status != ServiceStatus.INACTIVE:
|
||||
if self.original_status not in [ServiceStatus.INACTIVE, ServiceStatus.FAILED]:
|
||||
try:
|
||||
self.service.stop()
|
||||
wait_until_true(
|
||||
lambda: self.service.get_status() == ServiceStatus.INACTIVE,
|
||||
timeout_sec=DEFAULT_START_STOP_TIMEOUT,
|
||||
)
|
||||
except TimeoutError as error:
|
||||
raise TimeoutError(
|
||||
f"timed out waiting for {self.service.get_display_name()} to stop"
|
||||
) from error
|
||||
return self.service
|
||||
|
||||
def __exit__(self, type, value, traceback):
|
||||
if self.original_status in [ServiceStatus.ACTIVATING, ServiceStatus.ACTIVE]:
|
||||
try:
|
||||
self.service.start()
|
||||
wait_until_true(
|
||||
lambda: self.service.get_status() == ServiceStatus.ACTIVE,
|
||||
timeout_sec=DEFAULT_START_STOP_TIMEOUT,
|
||||
)
|
||||
except TimeoutError as error:
|
||||
raise TimeoutError(
|
||||
f"timed out waiting for {self.service.get_display_name()} to start"
|
||||
) from error
|
||||
|
|
|
@ -135,8 +135,12 @@ class DummyService(Service):
|
|||
|
||||
@classmethod
|
||||
def stop(cls):
|
||||
# simulate a failing service unable to stop
|
||||
if not cls.get_status() == ServiceStatus.FAILED:
|
||||
cls.set_status(ServiceStatus.DEACTIVATING)
|
||||
cls.change_status_with_async_delay(ServiceStatus.INACTIVE, cls.startstop_delay)
|
||||
cls.change_status_with_async_delay(
|
||||
ServiceStatus.INACTIVE, cls.startstop_delay
|
||||
)
|
||||
|
||||
@classmethod
|
||||
def start(cls):
|
||||
|
|
2
setup.py
2
setup.py
|
@ -2,7 +2,7 @@ from setuptools import setup, find_packages
|
|||
|
||||
setup(
|
||||
name="selfprivacy_api",
|
||||
version="2.2.1",
|
||||
version="2.3.1",
|
||||
packages=find_packages(),
|
||||
scripts=[
|
||||
"selfprivacy_api/app.py",
|
||||
|
|
|
@ -94,6 +94,18 @@ mutation TestRestoreService($snapshot_id: String!) {
|
|||
}
|
||||
"""
|
||||
|
||||
API_FORGET_MUTATION = """
|
||||
mutation TestForgetSnapshot($snapshot_id: String!) {
|
||||
backup {
|
||||
forgetSnapshot(snapshotId: $snapshot_id) {
|
||||
success
|
||||
message
|
||||
code
|
||||
}
|
||||
}
|
||||
}
|
||||
"""
|
||||
|
||||
API_SNAPSHOTS_QUERY = """
|
||||
allSnapshots {
|
||||
id
|
||||
|
@ -143,6 +155,17 @@ def api_backup(authorized_client, service):
|
|||
return response
|
||||
|
||||
|
||||
def api_forget(authorized_client, snapshot_id):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
json={
|
||||
"query": API_FORGET_MUTATION,
|
||||
"variables": {"snapshot_id": snapshot_id},
|
||||
},
|
||||
)
|
||||
return response
|
||||
|
||||
|
||||
def api_set_period(authorized_client, period):
|
||||
response = authorized_client.post(
|
||||
"/graphql",
|
||||
|
@ -370,3 +393,30 @@ def test_reload_snapshots(authorized_client, dummy_service):
|
|||
|
||||
snaps = api_snapshots(authorized_client)
|
||||
assert len(snaps) == 1
|
||||
|
||||
|
||||
def test_forget_snapshot(authorized_client, dummy_service):
|
||||
response = api_backup(authorized_client, dummy_service)
|
||||
data = get_data(response)["backup"]["startBackup"]
|
||||
|
||||
snaps = api_snapshots(authorized_client)
|
||||
assert len(snaps) == 1
|
||||
|
||||
response = api_forget(authorized_client, snaps[0]["id"])
|
||||
data = get_data(response)["backup"]["forgetSnapshot"]
|
||||
assert_ok(data)
|
||||
|
||||
snaps = api_snapshots(authorized_client)
|
||||
assert len(snaps) == 0
|
||||
|
||||
|
||||
def test_forget_nonexistent_snapshot(authorized_client, dummy_service):
|
||||
snaps = api_snapshots(authorized_client)
|
||||
assert len(snaps) == 0
|
||||
response = api_forget(authorized_client, "898798uekiodpjoiweoiwuoeirueor")
|
||||
data = get_data(response)["backup"]["forgetSnapshot"]
|
||||
assert data["code"] == 404
|
||||
assert data["success"] is False
|
||||
|
||||
snaps = api_snapshots(authorized_client)
|
||||
assert len(snaps) == 0
|
||||
|
|
|
@ -1,4 +1,5 @@
|
|||
import pytest
|
||||
import os
|
||||
import os.path as path
|
||||
from os import makedirs
|
||||
from os import remove
|
||||
|
@ -7,8 +8,11 @@ from os import urandom
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from subprocess import Popen
|
||||
|
||||
import tempfile
|
||||
|
||||
import selfprivacy_api.services as services
|
||||
from selfprivacy_api.services import Service, get_all_services
|
||||
from selfprivacy_api.services.service import ServiceStatus
|
||||
|
||||
from selfprivacy_api.services import get_service_by_id
|
||||
from selfprivacy_api.services.test_service import DummyService
|
||||
|
@ -18,16 +22,21 @@ from selfprivacy_api.jobs import Jobs, JobStatus
|
|||
|
||||
from selfprivacy_api.models.backup.snapshot import Snapshot
|
||||
|
||||
from selfprivacy_api.backup import Backups
|
||||
from selfprivacy_api.backup import Backups, BACKUP_PROVIDER_ENVS
|
||||
import selfprivacy_api.backup.providers as providers
|
||||
from selfprivacy_api.backup.providers import AbstractBackupProvider
|
||||
from selfprivacy_api.backup.providers.backblaze import Backblaze
|
||||
from selfprivacy_api.backup.providers.none import NoBackups
|
||||
from selfprivacy_api.backup.util import sync
|
||||
from selfprivacy_api.backup.backuppers.restic_backupper import ResticBackupper
|
||||
from selfprivacy_api.backup.jobs import add_backup_job, add_restore_job
|
||||
|
||||
|
||||
from selfprivacy_api.backup.tasks import start_backup, restore_snapshot
|
||||
from selfprivacy_api.backup.tasks import (
|
||||
start_backup,
|
||||
restore_snapshot,
|
||||
reload_snapshot_cache,
|
||||
)
|
||||
from selfprivacy_api.backup.storage import Storage
|
||||
from selfprivacy_api.backup.jobs import get_backup_job
|
||||
|
||||
|
@ -37,14 +46,34 @@ TESTFILE_2_BODY = "testissimo!"
|
|||
REPO_NAME = "test_backup"
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def backups(tmpdir):
|
||||
Backups.reset()
|
||||
|
||||
test_repo_path = path.join(tmpdir, "totallyunrelated")
|
||||
def prepare_localfile_backups(temp_dir):
|
||||
test_repo_path = path.join(temp_dir, "totallyunrelated")
|
||||
assert not path.exists(test_repo_path)
|
||||
Backups.set_localfile_repo(test_repo_path)
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def backups_local(tmpdir):
|
||||
Backups.reset()
|
||||
prepare_localfile_backups(tmpdir)
|
||||
Jobs.reset()
|
||||
Backups.init_repo()
|
||||
|
||||
|
||||
@pytest.fixture(scope="function")
|
||||
def backups(tmpdir):
|
||||
# for those tests that are supposed to pass with any repo
|
||||
Backups.reset()
|
||||
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
|
||||
Backups.set_provider_from_envs()
|
||||
else:
|
||||
prepare_localfile_backups(tmpdir)
|
||||
Jobs.reset()
|
||||
# assert not repo_path
|
||||
|
||||
Backups.init_repo()
|
||||
yield
|
||||
Backups.erase_repo()
|
||||
|
||||
|
||||
@pytest.fixture()
|
||||
|
@ -80,11 +109,6 @@ def raw_dummy_service(tmpdir):
|
|||
@pytest.fixture()
|
||||
def dummy_service(tmpdir, backups, raw_dummy_service) -> Service:
|
||||
service = raw_dummy_service
|
||||
repo_path = path.join(tmpdir, "test_repo")
|
||||
assert not path.exists(repo_path)
|
||||
# assert not repo_path
|
||||
|
||||
Backups.init_repo()
|
||||
|
||||
# register our service
|
||||
services.services.append(service)
|
||||
|
@ -129,6 +153,53 @@ def test_config_load(generic_userdata):
|
|||
assert provider.backupper.key == "KEY"
|
||||
|
||||
|
||||
def test_reset_sets_to_none1():
|
||||
Backups.reset()
|
||||
provider = Backups.provider()
|
||||
assert provider is not None
|
||||
assert isinstance(provider, NoBackups)
|
||||
|
||||
|
||||
def test_reset_sets_to_none2(backups):
|
||||
# now with something set up first^^^
|
||||
Backups.reset()
|
||||
provider = Backups.provider()
|
||||
assert provider is not None
|
||||
assert isinstance(provider, NoBackups)
|
||||
|
||||
|
||||
def test_setting_from_envs(tmpdir):
|
||||
Backups.reset()
|
||||
environment_stash = {}
|
||||
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
|
||||
# we are running under special envs, stash them before rewriting them
|
||||
for key in BACKUP_PROVIDER_ENVS.values():
|
||||
environment_stash[key] = os.environ[key]
|
||||
|
||||
os.environ[BACKUP_PROVIDER_ENVS["kind"]] = "BACKBLAZE"
|
||||
os.environ[BACKUP_PROVIDER_ENVS["login"]] = "ID"
|
||||
os.environ[BACKUP_PROVIDER_ENVS["key"]] = "KEY"
|
||||
os.environ[BACKUP_PROVIDER_ENVS["location"]] = "selfprivacy"
|
||||
Backups.set_provider_from_envs()
|
||||
provider = Backups.provider()
|
||||
|
||||
assert provider is not None
|
||||
assert isinstance(provider, Backblaze)
|
||||
assert provider.login == "ID"
|
||||
assert provider.key == "KEY"
|
||||
assert provider.location == "selfprivacy"
|
||||
|
||||
assert provider.backupper.account == "ID"
|
||||
assert provider.backupper.key == "KEY"
|
||||
|
||||
if environment_stash != {}:
|
||||
for key in BACKUP_PROVIDER_ENVS.values():
|
||||
os.environ[key] = environment_stash[key]
|
||||
else:
|
||||
for key in BACKUP_PROVIDER_ENVS.values():
|
||||
del os.environ[key]
|
||||
|
||||
|
||||
def test_json_reset(generic_userdata):
|
||||
Backups.reset(reset_json=False)
|
||||
provider = Backups.provider()
|
||||
|
@ -158,6 +229,19 @@ def test_file_backend_init(file_backup):
|
|||
file_backup.backupper.init()
|
||||
|
||||
|
||||
def test_reinit_after_purge(backups):
|
||||
assert Backups.is_initted() is True
|
||||
|
||||
Backups.erase_repo()
|
||||
assert Backups.is_initted() is False
|
||||
with pytest.raises(ValueError):
|
||||
Backups.get_all_snapshots()
|
||||
|
||||
Backups.init_repo()
|
||||
assert Backups.is_initted() is True
|
||||
assert len(Backups.get_all_snapshots()) == 0
|
||||
|
||||
|
||||
def test_backup_simple_file(raw_dummy_service, file_backup):
|
||||
# temporarily incomplete
|
||||
service = raw_dummy_service
|
||||
|
@ -258,9 +342,12 @@ def test_sizing(backups, dummy_service):
|
|||
assert size > 0
|
||||
|
||||
|
||||
def test_init_tracking(backups, raw_dummy_service):
|
||||
def test_init_tracking(backups, tmpdir):
|
||||
assert Backups.is_initted() is True
|
||||
Backups.reset()
|
||||
assert Backups.is_initted() is False
|
||||
|
||||
separate_dir = tmpdir / "out_of_the_way"
|
||||
prepare_localfile_backups(separate_dir)
|
||||
Backups.init_repo()
|
||||
|
||||
assert Backups.is_initted() is True
|
||||
|
@ -382,10 +469,19 @@ def restore_strategy(request) -> RestoreStrategy:
|
|||
return RestoreStrategy.INPLACE
|
||||
|
||||
|
||||
@pytest.fixture(params=["failed", "healthy"])
|
||||
def failed(request) -> bool:
|
||||
if request.param == "failed":
|
||||
return True
|
||||
return False
|
||||
|
||||
|
||||
def test_restore_snapshot_task(
|
||||
backups, dummy_service, restore_strategy, simulated_service_stopping_delay
|
||||
backups, dummy_service, restore_strategy, simulated_service_stopping_delay, failed
|
||||
):
|
||||
dummy_service.set_delay(simulated_service_stopping_delay)
|
||||
if failed:
|
||||
dummy_service.set_status(ServiceStatus.FAILED)
|
||||
|
||||
Backups.back_up(dummy_service)
|
||||
snaps = Backups.get_snapshots(dummy_service)
|
||||
|
@ -552,8 +648,38 @@ def test_snapshots_caching(backups, dummy_service):
|
|||
assert len(cached_snapshots) == 1
|
||||
|
||||
|
||||
def lowlevel_forget(snapshot_id):
|
||||
Backups.provider().backupper.forget_snapshot(snapshot_id)
|
||||
|
||||
|
||||
# Storage
|
||||
def test_snapshots_cache_invalidation(backups, dummy_service):
|
||||
Backups.back_up(dummy_service)
|
||||
cached_snapshots = Storage.get_cached_snapshots()
|
||||
assert len(cached_snapshots) == 1
|
||||
|
||||
Storage.invalidate_snapshot_storage()
|
||||
cached_snapshots = Storage.get_cached_snapshots()
|
||||
assert len(cached_snapshots) == 0
|
||||
|
||||
Backups.force_snapshot_cache_reload()
|
||||
cached_snapshots = Storage.get_cached_snapshots()
|
||||
assert len(cached_snapshots) == 1
|
||||
snap = cached_snapshots[0]
|
||||
|
||||
lowlevel_forget(snap.id)
|
||||
cached_snapshots = Storage.get_cached_snapshots()
|
||||
assert len(cached_snapshots) == 1
|
||||
|
||||
Backups.force_snapshot_cache_reload()
|
||||
cached_snapshots = Storage.get_cached_snapshots()
|
||||
assert len(cached_snapshots) == 0
|
||||
|
||||
|
||||
# Storage
|
||||
def test_init_tracking_caching(backups, raw_dummy_service):
|
||||
assert Storage.has_init_mark() is True
|
||||
Backups.reset()
|
||||
assert Storage.has_init_mark() is False
|
||||
|
||||
Storage.mark_as_init()
|
||||
|
@ -563,7 +689,12 @@ def test_init_tracking_caching(backups, raw_dummy_service):
|
|||
|
||||
|
||||
# Storage
|
||||
def test_init_tracking_caching2(backups, raw_dummy_service):
|
||||
def test_init_tracking_caching2(backups, tmpdir):
|
||||
assert Storage.has_init_mark() is True
|
||||
Backups.reset()
|
||||
assert Storage.has_init_mark() is False
|
||||
separate_dir = tmpdir / "out_of_the_way"
|
||||
prepare_localfile_backups(separate_dir)
|
||||
assert Storage.has_init_mark() is False
|
||||
|
||||
Backups.init_repo()
|
||||
|
@ -610,25 +741,6 @@ def test_sync_nonexistent_src(dummy_service):
|
|||
sync(src, dst)
|
||||
|
||||
|
||||
# Restic lowlevel
|
||||
def test_mount_umount(backups, dummy_service, tmpdir):
|
||||
Backups.back_up(dummy_service)
|
||||
backupper = Backups.provider().backupper
|
||||
assert isinstance(backupper, ResticBackupper)
|
||||
|
||||
mountpoint = tmpdir / "mount"
|
||||
makedirs(mountpoint)
|
||||
assert path.exists(mountpoint)
|
||||
assert len(listdir(mountpoint)) == 0
|
||||
|
||||
handle = backupper.mount_repo(mountpoint)
|
||||
assert len(listdir(mountpoint)) != 0
|
||||
|
||||
backupper.unmount_repo(mountpoint)
|
||||
# handle.terminate()
|
||||
assert len(listdir(mountpoint)) == 0
|
||||
|
||||
|
||||
def test_move_blocks_backups(backups, dummy_service, restore_strategy):
|
||||
snap = Backups.back_up(dummy_service)
|
||||
job = Jobs.add(
|
||||
|
@ -643,3 +755,71 @@ def test_move_blocks_backups(backups, dummy_service, restore_strategy):
|
|||
|
||||
with pytest.raises(ValueError):
|
||||
Backups.restore_snapshot(snap, restore_strategy)
|
||||
|
||||
|
||||
def test_double_lock_unlock(backups, dummy_service):
|
||||
# notice that introducing stale locks is only safe for other tests if we erase repo in between
|
||||
# which we do at the time of writing this test
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
with pytest.raises(ValueError):
|
||||
Backups.provider().backupper.lock()
|
||||
|
||||
Backups.provider().backupper.unlock()
|
||||
Backups.provider().backupper.lock()
|
||||
|
||||
Backups.provider().backupper.unlock()
|
||||
Backups.provider().backupper.unlock()
|
||||
|
||||
|
||||
def test_operations_while_locked(backups, dummy_service):
|
||||
# Stale lock prevention test
|
||||
|
||||
# consider making it fully at the level of backupper?
|
||||
# because this is where prevention lives?
|
||||
# Backups singleton is here only so that we can run this against B2, S3 and whatever
|
||||
# But maybe it is not necessary (if restic treats them uniformly enough)
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
snap = Backups.back_up(dummy_service)
|
||||
assert snap is not None
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
# using lowlevel to make sure no caching interferes
|
||||
assert Backups.provider().backupper.is_initted() is True
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
assert Backups.snapshot_restored_size(snap.id) > 0
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
Backups.restore_snapshot(snap)
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
Backups.forget_snapshot(snap)
|
||||
|
||||
Backups.provider().backupper.lock()
|
||||
assert Backups.provider().backupper.get_snapshots() == []
|
||||
|
||||
# check that no locks were left
|
||||
Backups.provider().backupper.lock()
|
||||
Backups.provider().backupper.unlock()
|
||||
|
||||
|
||||
# a paranoid check to weed out problems with tempdirs that are not dependent on us
|
||||
def test_tempfile():
|
||||
with tempfile.TemporaryDirectory() as temp:
|
||||
assert path.exists(temp)
|
||||
assert not path.exists(temp)
|
||||
|
||||
|
||||
# Storage
|
||||
def test_cache_invalidaton_task(backups, dummy_service):
|
||||
Backups.back_up(dummy_service)
|
||||
assert len(Storage.get_cached_snapshots()) == 1
|
||||
|
||||
# Does not trigger resync
|
||||
Storage.invalidate_snapshot_storage()
|
||||
assert Storage.get_cached_snapshots() == []
|
||||
|
||||
reload_snapshot_cache()
|
||||
assert len(Storage.get_cached_snapshots()) == 1
|
||||
|
|
Loading…
Reference in a new issue