diff --git a/selfprivacy_api/repositories/users/kanidm_user_repository.py b/selfprivacy_api/repositories/users/kanidm_user_repository.py index eae15d1..f66f5fb 100644 --- a/selfprivacy_api/repositories/users/kanidm_user_repository.py +++ b/selfprivacy_api/repositories/users/kanidm_user_repository.py @@ -1,15 +1,96 @@ from typing import Optional +import os +import subprocess import requests +from contextlib import contextmanager from selfprivacy_api.utils import get_domain +from selfprivacy_api.utils.redis_pool import RedisPool from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin from selfprivacy_api.repositories.users.abstract_user_repository import ( AbstractUserRepository, ) KANIDM_URL = "https://127.0.0.1:3013" -TEST_TOKEN = """eyJhbGciOiJFUzI1NiIsImtpZCI6IjVkNDUyNzdmZWUxY2UzZmNkMTViZDhkZjE3NTdlMjRkIn0.eyJhY2NvdW50X2lkIjoiYmZlN2MxNmEtNTY1NC00YzAxLWFkMjMtOWU2YjY4OTAxNDEwIiwidG9rZW5faWQiOiJmZTU5NzAxZS1iYzIyLTQwMzctYTEzNy1jZTIxYzBlNDhlZjciLCJsYWJlbCI6InRva2VuMiIsImV4cGlyeSI6bnVsbCwiaXNzdWVkX2F0IjoxNzMxMjgxMzM1LCJwdXJwb3NlIjoicmVhZHdyaXRlIn0.0fj0NAsUtBJWi1KVNKA4qi8EOHUUvaWNzeHbR82zbUVvWynnqm5ndLhFPG0v462qJXFTayonI9YJnkCaAE7a5w""" + +redis = RedisPool().get_connection() + + +@contextmanager +def temporary_env_var(key, value): + """ + A context manager for temporarily setting an environment variable + with automatic cleanup after exiting the block, even in case of an error. + """ + os.environ[key] = value + try: + yield + finally: + del os.environ[key] + + +class KanidmAdminToken: + @staticmethod + def get() -> str: + kanidm_admin_token = redis.get("kanidm:token") + + if kanidm_admin_token is None: + kanidm_admin_password = redis.get("kanidm:password") # type: ignore + + if kanidm_admin_password is None: + kanidm_admin_password = ( + KanidmAdminToken.reset_and_save_idm_admin_password() + ) + + kanidm_admin_token = KanidmAdminToken.create_and_save_token( + kanidm_admin_password=kanidm_admin_password + ) + + return kanidm_admin_token + + @staticmethod + def create_and_save_token(kanidm_admin_password: str) -> str: + with temporary_env_var(key="KANIDM_PASSWORD", value=kanidm_admin_password): + kanidm_admin_token = subprocess.check_output( + [ + "kanidm", + "service-account", + "api-token", + "generate", + "--rw", + "selfprivacy", + "token2", + ] + ) + + redis.set("kanidm:token", kanidm_admin_token) + return kanidm_admin_token + + @staticmethod + def reset_and_save_idm_admin_password() -> str: + new_kanidm_admin_password = subprocess.check_output( + [ + "kanidmd", + "recover-account", + "-c", + "/etc/kanidm/server.toml", + "idm_admin", + "-o", + "json", + "2>/dev/null", + "|", + "grep", + "'{\"password'", + "|", + "jq", + "-r", + ".password", + ] + ).decode("utf-8") + + redis.set("kanidm:password", new_kanidm_admin_password) + return new_kanidm_admin_password class KanidmQueryError(Exception): @@ -27,7 +108,7 @@ class KanidmUserRepository(AbstractUserRepository): full_endpoint, json=data, headers={ - "Authorization": f"Bearer {TEST_TOKEN}", + "Authorization": f"Bearer {KanidmPassword.get()}", "Content-Type": "application/json", }, timeout=0.8, # TODO: change timeout