fix: move Exceptions to new file

This commit is contained in:
dettlaff 2024-12-11 05:33:07 +04:00
parent 63ea78b1ef
commit 38b5dc19cb
2 changed files with 66 additions and 40 deletions

View file

@ -0,0 +1,39 @@
from typing import Optional
class KanidmQueryError(Exception):
"""Error occurred during kanidm query"""
def __init__(self, error_text: Optional[str] = None) -> None:
self.error_text = error_text
def get_error_message(self) -> str:
return (
f"An error occurred during the Kanidm query. Error {self.error_text}"
if self.error_text
else "An error occurred during the Kanidm query."
)
class KanidmReturnEmptyResponse(Exception):
"""Kanidm returned a blank response"""
@staticmethod
def get_error_message() -> str:
return "Kanidm returned an empty response."
class KanidmReturnUnknownResponseType(Exception):
"""Kanidm returned a blank response"""
@staticmethod
def get_error_message() -> str:
return "Kanidm returned an empty response."
class KanidmDidNotReturnAdminPassword(Exception):
"""Kanidm didn't return the admin password"""
@staticmethod
def get_error_message() -> str:
return "Kanidm didn't return the admin password."

View file

@ -1,4 +1,4 @@
from typing import Optional
from typing import Optional, Union
import subprocess
import requests
import re
@ -9,6 +9,12 @@ from selfprivacy_api.repositories.users.exceptions import (
SelfPrivacyAppIsOutdate,
UserAlreadyExists,
)
from selfprivacy_api.repositories.users.exceptions_kanidm import (
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
KanidmReturnEmptyResponse,
KanidmReturnUnknownResponseType,
)
from selfprivacy_api.utils import get_domain, temporary_env_var
from selfprivacy_api.utils.redis_pool import RedisPool
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
@ -27,30 +33,6 @@ logger = logging.getLogger(__name__)
ADMIN_KANIDM_GROUPS = ["sp.admin"]
class KanidmQueryError(Exception):
"""Error occurred during kanidm query"""
@staticmethod
def get_error_message() -> str:
return "An error occurred during the Kanidm query."
class KanidmReturnEmptyResponse(Exception):
"""Kanidm returned a blank response"""
@staticmethod
def get_error_message() -> str:
return "Kanidm returned an empty response."
class KanidmDidNotReturnAdminPassword(Exception):
"""Kanidm didn't return the admin password"""
@staticmethod
def get_error_message() -> str:
return "Kanidm didn't return the admin password."
class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
@staticmethod
def get() -> str:
@ -131,7 +113,7 @@ class KanidmUserRepository(AbstractUserRepository):
return UserDataUserOrigin.NORMAL
@staticmethod
def _send_query(endpoint: str, method: str = "GET", data=None) -> dict:
def _send_query(endpoint: str, method: str = "GET", data=None) -> Union[dict, list]:
request_method = getattr(requests, method.lower(), None)
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
@ -147,28 +129,33 @@ class KanidmUserRepository(AbstractUserRepository):
verify=False, # TODO: REMOVE THIS NOTHALAL!!!!!
)
except Exception as error:
raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}")
raise KanidmQueryError(error_text=str(error))
logger.info(str(response))
response_data = response.json()
if response.status_code != 200:
if isinstance(response_data, dict):
plugin_error = response_data.get("plugin", {})
if plugin_error.get("attrunique") == "duplicate value detected":
raise UserAlreadyExists # TODO only user ?
if response.status_code != 200:
raise KanidmQueryError(
f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}."
)
raise KanidmQueryError(error_text=response.text)
if (
isinstance(response_data, dict)
and response_data.get("data") is not None
):
if isinstance(response_data, dict):
if response_data.get("data") is None:
return response_data
else:
raise KanidmReturnEmptyResponse
elif isinstance(response_data, list):
if len(response_data) > 0:
return response_data
else:
raise KanidmReturnEmptyResponse
else:
raise KanidmReturnUnknownResponseType
# nomatchingentries
@staticmethod