Restic controller

This commit is contained in:
Inex Code 2021-12-06 09:48:29 +03:00
parent dc4c9a89e1
commit f68bd88a31
6 changed files with 403 additions and 94 deletions

View file

@ -7,6 +7,8 @@ portalocker
flask-swagger
flask-swagger-ui
pytz
huey
gevent
pytest
coverage

View file

@ -1,6 +1,9 @@
#!/usr/bin/env python3
"""SelfPrivacy server management API"""
import os
from gevent import monkey
from flask import Flask, request, jsonify
from flask_restful import Api
from flask_swagger import swagger
@ -11,6 +14,8 @@ from selfprivacy_api.resources.common import ApiVersion, DecryptDisk
from selfprivacy_api.resources.system import api_system
from selfprivacy_api.resources.services import services as api_services
from selfprivacy_api.restic_controller.tasks import huey, init_restic
swagger_blueprint = get_swaggerui_blueprint(
"/api/docs", "/api/swagger.json", config={"app_name": "SelfPrivacy API"}
)
@ -77,5 +82,8 @@ def create_app(test_config=None):
if __name__ == "__main__":
monkey.patch_all()
created_app = create_app()
huey.start()
init_restic()
created_app.run(port=5050, debug=False)

View file

@ -1,13 +1,11 @@
#!/usr/bin/env python3
"""Backups management module"""
import json
import os
import subprocess
from flask import current_app
from flask_restful import Resource, reqparse
from selfprivacy_api.resources.services import api
from selfprivacy_api.utils import WriteUserData
from selfprivacy_api.restic_controller import tasks as restic_tasks
from selfprivacy_api.restic_controller import ResticController, ResticStates
class ListAllBackups(Resource):
@ -29,40 +27,9 @@ class ListAllBackups(Resource):
401:
description: Unauthorized
"""
bucket = current_app.config["B2_BUCKET"]
backup_listing_command = [
"restic",
"-r",
f"rclone:backblaze:{bucket}/sfbackup",
"snapshots",
"--json",
]
init_command = [
"restic",
"-r",
f"rclone:backblaze:{bucket}/sfbackup",
"init",
]
with subprocess.Popen(
backup_listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_listing_process_descriptor:
snapshots_list = backup_listing_process_descriptor.communicate()[0].decode(
"utf-8"
)
try:
json.loads(snapshots_list)
except ValueError:
if "Is there a repository at the following location?" in snapshots_list:
subprocess.call(init_command)
return {"error": "Initializating"}, 500
return {"error": snapshots_list}, 500
return json.loads(snapshots_list)
restic = ResticController()
return restic.snapshot_list
class AsyncCreateBackup(Resource):
@ -83,24 +50,17 @@ class AsyncCreateBackup(Resource):
description: Bad request
401:
description: Unauthorized
409:
description: Backup already in progress
"""
bucket = current_app.config["B2_BUCKET"]
backup_command = [
"restic",
"-r",
f"rclone:backblaze:{bucket}/sfbackup",
"--verbose",
"--json",
"backup",
"/var",
]
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_command, shell=False, stdout=log_file, stderr=subprocess.STDOUT
)
restic = ResticController()
if restic.state is ResticStates.NO_KEY:
return {"error": "No key provided"}, 400
if restic.state is ResticStates.INITIALIZING:
return {"error": "Backup is initializing"}, 400
if restic.state is ResticStates.BACKING_UP:
return {"error": "Backup is already running"}, 409
restic_tasks.start_backup()
return {
"status": 0,
"message": "Backup creation has started",
@ -126,27 +86,39 @@ class CheckBackupStatus(Resource):
401:
description: Unauthorized
"""
backup_status_check_command = ["tail", "-1", "/tmp/backup.log"]
restic = ResticController()
# If the log file does not exists
if os.path.exists("/tmp/backup.log") is False:
return {"message_type": "not_started", "message": "Backup not started"}
return {
"status": restic.state.name,
"progress": restic.progress,
"error_message": restic.error_message,
}
with subprocess.Popen(
backup_status_check_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_status_check_process_descriptor:
backup_process_status = (
backup_status_check_process_descriptor.communicate()[0].decode("utf-8")
)
try:
json.loads(backup_process_status)
except ValueError:
return {"message_type": "error", "message": backup_process_status}
return json.loads(backup_process_status)
class ForceReloadSnapshots(Resource):
"""Force reload snapshots"""
def get(self):
"""
Force reload snapshots
---
tags:
- Backups
security:
- bearerAuth: []
responses:
200:
description: Snapshots reloaded
400:
description: Bad request
401:
description: Unauthorized
"""
restic_tasks.load_snapshots()
return {
"status": 0,
"message": "Snapshots reload started",
}
class AsyncRestoreBackup(Resource):
@ -183,29 +155,27 @@ class AsyncRestoreBackup(Resource):
parser = reqparse.RequestParser()
parser.add_argument("backupId", type=str, required=True)
args = parser.parse_args()
bucket = current_app.config["B2_BUCKET"]
backup_id = args["backupId"]
backup_restoration_command = [
"restic",
"-r",
f"rclone:backblaze:{bucket}/sfbackup",
"restore",
backup_id,
"--target",
"/var",
"--json",
]
restic = ResticController()
if restic.state is ResticStates.NO_KEY:
return {"error": "No key provided"}, 400
if restic.state is ResticStates.NOT_INITIALIZED:
return {"error": "Repository is not initialized"}, 400
if restic.state is ResticStates.BACKING_UP:
return {"error": "Backup is already running"}, 409
if restic.state is ResticStates.INITIALIZING:
return {"error": "Repository is initializing"}, 400
if restic.state is ResticStates.RESTORING:
return {"error": "Restore is already running"}, 409
for backup in restic.snapshot_list:
if backup["short_id"] == args["backupId"]:
restic_tasks.restore_from_backup(args["backupId"])
return {
"status": 0,
"message": "Backup restoration procedure started",
}
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_restoration_command,
shell=False,
stdout=log_file,
stderr=subprocess.STDOUT,
)
return {"status": 0, "message": "Backup restoration procedure started"}
return {"error": "Backup not found"}, 404
class BackblazeConfig(Resource):
@ -256,6 +226,8 @@ class BackblazeConfig(Resource):
data["backblaze"]["accountKey"] = args["accountKey"]
data["backblaze"]["bucket"] = args["bucket"]
restic_tasks.update_keys_from_userdata()
return "New Backblaze settings saved"
@ -264,3 +236,4 @@ api.add_resource(AsyncCreateBackup, "/restic/backup/create")
api.add_resource(CheckBackupStatus, "/restic/backup/status")
api.add_resource(AsyncRestoreBackup, "/restic/backup/restore")
api.add_resource(BackblazeConfig, "/restic/backblaze/config")
api.add_resource(ForceReloadSnapshots, "/restic/backup/reload")

View file

@ -0,0 +1,255 @@
"""Restic singleton controller."""
from datetime import datetime
import json
import subprocess
import os
from threading import Lock
from enum import Enum
import portalocker
from selfprivacy_api.utils import ReadUserData
class ResticStates(Enum):
"""Restic states enum."""
NO_KEY = 0
NOT_INITIALIZED = 1
INITIALIZED = 2
BACKING_UP = 3
RESTORING = 4
ERROR = 5
INITIALIZING = 6
class ResticController:
"""
States in wich the restic_controller may be
- no backblaze key
- backblaze key is provided, but repository is not initialized
- backblaze key is provided, repository is initialized
- fetching list of snapshots
- creating snapshot, current progress can be retrieved
- recovering from snapshot
Any ongoing operation acquires the lock
Current state can be fetched with get_state()
"""
_instance = None
_lock = Lock()
_initialized = False
def __new__(cls):
print("new is called!")
if not cls._instance:
cls._instance = super(ResticController, cls).__new__(cls)
return cls._instance
def __init__(self):
if self._initialized:
return
self.state = ResticStates.NO_KEY
self.lock = False
self.progress = 0
self._backblaze_account = None
self._backblaze_key = None
self._repository_name = None
self.snapshot_list = []
self.error_message = None
print("init is called!")
self.load_configuration()
self.write_rclone_config()
self.load_snapshots()
self._initialized = True
def load_configuration(self):
"""Load current configuration from user data to singleton."""
with ReadUserData() as user_data:
self._backblaze_account = user_data["backblaze"]["accountId"]
self._backblaze_key = user_data["backblaze"]["accountKey"]
self._repository_name = user_data["backblaze"]["bucket"]
if self._backblaze_account and self._backblaze_key and self._repository_name:
self.state = ResticStates.INITIALIZING
else:
self.state = ResticStates.NO_KEY
def write_rclone_config(self):
"""
Open /root/.config/rclone/rclone.conf with portalocker
and write configuration in the following format:
[backblaze]
type = b2
account = {self.backblaze_account}
key = {self.backblaze_key}
"""
with portalocker.Lock(
"/root/.config/rclone/rclone.conf", "w", timeout=None
) as rclone_config:
rclone_config.write(
f"[backblaze]\n"
f"type = b2\n"
f"account = {self._backblaze_account}\n"
f"key = {self._backblaze_key}\n"
)
def load_snapshots(self):
"""
Load list of snapshots from repository
"""
backup_listing_command = [
"restic",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"snapshots",
"--json",
]
if (
self.state == ResticStates.BACKING_UP
or self.state == ResticStates.RESTORING
):
return
with self._lock:
with subprocess.Popen(
backup_listing_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_listing_process_descriptor:
snapshots_list = backup_listing_process_descriptor.communicate()[
0
].decode("utf-8")
try:
starting_index = snapshots_list.find("[")
json.loads(snapshots_list[starting_index:])
self.snapshot_list = json.loads(snapshots_list[starting_index:])
self.state = ResticStates.INITIALIZED
except ValueError:
if "Is there a repository at the following location?" in snapshots_list:
self.state = ResticStates.NOT_INITIALIZED
return
else:
self.state = ResticStates.ERROR
self.error_message = snapshots_list
return
def initialize_repository(self):
"""
Initialize repository with restic
"""
initialize_repository_command = [
"restic",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"init",
]
with self._lock:
with subprocess.Popen(
initialize_repository_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as initialize_repository_process_descriptor:
msg = initialize_repository_process_descriptor.communicate()[0].decode(
"utf-8"
)
if initialize_repository_process_descriptor.returncode == 0:
self.state = ResticStates.INITIALIZED
else:
self.state = ResticStates.ERROR
self.error_message = msg
self.state = ResticStates.INITIALIZED
def start_backup(self):
"""
Start backup with restic
"""
backup_command = [
"restic",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"--verbose",
"--json",
"backup",
"/var",
]
with self._lock:
with open("/tmp/backup.log", "w", encoding="utf-8") as log_file:
subprocess.Popen(
backup_command,
shell=False,
stdout=log_file,
stderr=subprocess.STDOUT,
)
self.state = ResticStates.BACKING_UP
self.progress = 0
def check_progress(self):
"""
Check progress of ongoing backup operation
"""
backup_status_check_command = ["tail", "-1", "/tmp/backup.log"]
if (
self.state == ResticStates.NO_KEY
or self.state == ResticStates.NOT_INITIALIZED
):
return
# If the log file does not exists
if os.path.exists("/tmp/backup.log") is False:
self.state = ResticStates.INITIALIZED
with subprocess.Popen(
backup_status_check_command,
shell=False,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
) as backup_status_check_process_descriptor:
backup_process_status = (
backup_status_check_process_descriptor.communicate()[0].decode("utf-8")
)
try:
status = json.loads(backup_process_status)
except ValueError:
print(backup_process_status)
self.error_message = backup_process_status
return
if status["message_type"] == "status":
self.progress = status["percent_done"]
self.state = ResticStates.BACKING_UP
elif status["message_type"] == "summary":
self.state = ResticStates.INITIALIZED
self.progress = 0
self.snapshot_list.append(
{
"short_id": status["snapshot_id"],
# Current time in format 2021-12-02T00:02:51.086452543+03:00
"time": datetime.now().strftime("%Y-%m-%dT%H:%M:%S.%f%z"),
}
)
def restore_from_backup(self, snapshot_id):
"""
Restore from backup with restic
"""
backup_restoration_command = [
"restic",
"-r",
f"rclone:backblaze:{self._repository_name}/sfbackup",
"restore",
snapshot_id,
"--target",
"/",
]
self.state = ResticStates.RESTORING
with self._lock:
subprocess.run(backup_restoration_command, shell=False)
self.state = ResticStates.INITIALIZED

View file

@ -0,0 +1,72 @@
"""Tasks for the restic controller."""
from huey import crontab
from huey.contrib.mini import MiniHuey
from . import ResticController, ResticStates
huey = MiniHuey()
@huey.task()
def init_restic():
controller = ResticController()
if controller.state == ResticStates.NOT_INITIALIZED:
initialize_repository()
@huey.task()
def update_keys_from_userdata():
controller = ResticController()
controller.load_configuration()
controller.write_rclone_config()
initialize_repository()
# Check every morning at 5:00 AM
@huey.task(crontab(hour=5, minute=0))
def cron_load_snapshots():
controller = ResticController()
controller.load_snapshots()
# Check every morning at 5:00 AM
@huey.task()
def load_snapshots():
controller = ResticController()
controller.load_snapshots()
if controller.state == ResticStates.NOT_INITIALIZED:
load_snapshots.schedule(delay=120)
@huey.task()
def initialize_repository():
controller = ResticController()
if controller.state is not ResticStates.NO_KEY:
controller.initialize_repository()
load_snapshots()
@huey.task()
def fetch_backup_status():
controller = ResticController()
if controller.state is ResticStates.BACKING_UP:
controller.check_progress()
if controller.state is ResticStates.BACKING_UP:
fetch_backup_status.schedule(delay=2)
else:
load_snapshots.schedule(delay=240)
@huey.task()
def start_backup():
controller = ResticController()
if controller.state is ResticStates.NOT_INITIALIZED:
resp = initialize_repository()
resp.get()
controller.start_backup()
fetch_backup_status.schedule(delay=3)
@huey.task()
def restore_from_backup(snapshot):
controller = ResticController()
controller.restore_from_backup(snapshot)

View file

@ -2,7 +2,6 @@
"""Various utility functions"""
import json
import portalocker
from flask import current_app
USERDATA_FILE = "/etc/nixos/userdata/userdata.json"