mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-11 18:39:30 +00:00
test(backups): make the test repo overridable by envs
This commit is contained in:
parent
aa7cc71557
commit
ffec344ba8
|
@ -431,10 +431,18 @@ class Backups:
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def forget_snapshot(snapshot: Snapshot) -> None:
|
def forget_snapshot(snapshot: Snapshot) -> None:
|
||||||
"""Deletes a snapshot from the storage"""
|
"""Deletes a snapshot from the repo and from cache"""
|
||||||
Backups.provider().backupper.forget_snapshot(snapshot.id)
|
Backups.provider().backupper.forget_snapshot(snapshot.id)
|
||||||
Storage.delete_cached_snapshot(snapshot)
|
Storage.delete_cached_snapshot(snapshot)
|
||||||
|
|
||||||
|
@staticmethod
|
||||||
|
def forget_all_snapshots():
|
||||||
|
"""deliberately erase all snapshots we made"""
|
||||||
|
# there is no dedicated optimized command for this,
|
||||||
|
# but maybe we can have a multi-erase
|
||||||
|
for snapshot in Backups.get_all_snapshots():
|
||||||
|
Backups.forget_snapshot(snapshot)
|
||||||
|
|
||||||
@staticmethod
|
@staticmethod
|
||||||
def force_snapshot_cache_reload() -> None:
|
def force_snapshot_cache_reload() -> None:
|
||||||
"""
|
"""
|
||||||
|
|
|
@ -39,14 +39,34 @@ TESTFILE_2_BODY = "testissimo!"
|
||||||
REPO_NAME = "test_backup"
|
REPO_NAME = "test_backup"
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture(scope="function")
|
def prepare_localfile_backups(temp_dir):
|
||||||
def backups(tmpdir):
|
test_repo_path = path.join(temp_dir, "totallyunrelated")
|
||||||
Backups.reset()
|
assert not path.exists(test_repo_path)
|
||||||
|
|
||||||
test_repo_path = path.join(tmpdir, "totallyunrelated")
|
|
||||||
Backups.set_localfile_repo(test_repo_path)
|
Backups.set_localfile_repo(test_repo_path)
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def backups_local(tmpdir):
|
||||||
|
Backups.reset()
|
||||||
|
prepare_localfile_backups(tmpdir)
|
||||||
Jobs.reset()
|
Jobs.reset()
|
||||||
|
Backups.init_repo()
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture(scope="function")
|
||||||
|
def backups(tmpdir):
|
||||||
|
# for those tests that are supposed to pass with any repo
|
||||||
|
Backups.reset()
|
||||||
|
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
|
||||||
|
Backups.set_provider_from_envs()
|
||||||
|
else:
|
||||||
|
prepare_localfile_backups(tmpdir)
|
||||||
|
Jobs.reset()
|
||||||
|
# assert not repo_path
|
||||||
|
|
||||||
|
Backups.init_repo()
|
||||||
|
yield
|
||||||
|
Backups.forget_all_snapshots()
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
|
@ -82,11 +102,6 @@ def raw_dummy_service(tmpdir):
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
def dummy_service(tmpdir, backups, raw_dummy_service) -> Service:
|
def dummy_service(tmpdir, backups, raw_dummy_service) -> Service:
|
||||||
service = raw_dummy_service
|
service = raw_dummy_service
|
||||||
repo_path = path.join(tmpdir, "test_repo")
|
|
||||||
assert not path.exists(repo_path)
|
|
||||||
# assert not repo_path
|
|
||||||
|
|
||||||
Backups.init_repo()
|
|
||||||
|
|
||||||
# register our service
|
# register our service
|
||||||
services.services.append(service)
|
services.services.append(service)
|
||||||
|
@ -148,6 +163,12 @@ def test_reset_sets_to_none2(backups):
|
||||||
|
|
||||||
def test_setting_from_envs(tmpdir):
|
def test_setting_from_envs(tmpdir):
|
||||||
Backups.reset()
|
Backups.reset()
|
||||||
|
environment_stash = {}
|
||||||
|
if BACKUP_PROVIDER_ENVS["kind"] in os.environ.keys():
|
||||||
|
# we are running under special envs, stash them before rewriting them
|
||||||
|
for key in BACKUP_PROVIDER_ENVS.values():
|
||||||
|
environment_stash[key] = os.environ[key]
|
||||||
|
|
||||||
os.environ[BACKUP_PROVIDER_ENVS["kind"]] = "BACKBLAZE"
|
os.environ[BACKUP_PROVIDER_ENVS["kind"]] = "BACKBLAZE"
|
||||||
os.environ[BACKUP_PROVIDER_ENVS["login"]] = "ID"
|
os.environ[BACKUP_PROVIDER_ENVS["login"]] = "ID"
|
||||||
os.environ[BACKUP_PROVIDER_ENVS["key"]] = "KEY"
|
os.environ[BACKUP_PROVIDER_ENVS["key"]] = "KEY"
|
||||||
|
@ -164,6 +185,13 @@ def test_setting_from_envs(tmpdir):
|
||||||
assert provider.backupper.account == "ID"
|
assert provider.backupper.account == "ID"
|
||||||
assert provider.backupper.key == "KEY"
|
assert provider.backupper.key == "KEY"
|
||||||
|
|
||||||
|
if environment_stash != {}:
|
||||||
|
for key in BACKUP_PROVIDER_ENVS.values():
|
||||||
|
os.environ[key] = environment_stash[key]
|
||||||
|
else:
|
||||||
|
for key in BACKUP_PROVIDER_ENVS.values():
|
||||||
|
del os.environ[key]
|
||||||
|
|
||||||
|
|
||||||
def test_json_reset(generic_userdata):
|
def test_json_reset(generic_userdata):
|
||||||
Backups.reset(reset_json=False)
|
Backups.reset(reset_json=False)
|
||||||
|
@ -294,9 +322,12 @@ def test_sizing(backups, dummy_service):
|
||||||
assert size > 0
|
assert size > 0
|
||||||
|
|
||||||
|
|
||||||
def test_init_tracking(backups, raw_dummy_service):
|
def test_init_tracking(backups, tmpdir):
|
||||||
|
assert Backups.is_initted() is True
|
||||||
|
Backups.reset()
|
||||||
assert Backups.is_initted() is False
|
assert Backups.is_initted() is False
|
||||||
|
separate_dir = tmpdir / "out_of_the_way"
|
||||||
|
prepare_localfile_backups(separate_dir)
|
||||||
Backups.init_repo()
|
Backups.init_repo()
|
||||||
|
|
||||||
assert Backups.is_initted() is True
|
assert Backups.is_initted() is True
|
||||||
|
@ -618,6 +649,8 @@ def test_snapshots_cache_invalidation(backups, dummy_service):
|
||||||
|
|
||||||
# Storage
|
# Storage
|
||||||
def test_init_tracking_caching(backups, raw_dummy_service):
|
def test_init_tracking_caching(backups, raw_dummy_service):
|
||||||
|
assert Storage.has_init_mark() is True
|
||||||
|
Backups.reset()
|
||||||
assert Storage.has_init_mark() is False
|
assert Storage.has_init_mark() is False
|
||||||
|
|
||||||
Storage.mark_as_init()
|
Storage.mark_as_init()
|
||||||
|
@ -627,7 +660,12 @@ def test_init_tracking_caching(backups, raw_dummy_service):
|
||||||
|
|
||||||
|
|
||||||
# Storage
|
# Storage
|
||||||
def test_init_tracking_caching2(backups, raw_dummy_service):
|
def test_init_tracking_caching2(backups, tmpdir):
|
||||||
|
assert Storage.has_init_mark() is True
|
||||||
|
Backups.reset()
|
||||||
|
assert Storage.has_init_mark() is False
|
||||||
|
separate_dir = tmpdir / "out_of_the_way"
|
||||||
|
prepare_localfile_backups(separate_dir)
|
||||||
assert Storage.has_init_mark() is False
|
assert Storage.has_init_mark() is False
|
||||||
|
|
||||||
Backups.init_repo()
|
Backups.init_repo()
|
||||||
|
|
Loading…
Reference in a new issue