mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-19 15:26:41 +00:00
702 lines
20 KiB
Python
702 lines
20 KiB
Python
# pylint: disable=redefined-outer-name
|
|
# pylint: disable=unused-argument
|
|
import pytest
|
|
|
|
from tests.common import (
|
|
generate_users_query,
|
|
read_json,
|
|
)
|
|
from selfprivacy_api.utils import WriteUserData
|
|
from tests.test_graphql.common import (
|
|
assert_empty,
|
|
assert_errorcode,
|
|
assert_ok,
|
|
get_data,
|
|
)
|
|
|
|
invalid_usernames = [
|
|
"messagebus",
|
|
"postfix",
|
|
"polkituser",
|
|
"dovecot2",
|
|
"dovenull",
|
|
"nginx",
|
|
"postgres",
|
|
"systemd-journal-gateway",
|
|
"prosody",
|
|
"systemd-network",
|
|
"systemd-resolve",
|
|
"systemd-timesync",
|
|
"opendkim",
|
|
"rspamd",
|
|
"sshd",
|
|
"selfprivacy-api",
|
|
"restic",
|
|
"redis",
|
|
"pleroma",
|
|
"ocserv",
|
|
"nextcloud",
|
|
"memcached",
|
|
"knot-resolver",
|
|
"gitea",
|
|
"bitwarden_rs",
|
|
"vaultwarden",
|
|
"acme",
|
|
"virtualMail",
|
|
"nixbld1",
|
|
"nixbld2",
|
|
"nixbld29",
|
|
"nobody",
|
|
]
|
|
|
|
|
|
## FIXTURES ###################################################
|
|
|
|
|
|
@pytest.fixture
|
|
def no_users(mocker, datadir):
|
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_users.json")
|
|
assert read_json(datadir / "no_users.json")["users"] == []
|
|
return datadir
|
|
|
|
|
|
@pytest.fixture
|
|
def one_user(mocker, datadir):
|
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "one_user.json")
|
|
assert read_json(datadir / "one_user.json")["users"] == [
|
|
{
|
|
"username": "user1",
|
|
"hashedPassword": "HASHED_PASSWORD_1",
|
|
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
|
}
|
|
]
|
|
return datadir
|
|
|
|
|
|
@pytest.fixture
|
|
def some_users(mocker, datadir):
|
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json")
|
|
assert read_json(datadir / "some_users.json")["users"] == [
|
|
{
|
|
"username": "user1",
|
|
"hashedPassword": "HASHED_PASSWORD_1",
|
|
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
|
},
|
|
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
|
|
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
|
|
]
|
|
return datadir
|
|
|
|
|
|
@pytest.fixture
|
|
def undefined_settings(mocker, datadir):
|
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
|
|
assert "users" not in read_json(datadir / "undefined.json")
|
|
return datadir
|
|
|
|
|
|
@pytest.fixture
|
|
def no_users_no_admin_nobody(undefined_settings):
|
|
datadir = undefined_settings
|
|
with WriteUserData() as data:
|
|
del data["username"]
|
|
del data["sshKeys"]
|
|
return datadir
|
|
|
|
|
|
class ProcessMock:
|
|
"""Mock subprocess.Popen"""
|
|
|
|
def __init__(self, args, **kwargs):
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def communicate(): # pylint: disable=no-method-argument
|
|
return (b"NEW_HASHED", None)
|
|
|
|
returncode = 0
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_subprocess_popen(mocker):
|
|
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
|
|
return mock
|
|
|
|
|
|
## TESTS ######################################################
|
|
|
|
API_USERS_INFO = """
|
|
allUsers {
|
|
username
|
|
sshKeys
|
|
}
|
|
"""
|
|
|
|
|
|
def api_all_users(authorized_client):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": generate_users_query([API_USERS_INFO]),
|
|
},
|
|
)
|
|
output = get_data(response)["users"]["allUsers"]
|
|
return output
|
|
|
|
|
|
def test_graphql_get_users_unauthorized(client, some_users, mock_subprocess_popen):
|
|
"""Test wrong auth"""
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": generate_users_query([API_USERS_INFO]),
|
|
},
|
|
)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_graphql_get_some_users(authorized_client, some_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": generate_users_query([API_USERS_INFO]),
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
assert len(response.json()["data"]["users"]["allUsers"]) == 4
|
|
assert response.json()["data"]["users"]["allUsers"][0]["username"] == "user1"
|
|
assert response.json()["data"]["users"]["allUsers"][0]["sshKeys"] == [
|
|
"ssh-rsa KEY user1@pc"
|
|
]
|
|
|
|
assert response.json()["data"]["users"]["allUsers"][1]["username"] == "user2"
|
|
assert response.json()["data"]["users"]["allUsers"][1]["sshKeys"] == []
|
|
|
|
assert response.json()["data"]["users"]["allUsers"][3]["username"] == "tester"
|
|
assert response.json()["data"]["users"]["allUsers"][3]["sshKeys"] == [
|
|
"ssh-rsa KEY test@pc"
|
|
]
|
|
|
|
|
|
def test_graphql_get_no_users(authorized_client, no_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": generate_users_query([API_USERS_INFO]),
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["allUsers"]) == 1
|
|
assert response.json()["data"]["users"]["allUsers"][0]["username"] == "tester"
|
|
assert response.json()["data"]["users"]["allUsers"][0]["sshKeys"] == [
|
|
"ssh-rsa KEY test@pc"
|
|
]
|
|
|
|
|
|
def test_graphql_get_users_undefined_but_admin(authorized_client, undefined_settings):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": generate_users_query([API_USERS_INFO]),
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["allUsers"]) == 1
|
|
assert response.json()["data"]["users"]["allUsers"][0]["username"] == "tester"
|
|
assert response.json()["data"]["users"]["allUsers"][0]["sshKeys"] == [
|
|
"ssh-rsa KEY test@pc"
|
|
]
|
|
|
|
|
|
def test_graphql_get_users_undefined_no_admin(
|
|
authorized_client, no_users_no_admin_nobody
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": generate_users_query([API_USERS_INFO]),
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["allUsers"]) == 0
|
|
|
|
|
|
API_GET_USERS = """
|
|
query TestUsers($username: String!) {
|
|
users {
|
|
getUser(username: $username) {
|
|
sshKeys
|
|
username
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_popen):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "user1",
|
|
},
|
|
},
|
|
)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "user1",
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["getUser"]) == 2
|
|
assert response.json()["data"]["users"]["getUser"]["username"] == "user1"
|
|
assert response.json()["data"]["users"]["getUser"]["sshKeys"] == [
|
|
"ssh-rsa KEY user1@pc"
|
|
]
|
|
|
|
|
|
def test_graphql_get_some_user_undefined(authorized_client, undefined_settings):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "user1",
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["getUser"] is None
|
|
|
|
|
|
def test_graphql_get_some_user(authorized_client, some_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "user2",
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["getUser"]) == 2
|
|
assert response.json()["data"]["users"]["getUser"]["username"] == "user2"
|
|
assert response.json()["data"]["users"]["getUser"]["sshKeys"] == []
|
|
|
|
|
|
def test_graphql_get_root_user(authorized_client, some_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "root",
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["getUser"]) == 2
|
|
assert response.json()["data"]["users"]["getUser"]["username"] == "root"
|
|
assert response.json()["data"]["users"]["getUser"]["sshKeys"] == [
|
|
"ssh-ed25519 KEY test@pc"
|
|
]
|
|
|
|
|
|
def test_graphql_get_main_user(authorized_client, one_user, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "tester",
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert len(response.json()["data"]["users"]["getUser"]) == 2
|
|
assert response.json()["data"]["users"]["getUser"]["username"] == "tester"
|
|
assert response.json()["data"]["users"]["getUser"]["sshKeys"] == [
|
|
"ssh-rsa KEY test@pc"
|
|
]
|
|
|
|
|
|
def test_graphql_get_nonexistent_user(
|
|
authorized_client, one_user, mock_subprocess_popen
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_GET_USERS,
|
|
"variables": {
|
|
"username": "tyler_durden",
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["getUser"] is None
|
|
|
|
|
|
API_CREATE_USERS_MUTATION = """
|
|
mutation createUser($user: UserMutationInput!) {
|
|
users {
|
|
createUser(user: $user) {
|
|
success
|
|
message
|
|
code
|
|
user {
|
|
username
|
|
sshKeys
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def api_add_user_json(authorized_client, user_json: dict):
|
|
# lowlevel for deeper testing of edgecases
|
|
return authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_CREATE_USERS_MUTATION,
|
|
"variables": {
|
|
"user": user_json,
|
|
},
|
|
},
|
|
)
|
|
|
|
|
|
def api_add_user(authorized_client, username, password):
|
|
response = api_add_user_json(
|
|
authorized_client, {"username": username, "password": password}
|
|
)
|
|
output = get_data(response)["users"]["createUser"]
|
|
return output
|
|
|
|
|
|
def test_graphql_add_user_unauthorized(client, one_user, mock_subprocess_popen):
|
|
response = api_add_user_json(client, {"username": "user2", "password": "12345678"})
|
|
assert_empty(response)
|
|
|
|
|
|
def test_graphql_add_user(authorized_client, one_user, mock_subprocess_popen):
|
|
output = api_add_user(authorized_client, "user2", password="12345678")
|
|
assert_ok(output, code=201)
|
|
|
|
assert output["user"]["username"] == "user2"
|
|
assert output["user"]["sshKeys"] == []
|
|
|
|
|
|
def test_graphql_add_user_when_undefined_settings(
|
|
authorized_client, undefined_settings, mock_subprocess_popen
|
|
):
|
|
output = api_add_user(authorized_client, "user2", password="12345678")
|
|
assert_ok(output, code=201)
|
|
|
|
assert output["user"]["username"] == "user2"
|
|
assert output["user"]["sshKeys"] == []
|
|
|
|
|
|
users_witn_empty_fields = [
|
|
{"username": "user2", "password": ""},
|
|
{"username": "", "password": "12345678"},
|
|
{"username": "", "password": ""},
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("user_json", users_witn_empty_fields)
|
|
def test_graphql_add_with_empty_fields(authorized_client, one_user, user_json):
|
|
response = api_add_user_json(authorized_client, user_json)
|
|
output = get_data(response)["users"]["createUser"]
|
|
|
|
assert_errorcode(output, 400)
|
|
assert output["user"] is None
|
|
|
|
|
|
users_witn_undefined_fields = [
|
|
{"username": "user2"},
|
|
{"password": "12345678"},
|
|
{},
|
|
]
|
|
|
|
|
|
@pytest.mark.parametrize("user_json", users_witn_undefined_fields)
|
|
def test_graphql_add_with_undefined_fields(authorized_client, one_user, user_json):
|
|
# checking that all fields are mandatory
|
|
response = api_add_user_json(authorized_client, user_json)
|
|
|
|
assert response.json()["errors"] is not None
|
|
assert response.json()["errors"] != []
|
|
|
|
|
|
@pytest.mark.parametrize("username", invalid_usernames)
|
|
def test_graphql_add_system_username(
|
|
authorized_client, one_user, mock_subprocess_popen, username
|
|
):
|
|
output = api_add_user(authorized_client, username, password="12345678")
|
|
|
|
assert_errorcode(output, code=409)
|
|
assert output["user"] is None
|
|
|
|
|
|
def test_graphql_add_existing_user(authorized_client, one_user):
|
|
output = api_add_user(authorized_client, "user1", password="12345678")
|
|
|
|
assert_errorcode(output, code=409)
|
|
assert output["user"]["username"] == "user1"
|
|
assert output["user"]["sshKeys"][0] == "ssh-rsa KEY user1@pc"
|
|
|
|
|
|
def test_graphql_add_main_user(authorized_client, one_user):
|
|
output = api_add_user(authorized_client, "tester", password="12345678")
|
|
|
|
assert_errorcode(output, code=409)
|
|
assert output["user"]["username"] == "tester"
|
|
assert output["user"]["sshKeys"][0] == "ssh-rsa KEY test@pc"
|
|
|
|
|
|
def test_graphql_add_user_when_no_admin_defined(
|
|
authorized_client, no_users_no_admin_nobody
|
|
):
|
|
output = api_add_user(authorized_client, "tester", password="12345678")
|
|
|
|
assert_errorcode(output, code=400)
|
|
assert output["user"] is None
|
|
|
|
|
|
def test_graphql_add_long_username(authorized_client, one_user, mock_subprocess_popen):
|
|
output = api_add_user(authorized_client, "a" * 32, password="12345678")
|
|
|
|
assert_errorcode(output, code=400)
|
|
assert output["user"] is None
|
|
|
|
|
|
# TODO: maybe make a username generating function to make a more comprehensive invalid username test
|
|
@pytest.mark.parametrize(
|
|
"username", ["", "1", "фыр", "user1@", "^-^", "№:%##$^&@$&^()_"]
|
|
)
|
|
def test_graphql_add_invalid_username(
|
|
authorized_client, one_user, mock_subprocess_popen, username
|
|
):
|
|
output = api_add_user(authorized_client, username, password="12345678")
|
|
|
|
assert_errorcode(output, code=400)
|
|
assert output["user"] is None
|
|
|
|
|
|
API_DELETE_USER_MUTATION = """
|
|
mutation deleteUser($username: String!) {
|
|
users {
|
|
deleteUser(username: $username) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_delete_user_unauthorized(client, some_users, mock_subprocess_popen):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DELETE_USER_MUTATION,
|
|
"variables": {"username": "user1"},
|
|
},
|
|
)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_graphql_delete_user(authorized_client, some_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DELETE_USER_MUTATION,
|
|
"variables": {"username": "user1"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["deleteUser"]["code"] == 200
|
|
assert response.json()["data"]["users"]["deleteUser"]["message"] is not None
|
|
assert response.json()["data"]["users"]["deleteUser"]["success"] is True
|
|
|
|
new_users = api_all_users(authorized_client)
|
|
assert len(new_users) == 3
|
|
usernames = [user["username"] for user in new_users]
|
|
assert set(usernames) == set(["user2", "user3", "tester"])
|
|
|
|
|
|
@pytest.mark.parametrize("username", ["", "def"])
|
|
def test_graphql_delete_nonexistent_users(
|
|
authorized_client, some_users, mock_subprocess_popen, username
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DELETE_USER_MUTATION,
|
|
"variables": {"username": username},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["deleteUser"]["code"] == 404
|
|
assert response.json()["data"]["users"]["deleteUser"]["message"] is not None
|
|
assert response.json()["data"]["users"]["deleteUser"]["success"] is False
|
|
|
|
|
|
@pytest.mark.parametrize("username", invalid_usernames)
|
|
def test_graphql_delete_system_users(
|
|
authorized_client, some_users, mock_subprocess_popen, username
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DELETE_USER_MUTATION,
|
|
"variables": {"username": username},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert (
|
|
response.json()["data"]["users"]["deleteUser"]["code"] == 404
|
|
or response.json()["data"]["users"]["deleteUser"]["code"] == 400
|
|
)
|
|
assert response.json()["data"]["users"]["deleteUser"]["message"] is not None
|
|
assert response.json()["data"]["users"]["deleteUser"]["success"] is False
|
|
|
|
|
|
def test_graphql_delete_main_user(authorized_client, some_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DELETE_USER_MUTATION,
|
|
"variables": {"username": "tester"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["deleteUser"]["code"] == 400
|
|
assert response.json()["data"]["users"]["deleteUser"]["message"] is not None
|
|
assert response.json()["data"]["users"]["deleteUser"]["success"] is False
|
|
|
|
|
|
API_UPDATE_USER_MUTATION = """
|
|
mutation updateUser($user: UserMutationInput!) {
|
|
users {
|
|
updateUser(user: $user) {
|
|
success
|
|
message
|
|
code
|
|
user {
|
|
username
|
|
sshKeys
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_update_user_unauthorized(client, some_users, mock_subprocess_popen):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_UPDATE_USER_MUTATION,
|
|
"variables": {
|
|
"user": {
|
|
"username": "user1",
|
|
"password": "12345678",
|
|
},
|
|
},
|
|
},
|
|
)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_graphql_update_user(authorized_client, some_users, mock_subprocess_popen):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_UPDATE_USER_MUTATION,
|
|
"variables": {
|
|
"user": {
|
|
"username": "user1",
|
|
"password": "12345678",
|
|
},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["updateUser"]["code"] == 200
|
|
assert response.json()["data"]["users"]["updateUser"]["message"] is not None
|
|
assert response.json()["data"]["users"]["updateUser"]["success"] is True
|
|
|
|
assert response.json()["data"]["users"]["updateUser"]["user"]["username"] == "user1"
|
|
assert response.json()["data"]["users"]["updateUser"]["user"]["sshKeys"] == [
|
|
"ssh-rsa KEY user1@pc"
|
|
]
|
|
assert mock_subprocess_popen.call_count == 1
|
|
|
|
|
|
def test_graphql_update_nonexistent_user(
|
|
authorized_client, some_users, mock_subprocess_popen
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_UPDATE_USER_MUTATION,
|
|
"variables": {
|
|
"user": {
|
|
"username": "user666",
|
|
"password": "12345678",
|
|
},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["users"]["updateUser"]["code"] == 404
|
|
assert response.json()["data"]["users"]["updateUser"]["message"] is not None
|
|
assert response.json()["data"]["users"]["updateUser"]["success"] is False
|
|
|
|
assert response.json()["data"]["users"]["updateUser"]["user"] is None
|
|
assert mock_subprocess_popen.call_count == 1
|