2024-12-11 02:35:17 +00:00
|
|
|
from typing import Any, Optional, Union
|
2024-11-19 17:50:43 +00:00
|
|
|
import subprocess
|
2024-11-02 23:15:51 +00:00
|
|
|
import requests
|
2024-11-21 17:01:29 +00:00
|
|
|
import re
|
2024-11-25 11:24:05 +00:00
|
|
|
import logging
|
2024-11-02 23:15:51 +00:00
|
|
|
|
2024-12-12 19:53:41 +00:00
|
|
|
from selfprivacy_api.models.group import Group
|
2024-12-09 08:37:57 +00:00
|
|
|
from selfprivacy_api.repositories.users.exceptions import (
|
|
|
|
NoPasswordResetLinkFoundInResponse,
|
2024-12-10 02:15:15 +00:00
|
|
|
UserAlreadyExists,
|
2024-12-11 03:13:02 +00:00
|
|
|
UserNotFound,
|
2024-12-09 08:37:57 +00:00
|
|
|
)
|
2024-12-11 01:33:07 +00:00
|
|
|
from selfprivacy_api.repositories.users.exceptions_kanidm import (
|
2024-12-12 12:00:07 +00:00
|
|
|
FailedToGetValidKanidmToken,
|
2024-12-11 11:16:14 +00:00
|
|
|
KanidmCliSubprocessError,
|
2024-12-11 01:33:07 +00:00
|
|
|
KanidmDidNotReturnAdminPassword,
|
|
|
|
KanidmQueryError,
|
|
|
|
KanidmReturnEmptyResponse,
|
|
|
|
KanidmReturnUnknownResponseType,
|
|
|
|
)
|
2024-11-27 12:52:05 +00:00
|
|
|
from selfprivacy_api.utils import get_domain, temporary_env_var
|
2024-11-19 17:50:43 +00:00
|
|
|
from selfprivacy_api.utils.redis_pool import RedisPool
|
2024-11-11 17:37:17 +00:00
|
|
|
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
|
2024-10-28 21:57:23 +00:00
|
|
|
from selfprivacy_api.repositories.users.abstract_user_repository import (
|
|
|
|
AbstractUserRepository,
|
|
|
|
)
|
2024-12-03 22:54:24 +00:00
|
|
|
|
2024-10-28 21:57:23 +00:00
|
|
|
|
2024-11-11 16:50:52 +00:00
|
|
|
KANIDM_URL = "https://127.0.0.1:3013"
|
2024-12-11 11:16:14 +00:00
|
|
|
REDIS_TOKEN_KEY = "kanidm:token"
|
|
|
|
ADMIN_KANIDM_GROUPS = ["sp.admin"]
|
2024-11-19 17:50:43 +00:00
|
|
|
|
|
|
|
redis = RedisPool().get_connection()
|
|
|
|
|
2024-11-25 11:24:05 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-11 21:45:54 +00:00
|
|
|
class KanidmAdminToken:
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
Manages the administrative token for Kanidm.
|
|
|
|
|
|
|
|
Methods:
|
|
|
|
get() -> str:
|
|
|
|
Retrieves the current administrative token. If absent, resets the admin password and creates a new token.
|
|
|
|
|
|
|
|
_create_and_save_token(kanidm_admin_password: str) -> str:
|
|
|
|
Creates a new token using the admin password and saves it to Redis.
|
|
|
|
|
|
|
|
_reset_and_save_idm_admin_password() -> str:
|
|
|
|
Resets the Kanidm admin password and returns the new password.
|
|
|
|
|
|
|
|
_delete_kanidm_token_from_db() -> None:
|
|
|
|
Deletes the admin token from Redis.
|
2024-12-11 21:45:54 +00:00
|
|
|
|
|
|
|
_is_token_valid() -> bool:
|
|
|
|
Sends a request to kanidm to check the validity of the token.
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
|
2024-11-19 17:50:43 +00:00
|
|
|
@staticmethod
|
|
|
|
def get() -> str:
|
2024-12-11 11:16:14 +00:00
|
|
|
kanidm_admin_token = str(redis.get(REDIS_TOKEN_KEY))
|
2024-11-19 17:50:43 +00:00
|
|
|
|
2024-12-11 21:45:54 +00:00
|
|
|
if kanidm_admin_token is None or not KanidmAdminToken._is_token_valid(
|
|
|
|
kanidm_admin_token
|
|
|
|
):
|
2024-11-29 15:32:02 +00:00
|
|
|
kanidm_admin_password = (
|
|
|
|
KanidmAdminToken._reset_and_save_idm_admin_password()
|
|
|
|
)
|
2024-11-19 17:50:43 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
kanidm_admin_token = KanidmAdminToken._create_and_save_token(
|
2024-11-19 17:50:43 +00:00
|
|
|
kanidm_admin_password=kanidm_admin_password
|
|
|
|
)
|
|
|
|
|
|
|
|
return kanidm_admin_token
|
|
|
|
|
|
|
|
@staticmethod
|
2024-11-29 15:32:02 +00:00
|
|
|
def _create_and_save_token(kanidm_admin_password: str) -> str:
|
2024-11-19 17:50:43 +00:00
|
|
|
with temporary_env_var(key="KANIDM_PASSWORD", value=kanidm_admin_password):
|
2024-12-11 11:16:14 +00:00
|
|
|
try:
|
|
|
|
subprocess.run(["kanidm", "login", "-D", "idm_admin"], check=True)
|
|
|
|
|
|
|
|
output = subprocess.check_output(
|
|
|
|
[
|
|
|
|
"kanidm",
|
|
|
|
"service-account",
|
|
|
|
"api-token",
|
|
|
|
"generate",
|
|
|
|
"--rw",
|
|
|
|
"selfprivacy",
|
|
|
|
"token2",
|
|
|
|
],
|
|
|
|
text=True,
|
|
|
|
)
|
|
|
|
except subprocess.CalledProcessError as error:
|
|
|
|
logger.error(f"Error creating Kanidm token: {str(error.output)}")
|
|
|
|
raise KanidmCliSubprocessError(error=str(error.output))
|
2024-11-19 17:50:43 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
kanidm_admin_token = output.splitlines()[-1]
|
2024-11-22 19:34:07 +00:00
|
|
|
|
2024-11-19 17:50:43 +00:00
|
|
|
redis.set("kanidm:token", kanidm_admin_token)
|
|
|
|
return kanidm_admin_token
|
|
|
|
|
|
|
|
@staticmethod
|
2024-11-29 15:32:02 +00:00
|
|
|
def _reset_and_save_idm_admin_password() -> str:
|
2024-11-21 16:45:43 +00:00
|
|
|
output = subprocess.check_output(
|
|
|
|
[
|
|
|
|
"kanidmd",
|
|
|
|
"recover-account",
|
|
|
|
"-c",
|
|
|
|
"/etc/kanidm/server.toml",
|
|
|
|
"idm_admin",
|
|
|
|
"-o",
|
|
|
|
"json",
|
|
|
|
],
|
2024-11-21 17:20:29 +00:00
|
|
|
text=True,
|
|
|
|
)
|
2024-11-21 16:45:43 +00:00
|
|
|
|
2024-11-22 18:41:50 +00:00
|
|
|
match = re.search(r'"password":"([^"]+)"', output)
|
2024-12-09 08:37:57 +00:00
|
|
|
if match:
|
|
|
|
new_kanidm_admin_password = match.group(
|
|
|
|
1
|
2024-12-11 11:16:14 +00:00
|
|
|
) # we have many non-JSON strings in output
|
2024-12-09 08:37:57 +00:00
|
|
|
else:
|
|
|
|
raise KanidmDidNotReturnAdminPassword
|
2024-11-21 17:01:29 +00:00
|
|
|
|
2024-11-19 17:50:43 +00:00
|
|
|
return new_kanidm_admin_password
|
2024-11-02 23:15:51 +00:00
|
|
|
|
2024-12-11 21:45:54 +00:00
|
|
|
@staticmethod
|
|
|
|
def _is_token_valid(token: str) -> bool:
|
|
|
|
response = requests.get(
|
2024-12-11 22:10:54 +00:00
|
|
|
f"{KANIDM_URL}/v1/person/root",
|
2024-12-11 21:45:54 +00:00
|
|
|
headers={
|
2024-12-11 22:00:43 +00:00
|
|
|
"Authorization": f"Bearer {token}",
|
2024-12-11 21:45:54 +00:00
|
|
|
"Content-Type": "application/json",
|
|
|
|
},
|
|
|
|
timeout=1,
|
|
|
|
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
|
|
|
|
)
|
|
|
|
response_data = response.json()
|
|
|
|
|
|
|
|
# we do not handle the other errors, this is handled by the main function in KanidmUserRepository._send_query
|
|
|
|
if response.status_code != 200:
|
|
|
|
if isinstance(response_data, str) and response_data == "notauthenticated":
|
|
|
|
logger.error("Kanidm token is not valid")
|
|
|
|
return False
|
|
|
|
return True
|
|
|
|
|
2024-12-05 23:05:28 +00:00
|
|
|
@staticmethod
|
|
|
|
def _delete_kanidm_token_from_db() -> None:
|
|
|
|
redis.delete("kanidm:token")
|
|
|
|
|
2024-11-02 23:15:51 +00:00
|
|
|
|
2024-10-28 21:57:23 +00:00
|
|
|
class KanidmUserRepository(AbstractUserRepository):
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
Repository for managing users through Kanidm.
|
|
|
|
"""
|
|
|
|
|
2024-12-11 03:27:45 +00:00
|
|
|
@staticmethod
|
|
|
|
def _check_response_type_and_not_empty(data_type: str, response_data: Any) -> None:
|
|
|
|
"""
|
|
|
|
Validates the type and that content of the response data is not empty.
|
2024-12-11 02:35:17 +00:00
|
|
|
|
2024-12-11 03:27:45 +00:00
|
|
|
Args:
|
|
|
|
data_type (str): Expected type of response data ('list' or 'dict').
|
2024-12-11 11:16:14 +00:00
|
|
|
response_data (Any): Response data to validate.
|
2024-12-11 02:35:17 +00:00
|
|
|
|
2024-12-11 03:27:45 +00:00
|
|
|
Raises:
|
|
|
|
KanidmReturnEmptyResponse: If the response data is empty.
|
|
|
|
KanidmReturnUnknownResponseType: If the response data is not of the expected type.
|
|
|
|
"""
|
|
|
|
|
2024-12-11 08:09:56 +00:00
|
|
|
if not response_data and response_data is None:
|
2024-12-11 07:51:22 +00:00
|
|
|
raise KanidmReturnEmptyResponse
|
|
|
|
|
2024-12-11 02:35:17 +00:00
|
|
|
if data_type == "list":
|
|
|
|
if not isinstance(response_data, list):
|
|
|
|
raise KanidmReturnUnknownResponseType(response_data=response_data)
|
|
|
|
|
|
|
|
elif data_type == "dict":
|
|
|
|
if not isinstance(response_data, dict):
|
|
|
|
raise KanidmReturnUnknownResponseType(response_data=response_data)
|
|
|
|
|
2024-11-02 23:15:51 +00:00
|
|
|
@staticmethod
|
2024-11-29 15:32:02 +00:00
|
|
|
def _check_user_origin_by_memberof(
|
2024-12-09 08:37:57 +00:00
|
|
|
memberof: list[str] = [],
|
2024-11-29 15:32:02 +00:00
|
|
|
) -> UserDataUserOrigin:
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
Determines the origin of the user based on their group memberships.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
memberof (List[str]): List of groups the user belongs to.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
UserDataUserOrigin: The origin type of the user (PRIMARY or NORMAL).
|
|
|
|
"""
|
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS):
|
|
|
|
return UserDataUserOrigin.PRIMARY
|
|
|
|
else:
|
|
|
|
return UserDataUserOrigin.NORMAL
|
|
|
|
|
|
|
|
@staticmethod
|
2024-12-11 01:33:07 +00:00
|
|
|
def _send_query(endpoint: str, method: str = "GET", data=None) -> Union[dict, list]:
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
Sends a request to the Kanidm API.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
endpoint (str): The API endpoint.
|
|
|
|
method (str, optional): The HTTP method (GET, POST, PATCH, DELETE). Defaults to "GET".
|
|
|
|
data (Optional[dict], optional): The data to send in the request body. Defaults to None.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
Union[dict, list]: The response data.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
KanidmQueryError: If an error occurs during the request.
|
|
|
|
UserAlreadyExists: If the user already exists.
|
|
|
|
UserNotFound: If the user is not found.
|
|
|
|
"""
|
|
|
|
|
2024-11-02 23:15:51 +00:00
|
|
|
request_method = getattr(requests, method.lower(), None)
|
2024-12-11 11:16:14 +00:00
|
|
|
if not request_method:
|
|
|
|
logger.error(f"HTTP method '{method}' is not supported.")
|
|
|
|
raise ValueError(f"Unsupported HTTP method: {method}")
|
|
|
|
|
2024-11-11 16:36:16 +00:00
|
|
|
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
|
2024-11-02 23:15:51 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
try:
|
|
|
|
response = request_method(
|
|
|
|
full_endpoint,
|
|
|
|
json=data,
|
|
|
|
headers={
|
2024-12-11 22:00:43 +00:00
|
|
|
"Authorization": f"Bearer {KanidmAdminToken.get()}",
|
2024-11-29 15:32:02 +00:00
|
|
|
"Content-Type": "application/json",
|
|
|
|
},
|
2024-12-11 11:16:14 +00:00
|
|
|
timeout=1,
|
2024-12-11 01:41:44 +00:00
|
|
|
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
|
2024-12-11 11:16:14 +00:00
|
|
|
)
|
2024-12-13 03:06:34 +00:00
|
|
|
response_data = response.json()
|
|
|
|
|
2024-12-11 00:03:58 +00:00
|
|
|
except Exception as error:
|
2024-12-11 11:16:14 +00:00
|
|
|
logger.error(f"Kanidm query error: {str(error)}")
|
2024-12-11 01:33:07 +00:00
|
|
|
raise KanidmQueryError(error_text=str(error))
|
2024-11-11 16:07:34 +00:00
|
|
|
|
2024-12-11 00:03:58 +00:00
|
|
|
if response.status_code != 200:
|
2024-12-11 01:33:07 +00:00
|
|
|
if isinstance(response_data, dict):
|
|
|
|
plugin_error = response_data.get("plugin", {})
|
|
|
|
if plugin_error.get("attrunique") == "duplicate value detected":
|
2024-12-11 11:55:58 +00:00
|
|
|
raise UserAlreadyExists # does it work only for user? NO ONE KNOWS
|
2024-12-11 01:33:07 +00:00
|
|
|
|
2024-12-11 11:55:58 +00:00
|
|
|
if isinstance(response_data, str):
|
|
|
|
if response_data == "nomatchingentries":
|
|
|
|
raise UserNotFound # does it work only for user? hate kanidm's response
|
|
|
|
elif response_data == "accessdenied":
|
|
|
|
raise KanidmQueryError(error_text="Kanidm access issue")
|
2024-12-11 21:45:54 +00:00
|
|
|
elif response_data == "notauthenticated":
|
2024-12-12 12:00:07 +00:00
|
|
|
raise FailedToGetValidKanidmToken
|
2024-12-11 09:27:58 +00:00
|
|
|
|
2024-12-11 11:16:14 +00:00
|
|
|
logger.error(f"Kanidm query error: {response.text}")
|
2024-12-11 01:33:07 +00:00
|
|
|
raise KanidmQueryError(error_text=response.text)
|
|
|
|
|
2024-12-11 03:35:20 +00:00
|
|
|
return response_data
|
|
|
|
|
2024-11-02 23:15:51 +00:00
|
|
|
@staticmethod
|
2024-11-14 23:30:24 +00:00
|
|
|
def create_user(
|
|
|
|
username: str,
|
|
|
|
directmemberof: Optional[list[str]] = None,
|
2024-12-10 02:15:15 +00:00
|
|
|
displayname: Optional[str] = None,
|
2024-11-14 23:30:24 +00:00
|
|
|
) -> None:
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
2024-12-11 11:16:14 +00:00
|
|
|
Creates a new user.
|
|
|
|
! "password" is a legacy field, please use generate_password_reset_link() instead !
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-11 11:16:14 +00:00
|
|
|
Args:
|
|
|
|
username (str): The username.
|
|
|
|
directmemberof (Optional[List[str]], optional): List of direct group memberships. Defaults to None.
|
|
|
|
memberof (Optional[List[str]], optional): List of indirect group memberships. Defaults to None.
|
|
|
|
displayname (Optional[str], optional): If displayname is None, it will default to the username.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
KanidmQueryError: If an error occurs while creating the user.
|
|
|
|
UserAlreadyExists: If the user already exists.
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
|
|
|
|
2024-11-11 00:33:17 +00:00
|
|
|
data = {
|
|
|
|
"attrs": {
|
|
|
|
"name": [username],
|
2024-11-14 23:30:24 +00:00
|
|
|
"displayname": [displayname if displayname else username],
|
2024-12-10 02:15:15 +00:00
|
|
|
"mail": [f"{username}@{get_domain()}"],
|
2024-11-14 23:30:24 +00:00
|
|
|
"class": ["user"], # TODO read more about it
|
2024-11-11 00:33:17 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2024-11-14 23:30:24 +00:00
|
|
|
if directmemberof:
|
|
|
|
data["attrs"]["directmemberof"] = directmemberof
|
|
|
|
|
2024-12-09 08:37:57 +00:00
|
|
|
KanidmUserRepository._send_query(
|
2024-11-11 00:33:17 +00:00
|
|
|
endpoint="person",
|
|
|
|
method="POST",
|
|
|
|
data=data,
|
2024-11-02 23:15:51 +00:00
|
|
|
)
|
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
@staticmethod
|
2024-10-28 21:57:23 +00:00
|
|
|
def get_users(
|
|
|
|
exclude_primary: bool = False,
|
2024-11-29 15:32:02 +00:00
|
|
|
exclude_root: bool = False, # never return root
|
2024-10-28 21:57:23 +00:00
|
|
|
) -> list[UserDataUser]:
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
|
|
|
Gets a list of users with options to exclude specific user groups.
|
2024-12-11 11:16:14 +00:00
|
|
|
! The root user will never return !
|
|
|
|
|
|
|
|
Args:
|
|
|
|
exclude_primary (bool, optional): Exclude users with PRIMARY type. Defaults to False.
|
|
|
|
exclude_root (bool, optional): Not working for Kanidm. The root user will never return.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
List[UserDataUser]: The list of users.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
KanidmQueryError: If an error occurs while retrieving users.
|
|
|
|
KanidmReturnUnknownResponseType: If response type is unknown.
|
|
|
|
KanidmReturnEmptyResponse: If response is empty.
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
2024-12-11 11:16:14 +00:00
|
|
|
|
2024-11-11 17:41:44 +00:00
|
|
|
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
|
2024-12-03 22:54:24 +00:00
|
|
|
|
2024-12-11 03:13:02 +00:00
|
|
|
KanidmUserRepository._check_response_type_and_not_empty(
|
|
|
|
data_type="list", response_data=users_data
|
|
|
|
)
|
2024-12-11 02:35:17 +00:00
|
|
|
|
2024-11-11 17:37:17 +00:00
|
|
|
users = []
|
|
|
|
for user in users_data:
|
2024-12-03 22:54:24 +00:00
|
|
|
user_attrs = user.get("attrs", {})
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-03 22:54:24 +00:00
|
|
|
user_type = KanidmUserRepository._check_user_origin_by_memberof(
|
|
|
|
memberof=user_attrs.get("memberof", [])
|
2024-11-29 15:32:02 +00:00
|
|
|
)
|
2024-12-03 22:54:24 +00:00
|
|
|
if exclude_primary and user_type == UserDataUserOrigin.PRIMARY:
|
2024-11-29 15:32:02 +00:00
|
|
|
continue
|
|
|
|
|
2024-12-03 22:54:24 +00:00
|
|
|
filled_user = UserDataUser(
|
2024-12-10 01:07:22 +00:00
|
|
|
username=user_attrs["name"][0],
|
2024-12-03 22:54:24 +00:00
|
|
|
user_type=user_type,
|
2024-12-10 00:52:13 +00:00
|
|
|
ssh_keys=[], # actions layer will full in this field
|
2024-12-03 22:54:24 +00:00
|
|
|
directmemberof=user_attrs.get("directmemberof", []),
|
|
|
|
memberof=user_attrs.get("memberof", []),
|
2024-12-10 01:07:22 +00:00
|
|
|
displayname=user_attrs.get("displayname", None)[0],
|
|
|
|
email=user_attrs.get("mail", None)[0],
|
2024-11-11 17:37:17 +00:00
|
|
|
)
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-03 22:54:24 +00:00
|
|
|
users.append(filled_user)
|
2024-11-11 17:37:17 +00:00
|
|
|
return users
|
2024-10-28 21:57:23 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
@staticmethod
|
2024-10-28 21:57:23 +00:00
|
|
|
def delete_user(username: str) -> None:
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
Deletes an existing user from Kanidm.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
username (str): The username to delete.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
KanidmQueryError: If an error occurs while deleting the user.
|
|
|
|
UserNotFound: If the user does not exist.
|
|
|
|
"""
|
2024-12-11 03:13:02 +00:00
|
|
|
|
2024-12-09 08:37:57 +00:00
|
|
|
KanidmUserRepository._send_query(endpoint=f"person/{username}", method="DELETE")
|
2024-10-28 21:57:23 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
@staticmethod
|
2024-11-14 23:30:24 +00:00
|
|
|
def update_user(
|
|
|
|
username: str,
|
|
|
|
directmemberof: Optional[list[str]] = None,
|
2024-12-10 00:52:13 +00:00
|
|
|
displayname: Optional[str] = None,
|
2024-11-14 23:30:24 +00:00
|
|
|
) -> None:
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
|
|
|
Update user information.
|
2024-12-11 11:16:14 +00:00
|
|
|
! Do not update the password, please use generate_password_reset_link() instead !
|
|
|
|
|
|
|
|
Args:
|
|
|
|
username (str): The username to update.
|
|
|
|
directmemberof (Optional[List[str]], optional): New list of direct group memberships. Defaults to None.
|
|
|
|
displayname (Optional[str], optional): New display name. Defaults to username if not provided.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
KanidmQueryError: If an error occurs while updating the user.
|
|
|
|
UserNotFound: If the user does not exist.
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
2024-11-14 23:30:24 +00:00
|
|
|
|
|
|
|
data = {
|
|
|
|
"attrs": {
|
|
|
|
"displayname": [displayname if displayname else username],
|
2024-12-10 02:15:15 +00:00
|
|
|
"mail": [f"{username}@{get_domain()}"],
|
2024-11-14 23:30:24 +00:00
|
|
|
"class": ["user"], # TODO read more about it
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
if directmemberof:
|
|
|
|
data["attrs"]["directmemberof"] = directmemberof
|
|
|
|
|
2024-12-09 08:37:57 +00:00
|
|
|
KanidmUserRepository._send_query(
|
2024-11-14 23:30:24 +00:00
|
|
|
endpoint=f"person/{username}",
|
|
|
|
method="PATCH",
|
|
|
|
data=data,
|
|
|
|
)
|
2024-10-28 21:57:23 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
@staticmethod
|
2024-12-11 03:13:02 +00:00
|
|
|
def get_user_by_username(username: str) -> UserDataUser:
|
2024-12-11 11:16:14 +00:00
|
|
|
"""
|
|
|
|
Retrieves user data by username.
|
|
|
|
|
|
|
|
Args:
|
|
|
|
username (str): The username to search for.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
UserDataUser: The user data.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
UserNotFound: If the user does not exist.
|
|
|
|
KanidmQueryError: If an error occurs while retrieving the user data.
|
|
|
|
KanidmReturnUnknownResponseType: If response type is unknown.
|
|
|
|
"""
|
|
|
|
|
2024-11-14 23:30:24 +00:00
|
|
|
user_data = KanidmUserRepository._send_query(
|
|
|
|
endpoint=f"person/{username}",
|
|
|
|
method="GET",
|
|
|
|
)
|
|
|
|
|
2024-12-11 09:16:38 +00:00
|
|
|
try:
|
|
|
|
KanidmUserRepository._check_response_type_and_not_empty(
|
|
|
|
data_type="dict", response_data=user_data
|
|
|
|
)
|
|
|
|
except KanidmReturnEmptyResponse:
|
|
|
|
raise UserNotFound
|
2024-12-11 02:35:17 +00:00
|
|
|
|
2024-12-11 11:16:14 +00:00
|
|
|
attrs = user_data["attrs"] # type: ignore
|
2024-11-14 23:30:24 +00:00
|
|
|
|
|
|
|
return UserDataUser(
|
2024-12-10 01:07:22 +00:00
|
|
|
username=attrs["name"][0],
|
2024-12-03 22:54:24 +00:00
|
|
|
user_type=KanidmUserRepository._check_user_origin_by_memberof(
|
2024-11-29 15:32:02 +00:00
|
|
|
memberof=attrs.get("memberof", [])
|
|
|
|
),
|
2024-12-10 02:15:15 +00:00
|
|
|
ssh_keys=[], # Actions layer will fill this field
|
2024-11-14 23:30:24 +00:00
|
|
|
directmemberof=attrs.get("directmemberof", []),
|
|
|
|
memberof=attrs.get("memberof", []),
|
2024-12-10 01:07:22 +00:00
|
|
|
displayname=attrs.get("displayname", [None])[0],
|
|
|
|
email=attrs.get("mail", [None])[0],
|
2024-11-14 23:30:24 +00:00
|
|
|
)
|
2024-11-29 15:32:02 +00:00
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def generate_password_reset_link(username: str) -> str:
|
|
|
|
"""
|
|
|
|
Do not reset the password, just generate a link to reset the password.
|
2024-12-11 11:16:14 +00:00
|
|
|
! Not implemented in JsonUserRepository !
|
|
|
|
|
|
|
|
Args:
|
|
|
|
username (str): The username for which to generate the reset link.
|
|
|
|
|
|
|
|
Returns:
|
|
|
|
str: The password reset link.
|
|
|
|
|
|
|
|
Raises:
|
|
|
|
NoPasswordResetLinkFoundInResponse: If no token is found in the response.
|
|
|
|
KanidmReturnEmptyResponse: If the response from Kanidm is empty.
|
|
|
|
KanidmQueryError: If an error occurs while generating the link.
|
|
|
|
KanidmReturnUnknownResponseType: If response type is unknown.
|
2024-11-29 15:32:02 +00:00
|
|
|
"""
|
2024-12-11 11:16:14 +00:00
|
|
|
|
2024-12-09 08:37:57 +00:00
|
|
|
data = KanidmUserRepository._send_query(
|
2024-11-29 15:32:02 +00:00
|
|
|
endpoint=f"person/{username}/_credential/_update_intent",
|
|
|
|
method="GET",
|
|
|
|
)
|
|
|
|
|
2024-12-11 03:13:02 +00:00
|
|
|
KanidmUserRepository._check_response_type_and_not_empty(
|
|
|
|
data_type="dict", response_data=data
|
|
|
|
)
|
2024-12-11 02:35:17 +00:00
|
|
|
|
2024-12-11 11:16:14 +00:00
|
|
|
token = data.get("token", None) # type: ignore
|
2024-12-10 00:23:20 +00:00
|
|
|
|
|
|
|
if not token:
|
|
|
|
raise KanidmReturnEmptyResponse
|
2024-12-09 08:37:57 +00:00
|
|
|
|
|
|
|
if token:
|
2024-12-11 12:53:55 +00:00
|
|
|
return f"https://auth.{get_domain()}/ui/reset?token={token}"
|
2024-12-09 08:37:57 +00:00
|
|
|
|
|
|
|
raise NoPasswordResetLinkFoundInResponse
|
2024-12-12 12:00:07 +00:00
|
|
|
|
|
|
|
@staticmethod
|
2024-12-12 19:53:41 +00:00
|
|
|
def get_groups() -> list[Group]:
|
2024-12-12 12:00:07 +00:00
|
|
|
groups_list_data = KanidmUserRepository._send_query(
|
|
|
|
endpoint="/v1/group",
|
|
|
|
method="GET",
|
|
|
|
)
|
|
|
|
|
|
|
|
KanidmUserRepository._check_response_type_and_not_empty(
|
2024-12-12 19:53:41 +00:00
|
|
|
data_type="dict", response_data=groups_list_data
|
2024-12-12 12:00:07 +00:00
|
|
|
)
|
|
|
|
|
2024-12-12 19:53:41 +00:00
|
|
|
groups = []
|
|
|
|
for group_data in groups_list_data:
|
|
|
|
attrs = group_data.get("attrs", {})
|
|
|
|
group = Group(
|
|
|
|
name=attrs["name"],
|
|
|
|
group_class=attrs.get("class", []),
|
|
|
|
member=attrs.get("member", []),
|
|
|
|
memberof=attrs.get("memberof", []),
|
|
|
|
directmemberof=attrs.get("directmemberof", []),
|
|
|
|
spn=attrs.get("spn", [None])[0],
|
|
|
|
description=attrs.get("description", [None])[0],
|
|
|
|
)
|
|
|
|
groups.append(group)
|
|
|
|
|
|
|
|
return groups
|