selfprivacy-rest-api/selfprivacy_api/backup/restic_backuper.py

82 lines
2.5 KiB
Python
Raw Normal View History

import subprocess
from selfprivacy_api.backup.backuper import AbstractBackuper
class ResticBackuper(AbstractBackuper):
def __init__(self, login_flag: str, key_flag: str, type: str):
self.login_flag = login_flag
self.key_flag = key_flag
self.type = type
self.account = ""
self.key = ""
def set_creds(self, account: str, key: str):
self.account = account
self.key = key
def restic_repo(self, repository_name: str) -> str:
# https://restic.readthedocs.io/en/latest/030_preparing_a_new_repo.html#other-services-via-rclone
# https://forum.rclone.org/t/can-rclone-be-run-solely-with-command-line-options-no-config-no-env-vars/6314/5
2023-02-08 14:05:25 +00:00
return f"rclone::{self.type}:{repository_name}/sfbackup"
def rclone_args(self):
return "rclone.args=serve restic --stdio" + self.backend_rclone_args()
2023-02-08 14:05:25 +00:00
def backend_rclone_args(self) -> str:
2023-02-03 18:03:13 +00:00
acc_arg = ""
key_arg = ""
2023-02-08 14:05:25 +00:00
if self.account != "":
acc_arg = f"{self.login_flag} {self.account}"
if self.key != "":
key_arg = f"{self.key_flag} {self.key}"
2023-02-03 18:03:13 +00:00
return f"{acc_arg} {key_arg}"
2023-02-08 14:05:25 +00:00
def restic_command(self, repo_name: str, *args):
command = [
"restic",
"-o",
self.rclone_args(),
"-r",
2023-02-08 14:05:25 +00:00
self.restic_repo(repo_name),
]
if args != []:
command.extend(args)
return command
2023-02-08 14:05:25 +00:00
def start_backup(self, folder: str, repo_name: str):
"""
Start backup with restic
"""
backup_command = self.restic_command(
2023-02-08 14:05:25 +00:00
repo_name,
"backup",
folder,
)
2023-02-08 14:05:25 +00:00
subprocess.Popen(
backup_command,
shell=False,
stderr=subprocess.STDOUT,
)
# TODO: we might want to provide logging facilities
# that are reroutable for testing
# with open("/var/backup.log", "w", encoding="utf-8") as log_file:
# subprocess.Popen(
# backup_command,
# shell=False,
# stdout=log_file,
# stderr=subprocess.STDOUT,
# )
def restore_from_backup(self, repo_name, snapshot_id, folder):
"""
Restore from backup with restic
"""
restore_command = self.restic_command(
repo_name, "restore", snapshot_id, "--target", folder
)
subprocess.run(restore_command, shell=False)