mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-25 18:26:34 +00:00
feature(backups): forgetting snapshots
This commit is contained in:
parent
f361f44ded
commit
02e3c9bd5e
|
@ -305,6 +305,11 @@ class Backups:
|
||||||
|
|
||||||
return snap
|
return snap
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def forget_snapshot(snapshot: Snapshot):
|
||||||
|
Backups.provider().backupper.forget_snapshot(snapshot.id)
|
||||||
|
Storage.delete_cached_snapshot(snapshot)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def force_snapshot_cache_reload():
|
def force_snapshot_cache_reload():
|
||||||
upstream_snapshots = Backups.provider().backupper.get_snapshots()
|
upstream_snapshots = Backups.provider().backupper.get_snapshots()
|
||||||
|
|
|
@ -37,3 +37,7 @@ class AbstractBackupper(ABC):
|
||||||
@abstractmethod
|
@abstractmethod
|
||||||
def restored_size(self, snapshot_id: str) -> int:
|
def restored_size(self, snapshot_id: str) -> int:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
@abstractmethod
|
||||||
|
def forget_snapshot(self, snapshot_id):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
|
@ -27,3 +27,6 @@ class NoneBackupper(AbstractBackupper):
|
||||||
|
|
||||||
def restored_size(self, snapshot_id: str) -> int:
|
def restored_size(self, snapshot_id: str) -> int:
|
||||||
raise NotImplementedError
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def forget_snapshot(self, snapshot_id):
|
||||||
|
raise NotImplementedError
|
||||||
|
|
|
@ -257,6 +257,32 @@ class ResticBackupper(AbstractBackupper):
|
||||||
"restore exited with errorcode", returncode, ":", output
|
"restore exited with errorcode", returncode, ":", output
|
||||||
)
|
)
|
||||||
|
|
||||||
|
def forget_snapshot(self, snapshot_id):
|
||||||
|
"""either removes snapshot or marks it for deletion later depending on server settings"""
|
||||||
|
forget_command = self.restic_command(
|
||||||
|
"forget",
|
||||||
|
snapshot_id,
|
||||||
|
)
|
||||||
|
|
||||||
|
with subprocess.Popen(
|
||||||
|
forget_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False
|
||||||
|
) as handle:
|
||||||
|
# for some reason restore does not support nice reporting of progress via json
|
||||||
|
output, err = [string.decode("utf-8") for string in handle.communicate()]
|
||||||
|
|
||||||
|
if "no matching ID found" in err:
|
||||||
|
raise ValueError(
|
||||||
|
"trying to delete, but no such snapshot: ", snapshot_id
|
||||||
|
)
|
||||||
|
|
||||||
|
assert (
|
||||||
|
handle.returncode is not None
|
||||||
|
) # none should be impossible after communicate
|
||||||
|
if handle.returncode != 0:
|
||||||
|
raise ValueError(
|
||||||
|
"forget exited with errorcode", returncode, ":", output
|
||||||
|
)
|
||||||
|
|
||||||
def _load_snapshots(self) -> object:
|
def _load_snapshots(self) -> object:
|
||||||
"""
|
"""
|
||||||
Load list of snapshots from repository
|
Load list of snapshots from repository
|
||||||
|
|
|
@ -15,6 +15,8 @@ from selfprivacy_api.services.test_service import DummyService
|
||||||
from selfprivacy_api.graphql.queries.providers import BackupProvider
|
from selfprivacy_api.graphql.queries.providers import BackupProvider
|
||||||
from selfprivacy_api.jobs import Jobs, JobStatus
|
from selfprivacy_api.jobs import Jobs, JobStatus
|
||||||
|
|
||||||
|
from selfprivacy_api.models.backup.snapshot import Snapshot
|
||||||
|
|
||||||
from selfprivacy_api.backup import Backups
|
from selfprivacy_api.backup import Backups
|
||||||
import selfprivacy_api.backup.providers as providers
|
import selfprivacy_api.backup.providers as providers
|
||||||
from selfprivacy_api.backup.providers import AbstractBackupProvider
|
from selfprivacy_api.backup.providers import AbstractBackupProvider
|
||||||
|
@ -314,6 +316,30 @@ def test_backup_service_task(backups, dummy_service):
|
||||||
assert_job_had_progress(job_type_id)
|
assert_job_had_progress(job_type_id)
|
||||||
|
|
||||||
|
|
||||||
|
def test_forget_snapshot(backups, dummy_service):
|
||||||
|
snap1 = Backups.back_up(dummy_service)
|
||||||
|
snap2 = Backups.back_up(dummy_service)
|
||||||
|
assert len(Backups.get_snapshots(dummy_service)) == 2
|
||||||
|
|
||||||
|
Backups.forget_snapshot(snap2)
|
||||||
|
assert len(Backups.get_snapshots(dummy_service)) == 1
|
||||||
|
Backups.force_snapshot_cache_reload()
|
||||||
|
assert len(Backups.get_snapshots(dummy_service)) == 1
|
||||||
|
|
||||||
|
assert Backups.get_snapshots(dummy_service)[0].id == snap1.id
|
||||||
|
|
||||||
|
Backups.forget_snapshot(snap1)
|
||||||
|
assert len(Backups.get_snapshots(dummy_service)) == 0
|
||||||
|
|
||||||
|
|
||||||
|
def test_forget_nonexistent_snapshot(backups, dummy_service):
|
||||||
|
bogus = Snapshot(
|
||||||
|
id="gibberjibber", service_name="nohoho", created_at=datetime.now(timezone.utc)
|
||||||
|
)
|
||||||
|
with pytest.raises(ValueError):
|
||||||
|
Backups.forget_snapshot(bogus)
|
||||||
|
|
||||||
|
|
||||||
def test_backup_larger_file(backups, dummy_service):
|
def test_backup_larger_file(backups, dummy_service):
|
||||||
dir = path.join(dummy_service.get_folders()[0], "LARGEFILE")
|
dir = path.join(dummy_service.get_folders()[0], "LARGEFILE")
|
||||||
mega = 2**20
|
mega = 2**20
|
||||||
|
|
Loading…
Reference in a new issue