selfprivacy-rest-api/selfprivacy_api/graphql/mutations/users_mutations.py

307 lines
9.6 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
"""Users management module"""
# pylint: disable=too-few-public-methods
2024-11-15 00:19:24 +00:00
from typing import Optional
import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.user import (
2024-12-09 08:37:57 +00:00
PasswordResetLinkReturn,
UserMutationReturn,
get_user_by_username,
)
from selfprivacy_api.actions.ssh import (
InvalidPublicKey,
KeyAlreadyExists,
KeyNotFound,
2024-11-15 00:19:24 +00:00
create_ssh_key as create_ssh_key_action,
remove_ssh_key as remove_ssh_key_action,
)
from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
)
2024-11-15 00:19:24 +00:00
from selfprivacy_api.actions.users import (
ApiUsingWrongUserRepository,
2024-11-15 00:19:24 +00:00
create_user as create_user_action,
delete_user as delete_user_action,
update_user as update_user_action,
2024-12-03 22:54:24 +00:00
generate_password_reset_link as generate_password_reset_link_action,
2024-11-15 00:19:24 +00:00
)
from selfprivacy_api.repositories.users.exceptions import (
DisplaynameTooLong,
2024-12-09 08:37:57 +00:00
NoPasswordResetLinkFoundInResponse,
PasswordIsEmpty,
UsernameForbidden,
InvalidConfiguration,
UserAlreadyExists,
UserIsProtected,
UsernameNotAlphanumeric,
UsernameTooLong,
UserNotFound,
2024-12-03 22:54:24 +00:00
SelfPrivacyAppIsOutdate,
)
2024-12-11 01:48:30 +00:00
from selfprivacy_api.repositories.users.exceptions_kanidm import (
2024-12-12 12:00:07 +00:00
FailedToGetValidKanidmToken,
2024-12-10 02:15:15 +00:00
KanidmDidNotReturnAdminPassword,
2024-12-11 02:00:14 +00:00
KanidmQueryError,
2024-12-11 01:48:30 +00:00
KanidmReturnEmptyResponse,
KanidmReturnUnknownResponseType,
KanidmCliSubprocessError,
2024-12-10 02:15:15 +00:00
)
2024-12-18 20:35:33 +00:00
from selfprivacy_api.utils.strings import PLEASE_UPDATE_APP_TEXT
FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The problem occurred due to an old version of the SelfPrivacy app."
def return_failed_mutation_return(
2024-12-05 23:20:31 +00:00
message: str,
code: int = 400,
2024-12-09 08:37:57 +00:00
username: Optional[str] = None,
) -> UserMutationReturn:
return UserMutationReturn(
success=False,
2024-12-05 23:20:31 +00:00
message=str(message),
code=code,
user=get_user_by_username(username) if username else None,
)
@strawberry.input
class UserMutationInput:
"""Input type for user mutation"""
username: str
2024-12-09 08:48:05 +00:00
directmemberof: Optional[list[str]] = strawberry.field(default_factory=list)
2024-11-15 00:19:24 +00:00
password: Optional[str] = None
displayname: Optional[str] = None
@strawberry.input
class SshMutationInput:
"""Input type for ssh mutation"""
username: str
ssh_key: str
@strawberry.type
class UsersMutations:
"""Mutations change user settings"""
@strawberry.mutation(permission_classes=[IsAuthenticated])
def create_user(self, user: UserMutationInput) -> UserMutationReturn:
try:
2024-11-15 00:19:24 +00:00
create_user_action(
username=user.username,
password=user.password,
directmemberof=user.directmemberof,
2024-12-10 00:52:13 +00:00
displayname=user.displayname,
2024-11-15 00:19:24 +00:00
)
except (
PasswordIsEmpty,
UsernameNotAlphanumeric,
UsernameTooLong,
InvalidConfiguration,
2024-12-09 08:37:57 +00:00
KanidmDidNotReturnAdminPassword,
2024-12-11 02:00:14 +00:00
KanidmQueryError,
DisplaynameTooLong,
KanidmCliSubprocessError,
2024-12-12 12:00:07 +00:00
FailedToGetValidKanidmToken,
) as error:
return return_failed_mutation_return(
2024-12-09 08:37:57 +00:00
message=error.get_error_message(),
)
except UsernameForbidden as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=409,
username=user.username,
)
except UserAlreadyExists as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=409,
)
if user.password:
return return_failed_mutation_return(
message=f"{FAILED_TO_SETUP_PASSWORD_TEXT} {PLEASE_UPDATE_APP_TEXT}",
code=201,
username=user.username,
)
return UserMutationReturn(
success=True,
message="User created",
code=201,
user=get_user_by_username(user.username),
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def delete_user(self, username: str) -> GenericMutationReturn:
try:
2024-11-15 00:19:24 +00:00
delete_user_action(username)
except UserNotFound as error:
2024-12-09 08:37:57 +00:00
return GenericMutationReturn(
success=False,
message=error.get_error_message(),
code=404,
)
except UserIsProtected as error:
2024-12-09 08:37:57 +00:00
return GenericMutationReturn(
success=False,
code=400,
message=error.get_error_message(),
)
2024-12-11 03:35:20 +00:00
except (
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
KanidmCliSubprocessError,
2024-12-12 12:00:07 +00:00
FailedToGetValidKanidmToken,
2024-12-11 03:35:20 +00:00
) as error:
2024-12-09 08:37:57 +00:00
return GenericMutationReturn(
success=False,
code=500,
message=error.get_error_message(),
)
return GenericMutationReturn(
success=True,
message="User deleted",
code=200,
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def update_user(self, user: UserMutationInput) -> UserMutationReturn:
"""Update user mutation"""
try:
2024-11-15 00:19:24 +00:00
update_user_action(
username=user.username,
password=user.password,
directmemberof=user.directmemberof,
2024-12-10 00:52:13 +00:00
displayname=user.displayname,
2024-11-15 00:19:24 +00:00
)
2024-12-10 02:15:15 +00:00
except (
PasswordIsEmpty,
SelfPrivacyAppIsOutdate,
KanidmDidNotReturnAdminPassword,
2024-12-11 02:00:14 +00:00
KanidmQueryError,
DisplaynameTooLong,
KanidmCliSubprocessError,
2024-12-12 12:00:07 +00:00
FailedToGetValidKanidmToken,
ApiUsingWrongUserRepository,
2024-12-10 02:15:15 +00:00
) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
)
except UserNotFound as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=404,
)
return UserMutationReturn(
success=True,
message="User updated",
code=200,
user=get_user_by_username(user.username),
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def add_ssh_key(self, ssh_input: SshMutationInput) -> UserMutationReturn:
"""Add a new ssh key"""
try:
2024-11-15 00:19:24 +00:00
create_ssh_key_action(ssh_input.username, ssh_input.ssh_key)
except KeyAlreadyExists as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=409,
)
except InvalidPublicKey as error:
return return_failed_mutation_return(
message=error.get_error_message(),
)
except UserNotFound as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=404,
)
except Exception as error: # TODO why?
return return_failed_mutation_return(
message=str(error),
code=500,
)
return UserMutationReturn(
success=True,
message="New SSH key successfully written",
code=201,
user=get_user_by_username(ssh_input.username),
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def remove_ssh_key(self, ssh_input: SshMutationInput) -> UserMutationReturn:
"""Remove ssh key from user"""
try:
2024-11-15 00:19:24 +00:00
remove_ssh_key_action(ssh_input.username, ssh_input.ssh_key)
2024-12-09 08:37:57 +00:00
except (KeyNotFound, UserNotFound) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=404,
)
except Exception as error: # TODO why?
return UserMutationReturn(
success=False,
message=str(error),
code=500,
)
return UserMutationReturn(
success=True,
message="SSH key successfully removed",
code=200,
user=get_user_by_username(ssh_input.username),
)
2024-12-03 22:54:24 +00:00
@strawberry.mutation(permission_classes=[IsAuthenticated])
2024-12-10 02:15:15 +00:00
def generate_password_reset_link(self, username: str) -> PasswordResetLinkReturn:
2024-12-03 22:54:24 +00:00
try:
2024-12-10 02:15:15 +00:00
password_reset_link = generate_password_reset_link_action(username=username)
except UserNotFound as error:
2024-12-09 08:37:57 +00:00
return PasswordResetLinkReturn(
success=False,
message=error.get_error_message(),
2024-12-03 22:54:24 +00:00
code=404,
)
except UserIsProtected as error:
return PasswordResetLinkReturn(
success=False,
message=error.get_error_message(),
code=400,
)
2024-12-10 02:15:15 +00:00
except (
NoPasswordResetLinkFoundInResponse,
KanidmDidNotReturnAdminPassword,
2024-12-11 01:48:30 +00:00
KanidmReturnUnknownResponseType,
KanidmReturnEmptyResponse,
2024-12-11 02:00:14 +00:00
KanidmQueryError,
KanidmCliSubprocessError,
2024-12-12 12:00:07 +00:00
FailedToGetValidKanidmToken,
ApiUsingWrongUserRepository,
2024-12-10 02:15:15 +00:00
) as error:
2024-12-09 08:37:57 +00:00
return PasswordResetLinkReturn(
success=False,
code=500,
message=error.get_error_message(),
)
2024-12-03 22:54:24 +00:00
2024-12-09 08:37:57 +00:00
return PasswordResetLinkReturn(
2024-12-03 22:54:24 +00:00
success=True,
message="Link successfully created",
code=200,
password_reset_link=password_reset_link,
)