feat: add legacy support

This commit is contained in:
dettlaff 2024-12-01 17:29:29 +04:00
parent 8d4639fd78
commit e546ef00e9
6 changed files with 49 additions and 36 deletions

View file

@ -135,30 +135,22 @@ def remove_ssh_key(username: str, ssh_key: str):
raise UserNotFound()
# def get_ssh_keys(username: str) -> list:
# with ReadUserData() as data:
# ensure_ssh_and_users_fields_exist(data)
def get_ssh_keys(username: str) -> list:
"""Get all SSH keys for a user"""
# if username == "root":
# if ssh_key in data["ssh"]["rootKeys"]:
# data["ssh"]["rootKeys"].remove(ssh_key)
# return
with ReadUserData() as data:
ensure_ssh_and_users_fields_exist(data)
# raise KeyNotFound()
if username == "root":
return data["ssh"]["rootKeys"]
# if username == data["username"]:
# if ssh_key in data["sshKeys"]:
# data["sshKeys"].remove(ssh_key)
# return
if username == data["username"]:
return data["sshKeys"]
# raise KeyNotFound()
for user in data["users"]:
if user["username"] == username:
if "sshKeys" in user:
return user["sshKeys"]
return []
# for user in data["users"]:
# if user["username"] == username:
# if "sshKeys" not in user:
# user["sshKeys"] = []
# if ssh_key in user["sshKeys"]:
# user["sshKeys"].remove(ssh_key)
# return
# raise UserNotFound()
raise UserNotFound()

View file

@ -3,9 +3,10 @@
import re
from typing import Optional
from selfprivacy_api.models.user import UserDataUser
from selfprivacy_api.models.user import UserDataUser, UserDataUserOrigin
from selfprivacy_api.utils import is_username_forbidden
from selfprivacy_api.actions.ssh import get_ssh_keys
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
from selfprivacy_api.repositories.users import ACTIVE_USERS_PROVIDER
@ -13,6 +14,7 @@ from selfprivacy_api.repositories.users.exceptions import (
UsernameForbidden,
UsernameNotAlphanumeric,
UsernameTooLong,
UserNotFound,
)
@ -24,8 +26,22 @@ def get_users(
exclude_primary=exclude_primary, exclude_root=exclude_root
)
# for user in users:
# TODO: take ssh keys if ACTIVE_USERS_PROVIDER is KanidmUserRepository
if ACTIVE_USERS_PROVIDER != JsonUserRepository:
for user in users:
try:
user.ssh_keys = get_ssh_keys(user.username)
except UserNotFound:
pass
if not exclude_root:
users.append(
UserDataUser(
username="root",
origin=UserDataUserOrigin.ROOT,
ssh_keys=get_ssh_keys(user.username),
)
)
return users
@ -91,5 +107,17 @@ def update_user(
def get_user_by_username(username: str) -> Optional[UserDataUser]:
user = ACTIVE_USERS_PROVIDER.get_user_by_username(username=username)
# TODO: take ssh keys if ACTIVE_USERS_PROVIDER is KanidmUserRepository
if ACTIVE_USERS_PROVIDER != JsonUserRepository:
if username == "root":
return UserDataUser(
username="root",
origin=UserDataUserOrigin.ROOT,
ssh_keys=get_ssh_keys(user.username),
)
try:
user.ssh_keys = get_ssh_keys(user)
except UserNotFound:
pass
return user

View file

@ -26,7 +26,6 @@ class User:
user_type: UserType
displayname: Optional[str] = None
ssh_keys: list[str] = strawberry.field(default_factory=list)
uuid: Optional[str] = None
email: Optional[str] = None
directmemberof: Optional[list[str]] = strawberry.field(default_factory=list)
memberof: Optional[list[str]] = strawberry.field(default_factory=list)
@ -49,7 +48,6 @@ def get_user_by_username(username: str) -> Optional[User]:
user_type=UserType(user.origin.value),
username=user.username,
ssh_keys=user.ssh_keys,
uuid=user.uuid,
displayname=(user.displayname if user.displayname else user.username),
email=user.email,
directmemberof=user.directmemberof,
@ -65,7 +63,6 @@ def get_users() -> list[User]:
user_type=UserType(user.origin.value),
username=user.username,
ssh_keys=user.ssh_keys,
uuid=user.uuid,
displayname=(user.displayname if user.displayname else user.username),
email=user.email,
directmemberof=user.directmemberof,

View file

@ -44,7 +44,6 @@ class UserMutationInput:
password: Optional[str] = None
displayname: Optional[str] = None
email: Optional[str] = None
uuid: Optional[str] = None
directmemberof: Optional[list[str]] = strawberry.field(default_factory=list)
memberof: Optional[list[str]] = strawberry.field(default_factory=list)

View file

@ -20,7 +20,6 @@ class UserDataUser(BaseModel):
displayname: Optional[
str
] # in logic graphql will return "username" if "displayname" None
uuid: Optional[str]
email: Optional[str]
ssh_keys: Optional[list[str]]
directmemberof: Optional[list[str]]

View file

@ -4,6 +4,7 @@ import subprocess
import requests
import re
import logging
import json
from selfprivacy_api.utils import get_domain, temporary_env_var
from selfprivacy_api.utils.redis_pool import RedisPool
@ -185,7 +186,6 @@ class KanidmUserRepository(AbstractUserRepository):
continue
user_type = UserDataUser(
uuid=attrs.get("uuid", [None])[0],
username=attrs.get("name", [None])[0],
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
@ -255,7 +255,6 @@ class KanidmUserRepository(AbstractUserRepository):
attrs = user_data["attrs"]
return UserDataUser(
uuid=attrs.get("uuid", [None])[0],
username=attrs.get("name", [None])[0],
displayname=attrs.get("displayname", [None])[0],
email=attrs.get("mail", [None])[0],
@ -277,7 +276,6 @@ class KanidmUserRepository(AbstractUserRepository):
endpoint=f"person/{username}/_credential/_update_intent",
method="GET",
)
token_information = json.loads(token_information)
# {"token":"3btDa-sR5yX-q2XqZ-68gRq","expiry_time":1732713745}
# TODO: create link
return token_information
return f"https://id{get_domain()}/ui/reset?token={token_information['token']}"