mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-02-05 23:50:37 +00:00
feat: add _is_token_valid
This commit is contained in:
parent
a57d65ee18
commit
3b095bba5a
|
@ -33,7 +33,7 @@ redis = RedisPool().get_connection()
|
|||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
|
||||
class KanidmAdminToken:
|
||||
"""
|
||||
Manages the administrative token for Kanidm.
|
||||
|
||||
|
@ -49,13 +49,18 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
|
|||
|
||||
_delete_kanidm_token_from_db() -> None:
|
||||
Deletes the admin token from Redis.
|
||||
|
||||
_is_token_valid() -> bool:
|
||||
Sends a request to kanidm to check the validity of the token.
|
||||
"""
|
||||
|
||||
@staticmethod
|
||||
def get() -> str:
|
||||
kanidm_admin_token = str(redis.get(REDIS_TOKEN_KEY))
|
||||
|
||||
if kanidm_admin_token is None:
|
||||
if kanidm_admin_token is None or not KanidmAdminToken._is_token_valid(
|
||||
kanidm_admin_token
|
||||
):
|
||||
kanidm_admin_password = (
|
||||
KanidmAdminToken._reset_and_save_idm_admin_password()
|
||||
)
|
||||
|
@ -118,6 +123,26 @@ class KanidmAdminToken: # TODO CHECK IS TOKEN CORRECT?
|
|||
|
||||
return new_kanidm_admin_password
|
||||
|
||||
@staticmethod
|
||||
def _is_token_valid(token: str) -> bool:
|
||||
response = requests.get(
|
||||
"{KANIDM_URL}/v1/person/root",
|
||||
headers={
|
||||
"Authorization": f"Bearer {KanidmAdminToken.get()}",
|
||||
"Content-Type": "application/json",
|
||||
},
|
||||
timeout=1,
|
||||
verify=False, # TODO: REMOVE THIS NOT HALAL!!!!!
|
||||
)
|
||||
response_data = response.json()
|
||||
|
||||
# we do not handle the other errors, this is handled by the main function in KanidmUserRepository._send_query
|
||||
if response.status_code != 200:
|
||||
if isinstance(response_data, str) and response_data == "notauthenticated":
|
||||
logger.error("Kanidm token is not valid")
|
||||
return False
|
||||
return True
|
||||
|
||||
@staticmethod
|
||||
def _delete_kanidm_token_from_db() -> None:
|
||||
redis.delete("kanidm:token")
|
||||
|
@ -226,6 +251,10 @@ class KanidmUserRepository(AbstractUserRepository):
|
|||
raise UserNotFound # does it work only for user? hate kanidm's response
|
||||
elif response_data == "accessdenied":
|
||||
raise KanidmQueryError(error_text="Kanidm access issue")
|
||||
elif response_data == "notauthenticated":
|
||||
raise KanidmQueryError(
|
||||
error_text="Failed to get valid Kanidm token"
|
||||
)
|
||||
|
||||
logger.error(f"Kanidm query error: {response.text}")
|
||||
raise KanidmQueryError(error_text=response.text)
|
||||
|
|
|
@ -237,7 +237,9 @@ def test_adding_root_key_writes_json(generic_userdata):
|
|||
|
||||
def test_read_admin_keys_from_json(generic_userdata):
|
||||
admin_name = "tester"
|
||||
assert JsonUserRepository.get_user_by_username(admin_name).ssh_keys == ["ssh-rsa KEY test@pc"]
|
||||
assert JsonUserRepository.get_user_by_username(admin_name).ssh_keys == [
|
||||
"ssh-rsa KEY test@pc"
|
||||
]
|
||||
new_keys = ["ssh-rsa KEY test@pc", "ssh-ed25519 KEY2 test@pc"]
|
||||
|
||||
with WriteUserData() as data:
|
||||
|
|
Loading…
Reference in a new issue