mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-30 04:36:39 +00:00
feat: add endpoint to KanidmQueryError
This commit is contained in:
parent
1a07b1306b
commit
2b7e64f280
|
@ -1,6 +1,7 @@
|
|||
from abc import ABC, abstractmethod
|
||||
from typing import Optional
|
||||
|
||||
from selfprivacy_api.models.group import Group
|
||||
from selfprivacy_api.models.user import UserDataUser
|
||||
|
||||
|
||||
|
@ -34,7 +35,9 @@ class AbstractUserRepository(ABC):
|
|||
@staticmethod
|
||||
@abstractmethod
|
||||
def delete_user(username: str) -> None:
|
||||
"""Deletes an existing user"""
|
||||
"""
|
||||
Deletes an existing user
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
|
@ -54,17 +57,25 @@ class AbstractUserRepository(ABC):
|
|||
@staticmethod
|
||||
@abstractmethod
|
||||
def get_user_by_username(username: str) -> UserDataUser:
|
||||
"""Retrieves user data (UserDataUser) by username"""
|
||||
"""
|
||||
Retrieves user data (UserDataUser) by username
|
||||
"""
|
||||
|
||||
# ! Not implemented in JsonUserRepository !
|
||||
|
||||
# | |
|
||||
# \|/ \|/
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def generate_password_reset_link(username: str) -> str:
|
||||
"""
|
||||
Do not reset the password, just generate a link to reset the password.
|
||||
! Not implemented in JsonUserRepository !
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
@abstractmethod
|
||||
def groups_list() -> list:
|
||||
"""Get groups list"""
|
||||
def groups_list() -> list[Group]:
|
||||
"""
|
||||
Get groups list.
|
||||
"""
|
||||
|
|
|
@ -5,15 +5,19 @@ from typing import Any
|
|||
class KanidmQueryError(Exception):
|
||||
"""Error occurred during kanidm query"""
|
||||
|
||||
def __init__(self, error_text: Optional[str] = None) -> None:
|
||||
self.error_text = error_text
|
||||
def __init__(
|
||||
self, error_text: Optional[Any] = None, endpoint: Optional[str] = None
|
||||
) -> None:
|
||||
self.error_text = str(error_text)
|
||||
self.endpoint = endpoint
|
||||
|
||||
def get_error_message(self) -> str:
|
||||
return (
|
||||
f"An error occurred during the Kanidm query. Error {self.error_text}"
|
||||
if self.error_text
|
||||
else "An error occurred during the Kanidm query."
|
||||
)
|
||||
message = "An error occurred during the Kanidm query."
|
||||
if self.endpoint:
|
||||
message += f" Endpoint: {self.endpoint}"
|
||||
if self.error_text:
|
||||
message += f" Error: {self.error_text}"
|
||||
return message
|
||||
|
||||
|
||||
class KanidmReturnEmptyResponse(Exception):
|
||||
|
|
|
@ -1,9 +1,9 @@
|
|||
from json import JSONDecodeError
|
||||
from typing import Any, Optional, Union
|
||||
import subprocess
|
||||
import requests
|
||||
import re
|
||||
import logging
|
||||
import requests
|
||||
|
||||
from selfprivacy_api.models.group import Group
|
||||
from selfprivacy_api.repositories.users.exceptions import (
|
||||
|
@ -130,15 +130,24 @@ class KanidmAdminToken:
|
|||
|
||||
@staticmethod
|
||||
def _is_token_valid(token: str) -> bool:
|
||||
response = requests.get(
|
||||
f"{KANIDM_URL}/v1/person/root",
|
||||
headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=1,
|
||||
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
|
||||
)
|
||||
endpoint = f"{KANIDM_URL}/v1/person/root"
|
||||
try:
|
||||
response = requests.get(
|
||||
endpoint,
|
||||
headers={
|
||||
"Authorization": f"Bearer {token}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=1,
|
||||
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
|
||||
)
|
||||
|
||||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as error:
|
||||
raise KanidmQueryError(error_text=f"Kanidm is not responding to requests. Error: {str(error)}", endpoint=endpoint)
|
||||
|
||||
except Exception as error:
|
||||
raise KanidmQueryError(error_text=error, endpoint=endpoint)
|
||||
|
||||
response_data = response.json()
|
||||
|
||||
# we do not handle the other errors, this is handled by the main function in KanidmUserRepository._send_query
|
||||
|
@ -243,11 +252,16 @@ class KanidmUserRepository(AbstractUserRepository):
|
|||
|
||||
except JSONDecodeError as error:
|
||||
logger.error(f"Kanidm query error: {str(error)}")
|
||||
raise KanidmQueryError(error_text=f"No JSON found in Kanidm response. Error: {str(error)}")
|
||||
raise KanidmQueryError(
|
||||
error_text=f"No JSON found in Kanidm response. Error: {str(error)}",
|
||||
endpoint=full_endpoint,
|
||||
)
|
||||
except (requests.exceptions.Timeout, requests.exceptions.ConnectionError, requests.exceptions.HTTPError) as error:
|
||||
raise KanidmQueryError(error_text=f"Kanidm is not responding to requests. Error: {str(error)}", endpoint=endpoint)
|
||||
|
||||
except Exception as error:
|
||||
logger.error(f"Kanidm query error: {str(error)}")
|
||||
raise KanidmQueryError(error_text=str(error))
|
||||
raise KanidmQueryError(error_text=error, endpoint=full_endpoint)
|
||||
|
||||
if response.status_code != 200:
|
||||
if isinstance(response_data, dict):
|
||||
|
@ -259,12 +273,14 @@ class KanidmUserRepository(AbstractUserRepository):
|
|||
if response_data == "nomatchingentries":
|
||||
raise UserNotFound # does it work only for user? hate kanidm's response
|
||||
elif response_data == "accessdenied":
|
||||
raise KanidmQueryError(error_text="Kanidm access issue")
|
||||
raise KanidmQueryError(
|
||||
error_text="Kanidm access issue", endpoint=full_endpoint
|
||||
)
|
||||
elif response_data == "notauthenticated":
|
||||
raise FailedToGetValidKanidmToken
|
||||
|
||||
logger.error(f"Kanidm query error: {response.text}")
|
||||
raise KanidmQueryError(error_text=response.text)
|
||||
raise KanidmQueryError(error_text=response.text, endpoint=full_endpoint)
|
||||
|
||||
return response_data
|
||||
|
||||
|
|
Loading…
Reference in a new issue