test(root_daemon): test an allowed command

This commit is contained in:
Houkime 2024-12-12 12:03:54 +00:00
parent 743fb8a542
commit c40e68000e
3 changed files with 27 additions and 15 deletions

View file

@ -16,7 +16,6 @@ services = [
# Check that it is the same as the names in systemd services. # Check that it is the same as the names in systemd services.
"bitwarden", "bitwarden",
"forgejo", "forgejo",
"gitea",
"jitsimeet", "jitsimeet",
"mailserver", "mailserver",
"nextcloud", "nextcloud",
@ -24,6 +23,7 @@ services = [
"pleroma", "pleroma",
"prometheus", "prometheus",
"roundcube", "roundcube",
"postgresql",
] ]
service_folders: Dict[str, List[str]] = { service_folders: Dict[str, List[str]] = {
@ -64,7 +64,7 @@ def get_available_commands() -> List[str]:
for command in service_commands: for command in service_commands:
for service in services: for service in services:
# Concatenation of hardcoded strings # Concatenation of hardcoded strings
commands.append(command + " " + service) commands.append(command + " " + service + ".service")
chowns = [CHOWN_COMMAND + " " + folder for folder in service_folders] chowns = [CHOWN_COMMAND + " " + folder for folder in service_folders]
commands.extend(chowns) commands.extend(chowns)
@ -87,13 +87,20 @@ def init(socket_path=SOCKET_PATH) -> socket_module.socket:
return sock return sock
def _spawn_shell(command_string: str): def _spawn_shell(command_string: str) -> str:
# We use sh to refrain from parsing and simplify logic # We use sh to refrain from parsing and simplify logic
# Our commands are hardcoded so sh does not present # Our commands are hardcoded so sh does not present
# an extra attack surface here # an extra attack surface here
# TODO: continuous forwarding of command output # TODO: continuous forwarding of command output
subprocess.check_output(["sh", "-c", command_string]) return subprocess.check_output(["sh", "-c", command_string]).decode("utf-8")
# A copy of the one from utils module.
# It is possible to remove this duplication but unfortunately it is not
# trivial and extra complexity does not worth it at the moment.
def get_test_mode() -> Optional[str]:
return os.environ.get("TEST_MODE")
def _process_request(request: str, allowed_commands: str) -> str: def _process_request(request: str, allowed_commands: str) -> str:
@ -102,11 +109,10 @@ def _process_request(request: str, allowed_commands: str) -> str:
# explicitly only calling a _hardcoded_ command # explicitly only calling a _hardcoded_ command
# ever # ever
# test mode made like this does not mae it more dangerous too # test mode made like this does not mae it more dangerous too
raise ValueError("Oh no")
if get_test_mode(): if get_test_mode():
_spawn_shell(f'echo "{command}"') return _spawn_shell(f'echo "{command}"')
else: else:
_spawn_shell(command) return _spawn_shell(command)
else: else:
return "not permitted" return "not permitted"
@ -139,10 +145,3 @@ def main(socket_path=SOCKET_PATH):
if __name__ == "__main__": if __name__ == "__main__":
main() main()
# A copy of the one from utils module.
# It is possible to remove this duplication but unfortunately it is not
# trivial and extra complexity does not worth it at the moment.
def get_test_mode() -> Optional[str]:
return os.environ.get("TEST_MODE")

View file

@ -24,4 +24,4 @@ def _write_to_daemon_socket(cmd: List[str]) -> str:
answer = pipe.readline() answer = pipe.readline()
pipe.close() pipe.close()
sock.close() sock.close()
return answer return answer.strip()

View file

@ -71,3 +71,16 @@ def test_send_command():
assert answer == "not permitted" assert answer == "not permitted"
proc.kill() proc.kill()
def test_send_valid_command():
proc = start_root_demon()
command = ["systemctl", "start", "forgejo.service"]
answer = call_root_function(command)
assert answer == " ".join(command)
# confirm the loop still works
answer = call_root_function(["blabla"])
assert answer == "not permitted"
proc.kill()