feat: delete RootIsNotAvailableForModification and PrimaryUserDeletionNotAllowed

This commit is contained in:
dettlaff 2024-12-11 13:16:38 +04:00
parent 65a2a59ff8
commit eca12a3079
3 changed files with 29 additions and 47 deletions

View file

@ -8,7 +8,6 @@ from typing import Optional
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
from selfprivacy_api.repositories.users.exceptions_kanidm import KanidmReturnEmptyResponse
from selfprivacy_api.utils import is_username_forbidden
from selfprivacy_api.actions.ssh import get_ssh_keys
@ -17,6 +16,7 @@ from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepo
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
from selfprivacy_api.repositories.users.exceptions import (
SelfPrivacyAppIsOutdate,
UserIsProtected,
UsernameForbidden,
UsernameNotAlphanumeric,
UsernameTooLong,
@ -38,24 +38,6 @@ class ApiUsingWrongUserRepository(Exception):
return "API is using a too old or unfinished user repository"
class RootIsNotAvailableForModification(Exception):
"""
Root is not available for modification. Operation is restricted.
"""
@staticmethod
def get_error_message() -> str:
return "Root is not available for modification. Operation is restricted."
class PrimaryUserDeletionNotAllowed(Exception):
"""The primary user cannot be deleted."""
@staticmethod
def get_error_message() -> str:
return "The primary user cannot be deleted."
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
@ -122,20 +104,23 @@ def create_user(
def delete_user(username: str) -> None:
if username == "root":
raise RootIsNotAvailableForModification
raise UserIsProtected
try:
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
if user.user_type == UserDataUserOrigin.PRIMARY:
raise PrimaryUserDeletionNotAllowed
except UserNotFound:
raise UserNotFound
finally:
# need to maintain the logic of the old repository, since ssh management uses it.
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
try:
JsonUserRepository.delete_user(username=username)
except UserNotFound:
except (UserNotFound, UserIsProtected):
pass
if user.user_type == UserDataUserOrigin.PRIMARY:
raise UserIsProtected
ACTIVE_USERS_PROVIDER.delete_user(username=username)
@ -150,7 +135,7 @@ def update_user(
raise SelfPrivacyAppIsOutdate
if username == "root":
raise RootIsNotAvailableForModification
raise UserIsProtected
ACTIVE_USERS_PROVIDER.update_user(
username=username,
@ -170,10 +155,7 @@ def get_user_by_username(username: str) -> UserDataUser:
ssh_keys=get_ssh_keys(username="root"),
)
try:
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
except KanidmReturnEmptyResponse:
raise UserNotFound
try:
user.ssh_keys = get_ssh_keys(username=user.username)
@ -188,6 +170,6 @@ def generate_password_reset_link(username: str) -> str:
raise ApiUsingWrongUserRepository
if username == "root":
raise RootIsNotAvailableForModification
raise UserIsProtected
return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username)

View file

@ -21,8 +21,6 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
)
from selfprivacy_api.actions.users import (
PrimaryUserDeletionNotAllowed,
RootIsNotAvailableForModification,
create_user as create_user_action,
delete_user as delete_user_action,
update_user as update_user_action,
@ -152,8 +150,6 @@ class UsersMutations:
except (
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
PrimaryUserDeletionNotAllowed,
RootIsNotAvailableForModification,
) as error:
return GenericMutationReturn(
success=False,
@ -267,10 +263,15 @@ class UsersMutations:
message=error.get_error_message(),
code=404,
)
except UserIsProtected as error:
return PasswordResetLinkReturn(
success=False,
message=error.get_error_message(),
code=400,
)
except (
NoPasswordResetLinkFoundInResponse,
KanidmDidNotReturnAdminPassword,
RootIsNotAvailableForModification,
KanidmReturnUnknownResponseType,
KanidmReturnEmptyResponse,
KanidmQueryError,

View file

@ -168,10 +168,6 @@ class KanidmUserRepository(AbstractUserRepository):
raise KanidmQueryError(error_text=response.text)
if isinstance(response_data, str): # TODO
if response_data == "nomatchingentries":
raise UserNotFound
return response_data
@staticmethod
@ -293,9 +289,12 @@ class KanidmUserRepository(AbstractUserRepository):
method="GET",
)
try:
KanidmUserRepository._check_response_type_and_not_empty(
data_type="dict", response_data=user_data
)
except KanidmReturnEmptyResponse:
raise UserNotFound
attrs = user_data["attrs"]