selfprivacy-rest-api/selfprivacy_api/backup/backuppers/restic_backupper.py

534 lines
17 KiB
Python
Raw Normal View History

from __future__ import annotations
import subprocess
2023-02-08 16:28:05 +00:00
import json
import datetime
import tempfile
from typing import List, TypeVar, Callable
from collections.abc import Iterable
from json.decoder import JSONDecodeError
from os.path import exists, join
2023-07-03 15:28:12 +00:00
from os import listdir
from time import sleep
2023-02-13 11:16:35 +00:00
from selfprivacy_api.graphql.common_types.backup import BackupReason
from selfprivacy_api.backup.util import output_yielder, sync
from selfprivacy_api.backup.backuppers import AbstractBackupper
2023-02-13 11:16:35 +00:00
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup.jobs import get_backup_job
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.backup.local_secret import LocalBackupSecret
SHORT_ID_LEN = 8
T = TypeVar("T", bound=Callable)
def unlocked_repo(func: T) -> T:
"""unlock repo and retry if it appears to be locked"""
def inner(self: ResticBackupper, *args, **kwargs):
try:
return func(self, *args, **kwargs)
except Exception as e:
if "unable to create lock" in str(e):
self.unlock()
return func(self, *args, **kwargs)
else:
raise e
# Above, we manually guarantee that the type returned is compatible.
return inner # type: ignore
class ResticBackupper(AbstractBackupper):
2023-07-20 15:24:26 +00:00
def __init__(self, login_flag: str, key_flag: str, storage_type: str) -> None:
self.login_flag = login_flag
self.key_flag = key_flag
2023-07-20 15:24:26 +00:00
self.storage_type = storage_type
self.account = ""
self.key = ""
self.repo = ""
2023-07-20 15:24:26 +00:00
super().__init__()
2023-07-19 12:59:51 +00:00
def set_creds(self, account: str, key: str, repo: str) -> None:
self.account = account
self.key = key
self.repo = repo
def restic_repo(self) -> str:
# https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html#other-services-via-rclone
# https://forum.rclone.org/t/can-rclone-be-run-solely-with-command-line-options-no-config-no-env-vars/6314/5
return f"rclone:{self.rclone_repo()}"
def rclone_repo(self) -> str:
return f"{self.storage_type}{self.repo}"
def rclone_args(self):
return "rclone.args=serve restic --stdio " + " ".join(
self.backend_rclone_args()
)
def backend_rclone_args(self) -> list[str]:
args = []
2023-02-08 14:05:25 +00:00
if self.account != "":
acc_args = [self.login_flag, self.account]
args.extend(acc_args)
2023-02-08 14:05:25 +00:00
if self.key != "":
key_args = [self.key_flag, self.key]
args.extend(key_args)
return args
def _password_command(self):
return f"echo {LocalBackupSecret.get()}"
def restic_command(self, *args, tags: List[str] = []) -> List[str]:
2023-02-08 14:05:25 +00:00
command = [
"restic",
"-o",
self.rclone_args(),
"-r",
self.restic_repo(),
"--password-command",
self._password_command(),
2023-02-08 14:05:25 +00:00
]
if tags != []:
for tag in tags:
command.extend(
[
"--tag",
tag,
]
)
2023-07-20 15:24:26 +00:00
if args:
command.extend(ResticBackupper.__flatten_list(args))
2023-02-08 14:05:25 +00:00
return command
def erase_repo(self) -> None:
"""Fully erases repo on remote, can be reinitted again"""
command = [
"rclone",
"purge",
self.rclone_repo(),
]
backend_args = self.backend_rclone_args()
if backend_args:
command.extend(backend_args)
with subprocess.Popen(command, stdout=subprocess.PIPE, shell=False) as handle:
output = handle.communicate()[0].decode("utf-8")
if handle.returncode != 0:
raise ValueError(
"purge exited with errorcode",
handle.returncode,
":",
output,
)
2023-07-20 15:24:26 +00:00
def mount_repo(self, mount_directory):
mount_command = self.restic_command("mount", mount_directory)
2023-07-03 15:28:12 +00:00
mount_command.insert(0, "nohup")
2023-07-19 12:59:51 +00:00
handle = subprocess.Popen(
mount_command,
stdout=subprocess.DEVNULL,
shell=False,
)
2023-07-03 15:28:12 +00:00
sleep(2)
2023-07-20 15:24:26 +00:00
if "ids" not in listdir(mount_directory):
raise IOError("failed to mount dir ", mount_directory)
2023-07-03 15:28:12 +00:00
return handle
2023-07-20 15:24:26 +00:00
def unmount_repo(self, mount_directory):
mount_command = ["umount", "-l", mount_directory]
2023-07-03 15:28:12 +00:00
with subprocess.Popen(
mount_command, stdout=subprocess.PIPE, shell=False
) as handle:
output = handle.communicate()[0].decode("utf-8")
# TODO: check for exit code?
if "error" in output.lower():
2023-07-20 15:24:26 +00:00
return IOError("failed to unmount dir ", mount_directory, ": ", output)
2023-07-03 15:28:12 +00:00
2023-07-20 15:24:26 +00:00
if not listdir(mount_directory) == []:
return IOError("failed to unmount dir ", mount_directory)
2023-07-03 15:28:12 +00:00
@staticmethod
2023-07-20 15:24:26 +00:00
def __flatten_list(list_to_flatten):
"""string-aware list flattener"""
result = []
2023-07-20 15:24:26 +00:00
for item in list_to_flatten:
if isinstance(item, Iterable) and not isinstance(item, str):
result.extend(ResticBackupper.__flatten_list(item))
continue
result.append(item)
return result
@unlocked_repo
def start_backup(
self,
folders: List[str],
service_name: str,
reason: BackupReason = BackupReason.EXPLICIT,
) -> Snapshot:
"""
Start backup with restic
"""
2023-07-19 12:59:51 +00:00
# but maybe it is ok to accept a union
# of a string and an array of strings
assert not isinstance(folders, str)
tags = [service_name, reason.value]
backup_command = self.restic_command(
"backup",
"--json",
folders,
tags=tags,
)
service = get_service_by_id(service_name)
2023-07-19 12:59:51 +00:00
if service is None:
raise ValueError("No service with id ", service_name)
2023-07-19 12:59:51 +00:00
job = get_backup_job(service)
messages = []
output = []
try:
for raw_message in output_yielder(backup_command):
output.append(raw_message)
message = self.parse_message(raw_message, job)
messages.append(message)
id = ResticBackupper._snapshot_id_from_backup_messages(messages)
return Snapshot(
created_at=datetime.datetime.now(datetime.timezone.utc),
id=id,
service_name=service_name,
reason=reason,
2023-07-19 12:59:51 +00:00
)
2023-07-20 15:24:26 +00:00
except ValueError as error:
raise ValueError(
"Could not create a snapshot: ",
str(error),
output,
"parsed messages:",
messages,
) from error
@staticmethod
def _snapshot_id_from_backup_messages(messages) -> Snapshot:
for message in messages:
if message["message_type"] == "summary":
# There is a discrepancy between versions of restic/rclone
# Some report short_id in this field and some full
return message["snapshot_id"][0:SHORT_ID_LEN]
raise ValueError("no summary message in restic json output")
def parse_message(self, raw_message_line: str, job=None) -> dict:
message = ResticBackupper.parse_json_output(raw_message_line)
if not isinstance(message, dict):
raise ValueError("we have too many messages on one line?")
if message["message_type"] == "status":
if job is not None: # only update status if we run under some job
Jobs.update(
job,
JobStatus.RUNNING,
progress=int(message["percent_done"] * 100),
)
return message
2023-07-19 12:59:51 +00:00
def init(self) -> None:
2023-02-17 15:59:27 +00:00
init_command = self.restic_command(
"init",
)
with subprocess.Popen(
2023-02-17 15:59:27 +00:00
init_command,
shell=False,
stdout=subprocess.PIPE,
2023-02-17 15:59:27 +00:00
stderr=subprocess.STDOUT,
) as process_handle:
output = process_handle.communicate()[0].decode("utf-8")
2023-07-19 12:59:51 +00:00
if "created restic repository" not in output:
raise ValueError("cannot init a repo: " + output)
2023-02-17 15:59:27 +00:00
@unlocked_repo
def is_initted(self) -> bool:
2023-03-14 00:39:15 +00:00
command = self.restic_command(
"check",
2023-03-14 00:39:15 +00:00
)
2023-07-19 12:59:51 +00:00
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
shell=False,
stderr=subprocess.STDOUT,
2023-07-19 12:59:51 +00:00
) as handle:
output = handle.communicate()[0].decode("utf-8")
if handle.returncode != 0:
if "unable to create lock" in output:
raise ValueError("Stale lock detected: ", output)
2023-03-14 00:39:15 +00:00
return False
return True
def unlock(self) -> None:
"""Remove stale locks."""
command = self.restic_command(
"unlock",
)
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
shell=False,
stderr=subprocess.STDOUT,
) as handle:
# communication forces to complete and for returncode to get defined
output = handle.communicate()[0].decode("utf-8")
if handle.returncode != 0:
raise ValueError("cannot unlock the backup repository: ", output)
def lock(self) -> None:
"""
Introduce a stale lock.
Mainly for testing purposes.
Double lock is supposed to fail
"""
command = self.restic_command(
"check",
)
# using temporary cache in /run/user/1000/restic-check-cache-817079729
# repository 9639c714 opened (repository version 2) successfully, password is correct
# created new cache in /run/user/1000/restic-check-cache-817079729
# create exclusive lock for repository
# load indexes
# check all packs
# check snapshots, trees and blobs
# [0:00] 100.00% 1 / 1 snapshots
# no errors were found
try:
for line in output_yielder(command):
if "indexes" in line:
break
if "unable" in line:
raise ValueError(line)
except Exception as e:
raise ValueError("could not lock repository") from e
@unlocked_repo
2023-06-23 09:40:10 +00:00
def restored_size(self, snapshot_id: str) -> int:
2023-02-22 18:48:08 +00:00
"""
Size of a snapshot
"""
command = self.restic_command(
"stats",
snapshot_id,
"--json",
)
2023-06-23 09:40:10 +00:00
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
2023-06-23 09:40:10 +00:00
shell=False,
) as handle:
2023-02-22 18:48:08 +00:00
output = handle.communicate()[0].decode("utf-8")
try:
parsed_output = ResticBackupper.parse_json_output(output)
2023-02-22 18:48:08 +00:00
return parsed_output["total_size"]
2023-07-20 15:24:26 +00:00
except ValueError as error:
raise ValueError("cannot restore a snapshot: " + output) from error
2023-02-22 18:48:08 +00:00
@unlocked_repo
def restore_from_backup(
self,
snapshot_id,
folders: List[str],
verify=True,
2023-07-19 12:59:51 +00:00
) -> None:
"""
Restore from backup with restic
"""
if folders is None or folders == []:
raise ValueError("cannot restore without knowing where to!")
2023-07-20 15:24:26 +00:00
with tempfile.TemporaryDirectory() as temp_dir:
if verify:
2023-07-20 15:24:26 +00:00
self._raw_verified_restore(snapshot_id, target=temp_dir)
snapshot_root = temp_dir
else: # attempting inplace restore via mount + sync
2023-07-20 15:24:26 +00:00
self.mount_repo(temp_dir)
snapshot_root = join(temp_dir, "ids", snapshot_id)
assert snapshot_root is not None
for folder in folders:
src = join(snapshot_root, folder.strip("/"))
if not exists(src):
2023-07-19 12:59:51 +00:00
raise ValueError(f"No such path: {src}. We tried to find {folder}")
dst = folder
sync(src, dst)
if not verify:
2023-07-20 15:24:26 +00:00
self.unmount_repo(temp_dir)
def _raw_verified_restore(self, snapshot_id, target="/"):
"""barebones restic restore"""
restore_command = self.restic_command(
"restore", snapshot_id, "--target", target, "--verify"
)
with subprocess.Popen(
restore_command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=False,
) as handle:
2023-07-19 12:59:51 +00:00
# for some reason restore does not support
# nice reporting of progress via json
output = handle.communicate()[0].decode("utf-8")
2023-02-22 15:58:36 +00:00
if "restoring" not in output:
raise ValueError("cannot restore a snapshot: " + output)
2023-02-08 16:28:05 +00:00
assert (
handle.returncode is not None
) # none should be impossible after communicate
if handle.returncode != 0:
raise ValueError(
2023-07-19 12:59:51 +00:00
"restore exited with errorcode",
handle.returncode,
":",
output,
)
@unlocked_repo
2023-07-19 12:59:51 +00:00
def forget_snapshot(self, snapshot_id) -> None:
"""
Either removes snapshot or marks it for deletion later,
depending on server settings
"""
2023-07-05 13:13:30 +00:00
forget_command = self.restic_command(
"forget",
snapshot_id,
)
with subprocess.Popen(
2023-07-19 12:59:51 +00:00
forget_command,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
shell=False,
2023-07-05 13:13:30 +00:00
) as handle:
2023-07-19 12:59:51 +00:00
# for some reason restore does not support
# nice reporting of progress via json
output, err = [
string.decode(
"utf-8",
)
for string in handle.communicate()
]
2023-07-05 13:13:30 +00:00
if "no matching ID found" in err:
raise ValueError(
"trying to delete, but no such snapshot: ", snapshot_id
)
assert (
handle.returncode is not None
) # none should be impossible after communicate
if handle.returncode != 0:
raise ValueError(
"forget exited with errorcode", handle.returncode, ":", output, err
2023-07-05 13:13:30 +00:00
)
2023-05-31 13:16:08 +00:00
def _load_snapshots(self) -> object:
2023-02-08 16:28:05 +00:00
"""
Load list of snapshots from repository
raises Value Error if repo does not exist
2023-02-08 16:28:05 +00:00
"""
listing_command = self.restic_command(
"snapshots",
"--json",
)
with subprocess.Popen(
listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_listing_process_descriptor:
output = backup_listing_process_descriptor.communicate()[0].decode("utf-8")
if "Is there a repository at the following location?" in output:
raise ValueError("No repository! : " + output)
2023-02-08 16:28:05 +00:00
try:
return ResticBackupper.parse_json_output(output)
2023-07-20 15:24:26 +00:00
except ValueError as error:
raise ValueError("Cannot load snapshots: ", output) from error
2023-02-08 16:28:05 +00:00
@unlocked_repo
2023-05-31 13:16:08 +00:00
def get_snapshots(self) -> List[Snapshot]:
2023-02-13 11:16:35 +00:00
"""Get all snapshots from the repo"""
2023-02-08 16:28:05 +00:00
snapshots = []
2023-05-31 13:16:08 +00:00
for restic_snapshot in self._load_snapshots():
# Compatibility with previous snaps:
if len(restic_snapshot["tags"]) == 1:
reason = BackupReason.EXPLICIT
else:
reason = restic_snapshot["tags"][1]
snapshot = Snapshot(
id=restic_snapshot["short_id"],
created_at=restic_snapshot["time"],
service_name=restic_snapshot["tags"][0],
reason=reason,
)
2023-02-08 16:28:05 +00:00
snapshots.append(snapshot)
return snapshots
2023-06-23 09:40:10 +00:00
@staticmethod
def parse_json_output(output: str) -> object:
starting_index = ResticBackupper.json_start(output)
2023-03-14 00:39:15 +00:00
if starting_index == -1:
2023-07-19 12:59:51 +00:00
raise ValueError("There is no json in the restic output: " + output)
truncated_output = output[starting_index:]
json_messages = truncated_output.splitlines()
if len(json_messages) == 1:
try:
return json.loads(truncated_output)
2023-07-20 15:24:26 +00:00
except JSONDecodeError as error:
raise ValueError(
"There is no json in the restic output : " + output
2023-07-20 15:24:26 +00:00
) from error
2023-03-14 00:39:15 +00:00
result_array = []
for message in json_messages:
result_array.append(json.loads(message))
return result_array
2023-03-14 00:39:15 +00:00
2023-06-23 09:40:10 +00:00
@staticmethod
def json_start(output: str) -> int:
2023-02-22 18:48:08 +00:00
indices = [
output.find("["),
output.find("{"),
]
indices = [x for x in indices if x != -1]
if indices == []:
2023-03-14 00:39:15 +00:00
return -1
return min(indices)
2023-02-22 18:48:08 +00:00
2023-06-23 09:40:10 +00:00
@staticmethod
def has_json(output: str) -> bool:
if ResticBackupper.json_start(output) == -1:
2023-03-14 00:39:15 +00:00
return False
return True