test(backups): simulating async service start n stop

This commit is contained in:
Houkime 2023-07-10 17:03:10 +00:00
parent 6523105d89
commit 169e9ad57d

View file

@ -1,7 +1,12 @@
"""Class representing Bitwarden service"""
import base64
import typing
import subprocess
from typing import List
from os import path
# from enum import Enum
from selfprivacy_api.jobs import Job
from selfprivacy_api.services.service import Service, ServiceDnsRecord, ServiceStatus
@ -11,13 +16,24 @@ import selfprivacy_api.utils.network as network_utils
from selfprivacy_api.services.test_service.icon import BITWARDEN_ICON
DEFAULT_DELAY = 0
class DummyService(Service):
"""A test service"""
folders: List[str] = []
def __init_subclass__(cls, folders: List[str]):
cls.folders = folders
def __init__(self):
super().__init__()
dir = self.folders[0]
status_file = path.join(dir, "service_status")
with open(status_file, "w") as file:
file.write(ServiceStatus.ACTIVE.value)
@staticmethod
def get_id() -> str:
"""Return service id."""
@ -61,38 +77,61 @@ class DummyService(Service):
def is_enabled() -> bool:
return True
@staticmethod
def get_status() -> ServiceStatus:
"""
Return Bitwarden status from systemd.
Use command return code to determine status.
@classmethod
def status_file(cls) -> str:
dir = cls.folders[0]
return path.join(dir, "service_status")
Return code 0 means service is running.
Return code 1 or 2 means service is in error stat.
Return code 3 means service is stopped.
Return code 4 means service is off.
"""
return ServiceStatus.ACTIVE
@classmethod
def set_status(cls, status: ServiceStatus):
with open(cls.status_file(), "w") as file:
status_string = file.write(status.value)
@staticmethod
def enable():
@classmethod
def get_status(cls) -> ServiceStatus:
with open(cls.status_file(), "r") as file:
status_string = file.read().strip()
return ServiceStatus[status_string]
@classmethod
def change_status_with_async_delay(
cls, new_status: ServiceStatus, delay_sec: float
):
"""simulating a delay on systemd side"""
dir = cls.folders[0]
status_file = path.join(dir, "service_status")
command = [
"bash",
"-c",
f" sleep {delay_sec} && echo {new_status.value} > {status_file}",
]
handle = subprocess.Popen(command)
if delay_sec == 0:
handle.communicate()
@classmethod
def enable(cls):
pass
@staticmethod
def disable():
@classmethod
def disable(cls, delay):
pass
@staticmethod
def stop():
pass
@classmethod
def stop(cls, delay=DEFAULT_DELAY):
cls.set_status(ServiceStatus.DEACTIVATING)
cls.change_status_with_async_delay(ServiceStatus.INACTIVE, delay)
@staticmethod
def start():
pass
@classmethod
def start(cls, delay=DEFAULT_DELAY):
cls.set_status(ServiceStatus.ACTIVATING)
cls.change_status_with_async_delay(ServiceStatus.ACTIVE, delay)
@staticmethod
def restart():
pass
@classmethod
def restart(cls, delay=DEFAULT_DELAY):
cls.set_status(ServiceStatus.RELOADING) # is a correct one?
cls.change_status_with_async_delay(ServiceStatus.ACTIVE, delay)
@staticmethod
def get_configuration():
@ -112,7 +151,7 @@ class DummyService(Service):
return storage_usage
@staticmethod
def get_drive(cls) -> str:
def get_drive() -> str:
return "sda1"
@classmethod