selfprivacy-rest-api/selfprivacy_api/repositories/users/kanidm_user_repository.py

93 lines
3.4 KiB
Python
Raw Normal View History

2024-10-28 21:57:23 +00:00
from typing import Optional
import requests
2024-11-11 00:33:17 +00:00
from selfprivacy_api.utils import get_domain
2024-11-11 17:37:17 +00:00
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
2024-10-28 21:57:23 +00:00
from selfprivacy_api.repositories.users.abstract_user_repository import (
AbstractUserRepository,
)
2024-11-11 16:50:52 +00:00
KANIDM_URL = "https://127.0.0.1:3013"
2024-11-11 00:33:17 +00:00
TEST_TOKEN = """eyJhbGciOiJFUzI1NiIsImtpZCI6IjVkNDUyNzdmZWUxY2UzZmNkMTViZDhkZjE3NTdlMjRkIn0.eyJhY2NvdW50X2lkIjoiYmZlN2MxNmEtNTY1NC00YzAxLWFkMjMtOWU2YjY4OTAxNDEwIiwidG9rZW5faWQiOiJmZTU5NzAxZS1iYzIyLTQwMzctYTEzNy1jZTIxYzBlNDhlZjciLCJsYWJlbCI6InRva2VuMiIsImV4cGlyeSI6bnVsbCwiaXNzdWVkX2F0IjoxNzMxMjgxMzM1LCJwdXJwb3NlIjoicmVhZHdyaXRlIn0.0fj0NAsUtBJWi1KVNKA4qi8EOHUUvaWNzeHbR82zbUVvWynnqm5ndLhFPG0v462qJXFTayonI9YJnkCaAE7a5w"""
class KanidmQueryError(Exception):
2024-11-11 08:41:30 +00:00
"""Error occurred during kanidm query"""
2024-10-28 21:57:23 +00:00
class KanidmUserRepository(AbstractUserRepository):
@staticmethod
2024-11-11 00:33:17 +00:00
def _send_query(endpoint: str, method: str = "GET", data=None):
request_method = getattr(requests, method.lower(), None)
2024-11-11 16:36:16 +00:00
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
try:
response = request_method(
2024-11-11 16:36:16 +00:00
full_endpoint,
2024-11-11 00:33:17 +00:00
json=data,
headers={
"Authorization": f"Bearer {TEST_TOKEN}",
"Content-Type": "application/json",
},
timeout=0.8, # TODO: change timeout
2024-11-11 17:09:14 +00:00
verify=False, # TODO: REMOVE THIS NOTHALAL!!!!!
)
if response.status_code != 200:
2024-11-11 08:41:30 +00:00
raise KanidmQueryError(
2024-11-11 16:36:16 +00:00
f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}."
)
2024-11-11 17:09:14 +00:00
return response.json()
2024-11-11 16:07:34 +00:00
except Exception as error:
2024-11-11 16:07:34 +00:00
raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}")
@staticmethod
def create_user(username: str, password: str):
2024-11-11 00:33:17 +00:00
data = {
"attrs": {
"name": [username],
"displayname": [username],
"mail": [f"{username}@{get_domain()}"],
"class": ["user"],
}
}
return KanidmUserRepository._send_query(
2024-11-11 00:33:17 +00:00
endpoint="person",
method="POST",
data=data,
)
2024-10-28 21:57:23 +00:00
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
) -> list[UserDataUser]:
2024-11-11 17:41:44 +00:00
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
2024-11-11 17:37:17 +00:00
users = []
for user in users_data:
attrs = user.get("attrs", {})
user_type = UserDataUser(
uuid=attrs.get("uuid", [None])[0],
name=attrs.get("name", [None])[0],
ssh_keys="test", # TODO
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
origin=UserDataUserOrigin.NORMAL, # TODO
)
users.append(user_type)
return users
2024-10-28 21:57:23 +00:00
def delete_user(username: str) -> None:
"""Deletes an existing user"""
return KanidmUserRepository._send_query()
2024-10-28 21:57:23 +00:00
def update_user(username: str, password: str) -> None:
"""Updates the password of an existing user"""
return KanidmUserRepository._send_query()
2024-10-28 21:57:23 +00:00
def get_user_by_username(username: str) -> Optional[UserDataUser]:
"""Retrieves user data (UserDataUser) by username"""
return KanidmUserRepository._send_query()