feature(backups): forgetting snapshots

This commit is contained in:
Houkime 2023-07-05 13:13:30 +00:00
parent 03313b739a
commit 53bb5cc4e2
5 changed files with 64 additions and 0 deletions

View file

@ -305,6 +305,11 @@ class Backups:
return snap return snap
@staticmethod
def forget_snapshot(snapshot: Snapshot):
Backups.provider().backupper.forget_snapshot(snapshot.id)
Storage.delete_cached_snapshot(snapshot)
@staticmethod @staticmethod
def force_snapshot_cache_reload(): def force_snapshot_cache_reload():
upstream_snapshots = Backups.provider().backupper.get_snapshots() upstream_snapshots = Backups.provider().backupper.get_snapshots()

View file

@ -37,3 +37,7 @@ class AbstractBackupper(ABC):
@abstractmethod @abstractmethod
def restored_size(self, snapshot_id: str) -> int: def restored_size(self, snapshot_id: str) -> int:
raise NotImplementedError raise NotImplementedError
@abstractmethod
def forget_snapshot(self, snapshot_id):
raise NotImplementedError

View file

@ -27,3 +27,6 @@ class NoneBackupper(AbstractBackupper):
def restored_size(self, snapshot_id: str) -> int: def restored_size(self, snapshot_id: str) -> int:
raise NotImplementedError raise NotImplementedError
def forget_snapshot(self, snapshot_id):
raise NotImplementedError

View file

@ -257,6 +257,32 @@ class ResticBackupper(AbstractBackupper):
"restore exited with errorcode", returncode, ":", output "restore exited with errorcode", returncode, ":", output
) )
def forget_snapshot(self, snapshot_id):
"""either removes snapshot or marks it for deletion later depending on server settings"""
forget_command = self.restic_command(
"forget",
snapshot_id,
)
with subprocess.Popen(
forget_command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=False
) as handle:
# for some reason restore does not support nice reporting of progress via json
output, err = [string.decode("utf-8") for string in handle.communicate()]
if "no matching ID found" in err:
raise ValueError(
"trying to delete, but no such snapshot: ", snapshot_id
)
assert (
handle.returncode is not None
) # none should be impossible after communicate
if handle.returncode != 0:
raise ValueError(
"forget exited with errorcode", returncode, ":", output
)
def _load_snapshots(self) -> object: def _load_snapshots(self) -> object:
""" """
Load list of snapshots from repository Load list of snapshots from repository

View file

@ -15,6 +15,8 @@ from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.graphql.queries.providers import BackupProvider from selfprivacy_api.graphql.queries.providers import BackupProvider
from selfprivacy_api.jobs import Jobs, JobStatus from selfprivacy_api.jobs import Jobs, JobStatus
from selfprivacy_api.models.backup.snapshot import Snapshot
from selfprivacy_api.backup import Backups from selfprivacy_api.backup import Backups
import selfprivacy_api.backup.providers as providers import selfprivacy_api.backup.providers as providers
from selfprivacy_api.backup.providers import AbstractBackupProvider from selfprivacy_api.backup.providers import AbstractBackupProvider
@ -314,6 +316,30 @@ def test_backup_service_task(backups, dummy_service):
assert_job_had_progress(job_type_id) assert_job_had_progress(job_type_id)
def test_forget_snapshot(backups, dummy_service):
snap1 = Backups.back_up(dummy_service)
snap2 = Backups.back_up(dummy_service)
assert len(Backups.get_snapshots(dummy_service)) == 2
Backups.forget_snapshot(snap2)
assert len(Backups.get_snapshots(dummy_service)) == 1
Backups.force_snapshot_cache_reload()
assert len(Backups.get_snapshots(dummy_service)) == 1
assert Backups.get_snapshots(dummy_service)[0].id == snap1.id
Backups.forget_snapshot(snap1)
assert len(Backups.get_snapshots(dummy_service)) == 0
def test_forget_nonexistent_snapshot(backups, dummy_service):
bogus = Snapshot(
id="gibberjibber", service_name="nohoho", created_at=datetime.now(timezone.utc)
)
with pytest.raises(ValueError):
Backups.forget_snapshot(bogus)
def test_backup_larger_file(backups, dummy_service): def test_backup_larger_file(backups, dummy_service):
dir = path.join(dummy_service.get_folders()[0], "LARGEFILE") dir = path.join(dummy_service.get_folders()[0], "LARGEFILE")
mega = 2**20 mega = 2**20