From c40e68000eb83139bca710114572decbabfcbd84 Mon Sep 17 00:00:00 2001 From: Houkime <> Date: Thu, 12 Dec 2024 12:03:54 +0000 Subject: [PATCH] test(root_daemon): test an allowed command --- selfprivacy_api/root_daemon.py | 27 ++++++++++++------------- selfprivacy_api/utils/root_interface.py | 2 +- tests/test_root_daemon.py | 13 ++++++++++++ 3 files changed, 27 insertions(+), 15 deletions(-) diff --git a/selfprivacy_api/root_daemon.py b/selfprivacy_api/root_daemon.py index 2b99ee8..829de94 100644 --- a/selfprivacy_api/root_daemon.py +++ b/selfprivacy_api/root_daemon.py @@ -16,7 +16,6 @@ services = [ # Check that it is the same as the names in systemd services. "bitwarden", "forgejo", - "gitea", "jitsimeet", "mailserver", "nextcloud", @@ -24,6 +23,7 @@ services = [ "pleroma", "prometheus", "roundcube", + "postgresql", ] service_folders: Dict[str, List[str]] = { @@ -64,7 +64,7 @@ def get_available_commands() -> List[str]: for command in service_commands: for service in services: # Concatenation of hardcoded strings - commands.append(command + " " + service) + commands.append(command + " " + service + ".service") chowns = [CHOWN_COMMAND + " " + folder for folder in service_folders] commands.extend(chowns) @@ -87,13 +87,20 @@ def init(socket_path=SOCKET_PATH) -> socket_module.socket: return sock -def _spawn_shell(command_string: str): +def _spawn_shell(command_string: str) -> str: # We use sh to refrain from parsing and simplify logic # Our commands are hardcoded so sh does not present # an extra attack surface here # TODO: continuous forwarding of command output - subprocess.check_output(["sh", "-c", command_string]) + return subprocess.check_output(["sh", "-c", command_string]).decode("utf-8") + + +# A copy of the one from utils module. +# It is possible to remove this duplication but unfortunately it is not +# trivial and extra complexity does not worth it at the moment. +def get_test_mode() -> Optional[str]: + return os.environ.get("TEST_MODE") def _process_request(request: str, allowed_commands: str) -> str: @@ -102,11 +109,10 @@ def _process_request(request: str, allowed_commands: str) -> str: # explicitly only calling a _hardcoded_ command # ever # test mode made like this does not mae it more dangerous too - raise ValueError("Oh no") if get_test_mode(): - _spawn_shell(f'echo "{command}"') + return _spawn_shell(f'echo "{command}"') else: - _spawn_shell(command) + return _spawn_shell(command) else: return "not permitted" @@ -139,10 +145,3 @@ def main(socket_path=SOCKET_PATH): if __name__ == "__main__": main() - - -# A copy of the one from utils module. -# It is possible to remove this duplication but unfortunately it is not -# trivial and extra complexity does not worth it at the moment. -def get_test_mode() -> Optional[str]: - return os.environ.get("TEST_MODE") diff --git a/selfprivacy_api/utils/root_interface.py b/selfprivacy_api/utils/root_interface.py index ee91502..8c61b43 100644 --- a/selfprivacy_api/utils/root_interface.py +++ b/selfprivacy_api/utils/root_interface.py @@ -24,4 +24,4 @@ def _write_to_daemon_socket(cmd: List[str]) -> str: answer = pipe.readline() pipe.close() sock.close() - return answer + return answer.strip() diff --git a/tests/test_root_daemon.py b/tests/test_root_daemon.py index 232f943..f171e25 100644 --- a/tests/test_root_daemon.py +++ b/tests/test_root_daemon.py @@ -71,3 +71,16 @@ def test_send_command(): assert answer == "not permitted" proc.kill() + + +def test_send_valid_command(): + proc = start_root_demon() + + command = ["systemctl", "start", "forgejo.service"] + answer = call_root_function(command) + assert answer == " ".join(command) + # confirm the loop still works + answer = call_root_function(["blabla"]) + assert answer == "not permitted" + + proc.kill()