feat: forbid modification of important user.

This commit is contained in:
dettlaff 2024-12-11 07:13:02 +04:00
parent 2db33a1688
commit 27254dd014
3 changed files with 50 additions and 17 deletions

View file

@ -2,10 +2,17 @@
import re
import uuid
import logging
from typing import Optional
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
from selfprivacy_api.repositories.users.exceptions_kanidm import (
KanidmQueryError,
KanidmReturnEmptyResponse,
KanidmReturnUnknownResponseType,
)
from selfprivacy_api.utils import is_username_forbidden
from selfprivacy_api.actions.ssh import get_ssh_keys
@ -13,6 +20,7 @@ from selfprivacy_api.actions.ssh import get_ssh_keys
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
from selfprivacy_api.repositories.users.exceptions import (
SelfPrivacyAppIsOutdate,
UsernameForbidden,
UsernameNotAlphanumeric,
UsernameTooLong,
@ -21,6 +29,8 @@ from selfprivacy_api.repositories.users.exceptions import (
InvalidConfiguration,
)
logger = logging.getLogger(__name__)
class ApiUsingWrongUserRepository(Exception):
"""
@ -42,6 +52,14 @@ class RootIsNotAvailableForModification(Exception):
return "Root is not available for modification. Operation is restricted."
class PrimaryUserDeletionNotAllowed(Exception):
"""The primary user cannot be deleted."""
@staticmethod
def get_error_message() -> str:
return "The primary user cannot be deleted."
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
@ -87,6 +105,9 @@ def create_user(
if len(username) >= 32:
raise UsernameTooLong("Username must be less than 32 characters")
if password:
logger.error(PLEASE_UPDATE_APP_TEXT)
# need to maintain the logic of the old repository, since ssh management uses it.
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
try:
@ -98,13 +119,19 @@ def create_user(
ACTIVE_USERS_PROVIDER.create_user(
username=username,
password=password,
directmemberof=directmemberof,
displayname=displayname,
)
def delete_user(username: str) -> None:
if username == "root":
raise RootIsNotAvailableForModification
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
if user.user_type == UserDataUserOrigin.PRIMARY:
raise PrimaryUserDeletionNotAllowed
# need to maintain the logic of the old repository, since ssh management uses it.
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
@ -123,9 +150,14 @@ def update_user(
displayname: Optional[str] = None,
) -> None:
if password:
raise SelfPrivacyAppIsOutdate
if username == "root":
raise RootIsNotAvailableForModification
ACTIVE_USERS_PROVIDER.update_user(
username=username,
password=password,
directmemberof=directmemberof,
displayname=displayname,
)

View file

@ -40,7 +40,6 @@ class AbstractUserRepository(ABC):
@abstractmethod
def update_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
@ -55,7 +54,7 @@ class AbstractUserRepository(ABC):
@staticmethod
@abstractmethod
def get_user_by_username(username: str) -> Optional[UserDataUser]:
def get_user_by_username(username: str) -> UserDataUser:
"""Retrieves user data (UserDataUser) by username"""
@staticmethod

View file

@ -6,8 +6,8 @@ import logging
from selfprivacy_api.repositories.users.exceptions import (
NoPasswordResetLinkFoundInResponse,
SelfPrivacyAppIsOutdate,
UserAlreadyExists,
UserNotFound,
)
from selfprivacy_api.repositories.users.exceptions_kanidm import (
KanidmDidNotReturnAdminPassword,
@ -21,7 +21,6 @@ from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
from selfprivacy_api.repositories.users.abstract_user_repository import (
AbstractUserRepository,
)
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
KANIDM_URL = "https://127.0.0.1:3013"
@ -168,6 +167,9 @@ class KanidmUserRepository(AbstractUserRepository):
plugin_error = response_data.get("plugin", {})
if plugin_error.get("attrunique") == "duplicate value detected":
raise UserAlreadyExists # TODO only user ?
if isinstance(response_data, str):
if response_data == "nomatchingentries":
raise UserNotFound
raise KanidmQueryError(error_text=response.text)
@ -176,7 +178,6 @@ class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def create_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
@ -189,9 +190,6 @@ class KanidmUserRepository(AbstractUserRepository):
If email is None, it will default to username@get_domain().
"""
if password:
logger.error(PLEASE_UPDATE_APP_TEXT)
data = {
"attrs": {
"name": [username],
@ -223,7 +221,9 @@ class KanidmUserRepository(AbstractUserRepository):
"""
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
KanidmUserRepository._check_response_type_and_not_empty(data_type="list", response_data=users_data)
KanidmUserRepository._check_response_type_and_not_empty(
data_type="list", response_data=users_data
)
users = []
for user in users_data:
@ -251,12 +251,12 @@ class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def delete_user(username: str) -> None:
"""Deletes an existing user"""
KanidmUserRepository._send_query(endpoint=f"person/{username}", method="DELETE")
@staticmethod
def update_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
memberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
@ -266,8 +266,6 @@ class KanidmUserRepository(AbstractUserRepository):
Do not update the password, please
use generate_password_reset_link() instead.
"""
if password:
raise SelfPrivacyAppIsOutdate
data = {
"attrs": {
@ -289,14 +287,16 @@ class KanidmUserRepository(AbstractUserRepository):
)
@staticmethod
def get_user_by_username(username: str) -> Optional[UserDataUser]:
def get_user_by_username(username: str) -> UserDataUser:
"""Retrieves user data (UserDataUser) by username"""
user_data = KanidmUserRepository._send_query(
endpoint=f"person/{username}",
method="GET",
)
KanidmUserRepository._check_response_type_and_not_empty(data_type="dict", response_data=user_data)
KanidmUserRepository._check_response_type_and_not_empty(
data_type="dict", response_data=user_data
)
attrs = user_data["attrs"]
@ -323,7 +323,9 @@ class KanidmUserRepository(AbstractUserRepository):
method="GET",
)
KanidmUserRepository._check_response_type_and_not_empty(data_type="dict", response_data=data)
KanidmUserRepository._check_response_type_and_not_empty(
data_type="dict", response_data=data
)
token = data.get("token", None)