refactor(backups): make a StoppedService context manager

This commit is contained in:
Houkime 2023-07-12 15:02:45 +00:00 committed by Inex Code
parent ea4e53f826
commit 86c2ae2c1f
2 changed files with 44 additions and 1 deletions

View file

@ -10,6 +10,7 @@ from selfprivacy_api.utils.block_devices import BlockDevice
from selfprivacy_api.services.generic_size_counter import get_storage_usage from selfprivacy_api.services.generic_size_counter import get_storage_usage
from selfprivacy_api.services.owned_path import OwnedPath from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.utils.waitloop import wait_until_true
class ServiceStatus(Enum): class ServiceStatus(Enum):
@ -245,3 +246,32 @@ class Service(ABC):
def post_restore(self): def post_restore(self):
pass pass
class StoppedService:
"""
A context manager that stops the service if needed and reactivates it
after you are done if it was active
Example:
```
assert service.get_status() == ServiceStatus.ACTIVE
with StoppedService(service) [as stopped_service]:
assert service.get_status() == ServiceStatus.INACTIVE
```
"""
def __init__(self, service: Service):
self.service = service
self.original_status = service.get_status()
def __enter__(self) -> Service:
self.original_status = self.service.get_status()
if self.original_status != ServiceStatus.INACTIVE:
self.service.stop()
wait_until_true(lambda: self.service.get_status() == ServiceStatus.INACTIVE)
return self.service
def __exit__(self, type, value, traceback):
if self.original_status in [ServiceStatus.ACTIVATING, ServiceStatus.ACTIVE]:
self.service.start()
wait_until_true(lambda: self.service.get_status() == ServiceStatus.ACTIVE)

View file

@ -9,7 +9,7 @@ from selfprivacy_api.services.owned_path import OwnedPath
from selfprivacy_api.services.generic_service_mover import FolderMoveNames from selfprivacy_api.services.generic_service_mover import FolderMoveNames
from selfprivacy_api.services.test_service import DummyService from selfprivacy_api.services.test_service import DummyService
from selfprivacy_api.services.service import Service, ServiceStatus from selfprivacy_api.services.service import Service, ServiceStatus, StoppedService
from selfprivacy_api.utils.waitloop import wait_until_true from selfprivacy_api.utils.waitloop import wait_until_true
from tests.test_graphql.test_backup import raw_dummy_service from tests.test_graphql.test_backup import raw_dummy_service
@ -28,6 +28,19 @@ def test_unimplemented_folders_raises():
assert owned_folders is not None assert owned_folders is not None
def test_service_stopper(raw_dummy_service):
dummy: Service = raw_dummy_service
dummy.set_delay(0.3)
assert dummy.get_status() == ServiceStatus.ACTIVE
with StoppedService(dummy) as stopped_dummy:
assert stopped_dummy.get_status() == ServiceStatus.INACTIVE
assert dummy.get_status() == ServiceStatus.INACTIVE
assert dummy.get_status() == ServiceStatus.ACTIVE
def test_delayed_start_stop(raw_dummy_service): def test_delayed_start_stop(raw_dummy_service):
dummy = raw_dummy_service dummy = raw_dummy_service
dummy.set_delay(0.3) dummy.set_delay(0.3)