feat: add groups query

This commit is contained in:
dettlaff 2024-12-12 23:53:41 +04:00
parent 4cddf0e5eb
commit da81bd858c
6 changed files with 90 additions and 10 deletions

View file

@ -6,6 +6,7 @@ import logging
from typing import Optional from typing import Optional
from selfprivacy_api import PLEASE_UPDATE_APP_TEXT from selfprivacy_api import PLEASE_UPDATE_APP_TEXT
from selfprivacy_api.models.group import Group
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
from selfprivacy_api.utils import is_username_forbidden from selfprivacy_api.utils import is_username_forbidden
@ -189,8 +190,8 @@ def generate_password_reset_link(username: str) -> str:
return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username) return ACTIVE_USERS_PROVIDER.generate_password_reset_link(username=username)
def groups_list() -> list: def get_groups() -> list[Group]:
if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository): if isinstance(ACTIVE_USERS_PROVIDER, JsonUserRepository):
raise ApiUsingWrongUserRepository raise ApiUsingWrongUserRepository
return ACTIVE_USERS_PROVIDER.groups_list() return ACTIVE_USERS_PROVIDER.get_groups()

View file

@ -0,0 +1,33 @@
from typing import Optional
import strawberry
from selfprivacy_api.actions.users import get_groups as actions_get_groups
@strawberry.type
class Group:
name: str
group_class: list
member: Optional[list[str]] = strawberry.field(default_factory=list)
memberof: Optional[list[str]] = strawberry.field(default_factory=list)
directmemberof: Optional[list[str]] = strawberry.field(default_factory=list)
spn: Optional[str] = None
description: Optional[str] = None
def get_groups() -> list[Group]:
"""Get groups"""
groups = actions_get_groups()
return [
Group(
name=group.name,
group_class=getattr(group, "group_class", []),
member=getattr(group, "member", []),
memberof=getattr(group, "memberof", []),
directmemberof=getattr(group, "directmemberof", []),
spn=getattr(group, "spn", None),
description=getattr(group, "description", None),
)
for group in groups
]

View file

@ -0,0 +1,23 @@
"""Groups"""
# pylint: disable=too-few-public-methods
import typing
import strawberry
from selfprivacy_api.graphql.common_types.groups import (
Group,
get_groups,
)
from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.actions.users import get_groups as action_get_groups
@strawberry.type
class Groups:
all_groups: typing.List[Group] = strawberry.field(
permission_classes=[IsAuthenticated], resolver=get_groups
)
@strawberry.field(permission_classes=[IsAuthenticated])
def get_groups() -> list:
return action_get_groups()

View file

@ -11,7 +11,6 @@ from selfprivacy_api.graphql.common_types.user import (
) )
from selfprivacy_api.graphql import IsAuthenticated from selfprivacy_api.graphql import IsAuthenticated
from selfprivacy_api.repositories.users.exceptions import UserNotFound from selfprivacy_api.repositories.users.exceptions import UserNotFound
from selfprivacy_api.actions.users import groups_list as action_groups_list
@strawberry.type @strawberry.type
@ -28,7 +27,3 @@ class Users:
all_users: typing.List[User] = strawberry.field( all_users: typing.List[User] = strawberry.field(
permission_classes=[IsAuthenticated], resolver=get_users permission_classes=[IsAuthenticated], resolver=get_users
) )
@strawberry.field(permission_classes=[IsAuthenticated])
def groups_list() -> list:
return action_groups_list()

View file

@ -0,0 +1,13 @@
from typing import Optional
from pydantic import BaseModel
class Group(BaseModel):
name: str
group_class: Optional[list[str]] = []
member: Optional[list[str]] = []
memberof: Optional[list[str]] = []
directmemberof: Optional[list[str]] = []
spn: Optional[str] = None
description: Optional[str] = None

View file

@ -4,6 +4,7 @@ import requests
import re import re
import logging import logging
from selfprivacy_api.models.group import Group
from selfprivacy_api.repositories.users.exceptions import ( from selfprivacy_api.repositories.users.exceptions import (
NoPasswordResetLinkFoundInResponse, NoPasswordResetLinkFoundInResponse,
UserAlreadyExists, UserAlreadyExists,
@ -484,14 +485,28 @@ class KanidmUserRepository(AbstractUserRepository):
raise NoPasswordResetLinkFoundInResponse raise NoPasswordResetLinkFoundInResponse
@staticmethod @staticmethod
def groups_list() -> list: def get_groups() -> list[Group]:
groups_list_data = KanidmUserRepository._send_query( groups_list_data = KanidmUserRepository._send_query(
endpoint="/v1/group", endpoint="/v1/group",
method="GET", method="GET",
) )
KanidmUserRepository._check_response_type_and_not_empty( KanidmUserRepository._check_response_type_and_not_empty(
data_type="list", response_data=groups_list_data data_type="dict", response_data=groups_list_data
) )
return groups_list_data # type: ignore groups = []
for group_data in groups_list_data:
attrs = group_data.get("attrs", {})
group = Group(
name=attrs["name"],
group_class=attrs.get("class", []),
member=attrs.get("member", []),
memberof=attrs.get("memberof", []),
directmemberof=attrs.get("directmemberof", []),
spn=attrs.get("spn", [None])[0],
description=attrs.get("description", [None])[0],
)
groups.append(group)
return groups