mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-11 18:39:30 +00:00
test(backups): more checks regarding tmpdirs and mounting
This commit is contained in:
parent
36e915907f
commit
69f6e62877
|
@ -126,7 +126,9 @@ class ResticBackupper(AbstractBackupper):
|
|||
output,
|
||||
)
|
||||
|
||||
def mount_repo(self, mount_directory):
|
||||
def mount_repo(self, mount_directory: str) -> subprocess.Popen:
|
||||
if not exists(mount_directory):
|
||||
raise FileNotFoundError("no such directory to mount at: ", mount_directory)
|
||||
mount_command = self.restic_command("mount", mount_directory)
|
||||
mount_command.insert(0, "nohup")
|
||||
handle = subprocess.Popen(
|
||||
|
@ -139,7 +141,7 @@ class ResticBackupper(AbstractBackupper):
|
|||
raise IOError("failed to mount dir ", mount_directory)
|
||||
return handle
|
||||
|
||||
def unmount_repo(self, mount_directory):
|
||||
def unmount_repo(self, mount_directory: str) -> None:
|
||||
mount_command = ["umount", "-l", mount_directory]
|
||||
with subprocess.Popen(
|
||||
mount_command, stdout=subprocess.PIPE, shell=False
|
||||
|
@ -147,10 +149,10 @@ class ResticBackupper(AbstractBackupper):
|
|||
output = handle.communicate()[0].decode("utf-8")
|
||||
# TODO: check for exit code?
|
||||
if "error" in output.lower():
|
||||
return IOError("failed to unmount dir ", mount_directory, ": ", output)
|
||||
raise IOError("failed to unmount dir ", mount_directory, ": ", output)
|
||||
|
||||
if not listdir(mount_directory) == []:
|
||||
return IOError("failed to unmount dir ", mount_directory)
|
||||
raise IOError("failed to unmount dir ", mount_directory)
|
||||
|
||||
@staticmethod
|
||||
def __flatten_list(list_to_flatten):
|
||||
|
@ -363,6 +365,9 @@ class ResticBackupper(AbstractBackupper):
|
|||
self._raw_verified_restore(snapshot_id, target=temp_dir)
|
||||
snapshot_root = temp_dir
|
||||
else: # attempting inplace restore via mount + sync
|
||||
assert exists(
|
||||
temp_dir
|
||||
) # paranoid check, TemporaryDirectory is supposedly created
|
||||
self.mount_repo(temp_dir)
|
||||
snapshot_root = join(temp_dir, "ids", snapshot_id)
|
||||
|
||||
|
|
|
@ -8,6 +8,8 @@ from os import urandom
|
|||
from datetime import datetime, timedelta, timezone
|
||||
from subprocess import Popen
|
||||
|
||||
import tempfile
|
||||
|
||||
import selfprivacy_api.services as services
|
||||
from selfprivacy_api.services import Service, get_all_services
|
||||
|
||||
|
@ -806,3 +808,10 @@ def test_operations_while_locked(backups, dummy_service):
|
|||
# check that no locks were left
|
||||
Backups.provider().backupper.lock()
|
||||
Backups.provider().backupper.unlock()
|
||||
|
||||
|
||||
# a paranoid check to weed out problems with tempdirs that are not dependent on us
|
||||
def test_tempfile():
|
||||
with tempfile.TemporaryDirectory() as temp:
|
||||
assert path.exists(temp)
|
||||
assert not path.exists(temp)
|
||||
|
|
Loading…
Reference in a new issue