diff --git a/selfprivacy_api/actions/users.py b/selfprivacy_api/actions/users.py index 3eb9ca8..1d337b7 100644 --- a/selfprivacy_api/actions/users.py +++ b/selfprivacy_api/actions/users.py @@ -41,7 +41,7 @@ def get_users( exclude_primary=exclude_primary, exclude_root=exclude_root ) - if ACTIVE_USERS_PROVIDER != JsonUserRepository: + if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository): for user in users: try: user.ssh_keys = get_ssh_keys(username=user.username) @@ -81,7 +81,7 @@ def create_user( raise UsernameTooLong("Username must be less than 32 characters") # need to maintain the logic of the old repository, since ssh management uses it. - if ACTIVE_USERS_PROVIDER != JsonUserRepository: + if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository): try: JsonUserRepository.create_user( username=username, password=str(uuid.uuid4()) @@ -102,7 +102,7 @@ def create_user( def delete_user(username: str) -> None: # need to maintain the logic of the old repository, since ssh management uses it. - if ACTIVE_USERS_PROVIDER != JsonUserRepository: + if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository): try: JsonUserRepository.delete_user(username=username) except UserNotFound: @@ -131,9 +131,11 @@ def update_user( def get_user_by_username(username: str) -> Optional[UserDataUser]: - user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username) + user: UserDataUser | None = ACTIVE_USERS_PROVIDER.get_user_by_username( + username=username + ) - if ACTIVE_USERS_PROVIDER != JsonUserRepository: + if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository): if username == "root": return UserDataUser( username="root", @@ -142,7 +144,8 @@ def get_user_by_username(username: str) -> Optional[UserDataUser]: ) try: - user.ssh_keys = get_ssh_keys(username=user) + if user: + user.ssh_keys = get_ssh_keys(username=user.username) except UserNotFound: pass @@ -150,7 +153,7 @@ def get_user_by_username(username: str) -> Optional[UserDataUser]: def generate_password_reset_link(username: str) -> str: - if ACTIVE_USERS_PROVIDER == JsonUserRepository: + if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository): raise ApiUsingWrongUserRepository return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username) diff --git a/selfprivacy_api/graphql/common_types/user.py b/selfprivacy_api/graphql/common_types/user.py index c3f45a2..ebc4329 100644 --- a/selfprivacy_api/graphql/common_types/user.py +++ b/selfprivacy_api/graphql/common_types/user.py @@ -28,8 +28,8 @@ class User: user_type: Optional[UserType] = None displayname: Optional[str] = None email: Optional[str] = None - directmemberof: Optional[list[str]] = None - memberof: Optional[list[str]] = None + directmemberof: Optional[list[str]] = [] + memberof: Optional[list[str]] = [] @strawberry.type @@ -37,6 +37,12 @@ class UserMutationReturn(MutationReturnInterface): """Return type for user mutation""" user: Optional[User] = None + + +@strawberry.type +class PasswordResetLinkReturn(MutationReturnInterface): + """Return password reset link""" + password_reset_link: Optional[str] = None diff --git a/selfprivacy_api/graphql/mutations/users_mutations.py b/selfprivacy_api/graphql/mutations/users_mutations.py index bc8a2df..e35a2e9 100644 --- a/selfprivacy_api/graphql/mutations/users_mutations.py +++ b/selfprivacy_api/graphql/mutations/users_mutations.py @@ -6,6 +6,7 @@ import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.common_types.user import ( + PasswordResetLinkReturn, UserMutationReturn, get_user_by_username, ) @@ -26,6 +27,7 @@ from selfprivacy_api.actions.users import ( generate_password_reset_link as generate_password_reset_link_action, ) from selfprivacy_api.repositories.users.exceptions import ( + NoPasswordResetLinkFoundInResponse, PasswordIsEmpty, UsernameForbidden, InvalidConfiguration, @@ -37,6 +39,7 @@ from selfprivacy_api.repositories.users.exceptions import ( SelfPrivacyAppIsOutdate, ) from selfprivacy_api import PLEASE_UPDATE_APP_TEXT +from selfprivacy_api.repositories.users.kanidm_user_repository import KanidmDidNotReturnAdminPassword FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The problem occurred due to an old version of the SelfPrivacy app." @@ -45,7 +48,7 @@ FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The proble def return_failed_mutation_return( message: str, code: int = 400, - username: str = None, + username: Optional[str] = None, ) -> UserMutationReturn: return UserMutationReturn( success=False, @@ -63,8 +66,8 @@ class UserMutationInput: password: Optional[str] = None displayname: Optional[str] = None email: Optional[str] = None - directmemberof: Optional[list[str]] = None - memberof: Optional[list[str]] = None + directmemberof: Optional[list[str]] = [] + memberof: Optional[list[str]] = [] @strawberry.input @@ -95,9 +98,10 @@ class UsersMutations: UsernameNotAlphanumeric, UsernameTooLong, InvalidConfiguration, + KanidmDidNotReturnAdminPassword, ) as error: return return_failed_mutation_return( - message=error.get_description(), + message=error.get_error_message(), ) except UsernameForbidden as error: return return_failed_mutation_return( @@ -130,12 +134,23 @@ class UsersMutations: try: delete_user_action(username) except UserNotFound as error: - return return_failed_mutation_return( + return GenericMutationReturn( + success=False, message=error.get_error_message(), code=404, ) except UserIsProtected as error: - return return_failed_mutation_return(error=error) + return GenericMutationReturn( + success=False, + code=400, + message=error.get_error_message(), + ) + except KanidmDidNotReturnAdminPassword as error: + return GenericMutationReturn( + success=False, + code=500, + message=error.get_error_message(), + ) return GenericMutationReturn( success=True, @@ -155,7 +170,7 @@ class UsersMutations: directmemberof=user.directmemberof, memberof=user.memberof, ) - except (PasswordIsEmpty, SelfPrivacyAppIsOutdate) as error: + except (PasswordIsEmpty, SelfPrivacyAppIsOutdate, KanidmDidNotReturnAdminPassword) as error: return return_failed_mutation_return( message=error.get_error_message(), ) @@ -211,7 +226,7 @@ class UsersMutations: try: remove_ssh_key_action(ssh_input.username, ssh_input.ssh_key) - except (KeyNotFound, UserMutationReturn) as error: + except (KeyNotFound, UserNotFound) as error: return return_failed_mutation_return( message=error.get_error_message(), code=404, @@ -231,16 +246,27 @@ class UsersMutations: ) @strawberry.mutation(permission_classes=[IsAuthenticated]) - def generate_password_reset_link(username: str) -> UserMutationReturn: + def generate_password_reset_link( + self, user: UserMutationInput + ) -> PasswordResetLinkReturn: try: - password_reset_link = generate_password_reset_link_action(username=username) + password_reset_link = generate_password_reset_link_action( + username=user.username + ) except UserNotFound as error: - return return_failed_mutation_return( + return PasswordResetLinkReturn( + success=False, message=error.get_error_message(), code=404, ) + except (NoPasswordResetLinkFoundInResponse, KanidmDidNotReturnAdminPassword) as error: + return PasswordResetLinkReturn( + success=False, + code=500, + message=error.get_error_message(), + ) - return UserMutationReturn( + return PasswordResetLinkReturn( success=True, message="Link successfully created", code=200, diff --git a/selfprivacy_api/models/user.py b/selfprivacy_api/models/user.py index 2ae6769..78d1857 100644 --- a/selfprivacy_api/models/user.py +++ b/selfprivacy_api/models/user.py @@ -15,13 +15,11 @@ class UserDataUser(BaseModel): """The user model from the userdata file""" username: str - - ssh_keys: Optional[list[str]] = [] - user_type: Optional[UserDataUserOrigin] = None + user_type: UserDataUserOrigin + ssh_keys: list[str] = [] + directmemberof: Optional[list[str]] = [] + memberof: Optional[list[str]] = [] displayname: Optional[str] = ( None # in logic graphql will return "username" if "displayname" None ) - email: Optional[str] = None - directmemberof: Optional[list[str]] = None - memberof: Optional[list[str]] = None diff --git a/selfprivacy_api/repositories/users/exceptions.py b/selfprivacy_api/repositories/users/exceptions.py index 81cb877..7f39654 100644 --- a/selfprivacy_api/repositories/users/exceptions.py +++ b/selfprivacy_api/repositories/users/exceptions.py @@ -72,3 +72,11 @@ class SelfPrivacyAppIsOutdate(Exception): @staticmethod def get_error_message() -> str: return "SelfPrivacy app is out of date, please update" + + +class NoPasswordResetLinkFoundInResponse(Exception): + """No password reset link was found in the Kanidm response.""" + + @staticmethod + def get_error_message() -> str: + return "The Kanidm response does not contain a password reset link." diff --git a/selfprivacy_api/repositories/users/json_user_repository.py b/selfprivacy_api/repositories/users/json_user_repository.py index e20e2a1..9db9143 100644 --- a/selfprivacy_api/repositories/users/json_user_repository.py +++ b/selfprivacy_api/repositories/users/json_user_repository.py @@ -30,8 +30,8 @@ class JsonUserRepository(AbstractUserRepository): @staticmethod def get_users( - exclude_primary: bool = False, # TODO - exclude_root: bool = False, # TODO + exclude_primary: bool = False, + exclude_root: bool = False, ) -> list[UserDataUser]: """Retrieves a list of users with options to exclude specific user groups""" users = [] @@ -41,7 +41,7 @@ class JsonUserRepository(AbstractUserRepository): UserDataUser( username=user["username"], ssh_keys=user.get("sshKeys", []), - origin=UserDataUserOrigin.NORMAL, + user_type=UserDataUserOrigin.NORMAL, ) for user in user_data["users"] ] @@ -50,7 +50,7 @@ class JsonUserRepository(AbstractUserRepository): UserDataUser( username=user_data["username"], ssh_keys=user_data["sshKeys"], - origin=UserDataUserOrigin.PRIMARY, + user_type=UserDataUserOrigin.PRIMARY, ) ) if not exclude_root: @@ -58,7 +58,7 @@ class JsonUserRepository(AbstractUserRepository): UserDataUser( username="root", ssh_keys=user_data["ssh"]["rootKeys"], - origin=UserDataUserOrigin.ROOT, + user_type=UserDataUserOrigin.ROOT, ) ) return users @@ -133,14 +133,14 @@ class JsonUserRepository(AbstractUserRepository): if username == "root": return UserDataUser( - origin=UserDataUserOrigin.ROOT, + user_type=UserDataUserOrigin.ROOT, username="root", ssh_keys=data["ssh"]["rootKeys"], ) if username == data["username"]: return UserDataUser( - origin=UserDataUserOrigin.PRIMARY, + user_type=UserDataUserOrigin.PRIMARY, username=username, ssh_keys=data["sshKeys"], ) @@ -151,7 +151,7 @@ class JsonUserRepository(AbstractUserRepository): user["sshKeys"] = [] return UserDataUser( - origin=UserDataUserOrigin.NORMAL, + user_type=UserDataUserOrigin.NORMAL, username=username, ssh_keys=user["sshKeys"], ) diff --git a/selfprivacy_api/repositories/users/kanidm_user_repository.py b/selfprivacy_api/repositories/users/kanidm_user_repository.py index 14ea401..c4e93bc 100644 --- a/selfprivacy_api/repositories/users/kanidm_user_repository.py +++ b/selfprivacy_api/repositories/users/kanidm_user_repository.py @@ -1,12 +1,13 @@ from typing import Optional - import subprocess import requests import re import logging -import json -from selfprivacy_api.repositories.users.exceptions import SelfPrivacyAppIsOutdate +from selfprivacy_api.repositories.users.exceptions import ( + NoPasswordResetLinkFoundInResponse, + SelfPrivacyAppIsOutdate, +) from selfprivacy_api.utils import get_domain, temporary_env_var from selfprivacy_api.utils.redis_pool import RedisPool from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin @@ -25,10 +26,34 @@ logger = logging.getLogger(__name__) ADMIN_KANIDM_GROUPS = ["sp.admin"] +class KanidmQueryError(Exception): + """Error occurred during kanidm query""" + + @staticmethod + def get_error_message() -> str: + return "An error occurred during the Kanidm query." + + +class KanidmReturnEmptyResponse(Exception): + """Kanidm returned a blank response""" + + @staticmethod + def get_error_message() -> str: + return "Kanidm returned an empty response." + + +class KanidmDidNotReturnAdminPassword(Exception): + """Kanidm didn't return the admin password""" + + @staticmethod + def get_error_message() -> str: + return "Kanidm didn't return the admin password." + + class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT? @staticmethod def get() -> str: - kanidm_admin_token = redis.get("kanidm:token") + kanidm_admin_token = str(redis.get("kanidm:token")) if kanidm_admin_token is None: kanidm_admin_password = ( @@ -80,9 +105,12 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT? ) match = re.search(r'"password":"([^"]+)"', output) - new_kanidm_admin_password = match.group( - 1 - ) # we have many not json strings in output + if match: + new_kanidm_admin_password = match.group( + 1 + ) # we have many not json strings in output + else: + raise KanidmDidNotReturnAdminPassword return new_kanidm_admin_password @@ -91,14 +119,10 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT? redis.delete("kanidm:token") -class KanidmQueryError(Exception): - """Error occurred during kanidm query""" - - class KanidmUserRepository(AbstractUserRepository): @staticmethod def _check_user_origin_by_memberof( - memberof: Optional[list[str]] = None, + memberof: list[str] = [], ) -> UserDataUserOrigin: if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS): return UserDataUserOrigin.PRIMARY @@ -166,7 +190,7 @@ class KanidmUserRepository(AbstractUserRepository): if memberof: data["attrs"]["memberof"] = memberof - return KanidmUserRepository._send_query( + KanidmUserRepository._send_query( endpoint="person", method="POST", data=data, @@ -183,6 +207,9 @@ class KanidmUserRepository(AbstractUserRepository): """ users_data = KanidmUserRepository._send_query(endpoint="person", method="GET") + if not users_data or "attrs" not in users_data: + raise KanidmReturnEmptyResponse + users = [] for user in users_data: user_attrs = user.get("attrs", {}) @@ -194,9 +221,9 @@ class KanidmUserRepository(AbstractUserRepository): continue filled_user = UserDataUser( - username=user_attrs.get("name", [None])[0], - displayname=user_attrs.get("displayname", [None])[0], - email=user_attrs.get("mail", [None])[0], + username=user_attrs.get("name", [None]), + displayname=user_attrs.get("displayname", [None]), + email=user_attrs.get("mail", [None]), ssh_keys=[], # actions layer will full in this field user_type=user_type, directmemberof=user_attrs.get("directmemberof", []), @@ -209,9 +236,7 @@ class KanidmUserRepository(AbstractUserRepository): @staticmethod def delete_user(username: str) -> None: """Deletes an existing user""" - return KanidmUserRepository._send_query( - endpoint=f"person/{username}", method="DELETE" - ) + KanidmUserRepository._send_query(endpoint=f"person/{username}", method="DELETE") @staticmethod def update_user( @@ -243,7 +268,7 @@ class KanidmUserRepository(AbstractUserRepository): if memberof: data["attrs"]["memberof"] = memberof - return KanidmUserRepository._send_query( + KanidmUserRepository._send_query( endpoint=f"person/{username}", method="PATCH", data=data, @@ -258,14 +283,14 @@ class KanidmUserRepository(AbstractUserRepository): ) if not user_data or "attrs" not in user_data: - return None + raise KanidmReturnEmptyResponse attrs = user_data["attrs"] return UserDataUser( - username=attrs.get("name", [None])[0], - displayname=attrs.get("displayname", [None])[0], - email=attrs.get("mail", [None])[0], + username=attrs.get("name", [None]), + displayname=attrs.get("displayname", [None]), + email=attrs.get("mail", [None]), ssh_keys=[], # actions layer will full in this field user_type=KanidmUserRepository._check_user_origin_by_memberof( memberof=attrs.get("memberof", []) @@ -280,10 +305,17 @@ class KanidmUserRepository(AbstractUserRepository): Do not reset the password, just generate a link to reset the password. Not implemented in JsonUserRepository. """ - token_information = KanidmUserRepository._send_query( + data = KanidmUserRepository._send_query( endpoint=f"person/{username}/_credential/_update_intent", method="GET", ) - token_information = json.loads(token_information) - return f"https://id{get_domain()}/ui/reset?token={token_information['token']}" + if not data or "attrs" not in data: + raise KanidmReturnEmptyResponse + + token = data["attrs"].get["token", [None][0]] + + if token: + return f"https://id{get_domain()}/ui/reset?token={token}" + + raise NoPasswordResetLinkFoundInResponse