feat: detect more errors

This commit is contained in:
dettlaff 2024-12-10 06:15:15 +04:00
parent df211a9405
commit 51a663bc0a
3 changed files with 45 additions and 32 deletions

View file

@ -29,10 +29,19 @@ class ApiUsingWrongUserRepository(Exception):
@staticmethod
def get_error_message() -> str:
"""Return text message error."""
return "API is using a too old or unfinished user repository"
class RootIsNotAvailableForModification(Exception):
"""
Root is not available for modification. Operation is restricted.
"""
@staticmethod
def get_error_message() -> str:
return "Root is not available for modification. Operation is restricted."
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
@ -64,9 +73,7 @@ def create_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
if is_username_forbidden(username):
@ -93,9 +100,7 @@ def create_user(
username=username,
password=password,
directmemberof=directmemberof,
memberof=memberof,
displayname=displayname,
email=email,
)
@ -115,18 +120,14 @@ def update_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
ACTIVE_USERS_PROVIDER.update_user(
username=username,
password=password,
directmemberof=directmemberof,
memberof=memberof,
displayname=displayname,
email=email,
)
@ -156,4 +157,7 @@ def generate_password_reset_link(username: str) -> str:
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository
if username == "root":
raise RootIsNotAvailableForModification
return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username)

View file

@ -21,6 +21,7 @@ from selfprivacy_api.graphql.mutations.mutation_interface import (
GenericMutationReturn,
)
from selfprivacy_api.actions.users import (
RootIsNotAvailableForModification,
create_user as create_user_action,
delete_user as delete_user_action,
update_user as update_user_action,
@ -39,7 +40,9 @@ from selfprivacy_api.repositories.users.exceptions import (
SelfPrivacyAppIsOutdate,
)
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
from selfprivacy_api.repositories.users.kanidm_user_repository import KanidmDidNotReturnAdminPassword
from selfprivacy_api.repositories.users.kanidm_user_repository import (
KanidmDidNotReturnAdminPassword,
)
FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The problem occurred due to an old version of the SelfPrivacy app."
@ -64,10 +67,8 @@ class UserMutationInput:
username: str
directmemberof: Optional[list[str]] = strawberry.field(default_factory=list)
memberof: Optional[list[str]] = strawberry.field(default_factory=list)
password: Optional[str] = None
displayname: Optional[str] = None
email: Optional[str] = None
@strawberry.input
@ -89,9 +90,7 @@ class UsersMutations:
username=user.username,
password=user.password,
directmemberof=user.directmemberof,
memberof=user.memberof,
displayname=user.displayname,
email=user.email,
)
except (
PasswordIsEmpty,
@ -166,11 +165,13 @@ class UsersMutations:
username=user.username,
password=user.password,
directmemberof=user.directmemberof,
memberof=user.memberof,
displayname=user.displayname,
email=user.email,
)
except (PasswordIsEmpty, SelfPrivacyAppIsOutdate, KanidmDidNotReturnAdminPassword) as error:
except (
PasswordIsEmpty,
SelfPrivacyAppIsOutdate,
KanidmDidNotReturnAdminPassword,
) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
)
@ -246,20 +247,20 @@ class UsersMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def generate_password_reset_link(
self, user: UserMutationInput
) -> PasswordResetLinkReturn:
def generate_password_reset_link(self, username: str) -> PasswordResetLinkReturn:
try:
password_reset_link = generate_password_reset_link_action(
username=user.username
)
password_reset_link = generate_password_reset_link_action(username=username)
except UserNotFound as error:
return PasswordResetLinkReturn(
success=False,
message=error.get_error_message(),
code=404,
)
except (NoPasswordResetLinkFoundInResponse, KanidmDidNotReturnAdminPassword) as error:
except (
NoPasswordResetLinkFoundInResponse,
KanidmDidNotReturnAdminPassword,
RootIsNotAvailableForModification,
) as error:
return PasswordResetLinkReturn(
success=False,
code=500,

View file

@ -7,6 +7,7 @@ import logging
from selfprivacy_api.repositories.users.exceptions import (
NoPasswordResetLinkFoundInResponse,
SelfPrivacyAppIsOutdate,
UserAlreadyExists,
)
from selfprivacy_api.utils import get_domain, temporary_env_var
from selfprivacy_api.utils.redis_pool import RedisPool
@ -151,19 +152,27 @@ class KanidmUserRepository(AbstractUserRepository):
raise KanidmQueryError(
f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}."
)
return response.json()
response = response.json()
if response.get("plugin"):
if response["plugin"].get("attrunique") == "duplicate value detected":
raise UserAlreadyExists # TODO only user?
return response
except Exception as error:
raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}")
# {"plugin": {"attrunique": "duplicate value detected"}}
@staticmethod
def create_user(
username: str,
password: Optional[str] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
) -> None:
"""
Creates a new user."password" is a legacy field,
@ -180,7 +189,7 @@ class KanidmUserRepository(AbstractUserRepository):
"attrs": {
"name": [username],
"displayname": [displayname if displayname else username],
"mail": [email if email else f"{username}@{get_domain()}"],
"mail": [f"{username}@{get_domain()}"],
"class": ["user"], # TODO read more about it
}
}
@ -245,7 +254,6 @@ class KanidmUserRepository(AbstractUserRepository):
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
"""
Update user information.
@ -258,7 +266,7 @@ class KanidmUserRepository(AbstractUserRepository):
data = {
"attrs": {
"displayname": [displayname if displayname else username],
"mail": [email if email else f"{username}@{get_domain()}"],
"mail": [f"{username}@{get_domain()}"],
"class": ["user"], # TODO read more about it
}
}
@ -292,7 +300,7 @@ class KanidmUserRepository(AbstractUserRepository):
user_type=KanidmUserRepository._check_user_origin_by_memberof(
memberof=attrs.get("memberof", [])
),
ssh_keys=[], # Actions слой заполнит это поле
ssh_keys=[], # Actions layer will fill this field
directmemberof=attrs.get("directmemberof", []),
memberof=attrs.get("memberof", []),
displayname=attrs.get("displayname", [None])[0],
@ -319,6 +327,6 @@ class KanidmUserRepository(AbstractUserRepository):
raise KanidmReturnEmptyResponse
if token:
return f"https://id{get_domain()}/ui/reset?token={token}"
return f"https://id.{get_domain()}/ui/reset?token={token}"
raise NoPasswordResetLinkFoundInResponse