mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-23 01:06:43 +00:00
feature(backup): loading snapshots
This commit is contained in:
parent
a42294b706
commit
4ca2e62b5c
|
@ -1,4 +1,5 @@
|
|||
import subprocess
|
||||
import json
|
||||
|
||||
from selfprivacy_api.backup.backuper import AbstractBackuper
|
||||
|
||||
|
@ -79,3 +80,42 @@ class ResticBackuper(AbstractBackuper):
|
|||
)
|
||||
|
||||
subprocess.run(restore_command, shell=False)
|
||||
|
||||
def _load_snapshots(self, repo_name) -> object:
|
||||
"""
|
||||
Load list of snapshots from repository
|
||||
"""
|
||||
listing_command = self.restic_command(
|
||||
repo_name,
|
||||
"snapshots",
|
||||
"--json",
|
||||
)
|
||||
|
||||
with subprocess.Popen(
|
||||
listing_command,
|
||||
shell=False,
|
||||
stdout=subprocess.PIPE,
|
||||
stderr=subprocess.STDOUT,
|
||||
) as backup_listing_process_descriptor:
|
||||
output = backup_listing_process_descriptor.communicate()[0].decode("utf-8")
|
||||
|
||||
try:
|
||||
return self.parse_snapshot_output(output)
|
||||
except ValueError:
|
||||
if "Is there a repository at the following location?" in output:
|
||||
return []
|
||||
self.error_message = output
|
||||
return []
|
||||
|
||||
def get_snapshots(self):
|
||||
# No transformation for now
|
||||
snapshots = []
|
||||
for snapshot in self._load_snapshots():
|
||||
snapshots.append(snapshot)
|
||||
return snapshots
|
||||
|
||||
def parse_snapshot_output(self, output: str) -> object:
|
||||
starting_index = output.find("[")
|
||||
json.loads(output[starting_index:])
|
||||
self.snapshot_list = json.loads(output[starting_index:])
|
||||
print(output)
|
||||
|
|
Loading…
Reference in a new issue