feat: add DEFAULT_GROUPS ignoring

This commit is contained in:
dettlaff 2025-01-18 17:44:31 +04:00
parent df23a31a01
commit 482d48d923

View file

@ -28,13 +28,14 @@ from selfprivacy_api.repositories.users.abstract_user_repository import (
AbstractUserRepository, AbstractUserRepository,
) )
DOMAIN = get_domain()
REDIS_TOKEN_KEY = "kanidm:token" REDIS_TOKEN_KEY = "kanidm:token"
redis = RedisPool().get_connection()
KANIDM_URL = "https://127.0.0.1:3013" KANIDM_URL = "https://127.0.0.1:3013"
ADMIN_GROUPS = ["sp.admins"] ADMIN_GROUPS = ["sp.admins"]
DEFAULT_GROUPS = [f"idm_all_persons@{DOMAIN}", f"idm_all_accounts@{DOMAIN}"]
redis = RedisPool().get_connection()
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@ -62,15 +63,15 @@ class KanidmAdminToken:
@staticmethod @staticmethod
def get() -> str: def get() -> str:
kanidm_admin_token = str(redis.get(REDIS_TOKEN_KEY)) kanidm_admin_token = redis.get(REDIS_TOKEN_KEY)
if kanidm_admin_token is None or not KanidmAdminToken._is_token_valid( if kanidm_admin_token:
kanidm_admin_token if KanidmAdminToken._is_token_valid(kanidm_admin_token): # type: ignore
): return kanidm_admin_token # type: ignore
kanidm_admin_password = (
KanidmAdminToken._reset_and_save_idm_admin_password()
)
logging.warning("Kanidm admin token is missing or invalid. Regenerating.")
kanidm_admin_password = KanidmAdminToken._reset_and_save_idm_admin_password()
kanidm_admin_token = KanidmAdminToken._create_and_save_token( kanidm_admin_token = KanidmAdminToken._create_and_save_token(
kanidm_admin_password=kanidm_admin_password kanidm_admin_password=kanidm_admin_password
) )
@ -511,14 +512,17 @@ class KanidmUserRepository(AbstractUserRepository):
attrs = user_data["attrs"] # type: ignore attrs = user_data["attrs"] # type: ignore
directmemberof = [item for item in attrs.get("directmemberof", []) if item not in DEFAULT_GROUPS]
memberof = [item for item in attrs.get("memberof", []) if item not in DEFAULT_GROUPS]
return UserDataUser( return UserDataUser(
username=attrs["name"][0], username=attrs["name"][0],
user_type=KanidmUserRepository._check_user_origin_by_memberof( user_type=KanidmUserRepository._check_user_origin_by_memberof(
memberof=attrs.get("memberof", []) memberof=attrs.get("memberof", [])
), ),
ssh_keys=[], # Actions layer will fill this field ssh_keys=[], # Actions layer will fill this field
directmemberof=attrs.get("directmemberof", []), directmemberof=directmemberof,
memberof=attrs.get("memberof", []), memberof=memberof,
displayname=attrs.get("displayname", [None])[0], displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0], email=attrs.get("mail", [None])[0],
) )