feat: add legacy code support

This commit is contained in:
dettlaff 2024-11-29 19:32:02 +04:00
parent 6b95d590aa
commit b8b2363f48
5 changed files with 204 additions and 88 deletions

View file

@ -133,3 +133,32 @@ def remove_ssh_key(username: str, ssh_key: str):
raise KeyNotFound()
raise UserNotFound()
# def get_ssh_keys(username: str) -> list:
# with ReadUserData() as data:
# ensure_ssh_and_users_fields_exist(data)
# if username == "root":
# if ssh_key in data["ssh"]["rootKeys"]:
# data["ssh"]["rootKeys"].remove(ssh_key)
# return
# raise KeyNotFound()
# if username == data["username"]:
# if ssh_key in data["sshKeys"]:
# data["sshKeys"].remove(ssh_key)
# return
# raise KeyNotFound()
# for user in data["users"]:
# if user["username"] == username:
# if "sshKeys" not in user:
# user["sshKeys"] = []
# if ssh_key in user["sshKeys"]:
# user["sshKeys"].remove(ssh_key)
# return
# raise UserNotFound()

View file

@ -7,6 +7,7 @@ from selfprivacy_api.models.user import UserDataUser
from selfprivacy_api.utils import is_username_forbidden
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
from selfprivacy_api.repositories.users.exceptions import (
UsernameForbidden,
@ -19,10 +20,14 @@ def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
) -> list[UserDataUser]:
return ACTIVE_USERS_PROVIDER.get_users(
users = ACTIVE_USERS_PROVIDER.get_users(
exclude_primary=exclude_primary, exclude_root=exclude_root
)
# for user in users:
# TODO: take ssh keys if ACTIVE_USERS_PROVIDER is KanidmUserRepository
return users
def create_user(
username: str,
@ -44,6 +49,9 @@ def create_user(
if len(username) >= 32:
raise UsernameTooLong("Username must be less than 32 characters")
if ACTIVE_USERS_PROVIDER != JsonUserRepository: # for ssh management
JsonUserRepository.create_user(username=username, password="legacy")
return ACTIVE_USERS_PROVIDER.create_user(
username=username,
password=password,
@ -55,6 +63,9 @@ def create_user(
def delete_user(username: str) -> None:
if ACTIVE_USERS_PROVIDER != JsonUserRepository: # for ssh management
JsonUserRepository.delete_user(username=username)
return ACTIVE_USERS_PROVIDER.delete_user(username=username)
@ -78,4 +89,7 @@ def update_user(
def get_user_by_username(username: str) -> Optional[UserDataUser]:
return ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
# TODO: take ssh keys if ACTIVE_USERS_PROVIDER is KanidmUserRepository
return user

View file

@ -5,18 +5,31 @@ from selfprivacy_api.models.user import UserDataUser
class AbstractUserRepository(ABC):
@staticmethod
@abstractmethod
def create_user(
username: str,
password: Optional[str] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
) -> None:
"""
Creates a new user. In KanidmUserRepository "password" is a legacy field,
please use generate_password_reset_link() instead.
"""
@staticmethod
@abstractmethod
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
) -> list[UserDataUser]:
"""Retrieves a list of users with options to exclude specific user groups"""
@staticmethod
@abstractmethod
def create_user(username: str, password: str) -> None:
"""Creates a new user"""
"""
Gets a list of users with options to exclude specific user groups.
In KanidmUserRepository, the root user will never return.
"""
@staticmethod
@abstractmethod
@ -25,10 +38,30 @@ class AbstractUserRepository(ABC):
@staticmethod
@abstractmethod
def update_user(username: str, password: str) -> None:
"""Updates the password of an existing user"""
def update_user(
username: str,
password: Optional[str] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
) -> None:
"""
Update user information.
In the JsonUserRepository, only update the password of an existing user.
Do not update the password in KanidmUserRepository,
use generate_password_reset_link() instead.
"""
@staticmethod
@abstractmethod
def get_user_by_username(username: str) -> Optional[UserDataUser]:
"""Retrieves user data (UserDataUser) by username"""
@staticmethod
@abstractmethod
def generate_password_reset_link(username: str) -> str:
"""
Do not reset the password, just generate a link to reset the password.
Not implemented in JsonUserRepository.
"""

View file

@ -30,10 +30,10 @@ class JsonUserRepository(AbstractUserRepository):
@staticmethod
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
exclude_primary: bool = False, # TODO
exclude_root: bool = False, # TODO
) -> list[UserDataUser]:
"""Get the list of users"""
"""Retrieves a list of users with options to exclude specific user groups"""
users = []
with ReadUserData() as user_data:
ensure_ssh_and_users_fields_exist(user_data)
@ -65,6 +65,8 @@ class JsonUserRepository(AbstractUserRepository):
@staticmethod
def create_user(username: str, password: str) -> None:
"""Creates a new user"""
hashed_password = JsonUserRepository._check_and_hash_password(password)
with ReadUserData() as user_data:
@ -87,6 +89,8 @@ class JsonUserRepository(AbstractUserRepository):
@staticmethod
def delete_user(username: str) -> None:
"""Deletes an existing user"""
with WriteUserData() as user_data:
ensure_ssh_and_users_fields_exist(user_data)
if username == user_data["username"] or username == "root":
@ -101,6 +105,8 @@ class JsonUserRepository(AbstractUserRepository):
@staticmethod
def update_user(username: str, password: str) -> None:
"""Updates the password of an existing user"""
hashed_password = JsonUserRepository._check_and_hash_password(password)
with WriteUserData() as data:
@ -120,6 +126,8 @@ class JsonUserRepository(AbstractUserRepository):
@staticmethod
def get_user_by_username(username: str) -> Optional[UserDataUser]:
"""Retrieves user data (UserDataUser) by username"""
with ReadUserData() as data:
ensure_ssh_and_users_fields_exist(data)

View file

@ -18,47 +18,31 @@ redis = RedisPool().get_connection()
logger = logging.getLogger(__name__)
ADMIN_KANIDM_GROUPS = ["sp.admin"]
class KanidmAdminToken:
class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
@staticmethod
def get() -> str:
kanidm_admin_token = redis.get("kanidm:token")
if kanidm_admin_token is None:
kanidm_admin_password = redis.get("kanidm:password") # type: ignore
if kanidm_admin_password is None:
kanidm_admin_password = (
KanidmAdminToken.reset_and_save_idm_admin_password()
KanidmAdminToken._reset_and_save_idm_admin_password()
)
kanidm_admin_token = KanidmAdminToken.create_and_save_token(
kanidm_admin_token = KanidmAdminToken._create_and_save_token(
kanidm_admin_password=kanidm_admin_password
)
return kanidm_admin_token
@staticmethod
def create_and_save_token(kanidm_admin_password: str) -> str:
logging.error("create_and_save_token START")
def _create_and_save_token(kanidm_admin_password: str) -> str:
with temporary_env_var(key="KANIDM_PASSWORD", value=kanidm_admin_password):
subprocess.run(["kanidm", "login", "-D", "idm_admin"])
# kanidm_admin_token = subprocess.check_output(
# [
# "kanidm",
# "service-account",
# "api-token",
# "generate",
# "--rw",
# "selfprivacy",
# "token2",
# ],
# text=True,
# )
try:
kanidm_admin_token = subprocess.check_output(
output = subprocess.check_output(
[
"kanidm",
"service-account",
@ -69,25 +53,15 @@ class KanidmAdminToken:
"token2",
],
text=True,
stderr=subprocess.PIPE,
)
except subprocess.CalledProcessError as e:
logging.error(f"Error executing command: {e.cmd}")
logging.error(f"Error message: {e.stderr}")
raise
# except subprocess.CalledProcessError as e:
# logger.error(e)
kanidm_admin_token = kanidm_admin_token.splitlines()[-1]
kanidm_admin_token = output.splitlines()[-1]
redis.set("kanidm:token", kanidm_admin_token)
return kanidm_admin_token
@staticmethod
def reset_and_save_idm_admin_password() -> str:
logging.error("reset_and_save_idm_admin_password START")
def _reset_and_save_idm_admin_password() -> str:
output = subprocess.check_output(
[
"kanidmd",
@ -106,7 +80,6 @@ class KanidmAdminToken:
1
) # we have many not json strings in output
redis.set("kanidm:password", new_kanidm_admin_password)
return new_kanidm_admin_password
@ -116,11 +89,20 @@ class KanidmQueryError(Exception):
class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def _send_query(endpoint: str, method: str = "GET", data=None):
def _check_user_origin_by_memberof(
memberof: Optional[list[str]] = None,
) -> UserDataUserOrigin:
if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS):
return UserDataUserOrigin.PRIMARY
else:
return UserDataUserOrigin.NORMAL
@staticmethod
def _send_query(endpoint: str, method: str = "GET", data=None) -> dict:
request_method = getattr(requests, method.lower(), None)
full_endpoint = f"{KANIDM_URL}/v1/{endpoint}"
# try:
try:
response = request_method(
full_endpoint,
json=data,
@ -132,24 +114,36 @@ class KanidmUserRepository(AbstractUserRepository):
verify=False, # TODO: REMOVE THIS NOTHALAL!!!!!
)
# TODO make more cases, what if user do not exits?
if response.status_code != 200:
raise KanidmQueryError(
f"Kanidm returned {response.status_code} unexpected HTTP status code. Endpoint: {full_endpoint}. Error: {response.text}."
)
return response.json()
# except Exception as error:
# raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}")
except Exception as error:
raise KanidmQueryError(f"Kanidm request failed! Error: {str(error)}")
@staticmethod
def create_user(
username: str,
password: Optional[str] = None, # TODO legacy?
password: Optional[str] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
) -> None:
"""
Creates a new user."password" is a legacy field,
please use generate_password_reset_link() instead.
If displayname is None, it will default to the username.
If email is None, it will default to username@get_domain().
"""
if password:
pass # TODO make notif
data = {
"attrs": {
"name": [username],
@ -170,42 +164,62 @@ class KanidmUserRepository(AbstractUserRepository):
data=data,
)
@staticmethod
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
exclude_root: bool = False, # never return root
) -> list[UserDataUser]:
"""
Gets a list of users with options to exclude specific user groups.
The root user will never return.
"""
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
users = []
for user in users_data:
attrs = user.get("attrs", {})
origin = KanidmUserRepository._check_user_origin(
memberof=attrs.get("memberof", [])
)
if exclude_primary and origin == UserDataUserOrigin.PRIMARY:
continue
user_type = UserDataUser(
uuid=attrs.get("uuid", [None])[0],
username=attrs.get("name", [None])[0],
ssh_keys=["test"], # TODO: подключить реальные SSH-ключи
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
origin=UserDataUserOrigin.NORMAL, # TODO
origin=origin,
directmemberof=attrs.get("directmemberof", []),
memberof=attrs.get("memberof", []),
)
users.append(user_type)
return users
@staticmethod
def delete_user(username: str) -> None:
"""Deletes an existing user"""
return KanidmUserRepository._send_query(
endpoint=f"person/{username}", method="DELETE"
)
@staticmethod
def update_user(
username: str,
password: Optional[str] = None, # TODO legacy?
password: Optional[str] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
) -> None:
"""Updates the password of an existing user"""
"""
Update user information.
Do not update the password, please
use generate_password_reset_link() instead.
"""
if password:
pass # TODO make notif
data = {
"attrs": {
@ -226,6 +240,7 @@ class KanidmUserRepository(AbstractUserRepository):
data=data,
)
@staticmethod
def get_user_by_username(username: str) -> Optional[UserDataUser]:
"""Retrieves user data (UserDataUser) by username"""
user_data = KanidmUserRepository._send_query(
@ -244,7 +259,24 @@ class KanidmUserRepository(AbstractUserRepository):
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
ssh_keys=attrs.get("ssh_keys", []),
origin=UserDataUserOrigin.NORMAL, # TODO
origin=KanidmUserRepository._check_user_origin_by_memberof(
memberof=attrs.get("memberof", [])
),
directmemberof=attrs.get("directmemberof", []),
memberof=attrs.get("memberof", []),
)
@staticmethod
def generate_password_reset_link(username: str) -> str:
"""
Do not reset the password, just generate a link to reset the password.
Not implemented in JsonUserRepository.
"""
token_information = KanidmUserRepository._send_query(
endpoint=f"person/{username}/_credential/_update_intent",
method="GET",
)
# {"token":"3btDa-sR5yX-q2XqZ-68gRq","expiry_time":1732713745}
# TODO: create link
return token_information