2024-12-11 13:36:32 +00:00
|
|
|
from typing import List
|
2024-12-11 17:22:50 +00:00
|
|
|
from time import sleep
|
2024-12-11 15:55:52 +00:00
|
|
|
|
2024-12-11 13:36:32 +00:00
|
|
|
# from subprocess import check_output
|
|
|
|
from selfprivacy_api.root_daemon import SOCKET_PATH, socket_module
|
|
|
|
from tests.test_common import get_test_mode
|
|
|
|
|
|
|
|
|
|
|
|
def call_root_function(cmd: List[str]) -> str:
|
2024-12-11 17:22:50 +00:00
|
|
|
assert isinstance(cmd, List)
|
|
|
|
return _call_root_daemon(cmd)
|
2024-12-11 15:55:52 +00:00
|
|
|
|
|
|
|
|
|
|
|
def _call_root_daemon(cmd: List[str]) -> str:
|
2024-12-11 13:36:32 +00:00
|
|
|
return _write_to_daemon_socket(cmd)
|
|
|
|
|
|
|
|
|
|
|
|
def _write_to_daemon_socket(cmd: List[str]) -> str:
|
|
|
|
sock = socket_module.socket(socket_module.AF_UNIX, socket_module.SOCK_STREAM)
|
|
|
|
sock.connect(SOCKET_PATH)
|
2024-12-11 17:22:50 +00:00
|
|
|
payload = " ".join(cmd).encode("utf-8") + b"\n"
|
|
|
|
sock.send(payload)
|
|
|
|
pipe = sock.makefile("r")
|
2024-12-11 18:55:06 +00:00
|
|
|
answer = pipe.readline()
|
|
|
|
pipe.close()
|
|
|
|
sock.close()
|
|
|
|
return answer
|