2022-08-25 17:03:56 +00:00
|
|
|
"""Actions to manage the users."""
|
2024-07-26 19:59:44 +00:00
|
|
|
|
2022-08-25 17:03:56 +00:00
|
|
|
import re
|
2024-12-05 23:05:28 +00:00
|
|
|
import uuid
|
2024-12-11 03:13:02 +00:00
|
|
|
import logging
|
2022-08-25 17:03:56 +00:00
|
|
|
from typing import Optional
|
|
|
|
|
2024-12-11 03:13:02 +00:00
|
|
|
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
|
2024-12-01 13:29:29 +00:00
|
|
|
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
|
2022-08-25 17:03:56 +00:00
|
|
|
|
2024-11-15 14:30:51 +00:00
|
|
|
from selfprivacy_api.utils import is_username_forbidden
|
2024-12-01 13:29:29 +00:00
|
|
|
from selfprivacy_api.actions.ssh import get_ssh_keys
|
2024-11-02 23:15:51 +00:00
|
|
|
|
2024-12-05 23:05:28 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
|
2024-11-02 23:15:51 +00:00
|
|
|
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
|
2024-10-26 18:22:31 +00:00
|
|
|
from selfprivacy_api.repositories.users.exceptions import (
|
2024-12-11 03:13:02 +00:00
|
|
|
SelfPrivacyAppIsOutdate,
|
2024-12-11 09:16:38 +00:00
|
|
|
UserIsProtected,
|
2024-10-26 18:22:31 +00:00
|
|
|
UsernameForbidden,
|
|
|
|
UsernameNotAlphanumeric,
|
|
|
|
UsernameTooLong,
|
2024-12-01 13:29:29 +00:00
|
|
|
UserNotFound,
|
2024-12-05 23:41:06 +00:00
|
|
|
UserAlreadyExists,
|
|
|
|
InvalidConfiguration,
|
2024-10-26 18:22:31 +00:00
|
|
|
)
|
2022-08-25 17:03:56 +00:00
|
|
|
|
2024-12-11 03:13:02 +00:00
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
2022-08-25 17:03:56 +00:00
|
|
|
|
2024-12-05 23:05:28 +00:00
|
|
|
class ApiUsingWrongUserRepository(Exception):
|
|
|
|
"""
|
|
|
|
API is using a too old or unfinished user repository. Are you debugging?
|
|
|
|
"""
|
|
|
|
|
|
|
|
@staticmethod
|
|
|
|
def get_error_message() -> str:
|
|
|
|
return "API is using a too old or unfinished user repository"
|
|
|
|
|
|
|
|
|
2022-08-25 17:03:56 +00:00
|
|
|
def get_users(
|
|
|
|
exclude_primary: bool = False,
|
|
|
|
exclude_root: bool = False,
|
|
|
|
) -> list[UserDataUser]:
|
2024-11-29 15:32:02 +00:00
|
|
|
users = ACTIVE_USERS_PROVIDER.get_users(
|
2024-11-02 23:15:51 +00:00
|
|
|
exclude_primary=exclude_primary, exclude_root=exclude_root
|
|
|
|
)
|
2022-08-25 17:03:56 +00:00
|
|
|
|
2024-12-09 08:37:57 +00:00
|
|
|
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
|
2024-12-01 13:29:29 +00:00
|
|
|
for user in users:
|
|
|
|
try:
|
2024-12-03 18:06:12 +00:00
|
|
|
user.ssh_keys = get_ssh_keys(username=user.username)
|
2024-12-01 13:29:29 +00:00
|
|
|
except UserNotFound:
|
|
|
|
pass
|
|
|
|
|
|
|
|
if not exclude_root:
|
|
|
|
users.append(
|
|
|
|
UserDataUser(
|
|
|
|
username="root",
|
2024-12-04 14:10:14 +00:00
|
|
|
user_type=UserDataUserOrigin.ROOT,
|
2024-12-03 18:06:12 +00:00
|
|
|
ssh_keys=get_ssh_keys(username=user.username),
|
2024-12-01 13:29:29 +00:00
|
|
|
)
|
|
|
|
)
|
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
return users
|
|
|
|
|
2022-08-25 17:03:56 +00:00
|
|
|
|
2024-11-15 00:19:24 +00:00
|
|
|
def create_user(
|
|
|
|
username: str,
|
|
|
|
password: Optional[str] = None,
|
|
|
|
directmemberof: Optional[list[str]] = None,
|
2024-12-10 00:52:13 +00:00
|
|
|
displayname: Optional[str] = None,
|
2024-11-15 00:19:24 +00:00
|
|
|
) -> None:
|
2022-08-25 17:03:56 +00:00
|
|
|
|
|
|
|
if is_username_forbidden(username):
|
|
|
|
raise UsernameForbidden("Username is forbidden")
|
|
|
|
|
|
|
|
if not re.match(r"^[a-z_][a-z0-9_]+$", username):
|
|
|
|
raise UsernameNotAlphanumeric(
|
|
|
|
"Username must be alphanumeric and start with a letter"
|
|
|
|
)
|
|
|
|
|
|
|
|
if len(username) >= 32:
|
|
|
|
raise UsernameTooLong("Username must be less than 32 characters")
|
|
|
|
|
2024-12-11 03:13:02 +00:00
|
|
|
if password:
|
|
|
|
logger.error(PLEASE_UPDATE_APP_TEXT)
|
|
|
|
|
2024-12-05 23:41:06 +00:00
|
|
|
# need to maintain the logic of the old repository, since ssh management uses it.
|
2024-12-09 08:37:57 +00:00
|
|
|
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
|
2024-12-05 23:41:06 +00:00
|
|
|
try:
|
|
|
|
JsonUserRepository.create_user(
|
|
|
|
username=username, password=str(uuid.uuid4())
|
|
|
|
) # random password for legacy
|
|
|
|
except (UserAlreadyExists, InvalidConfiguration):
|
|
|
|
pass
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-05 23:12:10 +00:00
|
|
|
ACTIVE_USERS_PROVIDER.create_user(
|
2024-11-15 00:53:20 +00:00
|
|
|
username=username,
|
|
|
|
directmemberof=directmemberof,
|
2024-12-10 00:52:13 +00:00
|
|
|
displayname=displayname,
|
2024-11-02 23:15:51 +00:00
|
|
|
)
|
2022-08-25 17:03:56 +00:00
|
|
|
|
|
|
|
|
2024-10-26 18:22:31 +00:00
|
|
|
def delete_user(username: str) -> None:
|
2024-12-11 03:13:02 +00:00
|
|
|
if username == "root":
|
2024-12-11 09:16:38 +00:00
|
|
|
raise UserIsProtected
|
2024-12-11 03:13:02 +00:00
|
|
|
|
2024-12-11 09:16:38 +00:00
|
|
|
try:
|
|
|
|
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
|
|
|
|
except UserNotFound:
|
|
|
|
raise UserNotFound
|
|
|
|
finally:
|
|
|
|
# need to maintain the logic of the old repository, since ssh management uses it.
|
|
|
|
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
|
|
|
|
try:
|
|
|
|
JsonUserRepository.delete_user(username=username)
|
|
|
|
except (UserNotFound, UserIsProtected):
|
|
|
|
pass
|
2024-12-11 03:13:02 +00:00
|
|
|
|
|
|
|
if user.user_type == UserDataUserOrigin.PRIMARY:
|
2024-12-11 09:16:38 +00:00
|
|
|
raise UserIsProtected
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-05 23:12:10 +00:00
|
|
|
ACTIVE_USERS_PROVIDER.delete_user(username=username)
|
2022-08-25 17:03:56 +00:00
|
|
|
|
|
|
|
|
2024-11-15 00:19:24 +00:00
|
|
|
def update_user(
|
|
|
|
username: str,
|
|
|
|
password: Optional[str] = None,
|
|
|
|
directmemberof: Optional[list[str]] = None,
|
2024-12-10 00:52:13 +00:00
|
|
|
displayname: Optional[str] = None,
|
2024-11-15 00:19:24 +00:00
|
|
|
) -> None:
|
2022-08-25 17:03:56 +00:00
|
|
|
|
2024-12-11 03:13:02 +00:00
|
|
|
if password:
|
|
|
|
raise SelfPrivacyAppIsOutdate
|
|
|
|
|
|
|
|
if username == "root":
|
2024-12-11 09:16:38 +00:00
|
|
|
raise UserIsProtected
|
2024-12-11 03:13:02 +00:00
|
|
|
|
2024-12-05 23:12:10 +00:00
|
|
|
ACTIVE_USERS_PROVIDER.update_user(
|
2024-11-15 00:53:20 +00:00
|
|
|
username=username,
|
|
|
|
directmemberof=directmemberof,
|
2024-12-10 00:52:13 +00:00
|
|
|
displayname=displayname,
|
2024-11-02 23:15:51 +00:00
|
|
|
)
|
2022-08-25 17:03:56 +00:00
|
|
|
|
|
|
|
|
2024-12-11 07:51:22 +00:00
|
|
|
def get_user_by_username(username: str) -> UserDataUser:
|
2024-12-11 08:22:52 +00:00
|
|
|
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
|
|
|
|
return ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
|
2024-11-29 15:32:02 +00:00
|
|
|
|
2024-12-11 08:22:52 +00:00
|
|
|
if username == "root":
|
|
|
|
return UserDataUser(
|
|
|
|
username="root",
|
|
|
|
user_type=UserDataUserOrigin.ROOT,
|
|
|
|
ssh_keys=get_ssh_keys(username="root"),
|
|
|
|
)
|
2024-12-01 13:29:29 +00:00
|
|
|
|
2024-12-11 09:16:38 +00:00
|
|
|
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
|
2024-12-11 08:22:52 +00:00
|
|
|
|
|
|
|
try:
|
|
|
|
user.ssh_keys = get_ssh_keys(username=user.username)
|
|
|
|
except UserNotFound:
|
|
|
|
pass
|
2024-12-01 13:29:29 +00:00
|
|
|
|
2024-11-29 15:32:02 +00:00
|
|
|
return user
|
2024-12-03 22:54:24 +00:00
|
|
|
|
|
|
|
|
|
|
|
def generate_password_reset_link(username: str) -> str:
|
2024-12-09 08:37:57 +00:00
|
|
|
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
|
2024-12-05 23:05:28 +00:00
|
|
|
raise ApiUsingWrongUserRepository
|
2024-12-03 22:54:24 +00:00
|
|
|
|
2024-12-10 02:15:15 +00:00
|
|
|
if username == "root":
|
2024-12-11 09:16:38 +00:00
|
|
|
raise UserIsProtected
|
2024-12-10 02:15:15 +00:00
|
|
|
|
2024-12-03 22:54:24 +00:00
|
|
|
return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username)
|