mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-30 20:56:39 +00:00
tests: fix
This commit is contained in:
parent
413521463b
commit
be545a71df
|
@ -142,8 +142,6 @@ class KanidmUserRepository(AbstractUserRepository):
|
||||||
KanidmReturnUnknownResponseType: If the response data is not of the expected type.
|
KanidmReturnUnknownResponseType: If the response data is not of the expected type.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
logging.info(response_data)
|
|
||||||
|
|
||||||
if not response_data and response_data is None:
|
if not response_data and response_data is None:
|
||||||
raise KanidmReturnEmptyResponse
|
raise KanidmReturnEmptyResponse
|
||||||
|
|
||||||
|
@ -216,16 +214,18 @@ class KanidmUserRepository(AbstractUserRepository):
|
||||||
raise KanidmQueryError(error_text=str(error))
|
raise KanidmQueryError(error_text=str(error))
|
||||||
|
|
||||||
response_data = response.json()
|
response_data = response.json()
|
||||||
logger.info(str(response))
|
|
||||||
|
|
||||||
if response.status_code != 200:
|
if response.status_code != 200:
|
||||||
if isinstance(response_data, dict):
|
if isinstance(response_data, dict):
|
||||||
plugin_error = response_data.get("plugin", {})
|
plugin_error = response_data.get("plugin", {})
|
||||||
if plugin_error.get("attrunique") == "duplicate value detected":
|
if plugin_error.get("attrunique") == "duplicate value detected":
|
||||||
raise UserAlreadyExists # TODO only user ?
|
raise UserAlreadyExists # does it work only for user? NO ONE KNOWS
|
||||||
|
|
||||||
if isinstance(response_data, str) and response_data == "nomatchingentries":
|
if isinstance(response_data, str):
|
||||||
raise UserNotFound # does it work only for user?
|
if response_data == "nomatchingentries":
|
||||||
|
raise UserNotFound # does it work only for user? hate kanidm's response
|
||||||
|
elif response_data == "accessdenied":
|
||||||
|
raise KanidmQueryError(error_text="Kanidm access issue")
|
||||||
|
|
||||||
logger.error(f"Kanidm query error: {response.text}")
|
logger.error(f"Kanidm query error: {response.text}")
|
||||||
raise KanidmQueryError(error_text=response.text)
|
raise KanidmQueryError(error_text=response.text)
|
||||||
|
|
|
@ -3,9 +3,6 @@
|
||||||
import pytest
|
import pytest
|
||||||
from typing import Optional
|
from typing import Optional
|
||||||
|
|
||||||
from selfprivacy_api.graphql.mutations.system_mutations import SystemMutations
|
|
||||||
from selfprivacy_api.graphql.queries.system import System
|
|
||||||
|
|
||||||
# only allowed in fixtures and utils
|
# only allowed in fixtures and utils
|
||||||
from selfprivacy_api.actions.ssh import remove_ssh_key, get_ssh_settings
|
from selfprivacy_api.actions.ssh import remove_ssh_key, get_ssh_settings
|
||||||
from selfprivacy_api.actions.users import get_users, UserDataUserOrigin
|
from selfprivacy_api.actions.users import get_users, UserDataUserOrigin
|
||||||
|
@ -17,7 +14,7 @@ from tests.test_graphql.common import (
|
||||||
get_data,
|
get_data,
|
||||||
assert_errorcode,
|
assert_errorcode,
|
||||||
)
|
)
|
||||||
from tests.test_graphql.test_users import API_USERS_INFO
|
from tests.test_graphql.test_users_json_repository import API_USERS_INFO
|
||||||
|
|
||||||
key_users = ["root", "tester", "user1", "user2", "user3"]
|
key_users = ["root", "tester", "user1", "user2", "user3"]
|
||||||
|
|
||||||
|
|
|
@ -1,5 +1,5 @@
|
||||||
from selfprivacy_api.utils import ReadUserData
|
from selfprivacy_api.utils import ReadUserData
|
||||||
from selfprivacy_api.repositories.users.json_user_repository import delete_user
|
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
|
||||||
|
|
||||||
"""
|
"""
|
||||||
A place for user storage tests and other user tests that are not Graphql-specific.
|
A place for user storage tests and other user tests that are not Graphql-specific.
|
||||||
|
@ -11,7 +11,7 @@ from selfprivacy_api.repositories.users.json_user_repository import delete_user
|
||||||
|
|
||||||
|
|
||||||
def test_delete_user_writes_json(generic_userdata):
|
def test_delete_user_writes_json(generic_userdata):
|
||||||
delete_user("user2")
|
JsonUserRepository.delete_user("user2")
|
||||||
with ReadUserData() as data:
|
with ReadUserData() as data:
|
||||||
assert data["users"] == [
|
assert data["users"] == [
|
||||||
{
|
{
|
||||||
|
|
|
@ -14,10 +14,8 @@ from selfprivacy_api.actions.ssh import (
|
||||||
KeyNotFound,
|
KeyNotFound,
|
||||||
UserNotFound,
|
UserNotFound,
|
||||||
)
|
)
|
||||||
from selfprivacy_api.repositories.users.json_user_repository import (
|
from selfprivacy_api.repositories.users.json_user_repository import JsonUserRepository
|
||||||
get_users,
|
|
||||||
get_user_by_username,
|
|
||||||
)
|
|
||||||
from selfprivacy_api.utils import WriteUserData, ReadUserData
|
from selfprivacy_api.utils import WriteUserData, ReadUserData
|
||||||
from selfprivacy_api.models.user import UserDataUserOrigin
|
from selfprivacy_api.models.user import UserDataUserOrigin
|
||||||
|
|
||||||
|
@ -71,7 +69,7 @@ def password_auth_spectrum(request):
|
||||||
|
|
||||||
|
|
||||||
def admin_name() -> Optional[str]:
|
def admin_name() -> Optional[str]:
|
||||||
users = get_users()
|
users = JsonUserRepository.get_users()
|
||||||
for user in users:
|
for user in users:
|
||||||
if user.user_type == UserDataUserOrigin.PRIMARY:
|
if user.user_type == UserDataUserOrigin.PRIMARY:
|
||||||
return user.username
|
return user.username
|
||||||
|
@ -239,18 +237,18 @@ def test_adding_root_key_writes_json(generic_userdata):
|
||||||
|
|
||||||
def test_read_admin_keys_from_json(generic_userdata):
|
def test_read_admin_keys_from_json(generic_userdata):
|
||||||
admin_name = "tester"
|
admin_name = "tester"
|
||||||
assert get_user_by_username(admin_name).ssh_keys == ["ssh-rsa KEY test@pc"]
|
assert JsonUserRepository.get_user_by_username(admin_name).ssh_keys == ["ssh-rsa KEY test@pc"]
|
||||||
new_keys = ["ssh-rsa KEY test@pc", "ssh-ed25519 KEY2 test@pc"]
|
new_keys = ["ssh-rsa KEY test@pc", "ssh-ed25519 KEY2 test@pc"]
|
||||||
|
|
||||||
with WriteUserData() as data:
|
with WriteUserData() as data:
|
||||||
data["sshKeys"] = new_keys
|
data["sshKeys"] = new_keys
|
||||||
|
|
||||||
assert get_user_by_username(admin_name).ssh_keys == new_keys
|
assert JsonUserRepository.get_user_by_username(admin_name).ssh_keys == new_keys
|
||||||
|
|
||||||
with WriteUserData() as data:
|
with WriteUserData() as data:
|
||||||
del data["sshKeys"]
|
del data["sshKeys"]
|
||||||
|
|
||||||
assert get_user_by_username(admin_name).ssh_keys == []
|
assert JsonUserRepository.get_user_by_username(admin_name).ssh_keys == []
|
||||||
|
|
||||||
|
|
||||||
def test_adding_admin_key_writes_json(generic_userdata):
|
def test_adding_admin_key_writes_json(generic_userdata):
|
||||||
|
@ -278,13 +276,13 @@ def test_removing_admin_key_writes_json(generic_userdata):
|
||||||
# generic userdata has a a single admin key
|
# generic userdata has a a single admin key
|
||||||
admin_name = "tester"
|
admin_name = "tester"
|
||||||
|
|
||||||
admin_keys = get_user_by_username(admin_name).ssh_keys
|
admin_keys = JsonUserRepository.get_user_by_username(admin_name).ssh_keys
|
||||||
assert len(admin_keys) == 1
|
assert len(admin_keys) == 1
|
||||||
key1 = admin_keys[0]
|
key1 = admin_keys[0]
|
||||||
key2 = "ssh-rsa MYSUPERKEY admin@pc"
|
key2 = "ssh-rsa MYSUPERKEY admin@pc"
|
||||||
|
|
||||||
create_ssh_key(admin_name, key2)
|
create_ssh_key(admin_name, key2)
|
||||||
admin_keys = get_user_by_username(admin_name).ssh_keys
|
admin_keys = JsonUserRepository.get_user_by_username(admin_name).ssh_keys
|
||||||
assert len(admin_keys) == 2
|
assert len(admin_keys) == 2
|
||||||
|
|
||||||
remove_ssh_key(admin_name, key2)
|
remove_ssh_key(admin_name, key2)
|
||||||
|
@ -303,7 +301,7 @@ def test_remove_admin_key_on_undefined(generic_userdata):
|
||||||
# generic userdata has a a single admin key
|
# generic userdata has a a single admin key
|
||||||
admin_name = "tester"
|
admin_name = "tester"
|
||||||
|
|
||||||
admin_keys = get_user_by_username(admin_name).ssh_keys
|
admin_keys = JsonUserRepository.get_user_by_username(admin_name).ssh_keys
|
||||||
assert len(admin_keys) == 1
|
assert len(admin_keys) == 1
|
||||||
key1 = admin_keys[0]
|
key1 = admin_keys[0]
|
||||||
|
|
||||||
|
@ -312,7 +310,7 @@ def test_remove_admin_key_on_undefined(generic_userdata):
|
||||||
|
|
||||||
with pytest.raises(KeyNotFound):
|
with pytest.raises(KeyNotFound):
|
||||||
remove_ssh_key(admin_name, key1)
|
remove_ssh_key(admin_name, key1)
|
||||||
admin_keys = get_user_by_username(admin_name).ssh_keys
|
admin_keys = JsonUserRepository.get_user_by_username(admin_name).ssh_keys
|
||||||
assert len(admin_keys) == 0
|
assert len(admin_keys) == 0
|
||||||
|
|
||||||
|
|
||||||
|
@ -331,20 +329,20 @@ def find_user_index_in_json_users(users: list, username: str) -> Optional[int]:
|
||||||
@pytest.mark.parametrize("username", regular_users)
|
@pytest.mark.parametrize("username", regular_users)
|
||||||
def test_read_user_keys_from_json(generic_userdata, username):
|
def test_read_user_keys_from_json(generic_userdata, username):
|
||||||
old_keys = [f"ssh-rsa KEY {username}@pc"]
|
old_keys = [f"ssh-rsa KEY {username}@pc"]
|
||||||
assert get_user_by_username(username).ssh_keys == old_keys
|
assert JsonUserRepository.get_user_by_username(username).ssh_keys == old_keys
|
||||||
new_keys = ["ssh-rsa KEY test@pc", "ssh-ed25519 KEY2 test@pc"]
|
new_keys = ["ssh-rsa KEY test@pc", "ssh-ed25519 KEY2 test@pc"]
|
||||||
|
|
||||||
with WriteUserData() as data:
|
with WriteUserData() as data:
|
||||||
user_index = find_user_index_in_json_users(data["users"], username)
|
user_index = find_user_index_in_json_users(data["users"], username)
|
||||||
data["users"][user_index]["sshKeys"] = new_keys
|
data["users"][user_index]["sshKeys"] = new_keys
|
||||||
|
|
||||||
assert get_user_by_username(username).ssh_keys == new_keys
|
assert JsonUserRepository.get_user_by_username(username).ssh_keys == new_keys
|
||||||
|
|
||||||
with WriteUserData() as data:
|
with WriteUserData() as data:
|
||||||
user_index = find_user_index_in_json_users(data["users"], username)
|
user_index = find_user_index_in_json_users(data["users"], username)
|
||||||
del data["users"][user_index]["sshKeys"]
|
del data["users"][user_index]["sshKeys"]
|
||||||
|
|
||||||
assert get_user_by_username(username).ssh_keys == []
|
assert JsonUserRepository.get_user_by_username(username).ssh_keys == []
|
||||||
|
|
||||||
# deeper deletions are for user getter tests, not here
|
# deeper deletions are for user getter tests, not here
|
||||||
|
|
||||||
|
@ -376,13 +374,13 @@ def test_adding_user_key_writes_json(generic_userdata, username):
|
||||||
def test_removing_user_key_writes_json(generic_userdata, username):
|
def test_removing_user_key_writes_json(generic_userdata, username):
|
||||||
# generic userdata has a a single user key
|
# generic userdata has a a single user key
|
||||||
|
|
||||||
user_keys = get_user_by_username(username).ssh_keys
|
user_keys = JsonUserRepository.get_user_by_username(username).ssh_keys
|
||||||
assert len(user_keys) == 1
|
assert len(user_keys) == 1
|
||||||
key1 = user_keys[0]
|
key1 = user_keys[0]
|
||||||
key2 = "ssh-rsa MYSUPERKEY admin@pc"
|
key2 = "ssh-rsa MYSUPERKEY admin@pc"
|
||||||
|
|
||||||
create_ssh_key(username, key2)
|
create_ssh_key(username, key2)
|
||||||
user_keys = get_user_by_username(username).ssh_keys
|
user_keys = JsonUserRepository.get_user_by_username(username).ssh_keys
|
||||||
assert len(user_keys) == 2
|
assert len(user_keys) == 2
|
||||||
|
|
||||||
remove_ssh_key(username, key2)
|
remove_ssh_key(username, key2)
|
||||||
|
@ -402,7 +400,7 @@ def test_removing_user_key_writes_json(generic_userdata, username):
|
||||||
@pytest.mark.parametrize("username", regular_users)
|
@pytest.mark.parametrize("username", regular_users)
|
||||||
def test_remove_user_key_on_undefined(generic_userdata, username):
|
def test_remove_user_key_on_undefined(generic_userdata, username):
|
||||||
# generic userdata has a a single user key
|
# generic userdata has a a single user key
|
||||||
user_keys = get_user_by_username(username).ssh_keys
|
user_keys = JsonUserRepository.get_user_by_username(username).ssh_keys
|
||||||
assert len(user_keys) == 1
|
assert len(user_keys) == 1
|
||||||
key1 = user_keys[0]
|
key1 = user_keys[0]
|
||||||
|
|
||||||
|
@ -413,7 +411,7 @@ def test_remove_user_key_on_undefined(generic_userdata, username):
|
||||||
with pytest.raises(KeyNotFound):
|
with pytest.raises(KeyNotFound):
|
||||||
remove_ssh_key(username, key1)
|
remove_ssh_key(username, key1)
|
||||||
|
|
||||||
user_keys = get_user_by_username(username).ssh_keys
|
user_keys = JsonUserRepository.get_user_by_username(username).ssh_keys
|
||||||
assert len(user_keys) == 0
|
assert len(user_keys) == 0
|
||||||
|
|
||||||
with WriteUserData() as data:
|
with WriteUserData() as data:
|
||||||
|
|
Loading…
Reference in a new issue