feature(backups): return snapshot info from backup function

This commit is contained in:
Houkime 2023-04-03 16:29:06 +00:00
parent 772b499b46
commit 2eb64db199

View file

@ -1,5 +1,6 @@
import subprocess import subprocess
import json import json
import datetime
from typing import List from typing import List
@ -63,6 +64,7 @@ class ResticBackuper(AbstractBackuper):
backup_command = self.restic_command( backup_command = self.restic_command(
repo_name, repo_name,
"backup", "backup",
"--json",
folder, folder,
) )
with subprocess.Popen( with subprocess.Popen(
@ -72,8 +74,27 @@ class ResticBackuper(AbstractBackuper):
stderr=subprocess.STDOUT, stderr=subprocess.STDOUT,
) as handle: ) as handle:
output = handle.communicate()[0].decode("utf-8") output = handle.communicate()[0].decode("utf-8")
if "saved" not in output: try:
raise ValueError("could not create a new snapshot: " + output) messages = self.parse_json_output(output)
return ResticBackuper._snapshot_from_backup_messages(
messages, repo_name
)
except ValueError as e:
raise ValueError("could not create a snapshot: ") from e
@staticmethod
def _snapshot_from_backup_messages(messages, repo_name) -> Snapshot:
for message in messages:
if message["message_type"] == "summary":
return ResticBackuper._snapshot_from_fresh_summary(message, repo_name)
@staticmethod
def _snapshot_from_fresh_summary(message: object, repo_name) -> Snapshot:
return Snapshot(
id=message["snapshot_id"],
created_at=datetime.datetime.now(datetime.timezone.utc),
service_name=repo_name,
)
def init(self, repo_name): def init(self, repo_name):
init_command = self.restic_command( init_command = self.restic_command(
@ -189,11 +210,17 @@ class ResticBackuper(AbstractBackuper):
starting_index = self.json_start(output) starting_index = self.json_start(output)
if starting_index == -1: if starting_index == -1:
raise ValueError( raise ValueError("There is no json in the restic output : " + output)
"There is no json in the restic snapshot output : " + output
)
return json.loads(output[starting_index:]) truncated_output = output[starting_index:]
json_messages = truncated_output.splitlines()
if len(json_messages) == 1:
return json.loads(truncated_output)
result_array = []
for message in json_messages:
result_array.append(json.loads(message))
return result_array
def json_start(self, output: str) -> int: def json_start(self, output: str) -> int:
indices = [ indices = [