selfprivacy-rest-api/selfprivacy_api/repositories/users/kanidm_user_repository.py

337 lines
11 KiB
Python
Raw Normal View History

from typing import Any, Optional, Union
2024-11-19 21:50:43 +04:00
import subprocess
import requests
2024-11-21 21:01:29 +04:00
import re
2024-11-25 15:24:05 +04:00
import logging
2024-12-09 12:37:57 +04:00
from selfprivacy_api.repositories.users.exceptions import (
NoPasswordResetLinkFoundInResponse,
SelfPrivacyAppIsOutdate,
2024-12-10 06:15:15 +04:00
UserAlreadyExists,
2024-12-09 12:37:57 +04:00
)
2024-12-11 05:33:07 +04:00
from selfprivacy_api.repositories.users.exceptions_kanidm import (
KanidmDidNotReturnAdminPassword,
KanidmQueryError,
KanidmReturnEmptyResponse,
KanidmReturnUnknownResponseType,
)
2024-11-27 16:52:05 +04:00
from selfprivacy_api.utils import get_domain, temporary_env_var
2024-11-19 21:50:43 +04:00
from selfprivacy_api.utils.redis_pool import RedisPool
2024-11-11 21:37:17 +04:00
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
2024-10-29 01:57:23 +04:00
from selfprivacy_api.repositories.users.abstract_user_repository import (
AbstractUserRepository,
)
2024-12-04 02:54:24 +04:00
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
2024-10-29 01:57:23 +04:00
2024-11-11 20:50:52 +04:00
KANIDM_URL = "https://127.0.0.1:3013"
2024-11-19 21:50:43 +04:00
redis = RedisPool().get_connection()
2024-11-25 15:24:05 +04:00
logger = logging.getLogger(__name__)
2024-11-29 19:32:02 +04:00
ADMIN_KANIDM_GROUPS = ["sp.admin"]
2024-11-19 21:50:43 +04:00
2024-11-29 19:32:02 +04:00
class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
2024-11-19 21:50:43 +04:00
@staticmethod
def get() -> str:
2024-12-09 12:37:57 +04:00
kanidm_admin_token = str(redis.get("kanidm:token"))
2024-11-19 21:50:43 +04:00
if kanidm_admin_token is None:
2024-11-29 19:32:02 +04:00
kanidm_admin_password = (
KanidmAdminToken._reset_and_save_idm_admin_password()
)
2024-11-19 21:50:43 +04:00
2024-11-29 19:32:02 +04:00
kanidm_admin_token = KanidmAdminToken._create_and_save_token(
2024-11-19 21:50:43 +04:00
kanidm_admin_password=kanidm_admin_password
)
return kanidm_admin_token
@staticmethod
2024-11-29 19:32:02 +04:00
def _create_and_save_token(kanidm_admin_password: str) -> str:
2024-11-19 21:50:43 +04:00
with temporary_env_var(key="KANIDM_PASSWORD", value=kanidm_admin_password):
2024-11-27 17:13:20 +04:00
subprocess.run(["kanidm", "login", "-D", "idm_admin"])
2024-11-27 16:52:05 +04:00
2024-11-29 19:32:02 +04:00
output = subprocess.check_output(
[
"kanidm",
"service-account",
"api-token",
"generate",
"--rw",
"selfprivacy",
"token2",
],
text=True,
)
2024-11-19 21:50:43 +04:00
2024-11-29 19:32:02 +04:00
kanidm_admin_token = output.splitlines()[-1]
2024-11-22 23:34:07 +04:00
2024-11-19 21:50:43 +04:00
redis.set("kanidm:token", kanidm_admin_token)
return kanidm_admin_token
@staticmethod
2024-11-29 19:32:02 +04:00
def _reset_and_save_idm_admin_password() -> str:
output = subprocess.check_output(
[
"kanidmd",
"recover-account",
"-c",
"/etc/kanidm/server.toml",
"idm_admin",
"-o",
"json",
],
text=True,
)
2024-11-22 22:41:50 +04:00
match = re.search(r'"password":"([^"]+)"', output)
2024-12-09 12:37:57 +04:00
if match:
new_kanidm_admin_password = match.group(
1
) # we have many not json strings in output
else:
raise KanidmDidNotReturnAdminPassword
2024-11-21 21:01:29 +04:00
2024-11-19 21:50:43 +04:00
return new_kanidm_admin_password
@staticmethod
def _delete_kanidm_token_from_db() -> None:
redis.delete("kanidm:token")
2024-10-29 01:57:23 +04:00
class KanidmUserRepository(AbstractUserRepository):
"""
Validates the type and that content of the response data is not empty.
Args:
data_type (str): Expected type of response data ('list' or 'dict').
response_data (any): Response data to validate.
Raises:
KanidmReturnEmptyResponse: If the response data is empty.
KanidmReturnUnknownResponseType: If the response data is not of the expected type.
"""
@staticmethod
def _check_response_type_and_not_empty(data_type: str, response_data: Any) -> None:
if data_type == "list":
if not isinstance(response_data, list):
raise KanidmReturnUnknownResponseType(response_data=response_data)
if not response_data:
raise KanidmReturnEmptyResponse
elif data_type == "dict":
if not isinstance(response_data, dict):
raise KanidmReturnUnknownResponseType(response_data=response_data)
if not response_data.get("data"):
raise KanidmReturnEmptyResponse
else:
raise KanidmReturnUnknownResponseType(response_data=response_data)
@staticmethod
2024-11-29 19:32:02 +04:00
def _check_user_origin_by_memberof(
2024-12-09 12:37:57 +04:00
memberof: list[str] = [],
2024-11-29 19:32:02 +04:00
) -> UserDataUserOrigin:
if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS):
return UserDataUserOrigin.PRIMARY
else:
return UserDataUserOrigin.NORMAL
@staticmethod
2024-12-11 05:33:07 +04:00
def _send_query(endpoint: str, method: str = "GET", data=None) -> Union[dict, list]:
request_method = getattr(requests, method.lower(), None)
2024-11-11 20:36:16 +04:00
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
2024-11-29 19:32:02 +04:00
try:
response = request_method(
full_endpoint,
json=data,
headers={
"Authorization": f"Bearer {KanidmAdminToken.get()}",
"Content-Type": "application/json",
},
timeout=0.8, # TODO: change timeout
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
) # type: ignore
except Exception as error:
2024-12-11 05:33:07 +04:00
raise KanidmQueryError(error_text=str(error))
2024-11-11 20:07:34 +04:00
response_data = response.json()
logger.info(str(response))
2024-12-11 03:59:42 +04:00
if response.status_code != 200:
2024-12-11 05:33:07 +04:00
if isinstance(response_data, dict):
plugin_error = response_data.get("plugin", {})
if plugin_error.get("attrunique") == "duplicate value detected":
raise UserAlreadyExists # TODO only user ?
raise KanidmQueryError(error_text=response.text)
2024-12-10 20:02:06 +04:00
# nomatchingentries
2024-12-10 06:15:15 +04:00
@staticmethod
2024-11-15 03:30:24 +04:00
def create_user(
username: str,
2024-11-29 19:32:02 +04:00
password: Optional[str] = None,
2024-11-15 03:30:24 +04:00
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
2024-12-10 06:15:15 +04:00
displayname: Optional[str] = None,
2024-11-15 03:30:24 +04:00
) -> None:
2024-11-29 19:32:02 +04:00
"""
Creates a new user."password" is a legacy field,
please use generate_password_reset_link() instead.
If displayname is None, it will default to the username.
If email is None, it will default to username@get_domain().
"""
if password:
2024-12-04 02:54:24 +04:00
logger.error(PLEASE_UPDATE_APP_TEXT)
2024-11-29 19:32:02 +04:00
2024-11-11 04:33:17 +04:00
data = {
"attrs": {
"name": [username],
2024-11-15 03:30:24 +04:00
"displayname": [displayname if displayname else username],
2024-12-10 06:15:15 +04:00
"mail": [f"{username}@{get_domain()}"],
2024-11-15 03:30:24 +04:00
"class": ["user"], # TODO read more about it
2024-11-11 04:33:17 +04:00
}
}
2024-11-15 03:30:24 +04:00
if directmemberof:
data["attrs"]["directmemberof"] = directmemberof
if memberof:
data["attrs"]["memberof"] = memberof
2024-12-09 12:37:57 +04:00
KanidmUserRepository._send_query(
2024-11-11 04:33:17 +04:00
endpoint="person",
method="POST",
data=data,
)
2024-11-29 19:32:02 +04:00
@staticmethod
2024-10-29 01:57:23 +04:00
def get_users(
exclude_primary: bool = False,
2024-11-29 19:32:02 +04:00
exclude_root: bool = False, # never return root
2024-10-29 01:57:23 +04:00
) -> list[UserDataUser]:
2024-11-29 19:32:02 +04:00
"""
Gets a list of users with options to exclude specific user groups.
The root user will never return.
"""
2024-11-11 21:41:44 +04:00
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
2024-12-04 02:54:24 +04:00
KanidmUserRepository._check_response_type_and_not_empty(data_type="list", response_data=users_data)
2024-11-11 21:37:17 +04:00
users = []
for user in users_data:
2024-12-04 02:54:24 +04:00
user_attrs = user.get("attrs", {})
2024-11-29 19:32:02 +04:00
2024-12-04 02:54:24 +04:00
user_type = KanidmUserRepository._check_user_origin_by_memberof(
memberof=user_attrs.get("memberof", [])
2024-11-29 19:32:02 +04:00
)
2024-12-04 02:54:24 +04:00
if exclude_primary and user_type == UserDataUserOrigin.PRIMARY:
2024-11-29 19:32:02 +04:00
continue
2024-12-04 02:54:24 +04:00
filled_user = UserDataUser(
2024-12-10 05:07:22 +04:00
username=user_attrs["name"][0],
2024-12-04 02:54:24 +04:00
user_type=user_type,
2024-12-10 04:52:13 +04:00
ssh_keys=[], # actions layer will full in this field
2024-12-04 02:54:24 +04:00
directmemberof=user_attrs.get("directmemberof", []),
memberof=user_attrs.get("memberof", []),
2024-12-10 05:07:22 +04:00
displayname=user_attrs.get("displayname", None)[0],
email=user_attrs.get("mail", None)[0],
2024-11-11 21:37:17 +04:00
)
2024-11-29 19:32:02 +04:00
2024-12-04 02:54:24 +04:00
users.append(filled_user)
2024-11-11 21:37:17 +04:00
return users
2024-10-29 01:57:23 +04:00
2024-11-29 19:32:02 +04:00
@staticmethod
2024-10-29 01:57:23 +04:00
def delete_user(username: str) -> None:
"""Deletes an existing user"""
2024-12-09 12:37:57 +04:00
KanidmUserRepository._send_query(endpoint=f"person/{username}", method="DELETE")
2024-10-29 01:57:23 +04:00
2024-11-29 19:32:02 +04:00
@staticmethod
2024-11-15 03:30:24 +04:00
def update_user(
username: str,
2024-11-29 19:32:02 +04:00
password: Optional[str] = None,
2024-11-15 03:30:24 +04:00
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
2024-12-10 04:52:13 +04:00
displayname: Optional[str] = None,
2024-11-15 03:30:24 +04:00
) -> None:
2024-11-29 19:32:02 +04:00
"""
Update user information.
Do not update the password, please
use generate_password_reset_link() instead.
"""
if password:
2024-12-04 02:54:24 +04:00
raise SelfPrivacyAppIsOutdate
2024-11-15 03:30:24 +04:00
data = {
"attrs": {
"displayname": [displayname if displayname else username],
2024-12-10 06:15:15 +04:00
"mail": [f"{username}@{get_domain()}"],
2024-11-15 03:30:24 +04:00
"class": ["user"], # TODO read more about it
}
}
if directmemberof:
data["attrs"]["directmemberof"] = directmemberof
if memberof:
data["attrs"]["memberof"] = memberof
2024-12-09 12:37:57 +04:00
KanidmUserRepository._send_query(
2024-11-15 03:30:24 +04:00
endpoint=f"person/{username}",
method="PATCH",
data=data,
)
2024-10-29 01:57:23 +04:00
2024-11-29 19:32:02 +04:00
@staticmethod
2024-10-29 01:57:23 +04:00
def get_user_by_username(username: str) -> Optional[UserDataUser]:
"""Retrieves user data (UserDataUser) by username"""
2024-11-15 03:30:24 +04:00
user_data = KanidmUserRepository._send_query(
endpoint=f"person/{username}",
method="GET",
)
KanidmUserRepository._check_response_type_and_not_empty(data_type="dict", response_data=user_data)
2024-11-15 03:30:24 +04:00
attrs = user_data["attrs"]
return UserDataUser(
2024-12-10 05:07:22 +04:00
username=attrs["name"][0],
2024-12-04 02:54:24 +04:00
user_type=KanidmUserRepository._check_user_origin_by_memberof(
2024-11-29 19:32:02 +04:00
memberof=attrs.get("memberof", [])
),
2024-12-10 06:15:15 +04:00
ssh_keys=[], # Actions layer will fill this field
2024-11-15 03:30:24 +04:00
directmemberof=attrs.get("directmemberof", []),
memberof=attrs.get("memberof", []),
2024-12-10 05:07:22 +04:00
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
2024-11-15 03:30:24 +04:00
)
2024-11-29 19:32:02 +04:00
@staticmethod
def generate_password_reset_link(username: str) -> str:
"""
Do not reset the password, just generate a link to reset the password.
Not implemented in JsonUserRepository.
"""
2024-12-09 12:37:57 +04:00
data = KanidmUserRepository._send_query(
2024-11-29 19:32:02 +04:00
endpoint=f"person/{username}/_credential/_update_intent",
method="GET",
)
KanidmUserRepository._check_response_type_and_not_empty(data_type="dict", response_data=data)
2024-12-10 04:52:13 +04:00
token = data.get("token", None)
2024-12-10 04:23:20 +04:00
if not token:
raise KanidmReturnEmptyResponse
2024-12-09 12:37:57 +04:00
if token:
2024-12-10 06:15:15 +04:00
return f"https://id.{get_domain()}/ui/reset?token={token}"
2024-12-09 12:37:57 +04:00
raise NoPasswordResetLinkFoundInResponse