selfprivacy-rest-api/selfprivacy_api/backup/backuppers/restic_backupper.py

363 lines
12 KiB
Python
Raw Normal View History

import subprocess
2023-02-08 16:28:05 +00:00
import json
import datetime
import tempfile
2023-02-13 11:16:35 +00:00
from typing import List
from collections.abc import Iterable
from json.decoder import JSONDecodeError
from os.path import exists, join
2023-07-03 15:28:12 +00:00
from os import listdir
from time import sleep
2023-02-13 11:16:35 +00:00
from selfprivacy_api.backup.util import output_yielder, sync
from selfprivacy_api.backup.backuppers import AbstractBackupper
2023-02-13 11:16:35 +00:00
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup.jobs import get_backup_job
from selfprivacy_api.services import get_service_by_id
from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.backup.local_secret import LocalBackupSecret
class ResticBackupper(AbstractBackupper):
def __init__(self, login_flag: str, key_flag: str, type: str):
self.login_flag = login_flag
self.key_flag = key_flag
self.type = type
self.account = ""
self.key = ""
self.repo = ""
def set_creds(self, account: str, key: str, repo: str):
self.account = account
self.key = key
self.repo = repo
def restic_repo(self) -> str:
# https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html#other-services-via-rclone
# https://forum.rclone.org/t/can-rclone-be-run-solely-with-command-line-options-no-config-no-env-vars/6314/5
return f"rclone:{self.type}{self.repo}"
def rclone_args(self):
return "rclone.args=serve restic --stdio " + self.backend_rclone_args()
2023-02-08 14:05:25 +00:00
def backend_rclone_args(self) -> str:
2023-02-03 18:03:13 +00:00
acc_arg = ""
key_arg = ""
2023-02-08 14:05:25 +00:00
if self.account != "":
acc_arg = f"{self.login_flag} {self.account}"
if self.key != "":
key_arg = f"{self.key_flag} {self.key}"
2023-02-03 18:03:13 +00:00
return f"{acc_arg} {key_arg}"
def _password_command(self):
return f"echo {LocalBackupSecret.get()}"
2023-07-03 15:28:12 +00:00
def restic_command(self, *args, tag: str = "") -> List[str]:
2023-02-08 14:05:25 +00:00
command = [
"restic",
"-o",
self.rclone_args(),
"-r",
self.restic_repo(),
"--password-command",
self._password_command(),
2023-02-08 14:05:25 +00:00
]
2023-06-23 09:40:10 +00:00
if tag != "":
command.extend(
[
"--tag",
2023-06-23 09:40:10 +00:00
tag,
]
)
2023-02-08 14:05:25 +00:00
if args != []:
command.extend(ResticBackupper.__flatten_list(args))
2023-02-08 14:05:25 +00:00
return command
2023-07-03 15:28:12 +00:00
def mount_repo(self, dir):
mount_command = self.restic_command("mount", dir)
mount_command.insert(0, "nohup")
handle = subprocess.Popen(mount_command, stdout=subprocess.DEVNULL, shell=False)
sleep(2)
if not "ids" in listdir(dir):
raise IOError("failed to mount dir ", dir)
return handle
def unmount_repo(self, dir):
mount_command = ["umount", "-l", dir]
with subprocess.Popen(
mount_command, stdout=subprocess.PIPE, shell=False
) as handle:
output = handle.communicate()[0].decode("utf-8")
# TODO: check for exit code?
if "error" in output.lower():
return IOError("failed to unmount dir ", dir, ": ", output)
if not listdir(dir) == []:
return IOError("failed to unmount dir ", dir)
@staticmethod
def __flatten_list(list):
"""string-aware list flattener"""
result = []
for item in list:
if isinstance(item, Iterable) and not isinstance(item, str):
result.extend(ResticBackupper.__flatten_list(item))
continue
result.append(item)
return result
2023-06-23 09:40:10 +00:00
def start_backup(self, folders: List[str], tag: str):
"""
Start backup with restic
"""
# but maybe it is ok to accept a union of a string and an array of strings
assert not isinstance(folders, str)
backup_command = self.restic_command(
"backup",
"--json",
folders,
2023-06-23 09:40:10 +00:00
tag=tag,
)
messages = []
2023-06-23 09:40:10 +00:00
job = get_backup_job(get_service_by_id(tag))
try:
for raw_message in output_yielder(backup_command):
message = self.parse_message(raw_message, job)
messages.append(message)
return ResticBackupper._snapshot_from_backup_messages(messages, tag)
except ValueError as e:
raise ValueError("could not create a snapshot: ", messages) from e
@staticmethod
def _snapshot_from_backup_messages(messages, repo_name) -> Snapshot:
for message in messages:
if message["message_type"] == "summary":
return ResticBackupper._snapshot_from_fresh_summary(message, repo_name)
raise ValueError("no summary message in restic json output")
def parse_message(self, raw_message_line: str, job=None) -> dict:
message = ResticBackupper.parse_json_output(raw_message_line)
if not isinstance(message, dict):
raise ValueError("we have too many messages on one line?")
if message["message_type"] == "status":
if job is not None: # only update status if we run under some job
Jobs.update(
job,
JobStatus.RUNNING,
progress=int(message["percent_done"] * 100),
)
return message
@staticmethod
def _snapshot_from_fresh_summary(message: dict, repo_name) -> Snapshot:
return Snapshot(
id=message["snapshot_id"],
created_at=datetime.datetime.now(datetime.timezone.utc),
service_name=repo_name,
)
def init(self):
2023-02-17 15:59:27 +00:00
init_command = self.restic_command(
"init",
)
with subprocess.Popen(
2023-02-17 15:59:27 +00:00
init_command,
shell=False,
stdout=subprocess.PIPE,
2023-02-17 15:59:27 +00:00
stderr=subprocess.STDOUT,
) as process_handle:
output = process_handle.communicate()[0].decode("utf-8")
if not "created restic repository" in output:
raise ValueError("cannot init a repo: " + output)
2023-02-17 15:59:27 +00:00
def is_initted(self) -> bool:
2023-03-14 00:39:15 +00:00
command = self.restic_command(
"check",
"--json",
)
with subprocess.Popen(command, stdout=subprocess.PIPE, shell=False) as handle:
output = handle.communicate()[0].decode("utf-8")
if not ResticBackupper.has_json(output):
2023-03-14 00:39:15 +00:00
return False
# raise NotImplementedError("error(big): " + output)
return True
2023-06-23 09:40:10 +00:00
def restored_size(self, snapshot_id: str) -> int:
2023-02-22 18:48:08 +00:00
"""
Size of a snapshot
"""
command = self.restic_command(
"stats",
snapshot_id,
"--json",
)
2023-06-23 09:40:10 +00:00
with subprocess.Popen(
command,
stdout=subprocess.PIPE,
shell=False,
) as handle:
2023-02-22 18:48:08 +00:00
output = handle.communicate()[0].decode("utf-8")
try:
parsed_output = ResticBackupper.parse_json_output(output)
2023-02-22 18:48:08 +00:00
return parsed_output["total_size"]
except ValueError as e:
raise ValueError("cannot restore a snapshot: " + output) from e
def restore_from_backup(self, snapshot_id, folders: List[str], verify=True):
"""
Restore from backup with restic
"""
if folders is None or folders == []:
raise ValueError("cannot restore without knowing where to!")
with tempfile.TemporaryDirectory() as dir:
self.do_restore(snapshot_id, target=dir, verify=verify)
for folder in folders:
src = join(dir, folder.strip("/"))
if not exists(src):
raise ValueError(
f"there is no such path: {src}. We tried to find {folder}"
)
dst = folder
sync(src, dst)
def do_restore(self, snapshot_id, target="/", verify=False):
"""barebones restic restore"""
restore_command = self.restic_command(
2023-02-22 15:58:36 +00:00
"restore",
snapshot_id,
"--target",
target,
)
if verify:
restore_command.append("--verify")
with subprocess.Popen(
restore_command, stdout=subprocess.PIPE, shell=False
) as handle:
# for some reason restore does not support nice reporting of progress via json
output = handle.communicate()[0].decode("utf-8")
2023-02-22 15:58:36 +00:00
if "restoring" not in output:
raise ValueError("cannot restore a snapshot: " + output)
2023-02-08 16:28:05 +00:00
assert (
handle.returncode is not None
) # none should be impossible after communicate
if handle.returncode != 0:
raise ValueError(
"restore exited with errorcode", returncode, ":", output
)
2023-07-05 13:13:30 +00:00
def forget_snapshot(self, snapshot_id):
"""either removes snapshot or marks it for deletion later depending on server settings"""
forget_command = self.restic_command(
"forget",
snapshot_id,
)
with subprocess.Popen(
forget_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False
) as handle:
# for some reason restore does not support nice reporting of progress via json
output, err = [string.decode("utf-8") for string in handle.communicate()]
if "no matching ID found" in err:
raise ValueError(
"trying to delete, but no such snapshot: ", snapshot_id
)
assert (
handle.returncode is not None
) # none should be impossible after communicate
if handle.returncode != 0:
raise ValueError(
"forget exited with errorcode", returncode, ":", output
)
2023-05-31 13:16:08 +00:00
def _load_snapshots(self) -> object:
2023-02-08 16:28:05 +00:00
"""
Load list of snapshots from repository
raises Value Error if repo does not exist
2023-02-08 16:28:05 +00:00
"""
listing_command = self.restic_command(
"snapshots",
"--json",
)
with subprocess.Popen(
listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_listing_process_descriptor:
output = backup_listing_process_descriptor.communicate()[0].decode("utf-8")
if "Is there a repository at the following location?" in output:
raise ValueError("No repository! : " + output)
2023-02-08 16:28:05 +00:00
try:
return ResticBackupper.parse_json_output(output)
except ValueError as e:
raise ValueError("Cannot load snapshots: ") from e
2023-02-08 16:28:05 +00:00
2023-05-31 13:16:08 +00:00
def get_snapshots(self) -> List[Snapshot]:
2023-02-13 11:16:35 +00:00
"""Get all snapshots from the repo"""
2023-02-08 16:28:05 +00:00
snapshots = []
2023-05-31 13:16:08 +00:00
for restic_snapshot in self._load_snapshots():
snapshot = Snapshot(
id=restic_snapshot["short_id"],
created_at=restic_snapshot["time"],
service_name=restic_snapshot["tags"][0],
)
2023-02-08 16:28:05 +00:00
snapshots.append(snapshot)
return snapshots
2023-06-23 09:40:10 +00:00
@staticmethod
def parse_json_output(output: str) -> object:
starting_index = ResticBackupper.json_start(output)
2023-03-14 00:39:15 +00:00
if starting_index == -1:
raise ValueError("There is no json in the restic output : " + output)
truncated_output = output[starting_index:]
json_messages = truncated_output.splitlines()
if len(json_messages) == 1:
try:
return json.loads(truncated_output)
except JSONDecodeError as e:
raise ValueError(
"There is no json in the restic output : " + output
) from e
2023-03-14 00:39:15 +00:00
result_array = []
for message in json_messages:
result_array.append(json.loads(message))
return result_array
2023-03-14 00:39:15 +00:00
2023-06-23 09:40:10 +00:00
@staticmethod
def json_start(output: str) -> int:
2023-02-22 18:48:08 +00:00
indices = [
output.find("["),
output.find("{"),
]
indices = [x for x in indices if x != -1]
if indices == []:
2023-03-14 00:39:15 +00:00
return -1
return min(indices)
2023-02-22 18:48:08 +00:00
2023-06-23 09:40:10 +00:00
@staticmethod
def has_json(output: str) -> bool:
if ResticBackupper.json_start(output) == -1:
2023-03-14 00:39:15 +00:00
return False
return True