From 38b5dc19cbeb3670199394090a9938a3fb2bb582 Mon Sep 17 00:00:00 2001 From: dettlaff Date: Wed, 11 Dec 2024 05:33:07 +0400 Subject: [PATCH] fix: move Exceptions to new file --- .../repositories/users/exceptions_kanidm.py | 39 +++++++++++ .../users/kanidm_user_repository.py | 67 ++++++++----------- 2 files changed, 66 insertions(+), 40 deletions(-) create mode 100644 selfprivacy_api/repositories/users/exceptions_kanidm.py diff --git a/selfprivacy_api/repositories/users/exceptions_kanidm.py b/selfprivacy_api/repositories/users/exceptions_kanidm.py new file mode 100644 index 0000000..4f43c80 --- /dev/null +++ b/selfprivacy_api/repositories/users/exceptions_kanidm.py @@ -0,0 +1,39 @@ +from typing import Optional + + +class KanidmQueryError(Exception): + """Error occurred during kanidm query""" + + def __init__(self, error_text: Optional[str] = None) -> None: + self.error_text = error_text + + def get_error_message(self) -> str: + return ( + f"An error occurred during the Kanidm query. Error {self.error_text}" + if self.error_text + else "An error occurred during the Kanidm query." + ) + + +class KanidmReturnEmptyResponse(Exception): + """Kanidm returned a blank response""" + + @staticmethod + def get_error_message() -> str: + return "Kanidm returned an empty response." + + +class KanidmReturnUnknownResponseType(Exception): + """Kanidm returned a blank response""" + + @staticmethod + def get_error_message() -> str: + return "Kanidm returned an empty response." + + +class KanidmDidNotReturnAdminPassword(Exception): + """Kanidm didn't return the admin password""" + + @staticmethod + def get_error_message() -> str: + return "Kanidm didn't return the admin password." diff --git a/selfprivacy_api/repositories/users/kanidm_user_repository.py b/selfprivacy_api/repositories/users/kanidm_user_repository.py index 967fd81..7e03709 100644 --- a/selfprivacy_api/repositories/users/kanidm_user_repository.py +++ b/selfprivacy_api/repositories/users/kanidm_user_repository.py @@ -1,4 +1,4 @@ -from typing import Optional +from typing import Optional, Union import subprocess import requests import re @@ -9,6 +9,12 @@ from selfprivacy_api.repositories.users.exceptions import ( SelfPrivacyAppIsOutdate, UserAlreadyExists, ) +from selfprivacy_api.repositories.users.exceptions_kanidm import ( + KanidmDidNotReturnAdminPassword, + KanidmQueryError, + KanidmReturnEmptyResponse, + KanidmReturnUnknownResponseType, +) from selfprivacy_api.utils import get_domain, temporary_env_var from selfprivacy_api.utils.redis_pool import RedisPool from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin @@ -27,30 +33,6 @@ logger = logging.getLogger(__name__) ADMIN_KANIDM_GROUPS = ["sp.admin"] -class KanidmQueryError(Exception): - """Error occurred during kanidm query""" - - @staticmethod - def get_error_message() -> str: - return "An error occurred during the Kanidm query." - - -class KanidmReturnEmptyResponse(Exception): - """Kanidm returned a blank response""" - - @staticmethod - def get_error_message() -> str: - return "Kanidm returned an empty response." - - -class KanidmDidNotReturnAdminPassword(Exception): - """Kanidm didn't return the admin password""" - - @staticmethod - def get_error_message() -> str: - return "Kanidm didn't return the admin password." - - class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT? @staticmethod def get() -> str: @@ -131,7 +113,7 @@ class KanidmUserRepository(AbstractUserRepository): return UserDataUserOrigin.NORMAL @staticmethod - def _send_query(endpoint: str, method: str = "GET", data=None) -> dict: + def _send_query(endpoint: str, method: str = "GET", data=None) -> Union[dict, list]: request_method = getattr(requests, method.lower(), None) full_endpoint = f"{KANIDM_URL}/v1/{endpoint}" @@ -147,27 +129,32 @@ class KanidmUserRepository(AbstractUserRepository): verify=False, # TODO: REMOVE THIS NOTHALAL!!!!! ) except Exception as error: - raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}") + raise KanidmQueryError(error_text=str(error)) logger.info(str(response)) response_data = response.json() - plugin_error = response_data.get("plugin", {}) - if plugin_error.get("attrunique") == "duplicate value detected": - raise UserAlreadyExists # TODO only user ? - if response.status_code != 200: - raise KanidmQueryError( - f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}." - ) + if isinstance(response_data, dict): + plugin_error = response_data.get("plugin", {}) + if plugin_error.get("attrunique") == "duplicate value detected": + raise UserAlreadyExists # TODO only user ? - if ( - isinstance(response_data, dict) - and response_data.get("data") is not None - ): - return response_data + raise KanidmQueryError(error_text=response.text) + + if isinstance(response_data, dict): + if response_data.get("data") is None: + return response_data + else: + raise KanidmReturnEmptyResponse + + elif isinstance(response_data, list): + if len(response_data) > 0: + return response_data + else: + raise KanidmReturnEmptyResponse else: - raise KanidmReturnEmptyResponse + raise KanidmReturnUnknownResponseType # nomatchingentries