selfprivacy-rest-api/selfprivacy_api/actions/users.py

224 lines
6.3 KiB
Python
Raw Permalink Normal View History

"""Actions to manage the users."""
2024-07-26 19:59:44 +00:00
import re
import uuid
import logging
from typing import Optional
from selfprivacy_api.utils.strings import PLEASE_UPDATE_APP_TEXT
2024-12-12 19:53:41 +00:00
from selfprivacy_api.models.group import Group
2024-12-01 13:29:29 +00:00
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
2024-12-23 12:47:35 +00:00
from selfprivacy_api.utils import get_domain, is_username_forbidden
2024-12-01 13:29:29 +00:00
from selfprivacy_api.actions.ssh import get_ssh_keys
2024-11-29 15:32:02 +00:00
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
from selfprivacy_api.repositories.users.exceptions import (
DisplaynameTooLong,
SelfPrivacyAppIsOutdate,
UserIsProtected,
UsernameForbidden,
UsernameNotAlphanumeric,
UsernameTooLong,
2024-12-01 13:29:29 +00:00
UserNotFound,
2024-12-05 23:41:06 +00:00
UserAlreadyExists,
InvalidConfiguration,
)
2024-12-23 12:47:35 +00:00
domain = get_domain()
2024-12-19 00:44:05 +00:00
2024-12-23 12:47:35 +00:00
DEFAULT_GROUPS = [f"idm_all_persons@{domain}", "idm_all_accounts@{domain}"]
2024-12-19 00:44:05 +00:00
logger = logging.getLogger(__name__)
class ApiUsingWrongUserRepository(Exception):
"""
API is using a too old or unfinished user repository. Are you debugging?
"""
@staticmethod
def get_error_message() -> str:
return "API is using a too old or unfinished user repository"
def get_users(
exclude_primary: bool = False,
exclude_root: bool = False,
) -> list[UserDataUser]:
2024-11-29 15:32:02 +00:00
users = ACTIVE_USERS_PROVIDER.get_users(
exclude_primary=exclude_primary, exclude_root=exclude_root
)
2024-12-09 08:37:57 +00:00
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
2024-12-01 13:29:29 +00:00
for user in users:
try:
2024-12-03 18:06:12 +00:00
user.ssh_keys = get_ssh_keys(username=user.username)
2024-12-01 13:29:29 +00:00
except UserNotFound:
pass
if not exclude_root:
users.append(
UserDataUser(
username="root",
2024-12-04 14:10:14 +00:00
user_type=UserDataUserOrigin.ROOT,
2024-12-03 18:06:12 +00:00
ssh_keys=get_ssh_keys(username=user.username),
2024-12-01 13:29:29 +00:00
)
)
2024-11-29 15:32:02 +00:00
return users
2024-11-15 00:19:24 +00:00
def create_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
2024-12-10 00:52:13 +00:00
displayname: Optional[str] = None,
2024-11-15 00:19:24 +00:00
) -> None:
if is_username_forbidden(username):
raise UsernameForbidden
if not re.match(r"^[a-z_][a-z0-9_]+$", username):
raise UsernameNotAlphanumeric
if len(username) >= 32:
raise UsernameTooLong
if password:
logger.error(PLEASE_UPDATE_APP_TEXT)
2024-12-23 12:47:35 +00:00
if displayname and len(displayname) >= 255:
2024-12-18 02:20:50 +00:00
raise DisplaynameTooLong
2024-12-05 23:41:06 +00:00
# need to maintain the logic of the old repository, since ssh management uses it.
2024-12-09 08:37:57 +00:00
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
2024-12-05 23:41:06 +00:00
try:
JsonUserRepository.create_user(
username=username, password=str(uuid.uuid4())
) # random password for legacy
except (UserAlreadyExists, InvalidConfiguration):
pass
2024-11-29 15:32:02 +00:00
2024-12-05 23:12:10 +00:00
ACTIVE_USERS_PROVIDER.create_user(
2024-11-15 00:53:20 +00:00
username=username,
directmemberof=directmemberof,
2024-12-10 00:52:13 +00:00
displayname=displayname,
)
def delete_user(username: str) -> None:
if username == "root":
raise UserIsProtected
try:
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
except UserNotFound:
raise UserNotFound
finally:
# need to maintain the logic of the old repository, since ssh management uses it.
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
try:
JsonUserRepository.delete_user(username=username)
except (UserNotFound, UserIsProtected):
pass
if user.user_type == UserDataUserOrigin.PRIMARY:
raise UserIsProtected
2024-11-29 15:32:02 +00:00
2024-12-05 23:12:10 +00:00
ACTIVE_USERS_PROVIDER.delete_user(username=username)
2024-11-15 00:19:24 +00:00
def update_user(
username: str,
password: Optional[str] = None,
directmemberof: Optional[list[str]] = None,
2024-12-10 00:52:13 +00:00
displayname: Optional[str] = None,
2024-11-15 00:19:24 +00:00
) -> None:
if password:
raise SelfPrivacyAppIsOutdate
if username == "root":
raise UserIsProtected
2024-12-19 01:03:25 +00:00
if displayname:
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository
2024-12-23 12:47:35 +00:00
if len(displayname) >= 255:
2024-12-19 01:03:25 +00:00
raise DisplaynameTooLong
2024-12-19 01:03:25 +00:00
ACTIVE_USERS_PROVIDER.update_user(
username=username,
displayname=displayname,
)
2024-12-19 01:54:15 +00:00
if directmemberof is not None:
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
groups_to_add = [item for item in directmemberof if item not in user.directmemberof] # type: ignore
groups_to_delete = [item for item in user.directmemberof if item not in directmemberof] # type: ignore
if groups_to_add:
for group in groups_to_add:
2024-12-19 00:44:05 +00:00
if group in DEFAULT_GROUPS:
continue
2024-12-19 00:26:58 +00:00
ACTIVE_USERS_PROVIDER.add_users_to_group(
group_name=group, users=[username]
)
if groups_to_delete:
for group in groups_to_delete:
2024-12-19 00:44:05 +00:00
if group in DEFAULT_GROUPS:
continue
2024-12-19 00:26:58 +00:00
ACTIVE_USERS_PROVIDER.remove_users_from_group(
group_name=group, users=[username]
)
def get_user_by_username(username: str) -> UserDataUser:
2024-12-11 08:22:52 +00:00
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
return ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
2024-11-29 15:32:02 +00:00
2024-12-11 08:22:52 +00:00
if username == "root":
return UserDataUser(
username="root",
user_type=UserDataUserOrigin.ROOT,
ssh_keys=get_ssh_keys(username="root"),
)
2024-12-01 13:29:29 +00:00
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
2024-12-11 08:22:52 +00:00
try:
user.ssh_keys = get_ssh_keys(username=user.username)
except UserNotFound:
pass
2024-12-01 13:29:29 +00:00
2024-11-29 15:32:02 +00:00
return user
2024-12-03 22:54:24 +00:00
def generate_password_reset_link(username: str) -> str:
2024-12-09 08:37:57 +00:00
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository
2024-12-03 22:54:24 +00:00
2024-12-10 02:15:15 +00:00
if username == "root":
raise UserIsProtected
2024-12-10 02:15:15 +00:00
2024-12-03 22:54:24 +00:00
return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username)
2024-12-12 12:00:07 +00:00
2024-12-12 19:53:41 +00:00
def get_groups() -> list[Group]:
2024-12-12 12:00:07 +00:00
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository
2024-12-12 19:53:41 +00:00
return ACTIVE_USERS_PROVIDER.get_groups()