fix: from review

This commit is contained in:
dettlaff 2024-12-18 06:20:50 +04:00
parent e5268ba6b5
commit 65082e53fb
9 changed files with 103 additions and 46 deletions

View file

@ -82,6 +82,7 @@ in
pkgs.util-linux
pkgs.e2fsprogs
pkgs.iproute2
pkgs.kanidm
];
after = [ "network-online.target" ];
wants = [ "network-online.target" ];

View file

@ -1 +0,0 @@
PLEASE_UPDATE_APP_TEXT = "Your SelfPrivacy app is out of date, please update it. Some important functions are not working at the moment."

View file

@ -41,14 +41,6 @@ class ApiUsingWrongUserRepository(Exception):
return "API is using a too old or unfinished user repository"
def _check_displayname_is_correct(displayname: str) -> None:
if not re.match(r"^[a-z_][a-z0-9_]+$", displayname):
raise DisplaynameNotAlphanumeric
if len(displayname) >= 16: # we don't know the limitations of each service
raise DisplaynameTooLong
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
@ -95,8 +87,10 @@ def create_user(
if password:
logger.error(PLEASE_UPDATE_APP_TEXT)
if displayname:
_check_displayname_is_correct(displayname=displayname)
if (
displayname and len(displayname) >= 32
): # we don't know the limitations of each service
raise DisplaynameTooLong
# need to maintain the logic of the old repository, since ssh management uses it.
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
@ -149,8 +143,10 @@ def update_user(
if username == "root":
raise UserIsProtected
if displayname:
_check_displayname_is_correct(displayname=displayname)
if (
displayname and len(displayname) >= 32
): # we don't know the limitations of each service
raise DisplaynameTooLong
ACTIVE_USERS_PROVIDER.update_user(
username=username,

View file

@ -4,6 +4,35 @@ from pydantic import BaseModel
class Group(BaseModel):
"""
Attributes:
name (str): The name of the group.
group_class (Optional[list[str]]):
A list of group classes. This can be used to classify the
group or assign it different roles/categories. Defaults to an empty list.
member (Optional[list[str]]):
A list of members who belong to the group.
Optional, defaults to an empty list.
memberof (Optional[list[str]]):
A list of groups that this group is a member of.
Optional, defaults to an empty list.
directmemberof (Optional[list[str]]):
A list of groups that directly contain this group as a member.
Optional, defaults to an empty list.
spn (Optional[str]):
The Service Principal Name (SPN) associated with the group.
Optional, defaults to None.
description (Optional[str]):
A textual description of the group. Optional, defaults to None.
"""
name: str
group_class: Optional[list[str]] = []
member: Optional[list[str]] = []

View file

@ -13,7 +13,6 @@ class AbstractUserRepository(ABC):
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
"""
Creates a new user.
@ -36,7 +35,7 @@ class AbstractUserRepository(ABC):
@abstractmethod
def delete_user(username: str) -> None:
"""
Deletes an existing user
Deletes an existing user.
"""
@staticmethod
@ -45,7 +44,6 @@ class AbstractUserRepository(ABC):
username: str,
directmemberof: Optional[list[str]] = None,
displayname: Optional[str] = None,
email: Optional[str] = None,
) -> None:
"""
Update user information.
@ -58,7 +56,7 @@ class AbstractUserRepository(ABC):
@abstractmethod
def get_user_by_username(username: str) -> UserDataUser:
"""
Retrieves user data (UserDataUser) by username
Retrieves user data (UserDataUser) by username.
"""
# ! Not implemented in JsonUserRepository !
@ -79,3 +77,13 @@ class AbstractUserRepository(ABC):
"""
Get groups list.
"""
@staticmethod
@abstractmethod
def add_users_to_group(users: list[str], group_name: str) -> None:
""""""
@staticmethod
@abstractmethod
def remove_users_from_group(users: list[str], group_name: str) -> None:
""""""

View file

@ -1,5 +1,8 @@
from selfprivacy_api.utils.strings import PLEASE_UPDATE_APP_TEXT
class UserNotFound(Exception):
"""Attempted to get a user that does not exist"""
"""User not found"""
@staticmethod
def get_error_message() -> str:
@ -7,7 +10,7 @@ class UserNotFound(Exception):
class UserIsProtected(Exception):
"""Attempted to delete a user that is protected"""
"""User is protected and cannot be deleted or modified"""
@staticmethod
def get_error_message() -> str:
@ -15,7 +18,7 @@ class UserIsProtected(Exception):
class UsernameForbidden(Exception):
"""Attempted to create a user with a forbidden username"""
"""Username is forbidden"""
@staticmethod
def get_error_message() -> str:
@ -23,7 +26,7 @@ class UsernameForbidden(Exception):
class UserAlreadyExists(Exception):
"""Attempted to create a user that already exists"""
"""User already exists"""
@staticmethod
def get_error_message() -> str:
@ -31,7 +34,7 @@ class UserAlreadyExists(Exception):
class UsernameNotAlphanumeric(Exception):
"""Attempted to create a user with a non-alphanumeric username"""
"""Username must be alphanumeric and start with a letter"""
@staticmethod
def get_error_message() -> str:
@ -39,9 +42,7 @@ class UsernameNotAlphanumeric(Exception):
class UsernameTooLong(Exception):
"""
Attempted to create a user with a too long username. Username must be less than 32 characters
"""
"""Username is too long. Must be less than 32 characters"""
@staticmethod
def get_error_message() -> str:
@ -49,7 +50,7 @@ class UsernameTooLong(Exception):
class PasswordIsEmpty(Exception):
"""Attempted to create a user with an empty password"""
"""Password cannot be empty"""
@staticmethod
def get_error_message() -> str:
@ -57,7 +58,7 @@ class PasswordIsEmpty(Exception):
class InvalidConfiguration(Exception):
"""The userdata is broken"""
"""Invalid configuration, userdata is broken"""
@staticmethod
def get_error_message() -> str:
@ -71,7 +72,7 @@ class SelfPrivacyAppIsOutdate(Exception):
@staticmethod
def get_error_message() -> str:
return "SelfPrivacy app is out of date, please update"
return PLEASE_UPDATE_APP_TEXT
class NoPasswordResetLinkFoundInResponse(Exception):
@ -82,18 +83,8 @@ class NoPasswordResetLinkFoundInResponse(Exception):
return "The Kanidm response does not contain a password reset link."
class DisplaynameNotAlphanumeric(Exception):
"""Attempted to create a display name with non-alphanumeric characters"""
@staticmethod
def get_error_message() -> str:
return "Display name must be alphanumeric"
class DisplaynameTooLong(Exception):
"""
Attempted to create a display name that is too long. Display name must be less than 16 characters
"""
"""Display name is too long. Must be less than 16 characters"""
@staticmethod
def get_error_message() -> str:

View file

@ -60,7 +60,7 @@ class KanidmCliSubprocessError(Exception):
return (
f"An error occurred when using Kanidm cli. Error: {self.error}"
if self.error
else "An error occurred when using Kanidm cli."
else "An error occurred when using Kanidm cli"
)

View file

@ -213,7 +213,7 @@ class KanidmUserRepository(AbstractUserRepository):
UserDataUserOrigin: The origin type of the user (PRIMARY or NORMAL).
"""
if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS):
if all(group in memberof for group in ADMIN_KANIDM_GROUPS):
return UserDataUserOrigin.PRIMARY
else:
return UserDataUserOrigin.NORMAL
@ -285,7 +285,7 @@ class KanidmUserRepository(AbstractUserRepository):
if isinstance(response_data, str):
if response_data == "nomatchingentries":
raise UserNotFound # does it work only for user? hate kanidm's response
raise UserNotFound # does it work only for user?
elif response_data == "accessdenied":
raise KanidmQueryError(
error_text="Kanidm access issue", endpoint=full_endpoint
@ -425,12 +425,14 @@ class KanidmUserRepository(AbstractUserRepository):
data = {
"attrs": {
"displayname": [displayname if displayname else username],
"mail": [f"{username}@{get_domain()}"],
"class": ["user"], # TODO read more about it
}
}
if displayname:
data["attrs"]["displayname"] = [displayname]
if directmemberof:
data["attrs"]["directmemberof"] = directmemberof
@ -483,12 +485,15 @@ class KanidmUserRepository(AbstractUserRepository):
email=attrs.get("mail", [None])[0],
)
# ! Not implemented in JsonUserRepository !
# | |
# \|/ \|/
@staticmethod
def generate_password_reset_link(username: str) -> str:
"""
Do not reset the password, just generate a link to reset the password.
! Not implemented in JsonUserRepository !
Args:
username (str): The username for which to generate the reset link.
@ -525,7 +530,6 @@ class KanidmUserRepository(AbstractUserRepository):
def get_groups() -> list[Group]:
"""
Return Kanidm groups.
! Not implemented in JsonUserRepository !
Returns:
list[Group]
@ -560,3 +564,31 @@ class KanidmUserRepository(AbstractUserRepository):
groups.append(group)
return groups
@staticmethod
def add_users_to_group(users: list[str], group_name: str) -> None:
data = {
"attrs": {
"members": users,
}
}
KanidmUserRepository._send_query(
endpoint=f"group/{group_name}/_attr/member",
method="POST",
data=data,
)
@staticmethod
def remove_users_from_group(users: list[str], group_name: str) -> None:
data = {
"attrs": {
"members": users,
}
}
KanidmUserRepository._send_query(
endpoint=f"group/{group_name}/_attr/member",
method="POST",
data=data,
)

View file

@ -0,0 +1 @@
PLEASE_UPDATE_APP_TEXT = "Your SelfPrivacy app is out of date, please update it. Some important functions are not working at the moment."