feat: add KanidmCliSubprocessError and more descriptions

This commit is contained in:
dettlaff 2024-12-11 15:16:14 +04:00
parent 33f491d4f6
commit 6440a4f5c9
6 changed files with 226 additions and 53 deletions

View file

@ -15,6 +15,8 @@ from selfprivacy_api.actions.ssh import get_ssh_keys
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
from selfprivacy_api.repositories.users.exceptions import (
DisplaynameNotAlphanumeric,
DisplaynameTooLong,
SelfPrivacyAppIsOutdate,
UserIsProtected,
UsernameForbidden,
@ -38,6 +40,14 @@ class ApiUsingWrongUserRepository(Exception):
return "API is using a too old or unfinished user repository"
def _check_displayname_is_correct(displayname: str) -> None:
if not re.match(r"^[a-z_][a-z0-9_]+$", displayname):
raise DisplaynameNotAlphanumeric
if len(displayname) >= 16: # we don't know the limitations of each service
raise DisplaynameTooLong
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
@ -73,19 +83,20 @@ def create_user(
) -> None:
if is_username_forbidden(username):
raise UsernameForbidden("Username is forbidden")
raise UsernameForbidden
if not re.match(r"^[a-z_][a-z0-9_]+$", username):
raise UsernameNotAlphanumeric(
"Username must be alphanumeric and start with a letter"
)
raise UsernameNotAlphanumeric
if len(username) >= 32:
raise UsernameTooLong("Username must be less than 32 characters")
raise UsernameTooLong
if password:
logger.error(PLEASE_UPDATE_APP_TEXT)
if displayname:
_check_displayname_is_correct(displayname=displayname)
# need to maintain the logic of the old repository, since ssh management uses it.
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
try:
@ -137,6 +148,9 @@ def update_user(
if username == "root":
raise UserIsProtected
if displayname:
_check_displayname_is_correct(displayname=displayname)
ACTIVE_USERS_PROVIDER.update_user(
username=username,
directmemberof=directmemberof,

View file

@ -27,6 +27,8 @@ from selfprivacy_api.actions.users import (
generate_password_reset_link as generate_password_reset_link_action,
)
from selfprivacy_api.repositories.users.exceptions import (
DisplaynameNotAlphanumeric,
DisplaynameTooLong,
NoPasswordResetLinkFoundInResponse,
PasswordIsEmpty,
UsernameForbidden,
@ -43,6 +45,7 @@ from selfprivacy_api.repositories.users.exceptions_kanidm import (
KanidmQueryError,
KanidmReturnEmptyResponse,
KanidmReturnUnknownResponseType,
KanidmCliSubprocessError,
)
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
@ -101,6 +104,9 @@ class UsersMutations:
InvalidConfiguration,
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
DisplaynameNotAlphanumeric,
DisplaynameTooLong,
KanidmCliSubprocessError,
) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
@ -150,6 +156,7 @@ class UsersMutations:
except (
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
KanidmCliSubprocessError,
) as error:
return GenericMutationReturn(
success=False,
@ -178,6 +185,9 @@ class UsersMutations:
SelfPrivacyAppIsOutdate,
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
DisplaynameNotAlphanumeric,
DisplaynameTooLong,
KanidmCliSubprocessError,
) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
@ -275,6 +285,7 @@ class UsersMutations:
KanidmReturnUnknownResponseType,
KanidmReturnEmptyResponse,
KanidmQueryError,
KanidmCliSubprocessError,
) as error:
return PasswordResetLinkReturn(
success=False,

View file

@ -11,13 +11,13 @@ class AbstractUserRepository(ABC):
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
"""
Creates a new user. In KanidmUserRepository "password" is a legacy field,
please use generate_password_reset_link() instead.
Creates a new user.
! In KanidmUserRepository "password" is a legacy field,
please use generate_password_reset_link() instead !
"""
@staticmethod
@ -28,7 +28,7 @@ class AbstractUserRepository(ABC):
) -> list[UserDataUser]:
"""
Gets a list of users with options to exclude specific user groups.
In KanidmUserRepository, the root user will never return.
! In KanidmUserRepository, the root user will never return !
"""
@staticmethod
@ -41,15 +41,14 @@ class AbstractUserRepository(ABC):
def update_user(
username: str,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
"""
Update user information.
In the JsonUserRepository, only update the password of an existing user.
Do not update the password in KanidmUserRepository,
use generate_password_reset_link() instead.
! Do not update the password in KanidmUserRepository,
use generate_password_reset_link() instead !
"""
@staticmethod
@ -62,5 +61,5 @@ class AbstractUserRepository(ABC):
def generate_password_reset_link(username: str) -> str:
"""
Do not reset the password, just generate a link to reset the password.
Not implemented in JsonUserRepository.
! Not implemented in JsonUserRepository !
"""

View file

@ -35,7 +35,7 @@ class UsernameNotAlphanumeric(Exception):
@staticmethod
def get_error_message() -> str:
return "Username not alphanumeric"
return "Username must be alphanumeric and start with a letter"
class UsernameTooLong(Exception):
@ -80,3 +80,21 @@ class NoPasswordResetLinkFoundInResponse(Exception):
@staticmethod
def get_error_message() -> str:
return "The Kanidm response does not contain a password reset link."
class DisplaynameNotAlphanumeric(Exception):
"""Attempted to create a display name with non-alphanumeric characters"""
@staticmethod
def get_error_message() -> str:
return "Display name must be alphanumeric"
class DisplaynameTooLong(Exception):
"""
Attempted to create a display name that is too long. Display name must be less than 16 characters
"""
@staticmethod
def get_error_message() -> str:
return "Display name is too long. Must be less than 16 characters"

View file

@ -44,3 +44,17 @@ class KanidmDidNotReturnAdminPassword(Exception):
@staticmethod
def get_error_message() -> str:
return "Kanidm didn't return the admin password."
class KanidmCliSubprocessError(Exception):
"""An error occurred when using Kanidm cli"""
def __init__(self, error: Optional[str] = None) -> None:
self.error = error
def get_error_message(self) -> str:
return (
f"An error occurred when using Kanidm cli. Error: {self.error}"
if self.error
else "An error occurred when using Kanidm cli."
)

View file

@ -10,6 +10,7 @@ from selfprivacy_api.repositories.users.exceptions import (
UserNotFound,
)
from selfprivacy_api.repositories.users.exceptions_kanidm import (
KanidmCliSubprocessError,
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
KanidmReturnEmptyResponse,
@ -24,18 +25,35 @@ from selfprivacy_api.repositories.users.abstract_user_repository import (
KANIDM_URL = "https://127.0.0.1:3013"
REDIS_TOKEN_KEY = "kanidm:token"
ADMIN_KANIDM_GROUPS = ["sp.admin"]
redis = RedisPool().get_connection()
logger = logging.getLogger(__name__)
ADMIN_KANIDM_GROUPS = ["sp.admin"]
class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
"""
Manages the administrative token for Kanidm.
Methods:
get() -> str:
Retrieves the current administrative token. If absent, resets the admin password and creates a new token.
_create_and_save_token(kanidm_admin_password: str) -> str:
Creates a new token using the admin password and saves it to Redis.
_reset_and_save_idm_admin_password() -> str:
Resets the Kanidm admin password and returns the new password.
_delete_kanidm_token_from_db() -> None:
Deletes the admin token from Redis.
"""
@staticmethod
def get() -> str:
kanidm_admin_token = str(redis.get("kanidm:token"))
kanidm_admin_token = str(redis.get(REDIS_TOKEN_KEY))
if kanidm_admin_token is None:
kanidm_admin_password = (
@ -51,20 +69,24 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
@staticmethod
def _create_and_save_token(kanidm_admin_password: str) -> str:
with temporary_env_var(key="KANIDM_PASSWORD", value=kanidm_admin_password):
subprocess.run(["kanidm", "login", "-D", "idm_admin"])
try:
subprocess.run(["kanidm", "login", "-D", "idm_admin"], check=True)
output = subprocess.check_output(
[
"kanidm",
"service-account",
"api-token",
"generate",
"--rw",
"selfprivacy",
"token2",
],
text=True,
)
output = subprocess.check_output(
[
"kanidm",
"service-account",
"api-token",
"generate",
"--rw",
"selfprivacy",
"token2",
],
text=True,
)
except subprocess.CalledProcessError as error:
logger.error(f"Error creating Kanidm token: {str(error.output)}")
raise KanidmCliSubprocessError(error=str(error.output))
kanidm_admin_token = output.splitlines()[-1]
@ -90,7 +112,7 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
if match:
new_kanidm_admin_password = match.group(
1
) # we have many not json strings in output
) # we have many non-JSON strings in output
else:
raise KanidmDidNotReturnAdminPassword
@ -102,6 +124,10 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
class KanidmUserRepository(AbstractUserRepository):
"""
Repository for managing users through Kanidm.
"""
@staticmethod
def _check_response_type_and_not_empty(data_type: str, response_data: Any) -> None:
"""
@ -109,7 +135,7 @@ class KanidmUserRepository(AbstractUserRepository):
Args:
data_type (str): Expected type of response data ('list' or 'dict').
response_data (any): Response data to validate.
response_data (Any): Response data to validate.
Raises:
KanidmReturnEmptyResponse: If the response data is empty.
@ -133,6 +159,16 @@ class KanidmUserRepository(AbstractUserRepository):
def _check_user_origin_by_memberof(
memberof: list[str] = [],
) -> UserDataUserOrigin:
"""
Determines the origin of the user based on their group memberships.
Args:
memberof (List[str]): List of groups the user belongs to.
Returns:
UserDataUserOrigin: The origin type of the user (PRIMARY or NORMAL).
"""
if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS):
return UserDataUserOrigin.PRIMARY
else:
@ -140,7 +176,28 @@ class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def _send_query(endpoint: str, method: str = "GET", data=None) -> Union[dict, list]:
"""
Sends a request to the Kanidm API.
Args:
endpoint (str): The API endpoint.
method (str, optional): The HTTP method (GET, POST, PATCH, DELETE). Defaults to "GET".
data (Optional[dict], optional): The data to send in the request body. Defaults to None.
Returns:
Union[dict, list]: The response data.
Raises:
KanidmQueryError: If an error occurs during the request.
UserAlreadyExists: If the user already exists.
UserNotFound: If the user is not found.
"""
request_method = getattr(requests, method.lower(), None)
if not request_method:
logger.error(f"HTTP method '{method}' is not supported.")
raise ValueError(f"Unsupported HTTP method: {method}")
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
try:
@ -151,10 +208,11 @@ class KanidmUserRepository(AbstractUserRepository):
"Authorization": f"Bearer {KanidmAdminToken.get()}",
"Content-Type": "application/json",
},
timeout=0.5,
timeout=1,
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
) # type: ignore
)
except Exception as error:
logger.error(f"Kanidm query error: {str(error)}")
raise KanidmQueryError(error_text=str(error))
response_data = response.json()
@ -167,8 +225,9 @@ class KanidmUserRepository(AbstractUserRepository):
raise UserAlreadyExists # TODO only user ?
if isinstance(response_data, str) and response_data == "nomatchingentries":
raise UserNotFound # is it works only for user?
raise UserNotFound # does it work only for user?
logger.error(f"Kanidm query error: {response.text}")
raise KanidmQueryError(error_text=response.text)
return response_data
@ -177,15 +236,21 @@ class KanidmUserRepository(AbstractUserRepository):
def create_user(
username: str,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
) -> None:
"""
Creates a new user."password" is a legacy field,
please use generate_password_reset_link() instead.
Creates a new user.
! "password" is a legacy field, please use generate_password_reset_link() instead !
If displayname is None, it will default to the username.
If email is None, it will default to username@get_domain().
Args:
username (str): The username.
directmemberof (Optional[List[str]], optional): List of direct group memberships. Defaults to None.
memberof (Optional[List[str]], optional): List of indirect group memberships. Defaults to None.
displayname (Optional[str], optional): If displayname is None, it will default to the username.
Raises:
KanidmQueryError: If an error occurs while creating the user.
UserAlreadyExists: If the user already exists.
"""
data = {
@ -199,8 +264,6 @@ class KanidmUserRepository(AbstractUserRepository):
if directmemberof:
data["attrs"]["directmemberof"] = directmemberof
if memberof:
data["attrs"]["memberof"] = memberof
KanidmUserRepository._send_query(
endpoint="person",
@ -215,8 +278,21 @@ class KanidmUserRepository(AbstractUserRepository):
) -> list[UserDataUser]:
"""
Gets a list of users with options to exclude specific user groups.
The root user will never return.
! The root user will never return !
Args:
exclude_primary (bool, optional): Exclude users with PRIMARY type. Defaults to False.
exclude_root (bool, optional): Not working for Kanidm. The root user will never return.
Returns:
List[UserDataUser]: The list of users.
Raises:
KanidmQueryError: If an error occurs while retrieving users.
KanidmReturnUnknownResponseType: If response type is unknown.
KanidmReturnEmptyResponse: If response is empty.
"""
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
KanidmUserRepository._check_response_type_and_not_empty(
@ -248,7 +324,16 @@ class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def delete_user(username: str) -> None:
"""Deletes an existing user"""
"""
Deletes an existing user from Kanidm.
Args:
username (str): The username to delete.
Raises:
KanidmQueryError: If an error occurs while deleting the user.
UserNotFound: If the user does not exist.
"""
KanidmUserRepository._send_query(endpoint=f"person/{username}", method="DELETE")
@ -256,13 +341,20 @@ class KanidmUserRepository(AbstractUserRepository):
def update_user(
username: str,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
) -> None:
"""
Update user information.
Do not update the password, please
use generate_password_reset_link() instead.
! Do not update the password, please use generate_password_reset_link() instead !
Args:
username (str): The username to update.
directmemberof (Optional[List[str]], optional): New list of direct group memberships. Defaults to None.
displayname (Optional[str], optional): New display name. Defaults to username if not provided.
Raises:
KanidmQueryError: If an error occurs while updating the user.
UserNotFound: If the user does not exist.
"""
data = {
@ -275,8 +367,6 @@ class KanidmUserRepository(AbstractUserRepository):
if directmemberof:
data["attrs"]["directmemberof"] = directmemberof
if memberof:
data["attrs"]["memberof"] = memberof
KanidmUserRepository._send_query(
endpoint=f"person/{username}",
@ -286,7 +376,21 @@ class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def get_user_by_username(username: str) -> UserDataUser:
"""Retrieves user data (UserDataUser) by username"""
"""
Retrieves user data by username.
Args:
username (str): The username to search for.
Returns:
UserDataUser: The user data.
Raises:
UserNotFound: If the user does not exist.
KanidmQueryError: If an error occurs while retrieving the user data.
KanidmReturnUnknownResponseType: If response type is unknown.
"""
user_data = KanidmUserRepository._send_query(
endpoint=f"person/{username}",
method="GET",
@ -299,7 +403,7 @@ class KanidmUserRepository(AbstractUserRepository):
except KanidmReturnEmptyResponse:
raise UserNotFound
attrs = user_data["attrs"]
attrs = user_data["attrs"] # type: ignore
return UserDataUser(
username=attrs["name"][0],
@ -317,8 +421,21 @@ class KanidmUserRepository(AbstractUserRepository):
def generate_password_reset_link(username: str) -> str:
"""
Do not reset the password, just generate a link to reset the password.
Not implemented in JsonUserRepository.
! Not implemented in JsonUserRepository !
Args:
username (str): The username for which to generate the reset link.
Returns:
str: The password reset link.
Raises:
NoPasswordResetLinkFoundInResponse: If no token is found in the response.
KanidmReturnEmptyResponse: If the response from Kanidm is empty.
KanidmQueryError: If an error occurs while generating the link.
KanidmReturnUnknownResponseType: If response type is unknown.
"""
data = KanidmUserRepository._send_query(
endpoint=f"person/{username}/_credential/_update_intent",
method="GET",
@ -328,7 +445,7 @@ class KanidmUserRepository(AbstractUserRepository):
data_type="dict", response_data=data
)
token = data.get("token", None)
token = data.get("token", None) # type: ignore
if not token:
raise KanidmReturnEmptyResponse