mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-24 13:01:28 +00:00
feature(backups): return snapshot info from backup function
This commit is contained in:
parent
daa40d1142
commit
a82a986997
|
@ -1,5 +1,6 @@
|
|||
import subprocess
|
||||
import json
|
||||
import datetime
|
||||
|
||||
from typing import List
|
||||
|
||||
|
@ -63,6 +64,7 @@ class ResticBackuper(AbstractBackuper):
|
|||
backup_command = self.restic_command(
|
||||
repo_name,
|
||||
"backup",
|
||||
"--json",
|
||||
folder,
|
||||
)
|
||||
with subprocess.Popen(
|
||||
|
@ -72,8 +74,27 @@ class ResticBackuper(AbstractBackuper):
|
|||
stderr=subprocess.STDOUT,
|
||||
) as handle:
|
||||
output = handle.communicate()[0].decode("utf-8")
|
||||
if "saved" not in output:
|
||||
raise ValueError("could not create a new snapshot: " + output)
|
||||
try:
|
||||
messages = self.parse_json_output(output)
|
||||
return ResticBackuper._snapshot_from_backup_messages(
|
||||
messages, repo_name
|
||||
)
|
||||
except ValueError as e:
|
||||
raise ValueError("could not create a snapshot: ") from e
|
||||
|
||||
@staticmethod
|
||||
def _snapshot_from_backup_messages(messages, repo_name) -> Snapshot:
|
||||
for message in messages:
|
||||
if message["message_type"] == "summary":
|
||||
return ResticBackuper._snapshot_from_fresh_summary(message, repo_name)
|
||||
|
||||
@staticmethod
|
||||
def _snapshot_from_fresh_summary(message: object, repo_name) -> Snapshot:
|
||||
return Snapshot(
|
||||
id=message["snapshot_id"],
|
||||
created_at=datetime.datetime.now(datetime.timezone.utc),
|
||||
service_name=repo_name,
|
||||
)
|
||||
|
||||
def init(self, repo_name):
|
||||
init_command = self.restic_command(
|
||||
|
@ -189,11 +210,17 @@ class ResticBackuper(AbstractBackuper):
|
|||
starting_index = self.json_start(output)
|
||||
|
||||
if starting_index == -1:
|
||||
raise ValueError(
|
||||
"There is no json in the restic snapshot output : " + output
|
||||
)
|
||||
raise ValueError("There is no json in the restic output : " + output)
|
||||
|
||||
return json.loads(output[starting_index:])
|
||||
truncated_output = output[starting_index:]
|
||||
json_messages = truncated_output.splitlines()
|
||||
if len(json_messages) == 1:
|
||||
return json.loads(truncated_output)
|
||||
|
||||
result_array = []
|
||||
for message in json_messages:
|
||||
result_array.append(json.loads(message))
|
||||
return result_array
|
||||
|
||||
def json_start(self, output: str) -> int:
|
||||
indices = [
|
||||
|
|
Loading…
Reference in a new issue