mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-26 14:01:30 +00:00
test(backups): test async service start n stop simulation
This commit is contained in:
parent
d33e9d6335
commit
e2b906b219
|
@ -2,14 +2,19 @@ from time import sleep
|
||||||
from typing import Callable
|
from typing import Callable
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
def wait_until_true(readiness_checker: Callable[[],bool],*,interval: float =0.1, timeout_sec: Optional[float] = None):
|
|
||||||
|
def wait_until_true(
|
||||||
|
readiness_checker: Callable[[], bool],
|
||||||
|
*,
|
||||||
|
interval: float = 0.1,
|
||||||
|
timeout_sec: Optional[float] = None
|
||||||
|
):
|
||||||
elapsed = 0.0
|
elapsed = 0.0
|
||||||
if timeout_sec is None:
|
if timeout_sec is None:
|
||||||
timeout_sec = 10e16
|
timeout_sec = 10e16
|
||||||
while not readiness_checker or elapsed > timeout_sec:
|
|
||||||
|
while (not readiness_checker()) and elapsed < timeout_sec:
|
||||||
sleep(interval)
|
sleep(interval)
|
||||||
elapsed += interval
|
elapsed += interval
|
||||||
if elapsed > timeout_sec:
|
if elapsed > timeout_sec:
|
||||||
raise TimeoutError()
|
raise TimeoutError()
|
||||||
|
|
||||||
|
|
||||||
|
|
|
@ -52,7 +52,7 @@ def backups_backblaze(generic_userdata):
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture()
|
@pytest.fixture()
|
||||||
def raw_dummy_service(tmpdir, backups):
|
def raw_dummy_service(tmpdir):
|
||||||
dirnames = ["test_service", "also_test_service"]
|
dirnames = ["test_service", "also_test_service"]
|
||||||
service_dirs = []
|
service_dirs = []
|
||||||
for d in dirnames:
|
for d in dirnames:
|
||||||
|
@ -578,13 +578,13 @@ def test_services_to_back_up(backups, dummy_service):
|
||||||
def test_sync(dummy_service):
|
def test_sync(dummy_service):
|
||||||
src = dummy_service.get_folders()[0]
|
src = dummy_service.get_folders()[0]
|
||||||
dst = dummy_service.get_folders()[1]
|
dst = dummy_service.get_folders()[1]
|
||||||
old_files_src = listdir(src)
|
old_files_src = set(listdir(src))
|
||||||
old_files_dst = listdir(dst)
|
old_files_dst = set(listdir(dst))
|
||||||
assert old_files_src != old_files_dst
|
assert old_files_src != old_files_dst
|
||||||
|
|
||||||
sync(src, dst)
|
sync(src, dst)
|
||||||
new_files_src = listdir(src)
|
new_files_src = set(listdir(src))
|
||||||
new_files_dst = listdir(dst)
|
new_files_dst = set(listdir(dst))
|
||||||
assert new_files_src == old_files_src
|
assert new_files_src == old_files_src
|
||||||
assert new_files_dst == new_files_src
|
assert new_files_dst == new_files_src
|
||||||
|
|
||||||
|
|
|
@ -9,7 +9,10 @@ from selfprivacy_api.services.owned_path import OwnedPath
|
||||||
from selfprivacy_api.services.generic_service_mover import FolderMoveNames
|
from selfprivacy_api.services.generic_service_mover import FolderMoveNames
|
||||||
|
|
||||||
from selfprivacy_api.services.test_service import DummyService
|
from selfprivacy_api.services.test_service import DummyService
|
||||||
from selfprivacy_api.services.service import Service
|
from selfprivacy_api.services.service import Service, ServiceStatus
|
||||||
|
from selfprivacy_api.utils.waitloop import wait_until_true
|
||||||
|
|
||||||
|
from tests.test_graphql.test_backup import raw_dummy_service
|
||||||
|
|
||||||
|
|
||||||
def test_unimplemented_folders_raises():
|
def test_unimplemented_folders_raises():
|
||||||
|
@ -25,6 +28,20 @@ def test_unimplemented_folders_raises():
|
||||||
assert owned_folders is not None
|
assert owned_folders is not None
|
||||||
|
|
||||||
|
|
||||||
|
def test_delayed_start_stop(raw_dummy_service):
|
||||||
|
dummy = raw_dummy_service
|
||||||
|
|
||||||
|
dummy.stop(delay=0.3)
|
||||||
|
assert dummy.get_status() == ServiceStatus.DEACTIVATING
|
||||||
|
wait_until_true(lambda: dummy.get_status() == ServiceStatus.INACTIVE)
|
||||||
|
assert dummy.get_status() == ServiceStatus.INACTIVE
|
||||||
|
|
||||||
|
dummy.start(delay=0.3)
|
||||||
|
assert dummy.get_status() == ServiceStatus.ACTIVATING
|
||||||
|
wait_until_true(lambda: dummy.get_status() == ServiceStatus.ACTIVE)
|
||||||
|
assert dummy.get_status() == ServiceStatus.ACTIVE
|
||||||
|
|
||||||
|
|
||||||
def test_owned_folders_from_not_owned():
|
def test_owned_folders_from_not_owned():
|
||||||
assert Bitwarden.get_owned_folders() == [
|
assert Bitwarden.get_owned_folders() == [
|
||||||
OwnedPath(
|
OwnedPath(
|
||||||
|
|
Loading…
Reference in a new issue