diff --git a/selfprivacy_api/actions/ssh.py b/selfprivacy_api/actions/ssh.py index 47ba2e2..9cb0ef0 100644 --- a/selfprivacy_api/actions/ssh.py +++ b/selfprivacy_api/actions/ssh.py @@ -133,3 +133,32 @@ def remove_ssh_key(username: str, ssh_key: str): raise KeyNotFound() raise UserNotFound() + + +# def get_ssh_keys(username: str) -> list: +# with ReadUserData() as data: +# ensure_ssh_and_users_fields_exist(data) + +# if username == "root": +# if ssh_key in data["ssh"]["rootKeys"]: +# data["ssh"]["rootKeys"].remove(ssh_key) +# return + +# raise KeyNotFound() + +# if username == data["username"]: +# if ssh_key in data["sshKeys"]: +# data["sshKeys"].remove(ssh_key) +# return + +# raise KeyNotFound() + +# for user in data["users"]: +# if user["username"] == username: +# if "sshKeys" not in user: +# user["sshKeys"] = [] +# if ssh_key in user["sshKeys"]: +# user["sshKeys"].remove(ssh_key) +# return + +# raise UserNotFound() diff --git a/selfprivacy_api/actions/users.py b/selfprivacy_api/actions/users.py index 404e239..cc57a45 100644 --- a/selfprivacy_api/actions/users.py +++ b/selfprivacy_api/actions/users.py @@ -7,6 +7,7 @@ from selfprivacy_api.models.user import UserDataUser from selfprivacy_api.utils import is_username_forbidden +from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER from selfprivacy_api.repositories.users.exceptions import ( UsernameForbidden, @@ -19,10 +20,14 @@ def get_users( exclude_primary: bool = False, exclude_root: bool = False, ) -> list[UserDataUser]: - return ACTIVE_USERS_PROVIDER.get_users( + users = ACTIVE_USERS_PROVIDER.get_users( exclude_primary=exclude_primary, exclude_root=exclude_root ) + # for user in users: + # TODO: take ssh keys if ACTIVE_USERS_PROVIDER is KanidmUserRepository + return users + def create_user( username: str, @@ -44,6 +49,9 @@ def create_user( if len(username) >= 32: raise UsernameTooLong("Username must be less than 32 characters") + if ACTIVE_USERS_PROVIDER != JsonUserRepository: # for ssh management + JsonUserRepository.create_user(username=username, password="legacy") + return ACTIVE_USERS_PROVIDER.create_user( username=username, password=password, @@ -55,6 +63,9 @@ def create_user( def delete_user(username: str) -> None: + if ACTIVE_USERS_PROVIDER != JsonUserRepository: # for ssh management + JsonUserRepository.delete_user(username=username) + return ACTIVE_USERS_PROVIDER.delete_user(username=username) @@ -78,4 +89,7 @@ def update_user( def get_user_by_username(username: str) -> Optional[UserDataUser]: - return ACTIVE_USERS_PROVIDER.get_user_by_username(username=username) + user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username) + + # TODO: take ssh keys if ACTIVE_USERS_PROVIDER is KanidmUserRepository + return user diff --git a/selfprivacy_api/repositories/users/abstract_user_repository.py b/selfprivacy_api/repositories/users/abstract_user_repository.py index 7c458b2..e6885e4 100644 --- a/selfprivacy_api/repositories/users/abstract_user_repository.py +++ b/selfprivacy_api/repositories/users/abstract_user_repository.py @@ -5,18 +5,31 @@ from selfprivacy_api.models.user import UserDataUser class AbstractUserRepository(ABC): + @staticmethod + @abstractmethod + def create_user( + username: str, + password: Optional[str] = None, + displayname: Optional[str] = None, + email: Optional[str] = None, + directmemberof: Optional[list[str]] = None, + memberof: Optional[list[str]] = None, + ) -> None: + """ + Creates a new user. In KanidmUserRepository "password" is a legacy field, + please use generate_password_reset_link() instead. + """ + @staticmethod @abstractmethod def get_users( exclude_primary: bool = False, exclude_root: bool = False, ) -> list[UserDataUser]: - """Retrieves a list of users with options to exclude specific user groups""" - - @staticmethod - @abstractmethod - def create_user(username: str, password: str) -> None: - """Creates a new user""" + """ + Gets a list of users with options to exclude specific user groups. + In KanidmUserRepository, the root user will never return. + """ @staticmethod @abstractmethod @@ -25,10 +38,30 @@ class AbstractUserRepository(ABC): @staticmethod @abstractmethod - def update_user(username: str, password: str) -> None: - """Updates the password of an existing user""" + def update_user( + username: str, + password: Optional[str] = None, + displayname: Optional[str] = None, + email: Optional[str] = None, + directmemberof: Optional[list[str]] = None, + memberof: Optional[list[str]] = None, + ) -> None: + """ + Update user information. + In the JsonUserRepository, only update the password of an existing user. + Do not update the password in KanidmUserRepository, + use generate_password_reset_link() instead. + """ @staticmethod @abstractmethod def get_user_by_username(username: str) -> Optional[UserDataUser]: """Retrieves user data (UserDataUser) by username""" + + @staticmethod + @abstractmethod + def generate_password_reset_link(username: str) -> str: + """ + Do not reset the password, just generate a link to reset the password. + Not implemented in JsonUserRepository. + """ diff --git a/selfprivacy_api/repositories/users/json_user_repository.py b/selfprivacy_api/repositories/users/json_user_repository.py index 1c999de..e20e2a1 100644 --- a/selfprivacy_api/repositories/users/json_user_repository.py +++ b/selfprivacy_api/repositories/users/json_user_repository.py @@ -30,10 +30,10 @@ class JsonUserRepository(AbstractUserRepository): @staticmethod def get_users( - exclude_primary: bool = False, - exclude_root: bool = False, + exclude_primary: bool = False, # TODO + exclude_root: bool = False, # TODO ) -> list[UserDataUser]: - """Get the list of users""" + """Retrieves a list of users with options to exclude specific user groups""" users = [] with ReadUserData() as user_data: ensure_ssh_and_users_fields_exist(user_data) @@ -65,6 +65,8 @@ class JsonUserRepository(AbstractUserRepository): @staticmethod def create_user(username: str, password: str) -> None: + """Creates a new user""" + hashed_password = JsonUserRepository._check_and_hash_password(password) with ReadUserData() as user_data: @@ -87,6 +89,8 @@ class JsonUserRepository(AbstractUserRepository): @staticmethod def delete_user(username: str) -> None: + """Deletes an existing user""" + with WriteUserData() as user_data: ensure_ssh_and_users_fields_exist(user_data) if username == user_data["username"] or username == "root": @@ -101,6 +105,8 @@ class JsonUserRepository(AbstractUserRepository): @staticmethod def update_user(username: str, password: str) -> None: + """Updates the password of an existing user""" + hashed_password = JsonUserRepository._check_and_hash_password(password) with WriteUserData() as data: @@ -120,6 +126,8 @@ class JsonUserRepository(AbstractUserRepository): @staticmethod def get_user_by_username(username: str) -> Optional[UserDataUser]: + """Retrieves user data (UserDataUser) by username""" + with ReadUserData() as data: ensure_ssh_and_users_fields_exist(data) diff --git a/selfprivacy_api/repositories/users/kanidm_user_repository.py b/selfprivacy_api/repositories/users/kanidm_user_repository.py index ada22d2..0918a63 100644 --- a/selfprivacy_api/repositories/users/kanidm_user_repository.py +++ b/selfprivacy_api/repositories/users/kanidm_user_repository.py @@ -18,76 +18,50 @@ redis = RedisPool().get_connection() logger = logging.getLogger(__name__) +ADMIN_KANIDM_GROUPS = ["sp.admin"] -class KanidmAdminToken: + +class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT? @staticmethod def get() -> str: kanidm_admin_token = redis.get("kanidm:token") if kanidm_admin_token is None: - kanidm_admin_password = redis.get("kanidm:password") # type: ignore + kanidm_admin_password = ( + KanidmAdminToken._reset_and_save_idm_admin_password() + ) - if kanidm_admin_password is None: - kanidm_admin_password = ( - KanidmAdminToken.reset_and_save_idm_admin_password() - ) - - kanidm_admin_token = KanidmAdminToken.create_and_save_token( + kanidm_admin_token = KanidmAdminToken._create_and_save_token( kanidm_admin_password=kanidm_admin_password ) return kanidm_admin_token @staticmethod - def create_and_save_token(kanidm_admin_password: str) -> str: - logging.error("create_and_save_token START") - + def _create_and_save_token(kanidm_admin_password: str) -> str: with temporary_env_var(key="KANIDM_PASSWORD", value=kanidm_admin_password): subprocess.run(["kanidm", "login", "-D", "idm_admin"]) - # kanidm_admin_token = subprocess.check_output( - # [ - # "kanidm", - # "service-account", - # "api-token", - # "generate", - # "--rw", - # "selfprivacy", - # "token2", - # ], - # text=True, - # ) - try: - kanidm_admin_token = subprocess.check_output( - [ - "kanidm", - "service-account", - "api-token", - "generate", - "--rw", - "selfprivacy", - "token2", - ], - text=True, - stderr=subprocess.PIPE, - ) - except subprocess.CalledProcessError as e: - logging.error(f"Error executing command: {e.cmd}") - logging.error(f"Error message: {e.stderr}") - raise + output = subprocess.check_output( + [ + "kanidm", + "service-account", + "api-token", + "generate", + "--rw", + "selfprivacy", + "token2", + ], + text=True, + ) - # except subprocess.CalledProcessError as e: - # logger.error(e) - - kanidm_admin_token = kanidm_admin_token.splitlines()[-1] + kanidm_admin_token = output.splitlines()[-1] redis.set("kanidm:token", kanidm_admin_token) return kanidm_admin_token @staticmethod - def reset_and_save_idm_admin_password() -> str: - logging.error("reset_and_save_idm_admin_password START") - + def _reset_and_save_idm_admin_password() -> str: output = subprocess.check_output( [ "kanidmd", @@ -106,7 +80,6 @@ class KanidmAdminToken: 1 ) # we have many not json strings in output - redis.set("kanidm:password", new_kanidm_admin_password) return new_kanidm_admin_password @@ -116,40 +89,61 @@ class KanidmQueryError(Exception): class KanidmUserRepository(AbstractUserRepository): @staticmethod - def _send_query(endpoint: str, method: str = "GET", data=None): + def _check_user_origin_by_memberof( + memberof: Optional[list[str]] = None, + ) -> UserDataUserOrigin: + if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS): + return UserDataUserOrigin.PRIMARY + else: + return UserDataUserOrigin.NORMAL + + @staticmethod + def _send_query(endpoint: str, method: str = "GET", data=None) -> dict: request_method = getattr(requests, method.lower(), None) full_endpoint = f"{KANIDM_URL}/v1/{endpoint}" - # try: - response = request_method( - full_endpoint, - json=data, - headers={ - "Authorization": f"Bearer {KanidmAdminToken.get()}", - "Content-Type": "application/json", - }, - timeout=0.8, # TODO: change timeout - verify=False, # TODO: REMOVE THIS NOTHALAL!!!!! - ) - - if response.status_code != 200: - raise KanidmQueryError( - f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}." + try: + response = request_method( + full_endpoint, + json=data, + headers={ + "Authorization": f"Bearer {KanidmAdminToken.get()}", + "Content-Type": "application/json", + }, + timeout=0.8, # TODO: change timeout + verify=False, # TODO: REMOVE THIS NOTHALAL!!!!! ) - return response.json() - # except Exception as error: - # raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}") + # TODO make more cases, what if user do not exits? + if response.status_code != 200: + raise KanidmQueryError( + f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}." + ) + return response.json() + + except Exception as error: + raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}") @staticmethod def create_user( username: str, - password: Optional[str] = None, # TODO legacy? + password: Optional[str] = None, displayname: Optional[str] = None, email: Optional[str] = None, directmemberof: Optional[list[str]] = None, memberof: Optional[list[str]] = None, ) -> None: + """ + Creates a new user."password" is a legacy field, + please use generate_password_reset_link() instead. + + If displayname is None, it will default to the username. + If email is None, it will default to username@get_domain(). + """ + + if password: + pass # TODO make notif + data = { "attrs": { "name": [username], @@ -170,42 +164,62 @@ class KanidmUserRepository(AbstractUserRepository): data=data, ) + @staticmethod def get_users( exclude_primary: bool = False, - exclude_root: bool = False, + exclude_root: bool = False, # never return root ) -> list[UserDataUser]: + """ + Gets a list of users with options to exclude specific user groups. + The root user will never return. + """ users_data = KanidmUserRepository._send_query(endpoint="person", method="GET") users = [] for user in users_data: attrs = user.get("attrs", {}) + + origin = KanidmUserRepository._check_user_origin( + memberof=attrs.get("memberof", []) + ) + if exclude_primary and origin == UserDataUserOrigin.PRIMARY: + continue + user_type = UserDataUser( uuid=attrs.get("uuid", [None])[0], username=attrs.get("name", [None])[0], - ssh_keys=["test"], # TODO: подключить реальные SSH-ключи displayname=attrs.get("displayname", [None])[0], email=attrs.get("mail", [None])[0], - origin=UserDataUserOrigin.NORMAL, # TODO + origin=origin, directmemberof=attrs.get("directmemberof", []), memberof=attrs.get("memberof", []), ) + users.append(user_type) return users + @staticmethod def delete_user(username: str) -> None: """Deletes an existing user""" return KanidmUserRepository._send_query( endpoint=f"person/{username}", method="DELETE" ) + @staticmethod def update_user( username: str, - password: Optional[str] = None, # TODO legacy? + password: Optional[str] = None, displayname: Optional[str] = None, email: Optional[str] = None, directmemberof: Optional[list[str]] = None, memberof: Optional[list[str]] = None, ) -> None: - """Updates the password of an existing user""" + """ + Update user information. + Do not update the password, please + use generate_password_reset_link() instead. + """ + if password: + pass # TODO make notif data = { "attrs": { @@ -226,6 +240,7 @@ class KanidmUserRepository(AbstractUserRepository): data=data, ) + @staticmethod def get_user_by_username(username: str) -> Optional[UserDataUser]: """Retrieves user data (UserDataUser) by username""" user_data = KanidmUserRepository._send_query( @@ -244,7 +259,24 @@ class KanidmUserRepository(AbstractUserRepository): displayname=attrs.get("displayname", [None])[0], email=attrs.get("mail", [None])[0], ssh_keys=attrs.get("ssh_keys", []), - origin=UserDataUserOrigin.NORMAL, # TODO + origin=KanidmUserRepository._check_user_origin_by_memberof( + memberof=attrs.get("memberof", []) + ), directmemberof=attrs.get("directmemberof", []), memberof=attrs.get("memberof", []), ) + + @staticmethod + def generate_password_reset_link(username: str) -> str: + """ + Do not reset the password, just generate a link to reset the password. + Not implemented in JsonUserRepository. + """ + token_information = KanidmUserRepository._send_query( + endpoint=f"person/{username}/_credential/_update_intent", + method="GET", + ) + + # {"token":"3btDa-sR5yX-q2XqZ-68gRq","expiry_time":1732713745} + # TODO: create link + return token_information