From fbc0ae61fb2707a018fffccf541dd6af86874087 Mon Sep 17 00:00:00 2001 From: dettlaff Date: Sun, 3 Nov 2024 03:15:51 +0400 Subject: [PATCH] feat: removed unnecessary functionality from the repository --- selfprivacy_api/actions/api_tokens.py | 40 +++-- selfprivacy_api/actions/ssh.py | 6 +- selfprivacy_api/actions/users.py | 144 ++---------------- selfprivacy_api/graphql/common_types/user.py | 11 +- .../graphql/mutations/users_mutations.py | 8 +- selfprivacy_api/models/user.py | 18 +++ .../repositories/tokens/__init__.py | 5 + .../tokens/redis_tokens_repository.py | 11 +- .../users/abstract_user_repository.py | 28 +--- .../users/json_user_repository.py | 138 ++++++++++++++--- .../users/kanidm_user_repository.py | 55 +++++-- selfprivacy_api/utils/__init__.py | 15 ++ selfprivacy_api/utils/kanidm_manager.py | 71 --------- tests/test_graphql/test_websocket.py | 4 +- tests/test_users.py | 2 +- 15 files changed, 261 insertions(+), 295 deletions(-) create mode 100644 selfprivacy_api/models/user.py delete mode 100644 selfprivacy_api/utils/kanidm_manager.py diff --git a/selfprivacy_api/actions/api_tokens.py b/selfprivacy_api/actions/api_tokens.py index af4bde5..dcfca77 100644 --- a/selfprivacy_api/actions/api_tokens.py +++ b/selfprivacy_api/actions/api_tokens.py @@ -9,9 +9,7 @@ from pydantic import BaseModel from mnemonic import Mnemonic from selfprivacy_api.utils.timeutils import ensure_tz_aware, ensure_tz_aware_strict -from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( - RedisTokensRepository, -) +from selfprivacy_api.repositories.tokens import ACTIVE_TOKEN_PROVIDER from selfprivacy_api.repositories.tokens.exceptions import ( TokenNotFound, RecoveryKeyNotFound, @@ -19,8 +17,6 @@ from selfprivacy_api.repositories.tokens.exceptions import ( NewDeviceKeyNotFound, ) -TOKEN_REPO = RedisTokensRepository() - class TokenInfoWithIsCaller(BaseModel): """Token info""" @@ -40,8 +36,10 @@ def _naive(date_time: datetime) -> datetime: def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCaller]: """Get the tokens info""" - caller_name = TOKEN_REPO.get_token_by_token_string(caller_token).device_name - tokens = TOKEN_REPO.get_tokens() + caller_name = ACTIVE_TOKEN_PROVIDER.get_token_by_token_string( + caller_token + ).device_name + tokens = ACTIVE_TOKEN_PROVIDER.get_tokens() return [ TokenInfoWithIsCaller( name=token.device_name, @@ -54,7 +52,7 @@ def get_api_tokens_with_caller_flag(caller_token: str) -> list[TokenInfoWithIsCa def is_token_valid(token) -> bool: """Check if token is valid""" - return TOKEN_REPO.is_token_valid(token) + return ACTIVE_TOKEN_PROVIDER.is_token_valid(token) class NotFoundException(Exception): @@ -67,19 +65,19 @@ class CannotDeleteCallerException(Exception): def delete_api_token(caller_token: str, token_name: str) -> None: """Delete the token""" - if TOKEN_REPO.is_token_name_pair_valid(token_name, caller_token): + if ACTIVE_TOKEN_PROVIDER.is_token_name_pair_valid(token_name, caller_token): raise CannotDeleteCallerException("Cannot delete caller's token") - if not TOKEN_REPO.is_token_name_exists(token_name): + if not ACTIVE_TOKEN_PROVIDER.is_token_name_exists(token_name): raise NotFoundException("Token not found") - token = TOKEN_REPO.get_token_by_name(token_name) - TOKEN_REPO.delete_token(token) + token = ACTIVE_TOKEN_PROVIDER.get_token_by_name(token_name) + ACTIVE_TOKEN_PROVIDER.delete_token(token) def refresh_api_token(caller_token: str) -> str: """Refresh the token""" try: - old_token = TOKEN_REPO.get_token_by_token_string(caller_token) - new_token = TOKEN_REPO.refresh_token(old_token) + old_token = ACTIVE_TOKEN_PROVIDER.get_token_by_token_string(caller_token) + new_token = ACTIVE_TOKEN_PROVIDER.refresh_token(old_token) except TokenNotFound: raise NotFoundException("Token not found") return new_token.token @@ -97,10 +95,10 @@ class RecoveryTokenStatus(BaseModel): def get_api_recovery_token_status() -> RecoveryTokenStatus: """Get the recovery token status, timezone-aware""" - token = TOKEN_REPO.get_recovery_key() + token = ACTIVE_TOKEN_PROVIDER.get_recovery_key() if token is None: return RecoveryTokenStatus(exists=False, valid=False) - is_valid = TOKEN_REPO.is_recovery_key_valid() + is_valid = ACTIVE_TOKEN_PROVIDER.is_recovery_key_valid() # New tokens are tz-aware, but older ones might not be expiry_date = token.expires_at @@ -137,7 +135,7 @@ def get_new_api_recovery_key( if uses_left <= 0: raise InvalidUsesLeft("Uses must be greater than 0") - key = TOKEN_REPO.create_recovery_key(expiration_date, uses_left) + key = ACTIVE_TOKEN_PROVIDER.create_recovery_key(expiration_date, uses_left) mnemonic_phrase = Mnemonic(language="english").to_mnemonic(bytes.fromhex(key.key)) return mnemonic_phrase @@ -152,21 +150,21 @@ def use_mnemonic_recovery_token(mnemonic_phrase, name): mnemonic_phrase is a string representation of the mnemonic word list. """ try: - token = TOKEN_REPO.use_mnemonic_recovery_key(mnemonic_phrase, name) + token = ACTIVE_TOKEN_PROVIDER.use_mnemonic_recovery_key(mnemonic_phrase, name) return token.token except (RecoveryKeyNotFound, InvalidMnemonic): return None def delete_new_device_auth_token() -> None: - TOKEN_REPO.delete_new_device_key() + ACTIVE_TOKEN_PROVIDER.delete_new_device_key() def get_new_device_auth_token() -> str: """Generate and store a new device auth token which is valid for 10 minutes and return a mnemonic phrase representation """ - key = TOKEN_REPO.get_new_device_key() + key = ACTIVE_TOKEN_PROVIDER.get_new_device_key() return Mnemonic(language="english").to_mnemonic(bytes.fromhex(key.key)) @@ -176,7 +174,7 @@ def use_new_device_auth_token(mnemonic_phrase, name) -> Optional[str]: New device auth token must be deleted. """ try: - token = TOKEN_REPO.use_mnemonic_new_device_key(mnemonic_phrase, name) + token = ACTIVE_TOKEN_PROVIDER.use_mnemonic_new_device_key(mnemonic_phrase, name) return token.token except (NewDeviceKeyNotFound, InvalidMnemonic): return None diff --git a/selfprivacy_api/actions/ssh.py b/selfprivacy_api/actions/ssh.py index 291ca7b..47ba2e2 100644 --- a/selfprivacy_api/actions/ssh.py +++ b/selfprivacy_api/actions/ssh.py @@ -2,12 +2,10 @@ from typing import Optional from pydantic import BaseModel -from selfprivacy_api.actions.users import ( - UserNotFound, - ensure_ssh_and_users_fields_exist, -) from selfprivacy_api.utils import WriteUserData, ReadUserData, validate_ssh_public_key +from selfprivacy_api.repositories.users.exceptions import UserNotFound +from selfprivacy_api.utils import ensure_ssh_and_users_fields_exist def enable_ssh(): diff --git a/selfprivacy_api/actions/users.py b/selfprivacy_api/actions/users.py index f0e48a7..32f478a 100644 --- a/selfprivacy_api/actions/users.py +++ b/selfprivacy_api/actions/users.py @@ -2,78 +2,27 @@ import re from typing import Optional -from selfprivacy_api.utils import ( - ReadUserData, - WriteUserData, - hash_password, - is_username_forbidden, -) -from selfprivacy_api.repositories.users.abstract_user_repository import ( - UserDataUser, - UserDataUserOrigin, -) +from selfprivacy_api.models.user import UserDataUser +from selfprivacy_api.utils import hash_password, is_username_forbidden + +from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER from selfprivacy_api.repositories.users.exceptions import ( - InvalidConfiguration, PasswordIsEmpty, - UserAlreadyExists, - UserIsProtected, UsernameForbidden, UsernameNotAlphanumeric, UsernameTooLong, - UserNotFound, ) -def ensure_ssh_and_users_fields_exist(data): - if "ssh" not in data: - data["ssh"] = {} - data["ssh"]["rootKeys"] = [] - - elif data["ssh"].get("rootKeys") is None: - data["ssh"]["rootKeys"] = [] - - if "sshKeys" not in data: - data["sshKeys"] = [] - - if "users" not in data: - data["users"] = [] - - def get_users( exclude_primary: bool = False, exclude_root: bool = False, ) -> list[UserDataUser]: - """Get the list of users""" - users = [] - with ReadUserData() as user_data: - ensure_ssh_and_users_fields_exist(user_data) - users = [ - UserDataUser( - username=user["username"], - ssh_keys=user.get("sshKeys", []), - origin=UserDataUserOrigin.NORMAL, - ) - for user in user_data["users"] - ] - if not exclude_primary and "username" in user_data.keys(): - users.append( - UserDataUser( - username=user_data["username"], - ssh_keys=user_data["sshKeys"], - origin=UserDataUserOrigin.PRIMARY, - ) - ) - if not exclude_root: - users.append( - UserDataUser( - username="root", - ssh_keys=user_data["ssh"]["rootKeys"], - origin=UserDataUserOrigin.ROOT, - ) - ) - return users + return ACTIVE_USERS_PROVIDER.get_users( + exclude_primary=exclude_primary, exclude_root=exclude_root + ) def create_user(username: str, password: str) -> None: @@ -91,39 +40,15 @@ def create_user(username: str, password: str) -> None: if len(username) >= 32: raise UsernameTooLong("Username must be less than 32 characters") - with ReadUserData() as user_data: - ensure_ssh_and_users_fields_exist(user_data) - if "username" not in user_data.keys(): - raise InvalidConfiguration( - "Broken config: Admin name is not defined. Consider recovery or add it manually" - ) - if username == user_data["username"]: - raise UserAlreadyExists("User already exists") - if username in [user["username"] for user in user_data["users"]]: - raise UserAlreadyExists("User already exists") - hashed_password = hash_password(password) - with WriteUserData() as user_data: - ensure_ssh_and_users_fields_exist(user_data) - - user_data["users"].append( - {"username": username, "sshKeys": [], "hashedPassword": hashed_password} - ) + return ACTIVE_USERS_PROVIDER.create_user( + username=username, hashed_password=hashed_password + ) def delete_user(username: str) -> None: - with WriteUserData() as user_data: - ensure_ssh_and_users_fields_exist(user_data) - if username == user_data["username"] or username == "root": - raise UserIsProtected("Cannot delete main or root user") - - for data_user in user_data["users"]: - if data_user["username"] == username: - user_data["users"].remove(data_user) - break - else: - raise UserNotFound("User did not exist") + return ACTIVE_USERS_PROVIDER.delete_user(username=username) def update_user(username: str, password: str) -> None: @@ -132,49 +57,10 @@ def update_user(username: str, password: str) -> None: hashed_password = hash_password(password) - with WriteUserData() as data: - ensure_ssh_and_users_fields_exist(data) - - if username == data["username"]: - data["hashedMasterPassword"] = hashed_password - - # Return 404 if user does not exist - else: - for data_user in data["users"]: - if data_user["username"] == username: - data_user["hashedPassword"] = hashed_password - break - else: - raise UserNotFound("User does not exist") + return ACTIVE_USERS_PROVIDER.update_user( + username=username, hashed_password=hashed_password + ) def get_user_by_username(username: str) -> Optional[UserDataUser]: - with ReadUserData() as data: - ensure_ssh_and_users_fields_exist(data) - - if username == "root": - return UserDataUser( - origin=UserDataUserOrigin.ROOT, - username="root", - ssh_keys=data["ssh"]["rootKeys"], - ) - - if username == data["username"]: - return UserDataUser( - origin=UserDataUserOrigin.PRIMARY, - username=username, - ssh_keys=data["sshKeys"], - ) - - for user in data["users"]: - if user["username"] == username: - if "sshKeys" not in user: - user["sshKeys"] = [] - - return UserDataUser( - origin=UserDataUserOrigin.NORMAL, - username=username, - ssh_keys=user["sshKeys"], - ) - - return None + return ACTIVE_USERS_PROVIDER.get_user_by_username(username=username) diff --git a/selfprivacy_api/graphql/common_types/user.py b/selfprivacy_api/graphql/common_types/user.py index 2dafd64..4e6bb63 100644 --- a/selfprivacy_api/graphql/common_types/user.py +++ b/selfprivacy_api/graphql/common_types/user.py @@ -1,7 +1,12 @@ import typing from enum import Enum + import strawberry -from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER as users_actions + +from selfprivacy_api.actions.users import ( + get_user_by_username as actions_get_user_by_username, +) +from selfprivacy_api.actions.users import get_users as actions_get_users from selfprivacy_api.graphql.mutations.mutation_interface import ( MutationReturnInterface, @@ -31,7 +36,7 @@ class UserMutationReturn(MutationReturnInterface): def get_user_by_username(username: str) -> typing.Optional[User]: - user = users_actions.get_user_by_username(username=username) + user = actions_get_user_by_username(username=username) if user is None: return None @@ -44,7 +49,7 @@ def get_user_by_username(username: str) -> typing.Optional[User]: def get_users() -> typing.List[User]: """Get users""" - users = users_actions.get_users(exclude_root=True) + users = actions_get_users(exclude_root=True) return [ User( user_type=UserType(user.origin.value), diff --git a/selfprivacy_api/graphql/mutations/users_mutations.py b/selfprivacy_api/graphql/mutations/users_mutations.py index 37f4a84..9e838d9 100644 --- a/selfprivacy_api/graphql/mutations/users_mutations.py +++ b/selfprivacy_api/graphql/mutations/users_mutations.py @@ -18,7 +18,7 @@ from selfprivacy_api.actions.ssh import ( from selfprivacy_api.graphql.mutations.mutation_interface import ( GenericMutationReturn, ) -from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER as users_actions +from selfprivacy_api.actions.users import create_user, delete_user, update_user from selfprivacy_api.repositories.users.exceptions import ( PasswordIsEmpty, UsernameForbidden, @@ -54,7 +54,7 @@ class UsersMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) def create_user(self, user: UserMutationInput) -> UserMutationReturn: try: - users_actions.create_user(user.username, user.password) + create_user(user.username, user.password) except PasswordIsEmpty as e: return UserMutationReturn( success=False, @@ -103,7 +103,7 @@ class UsersMutations: @strawberry.mutation(permission_classes=[IsAuthenticated]) def delete_user(self, username: str) -> GenericMutationReturn: try: - users_actions.delete_user(username) + delete_user(username) except UserNotFound as e: return GenericMutationReturn( success=False, @@ -127,7 +127,7 @@ class UsersMutations: def update_user(self, user: UserMutationInput) -> UserMutationReturn: """Update user mutation""" try: - users_actions.update_user(user.username, user.password) + update_user(user.username, user.password) except PasswordIsEmpty as e: return UserMutationReturn( success=False, diff --git a/selfprivacy_api/models/user.py b/selfprivacy_api/models/user.py new file mode 100644 index 0000000..d29b545 --- /dev/null +++ b/selfprivacy_api/models/user.py @@ -0,0 +1,18 @@ +from enum import Enum +from pydantic import BaseModel + + +class UserDataUserOrigin(Enum): + """Origin of the user in the user data""" + + NORMAL = "NORMAL" + PRIMARY = "PRIMARY" + ROOT = "ROOT" + + +class UserDataUser(BaseModel): + """The user model from the userdata file""" + + username: str + ssh_keys: list[str] + origin: UserDataUserOrigin diff --git a/selfprivacy_api/repositories/tokens/__init__.py b/selfprivacy_api/repositories/tokens/__init__.py index e69de29..2a3562e 100644 --- a/selfprivacy_api/repositories/tokens/__init__.py +++ b/selfprivacy_api/repositories/tokens/__init__.py @@ -0,0 +1,5 @@ +from selfprivacy_api.repositories.tokens.redis_tokens_repository import ( + RedisTokensRepository, +) + +ACTIVE_TOKEN_PROVIDER = RedisTokensRepository() diff --git a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py index b7b4dd6..0565ee4 100644 --- a/selfprivacy_api/repositories/tokens/redis_tokens_repository.py +++ b/selfprivacy_api/repositories/tokens/redis_tokens_repository.py @@ -3,18 +3,19 @@ Token repository using Redis as backend. """ from typing import Any, Optional -from datetime import datetime +from datetime import datetime, timezone from hashlib import md5 -from datetime import timezone -from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( - AbstractTokensRepository, -) from selfprivacy_api.utils.redis_pool import RedisPool + from selfprivacy_api.models.tokens.token import Token from selfprivacy_api.models.tokens.recovery_key import RecoveryKey from selfprivacy_api.models.tokens.new_device_key import NewDeviceKey + from selfprivacy_api.repositories.tokens.exceptions import TokenNotFound +from selfprivacy_api.repositories.tokens.abstract_tokens_repository import ( + AbstractTokensRepository, +) TOKENS_PREFIX = "token_repo:tokens:" NEW_DEVICE_KEY_REDIS_KEY = "token_repo:new_device_key" diff --git a/selfprivacy_api/repositories/users/abstract_user_repository.py b/selfprivacy_api/repositories/users/abstract_user_repository.py index dd53c77..2941286 100644 --- a/selfprivacy_api/repositories/users/abstract_user_repository.py +++ b/selfprivacy_api/repositories/users/abstract_user_repository.py @@ -1,28 +1,12 @@ from abc import ABC, abstractmethod from typing import Optional -from pydantic import BaseModel -from enum import Enum - - -class UserDataUserOrigin(Enum): - """Origin of the user in the user data""" - - NORMAL = "NORMAL" - PRIMARY = "PRIMARY" - ROOT = "ROOT" - - -class UserDataUser(BaseModel): - """The user model from the userdata file""" - - username: str - ssh_keys: list[str] - origin: UserDataUserOrigin +from selfprivacy_api.models.user import UserDataUser class AbstractUserRepository(ABC): + @staticmethod @abstractmethod def get_users( exclude_primary: bool = False, @@ -30,18 +14,22 @@ class AbstractUserRepository(ABC): ) -> list[UserDataUser]: """Retrieves a list of users with options to exclude specific user groups""" + @staticmethod @abstractmethod - def create_user(username: str, password: str): + def create_user(username: str, hashed_password: str) -> None: """Creates a new user""" + @staticmethod @abstractmethod def delete_user(username: str) -> None: """Deletes an existing user""" + @staticmethod @abstractmethod - def update_user(username: str, password: str) -> None: + def update_user(username: str, hashed_password: str) -> None: """Updates the password of an existing user""" + @staticmethod @abstractmethod def get_user_by_username(username: str) -> Optional[UserDataUser]: """Retrieves user data (UserDataUser) by username""" diff --git a/selfprivacy_api/repositories/users/json_user_repository.py b/selfprivacy_api/repositories/users/json_user_repository.py index 25f2414..4208844 100644 --- a/selfprivacy_api/repositories/users/json_user_repository.py +++ b/selfprivacy_api/repositories/users/json_user_repository.py @@ -1,38 +1,138 @@ from typing import Optional + +from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin +from selfprivacy_api.utils import ( + ReadUserData, + WriteUserData, + ensure_ssh_and_users_fields_exist, +) from selfprivacy_api.repositories.users.abstract_user_repository import ( AbstractUserRepository, - UserDataUser, ) - -from selfprivacy_api.actions.users import ( - create_user, - delete_user, - get_user_by_username, - get_users, - update_user, +from selfprivacy_api.repositories.users.exceptions import ( + InvalidConfiguration, + UserAlreadyExists, + UserIsProtected, + UserNotFound, ) class JsonUserRepository(AbstractUserRepository): + @staticmethod def get_users( exclude_primary: bool = False, exclude_root: bool = False, ) -> list[UserDataUser]: - return get_users(exclude_primary=exclude_primary, exclude_root=exclude_root) + """Get the list of users""" + users = [] + with ReadUserData() as user_data: + ensure_ssh_and_users_fields_exist(user_data) + users = [ + UserDataUser( + username=user["username"], + ssh_keys=user.get("sshKeys", []), + origin=UserDataUserOrigin.NORMAL, + ) + for user in user_data["users"] + ] + if not exclude_primary and "username" in user_data.keys(): + users.append( + UserDataUser( + username=user_data["username"], + ssh_keys=user_data["sshKeys"], + origin=UserDataUserOrigin.PRIMARY, + ) + ) + if not exclude_root: + users.append( + UserDataUser( + username="root", + ssh_keys=user_data["ssh"]["rootKeys"], + origin=UserDataUserOrigin.ROOT, + ) + ) + return users - def create_user(username: str, password: str): - """Creates a new user""" - return create_user(username=username, password=password) + @staticmethod + def create_user(username: str, hashed_password: str) -> None: + with ReadUserData() as user_data: + ensure_ssh_and_users_fields_exist(user_data) + if "username" not in user_data.keys(): + raise InvalidConfiguration( + "Broken config: Admin name is not defined. Consider recovery or add it manually" + ) + if username == user_data["username"]: + raise UserAlreadyExists("User already exists") + if username in [user["username"] for user in user_data["users"]]: + raise UserAlreadyExists("User already exists") + with WriteUserData() as user_data: + ensure_ssh_and_users_fields_exist(user_data) + + user_data["users"].append( + {"username": username, "sshKeys": [], "hashedPassword": hashed_password} + ) + + @staticmethod def delete_user(username: str) -> None: - """Deletes an existing user""" - return delete_user(username=username) + with WriteUserData() as user_data: + ensure_ssh_and_users_fields_exist(user_data) + if username == user_data["username"] or username == "root": + raise UserIsProtected("Cannot delete main or root user") - def update_user(username: str, password: str) -> None: - """Updates the password of an existing user""" - return update_user(username=username, password=password) + for data_user in user_data["users"]: + if data_user["username"] == username: + user_data["users"].remove(data_user) + break + else: + raise UserNotFound("User did not exist") + @staticmethod + def update_user(username: str, hashed_password: str) -> None: + with WriteUserData() as data: + ensure_ssh_and_users_fields_exist(data) + + if username == data["username"]: + data["hashedMasterPassword"] = hashed_password + + # Return 404 if user does not exist + else: + for data_user in data["users"]: + if data_user["username"] == username: + data_user["hashedPassword"] = hashed_password + break + else: + raise UserNotFound("User does not exist") + + @staticmethod def get_user_by_username(username: str) -> Optional[UserDataUser]: - """Retrieves user data (UserDataUser) by username""" - return get_user_by_username(username=username) + with ReadUserData() as data: + ensure_ssh_and_users_fields_exist(data) + + if username == "root": + return UserDataUser( + origin=UserDataUserOrigin.ROOT, + username="root", + ssh_keys=data["ssh"]["rootKeys"], + ) + + if username == data["username"]: + return UserDataUser( + origin=UserDataUserOrigin.PRIMARY, + username=username, + ssh_keys=data["sshKeys"], + ) + + for user in data["users"]: + if user["username"] == username: + if "sshKeys" not in user: + user["sshKeys"] = [] + + return UserDataUser( + origin=UserDataUserOrigin.NORMAL, + username=username, + ssh_keys=user["sshKeys"], + ) + + return None diff --git a/selfprivacy_api/repositories/users/kanidm_user_repository.py b/selfprivacy_api/repositories/users/kanidm_user_repository.py index e5b2182..a63610e 100644 --- a/selfprivacy_api/repositories/users/kanidm_user_repository.py +++ b/selfprivacy_api/repositories/users/kanidm_user_repository.py @@ -1,38 +1,61 @@ from typing import Optional +import requests + +from selfprivacy_api.models.user import UserDataUser from selfprivacy_api.repositories.users.abstract_user_repository import ( AbstractUserRepository, - UserDataUser, ) -from selfprivacy_api.utils.kanidm_manager import ( - create_user, - delete_user, - get_user_by_username, - get_users, - update_user, -) +KANIDM_URL = "http://localhost:9001" + + +class KanidmQueryError(Exception): + """Error occurred during Kanidm query""" class KanidmUserRepository(AbstractUserRepository): + @staticmethod + def _send_query(endpoint: str, method: str = "GET", **kwargs): + request_method = getattr(requests, method.lower(), None) + + try: + response = request_method( + f"{KANIDM_URL}/api/v1/{endpoint}", + params=kwargs, + timeout=0.8, # TODO: change timeout + ) + + if response.status_code != 200: + raise KanidmQueryError( + error=f"Kanidm returned unexpected HTTP status code. Error: {response.text}." + ) + json = response.json() + + return json["data"] + except Exception as error: + raise KanidmQueryError(error=f"Kanidm request failed! Error: {str(error)}") + + @staticmethod + def create_user(username: str, password: str): + return KanidmUserRepository._send_query( + endpoint="person", method="POST", name=username, displayname=username + ) + def get_users( exclude_primary: bool = False, exclude_root: bool = False, ) -> list[UserDataUser]: - return get_users(exclude_primary=exclude_primary, exclude_root=exclude_root) - - def create_user(username: str, password: str): - """Creates a new user""" - return create_user(username=username, password=password) + return KanidmUserRepository._send_query() def delete_user(username: str) -> None: """Deletes an existing user""" - return delete_user(username=username) + return KanidmUserRepository._send_query() def update_user(username: str, password: str) -> None: """Updates the password of an existing user""" - return update_user(username=username, password=password) + return KanidmUserRepository._send_query() def get_user_by_username(username: str) -> Optional[UserDataUser]: """Retrieves user data (UserDataUser) by username""" - return get_user_by_username(username=username) + return KanidmUserRepository._send_query() diff --git a/selfprivacy_api/utils/__init__.py b/selfprivacy_api/utils/__init__.py index a7cdb5f..37dc822 100644 --- a/selfprivacy_api/utils/__init__.py +++ b/selfprivacy_api/utils/__init__.py @@ -92,6 +92,21 @@ class ReadUserData(object): self.userdata_file.close() +def ensure_ssh_and_users_fields_exist(data): + if "ssh" not in data: + data["ssh"] = {} + data["ssh"]["rootKeys"] = [] + + elif data["ssh"].get("rootKeys") is None: + data["ssh"]["rootKeys"] = [] + + if "sshKeys" not in data: + data["sshKeys"] = [] + + if "users" not in data: + data["users"] = [] + + def validate_ssh_public_key(key): """Validate SSH public key. It may be ssh-ed25519, ssh-rsa or ecdsa-sha2-nistp256.""" diff --git a/selfprivacy_api/utils/kanidm_manager.py b/selfprivacy_api/utils/kanidm_manager.py deleted file mode 100644 index a393344..0000000 --- a/selfprivacy_api/utils/kanidm_manager.py +++ /dev/null @@ -1,71 +0,0 @@ -"""Kanidm queries.""" - -# pylint: disable=too-few-public-methods -import requests - -import strawberry - -from typing import Annotated, Union - -KANIDM_URL = "http://localhost:9001" - - -@strawberry.type -class KanidmQueryError: - error: str - - -KanidmValuesResult = Annotated[ - Union[str, KanidmQueryError], # WIP. TODO: change str - strawberry.union("KanidmValuesResult"), -] - - -# WIP WIP WIP WIP WIP WIP - - -class KanidmQueries: - @staticmethod - def _send_query(query: str) -> Union[dict, KanidmQueryError]: - try: - response = requests.get( - f"{KANIDM_URL}/api/v1/query", - params={ - "query": query, - }, - timeout=0.8, # TODO: change timeout - ) - if response.status_code != 200: - return KanidmQueryError( - error=f"Kanidm returned unexpected HTTP status code. Error: {response.text}. The query was {query}" - ) - json = response.json() - - return json["data"] - except Exception as error: - return KanidmQueryError(error=f"Kanidm request failed! Error: {str(error)}") - - @staticmethod - def create_user(username: str, password: str) -> KanidmValuesResult: - query = """""" - - data = KanidmQueries._send_query(query=query) - - if isinstance(data, KanidmQueryError): - return data - - return KanidmValuesResult(data) - - -# def get_users( -# exclude_primary: bool = False, -# exclude_root: bool = False, -# ) -> list[UserDataUser]: - -# def create_user(username: str, password: str): - -# def delete_user(username: str) -> None: - -# def update_user(username: str, password: str) -> None: - -# def get_user_by_username(username: str) -> Optional[UserDataUser]: diff --git a/tests/test_graphql/test_websocket.py b/tests/test_graphql/test_websocket.py index 754fbbf..8a66706 100644 --- a/tests/test_graphql/test_websocket.py +++ b/tests/test_graphql/test_websocket.py @@ -7,7 +7,7 @@ from time import sleep from starlette.testclient import WebSocketTestSession from selfprivacy_api.jobs import Jobs -from selfprivacy_api.actions.api_tokens import TOKEN_REPO +from selfprivacy_api.actions.api_tokens import ACTIVE_TOKEN_PROVIDER from selfprivacy_api.graphql import IsAuthenticated from tests.conftest import DEVICE_WE_AUTH_TESTS_WITH @@ -75,7 +75,7 @@ def authenticated_websocket( ) -> Generator[WebSocketTestSession, None, None]: # We use authorized_client only to have token in the repo, this client by itself is not enough to authorize websocket - ValueError(TOKEN_REPO.get_tokens()) + ValueError(ACTIVE_TOKEN_PROVIDER.get_tokens()) with connect_ws_authenticated(authorized_client) as websocket: yield websocket diff --git a/tests/test_users.py b/tests/test_users.py index 3d7f38f..6203df9 100644 --- a/tests/test_users.py +++ b/tests/test_users.py @@ -1,4 +1,4 @@ -from selfprivacy_api.utils import ReadUserData, WriteUserData +from selfprivacy_api.utils import ReadUserData from selfprivacy_api.actions.users import delete_user """