#!/usr/bin/env python3 """Users management module""" # pylint: disable=too-few-public-methods from typing import Optional import strawberry from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql.common_types.user import ( PasswordResetLinkReturn, UserMutationReturn, get_user_by_username, ) from selfprivacy_api.actions.ssh import ( InvalidPublicKey, KeyAlreadyExists, KeyNotFound, create_ssh_key as create_ssh_key_action, remove_ssh_key as remove_ssh_key_action, ) from selfprivacy_api.graphql.mutations.mutation_interface import ( GenericMutationReturn, ) from selfprivacy_api.actions.users import ( ApiUsingWrongUserRepository, create_user as create_user_action, delete_user as delete_user_action, update_user as update_user_action, generate_password_reset_link as generate_password_reset_link_action, ) from selfprivacy_api.repositories.users.exceptions import ( DisplaynameTooLong, NoPasswordResetLinkFoundInResponse, PasswordIsEmpty, UserOrGroupNotFound, UsernameForbidden, InvalidConfiguration, UserAlreadyExists, UserIsProtected, UsernameNotAlphanumeric, UsernameTooLong, UserNotFound, SelfPrivacyAppIsOutdate, ) from selfprivacy_api.repositories.users.exceptions_kanidm import ( FailedToGetValidKanidmToken, KanidmDidNotReturnAdminPassword, KanidmQueryError, KanidmReturnEmptyResponse, KanidmReturnUnknownResponseType, KanidmCliSubprocessError, ) from selfprivacy_api.utils.strings import PLEASE_UPDATE_APP_TEXT FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The problem occurred due to an old version of the SelfPrivacy app." def return_failed_mutation_return( message: str, code: int = 400, username: Optional[str] = None, ) -> UserMutationReturn: return UserMutationReturn( success=False, message=str(message), code=code, user=get_user_by_username(username) if username else None, ) @strawberry.input class UserMutationInput: """Input type for user mutation""" username: str directmemberof: Optional[list[str]] = strawberry.field(default_factory=list) password: Optional[str] = None displayname: Optional[str] = None @strawberry.input class SshMutationInput: """Input type for ssh mutation""" username: str ssh_key: str @strawberry.type class UsersMutations: """Mutations change user settings""" @strawberry.mutation(permission_classes=[IsAuthenticated]) def create_user(self, user: UserMutationInput) -> UserMutationReturn: try: create_user_action( username=user.username, password=user.password, directmemberof=user.directmemberof, displayname=user.displayname, ) except ( PasswordIsEmpty, UsernameNotAlphanumeric, UsernameTooLong, InvalidConfiguration, KanidmDidNotReturnAdminPassword, KanidmQueryError, DisplaynameTooLong, KanidmCliSubprocessError, FailedToGetValidKanidmToken, ) as error: return return_failed_mutation_return( message=error.get_error_message(), ) except UsernameForbidden as error: return return_failed_mutation_return( message=error.get_error_message(), code=409, username=user.username, ) except UserAlreadyExists as error: return return_failed_mutation_return( message=error.get_error_message(), code=409, ) if user.password: return return_failed_mutation_return( message=f"{FAILED_TO_SETUP_PASSWORD_TEXT} {PLEASE_UPDATE_APP_TEXT}", code=201, username=user.username, ) return UserMutationReturn( success=True, message="User created", code=201, user=get_user_by_username(user.username), ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_user(self, username: str) -> GenericMutationReturn: try: delete_user_action(username) except (UserNotFound, UserOrGroupNotFound) as error: return GenericMutationReturn( success=False, message=error.get_error_message(), code=404, ) except UserIsProtected as error: return GenericMutationReturn( success=False, code=400, message=error.get_error_message(), ) except ( KanidmDidNotReturnAdminPassword, KanidmQueryError, KanidmCliSubprocessError, FailedToGetValidKanidmToken, ) as error: return GenericMutationReturn( success=False, code=500, message=error.get_error_message(), ) return GenericMutationReturn( success=True, message="User deleted", code=200, ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def update_user(self, user: UserMutationInput) -> UserMutationReturn: """Update user mutation""" try: update_user_action( username=user.username, password=user.password, directmemberof=user.directmemberof, displayname=user.displayname, ) except ( PasswordIsEmpty, SelfPrivacyAppIsOutdate, KanidmDidNotReturnAdminPassword, KanidmQueryError, DisplaynameTooLong, KanidmCliSubprocessError, FailedToGetValidKanidmToken, ApiUsingWrongUserRepository, ) as error: return return_failed_mutation_return( message=error.get_error_message(), ) except (UserNotFound, UserOrGroupNotFound) as error: return return_failed_mutation_return( message=error.get_error_message(), code=404, ) return UserMutationReturn( success=True, message="User updated", code=200, user=get_user_by_username(user.username), ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def add_ssh_key(self, ssh_input: SshMutationInput) -> UserMutationReturn: """Add a new ssh key""" try: create_ssh_key_action(ssh_input.username, ssh_input.ssh_key) except KeyAlreadyExists as error: return return_failed_mutation_return( message=error.get_error_message(), code=409, ) except InvalidPublicKey as error: return return_failed_mutation_return( message=error.get_error_message(), ) except UserNotFound as error: return return_failed_mutation_return( message=error.get_error_message(), code=404, ) except Exception as error: # TODO why? return return_failed_mutation_return( message=str(error), code=500, ) return UserMutationReturn( success=True, message="New SSH key successfully written", code=201, user=get_user_by_username(ssh_input.username), ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def remove_ssh_key(self, ssh_input: SshMutationInput) -> UserMutationReturn: """Remove ssh key from user""" try: remove_ssh_key_action(ssh_input.username, ssh_input.ssh_key) except (KeyNotFound, UserNotFound) as error: return return_failed_mutation_return( message=error.get_error_message(), code=404, ) except Exception as error: # TODO why? return UserMutationReturn( success=False, message=str(error), code=500, ) return UserMutationReturn( success=True, message="SSH key successfully removed", code=200, user=get_user_by_username(ssh_input.username), ) @strawberry.mutation(permission_classes=[IsAuthenticated]) def generate_password_reset_link(self, username: str) -> PasswordResetLinkReturn: try: password_reset_link = generate_password_reset_link_action(username=username) except (UserNotFound, UserOrGroupNotFound) as error: return PasswordResetLinkReturn( success=False, message=error.get_error_message(), code=404, ) except UserIsProtected as error: return PasswordResetLinkReturn( success=False, message=error.get_error_message(), code=400, ) except ( NoPasswordResetLinkFoundInResponse, KanidmDidNotReturnAdminPassword, KanidmReturnUnknownResponseType, KanidmReturnEmptyResponse, KanidmQueryError, KanidmCliSubprocessError, FailedToGetValidKanidmToken, ApiUsingWrongUserRepository, ) as error: return PasswordResetLinkReturn( success=False, code=500, message=error.get_error_message(), ) return PasswordResetLinkReturn( success=True, message="Link successfully created", code=200, password_reset_link=password_reset_link, )