fix: typing

This commit is contained in:
dettlaff 2024-12-09 12:37:57 +04:00
parent 63f0011da5
commit 3c53251caa
7 changed files with 135 additions and 62 deletions

View file

@ -41,7 +41,7 @@ def get_users(
exclude_primary=exclude_primary, exclude_root=exclude_root
)
if ACTIVE_USERS_PROVIDER != JsonUserRepository:
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
for user in users:
try:
user.ssh_keys = get_ssh_keys(username=user.username)
@ -81,7 +81,7 @@ def create_user(
raise UsernameTooLong("Username must be less than 32 characters")
# need to maintain the logic of the old repository, since ssh management uses it.
if ACTIVE_USERS_PROVIDER != JsonUserRepository:
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
try:
JsonUserRepository.create_user(
username=username, password=str(uuid.uuid4())
@ -102,7 +102,7 @@ def create_user(
def delete_user(username: str) -> None:
# need to maintain the logic of the old repository, since ssh management uses it.
if ACTIVE_USERS_PROVIDER != JsonUserRepository:
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
try:
JsonUserRepository.delete_user(username=username)
except UserNotFound:
@ -131,9 +131,11 @@ def update_user(
def get_user_by_username(username: str) -> Optional[UserDataUser]:
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
user: UserDataUser | None = ACTIVE_USERS_PROVIDER.get_user_by_username(
username=username
)
if ACTIVE_USERS_PROVIDER != JsonUserRepository:
if not isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
if username == "root":
return UserDataUser(
username="root",
@ -142,7 +144,8 @@ def get_user_by_username(username: str) -> Optional[UserDataUser]:
)
try:
user.ssh_keys = get_ssh_keys(username=user)
if user:
user.ssh_keys = get_ssh_keys(username=user.username)
except UserNotFound:
pass
@ -150,7 +153,7 @@ def get_user_by_username(username: str) -> Optional[UserDataUser]:
def generate_password_reset_link(username: str) -> str:
if ACTIVE_USERS_PROVIDER == JsonUserRepository:
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository
return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username)

View file

@ -28,8 +28,8 @@ class User:
user_type: Optional[UserType] = None
displayname: Optional[str] = None
email: Optional[str] = None
directmemberof: Optional[list[str]] = None
memberof: Optional[list[str]] = None
directmemberof: Optional[list[str]] = []
memberof: Optional[list[str]] = []
@strawberry.type
@ -37,6 +37,12 @@ class UserMutationReturn(MutationReturnInterface):
"""Return type for user mutation"""
user: Optional[User] = None
@strawberry.type
class PasswordResetLinkReturn(MutationReturnInterface):
"""Return password reset link"""
password_reset_link: Optional[str] = None

View file

@ -6,6 +6,7 @@ import strawberry
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.graphql.common_types.user import (
PasswordResetLinkReturn,
UserMutationReturn,
get_user_by_username,
)
@ -26,6 +27,7 @@ from selfprivacy_api.actions.users import (
generate_password_reset_link as generate_password_reset_link_action,
)
from selfprivacy_api.repositories.users.exceptions import (
NoPasswordResetLinkFoundInResponse,
PasswordIsEmpty,
UsernameForbidden,
InvalidConfiguration,
@ -37,6 +39,7 @@ from selfprivacy_api.repositories.users.exceptions import (
SelfPrivacyAppIsOutdate,
)
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
from selfprivacy_api.repositories.users.kanidm_user_repository import KanidmDidNotReturnAdminPassword
FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The problem occurred due to an old version of the SelfPrivacy app."
@ -45,7 +48,7 @@ FAILED_TO_SETUP_PASSWORD_TEXT = "Failed to set a password for a user. The proble
def return_failed_mutation_return(
message: str,
code: int = 400,
username: str = None,
username: Optional[str] = None,
) -> UserMutationReturn:
return UserMutationReturn(
success=False,
@ -63,8 +66,8 @@ class UserMutationInput:
password: Optional[str] = None
displayname: Optional[str] = None
email: Optional[str] = None
directmemberof: Optional[list[str]] = None
memberof: Optional[list[str]] = None
directmemberof: Optional[list[str]] = []
memberof: Optional[list[str]] = []
@strawberry.input
@ -95,9 +98,10 @@ class UsersMutations:
UsernameNotAlphanumeric,
UsernameTooLong,
InvalidConfiguration,
KanidmDidNotReturnAdminPassword,
) as error:
return return_failed_mutation_return(
message=error.get_description(),
message=error.get_error_message(),
)
except UsernameForbidden as error:
return return_failed_mutation_return(
@ -130,12 +134,23 @@ class UsersMutations:
try:
delete_user_action(username)
except UserNotFound as error:
return return_failed_mutation_return(
return GenericMutationReturn(
success=False,
message=error.get_error_message(),
code=404,
)
except UserIsProtected as error:
return return_failed_mutation_return(error=error)
return GenericMutationReturn(
success=False,
code=400,
message=error.get_error_message(),
)
except KanidmDidNotReturnAdminPassword as error:
return GenericMutationReturn(
success=False,
code=500,
message=error.get_error_message(),
)
return GenericMutationReturn(
success=True,
@ -155,7 +170,7 @@ class UsersMutations:
directmemberof=user.directmemberof,
memberof=user.memberof,
)
except (PasswordIsEmpty, SelfPrivacyAppIsOutdate) as error:
except (PasswordIsEmpty, SelfPrivacyAppIsOutdate, KanidmDidNotReturnAdminPassword) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
)
@ -211,7 +226,7 @@ class UsersMutations:
try:
remove_ssh_key_action(ssh_input.username, ssh_input.ssh_key)
except (KeyNotFound, UserMutationReturn) as error:
except (KeyNotFound, UserNotFound) as error:
return return_failed_mutation_return(
message=error.get_error_message(),
code=404,
@ -231,16 +246,27 @@ class UsersMutations:
)
@strawberry.mutation(permission_classes=[IsAuthenticated])
def generate_password_reset_link(username: str) -> UserMutationReturn:
def generate_password_reset_link(
self, user: UserMutationInput
) -> PasswordResetLinkReturn:
try:
password_reset_link = generate_password_reset_link_action(username=username)
password_reset_link = generate_password_reset_link_action(
username=user.username
)
except UserNotFound as error:
return return_failed_mutation_return(
return PasswordResetLinkReturn(
success=False,
message=error.get_error_message(),
code=404,
)
except (NoPasswordResetLinkFoundInResponse, KanidmDidNotReturnAdminPassword) as error:
return PasswordResetLinkReturn(
success=False,
code=500,
message=error.get_error_message(),
)
return UserMutationReturn(
return PasswordResetLinkReturn(
success=True,
message="Link successfully created",
code=200,

View file

@ -15,13 +15,11 @@ class UserDataUser(BaseModel):
"""The user model from the userdata file"""
username: str
ssh_keys: Optional[list[str]] = []
user_type: Optional[UserDataUserOrigin] = None
user_type: UserDataUserOrigin
ssh_keys: list[str] = []
directmemberof: Optional[list[str]] = []
memberof: Optional[list[str]] = []
displayname: Optional[str] = (
None # in logic graphql will return "username" if "displayname" None
)
email: Optional[str] = None
directmemberof: Optional[list[str]] = None
memberof: Optional[list[str]] = None

View file

@ -72,3 +72,11 @@ class SelfPrivacyAppIsOutdate(Exception):
@staticmethod
def get_error_message() -> str:
return "SelfPrivacy app is out of date, please update"
class NoPasswordResetLinkFoundInResponse(Exception):
"""No password reset link was found in the Kanidm response."""
@staticmethod
def get_error_message() -> str:
return "The Kanidm response does not contain a password reset link."

View file

@ -30,8 +30,8 @@ class JsonUserRepository(AbstractUserRepository):
@staticmethod
def get_users(
exclude_primary: bool = False, # TODO
exclude_root: bool = False, # TODO
exclude_primary: bool = False,
exclude_root: bool = False,
) -> list[UserDataUser]:
"""Retrieves a list of users with options to exclude specific user groups"""
users = []
@ -41,7 +41,7 @@ class JsonUserRepository(AbstractUserRepository):
UserDataUser(
username=user["username"],
ssh_keys=user.get("sshKeys", []),
origin=UserDataUserOrigin.NORMAL,
user_type=UserDataUserOrigin.NORMAL,
)
for user in user_data["users"]
]
@ -50,7 +50,7 @@ class JsonUserRepository(AbstractUserRepository):
UserDataUser(
username=user_data["username"],
ssh_keys=user_data["sshKeys"],
origin=UserDataUserOrigin.PRIMARY,
user_type=UserDataUserOrigin.PRIMARY,
)
)
if not exclude_root:
@ -58,7 +58,7 @@ class JsonUserRepository(AbstractUserRepository):
UserDataUser(
username="root",
ssh_keys=user_data["ssh"]["rootKeys"],
origin=UserDataUserOrigin.ROOT,
user_type=UserDataUserOrigin.ROOT,
)
)
return users
@ -133,14 +133,14 @@ class JsonUserRepository(AbstractUserRepository):
if username == "root":
return UserDataUser(
origin=UserDataUserOrigin.ROOT,
user_type=UserDataUserOrigin.ROOT,
username="root",
ssh_keys=data["ssh"]["rootKeys"],
)
if username == data["username"]:
return UserDataUser(
origin=UserDataUserOrigin.PRIMARY,
user_type=UserDataUserOrigin.PRIMARY,
username=username,
ssh_keys=data["sshKeys"],
)
@ -151,7 +151,7 @@ class JsonUserRepository(AbstractUserRepository):
user["sshKeys"] = []
return UserDataUser(
origin=UserDataUserOrigin.NORMAL,
user_type=UserDataUserOrigin.NORMAL,
username=username,
ssh_keys=user["sshKeys"],
)

View file

@ -1,12 +1,13 @@
from typing import Optional
import subprocess
import requests
import re
import logging
import json
from selfprivacy_api.repositories.users.exceptions import SelfPrivacyAppIsOutdate
from selfprivacy_api.repositories.users.exceptions import (
NoPasswordResetLinkFoundInResponse,
SelfPrivacyAppIsOutdate,
)
from selfprivacy_api.utils import get_domain, temporary_env_var
from selfprivacy_api.utils.redis_pool import RedisPool
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
@ -25,10 +26,34 @@ logger = logging.getLogger(__name__)
ADMIN_KANIDM_GROUPS = ["sp.admin"]
class KanidmQueryError(Exception):
"""Error occurred during kanidm query"""
@staticmethod
def get_error_message() -> str:
return "An error occurred during the Kanidm query."
class KanidmReturnEmptyResponse(Exception):
"""Kanidm returned a blank response"""
@staticmethod
def get_error_message() -> str:
return "Kanidm returned an empty response."
class KanidmDidNotReturnAdminPassword(Exception):
"""Kanidm didn't return the admin password"""
@staticmethod
def get_error_message() -> str:
return "Kanidm didn't return the admin password."
class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
@staticmethod
def get() -> str:
kanidm_admin_token = redis.get("kanidm:token")
kanidm_admin_token = str(redis.get("kanidm:token"))
if kanidm_admin_token is None:
kanidm_admin_password = (
@ -80,9 +105,12 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
)
match = re.search(r'"password":"([^"]+)"', output)
new_kanidm_admin_password = match.group(
1
) # we have many not json strings in output
if match:
new_kanidm_admin_password = match.group(
1
) # we have many not json strings in output
else:
raise KanidmDidNotReturnAdminPassword
return new_kanidm_admin_password
@ -91,14 +119,10 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
redis.delete("kanidm:token")
class KanidmQueryError(Exception):
"""Error occurred during kanidm query"""
class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def _check_user_origin_by_memberof(
memberof: Optional[list[str]] = None,
memberof: list[str] = [],
) -> UserDataUserOrigin:
if sorted(memberof) == sorted(ADMIN_KANIDM_GROUPS):
return UserDataUserOrigin.PRIMARY
@ -166,7 +190,7 @@ class KanidmUserRepository(AbstractUserRepository):
if memberof:
data["attrs"]["memberof"] = memberof
return KanidmUserRepository._send_query(
KanidmUserRepository._send_query(
endpoint="person",
method="POST",
data=data,
@ -183,6 +207,9 @@ class KanidmUserRepository(AbstractUserRepository):
"""
users_data = KanidmUserRepository._send_query(endpoint="person", method="GET")
if not users_data or "attrs" not in users_data:
raise KanidmReturnEmptyResponse
users = []
for user in users_data:
user_attrs = user.get("attrs", {})
@ -194,9 +221,9 @@ class KanidmUserRepository(AbstractUserRepository):
continue
filled_user = UserDataUser(
username=user_attrs.get("name", [None])[0],
displayname=user_attrs.get("displayname", [None])[0],
email=user_attrs.get("mail", [None])[0],
username=user_attrs.get("name", [None]),
displayname=user_attrs.get("displayname", [None]),
email=user_attrs.get("mail", [None]),
ssh_keys=[], # actions layer will full in this field
user_type=user_type,
directmemberof=user_attrs.get("directmemberof", []),
@ -209,9 +236,7 @@ class KanidmUserRepository(AbstractUserRepository):
@staticmethod
def delete_user(username: str) -> None:
"""Deletes an existing user"""
return KanidmUserRepository._send_query(
endpoint=f"person/{username}", method="DELETE"
)
KanidmUserRepository._send_query(endpoint=f"person/{username}", method="DELETE")
@staticmethod
def update_user(
@ -243,7 +268,7 @@ class KanidmUserRepository(AbstractUserRepository):
if memberof:
data["attrs"]["memberof"] = memberof
return KanidmUserRepository._send_query(
KanidmUserRepository._send_query(
endpoint=f"person/{username}",
method="PATCH",
data=data,
@ -258,14 +283,14 @@ class KanidmUserRepository(AbstractUserRepository):
)
if not user_data or "attrs" not in user_data:
return None
raise KanidmReturnEmptyResponse
attrs = user_data["attrs"]
return UserDataUser(
username=attrs.get("name", [None])[0],
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
username=attrs.get("name", [None]),
displayname=attrs.get("displayname", [None]),
email=attrs.get("mail", [None]),
ssh_keys=[], # actions layer will full in this field
user_type=KanidmUserRepository._check_user_origin_by_memberof(
memberof=attrs.get("memberof", [])
@ -280,10 +305,17 @@ class KanidmUserRepository(AbstractUserRepository):
Do not reset the password, just generate a link to reset the password.
Not implemented in JsonUserRepository.
"""
token_information = KanidmUserRepository._send_query(
data = KanidmUserRepository._send_query(
endpoint=f"person/{username}/_credential/_update_intent",
method="GET",
)
token_information = json.loads(token_information)
return f"https://id{get_domain()}/ui/reset?token={token_information['token']}"
if not data or "attrs" not in data:
raise KanidmReturnEmptyResponse
token = data["attrs"].get["token", [None][0]]
if token:
return f"https://id{get_domain()}/ui/reset?token={token}"
raise NoPasswordResetLinkFoundInResponse