mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-29 07:21:27 +00:00
Add GraphQJ user and ssh management (#12)
Co-authored-by: Inex Code <inex.code@selfprivacy.org> Reviewed-on: https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api/pulls/12 Co-authored-by: def <dettlaff@riseup.net> Co-committed-by: def <dettlaff@riseup.net>
This commit is contained in:
parent
5be240d357
commit
337cf29884
|
@ -1,2 +1,2 @@
|
||||||
[MASTER]
|
[MASTER]
|
||||||
init-hook='import sys; sys.path.append("/path/to/root")'
|
init-hook="from pylint.config import find_pylintrc; import os, sys; sys.path.append(os.path.dirname(find_pylintrc()))"
|
||||||
|
|
|
@ -53,7 +53,6 @@ def create_app(test_config=None):
|
||||||
pass
|
pass
|
||||||
elif request.path.startswith("/auth/recovery_token/use"):
|
elif request.path.startswith("/auth/recovery_token/use"):
|
||||||
pass
|
pass
|
||||||
# TODO: REMOVE THIS
|
|
||||||
elif request.path.startswith("/graphql"):
|
elif request.path.startswith("/graphql"):
|
||||||
pass
|
pass
|
||||||
else:
|
else:
|
||||||
|
|
0
selfprivacy_api/graphql/common_types/__init__.py
Normal file
0
selfprivacy_api/graphql/common_types/__init__.py
Normal file
78
selfprivacy_api/graphql/common_types/user.py
Normal file
78
selfprivacy_api/graphql/common_types/user.py
Normal file
|
@ -0,0 +1,78 @@
|
||||||
|
import typing
|
||||||
|
from enum import Enum
|
||||||
|
import strawberry
|
||||||
|
|
||||||
|
from selfprivacy_api.utils import ReadUserData
|
||||||
|
from selfprivacy_api.graphql.mutations.mutation_interface import (
|
||||||
|
MutationReturnInterface,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.enum
|
||||||
|
class UserType(Enum):
|
||||||
|
NORMAL = "NORMAL"
|
||||||
|
PRIMARY = "PRIMARY"
|
||||||
|
ROOT = "ROOT"
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.type
|
||||||
|
class User:
|
||||||
|
|
||||||
|
user_type: UserType
|
||||||
|
username: str
|
||||||
|
# userHomeFolderspace: UserHomeFolderUsage
|
||||||
|
ssh_keys: typing.List[str] = strawberry.field(default_factory=list)
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.type
|
||||||
|
class UserMutationReturn(MutationReturnInterface):
|
||||||
|
"""Return type for user mutation"""
|
||||||
|
|
||||||
|
user: typing.Optional[User]
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_ssh_and_users_fields_exist(data):
|
||||||
|
if "ssh" not in data:
|
||||||
|
data["ssh"] = []
|
||||||
|
data["ssh"]["rootKeys"] = []
|
||||||
|
|
||||||
|
elif data["ssh"].get("rootKeys") is None:
|
||||||
|
data["ssh"]["rootKeys"] = []
|
||||||
|
|
||||||
|
if "sshKeys" not in data:
|
||||||
|
data["sshKeys"] = []
|
||||||
|
|
||||||
|
if "users" not in data:
|
||||||
|
data["users"] = []
|
||||||
|
|
||||||
|
|
||||||
|
def get_user_by_username(username: str) -> typing.Optional[User]:
|
||||||
|
with ReadUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
if username == "root":
|
||||||
|
return User(
|
||||||
|
user_type=UserType.ROOT,
|
||||||
|
username="root",
|
||||||
|
ssh_keys=data["ssh"]["rootKeys"],
|
||||||
|
)
|
||||||
|
|
||||||
|
if username == data["username"]:
|
||||||
|
return User(
|
||||||
|
user_type=UserType.PRIMARY,
|
||||||
|
username=username,
|
||||||
|
ssh_keys=data["sshKeys"],
|
||||||
|
)
|
||||||
|
|
||||||
|
for user in data["users"]:
|
||||||
|
if user["username"] == username:
|
||||||
|
if "sshKeys" not in user:
|
||||||
|
user["sshKeys"] = []
|
||||||
|
|
||||||
|
return User(
|
||||||
|
user_type=UserType.NORMAL,
|
||||||
|
username=username,
|
||||||
|
ssh_keys=user["sshKeys"],
|
||||||
|
)
|
||||||
|
|
||||||
|
return None
|
54
selfprivacy_api/graphql/mutations/ssh_mutations.py
Normal file
54
selfprivacy_api/graphql/mutations/ssh_mutations.py
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Users management module"""
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
|
|
||||||
|
import strawberry
|
||||||
|
|
||||||
|
from selfprivacy_api.graphql import IsAuthenticated
|
||||||
|
from selfprivacy_api.graphql.mutations.ssh_utils import (
|
||||||
|
create_ssh_key,
|
||||||
|
remove_ssh_key,
|
||||||
|
)
|
||||||
|
from selfprivacy_api.graphql.common_types.user import (
|
||||||
|
UserMutationReturn,
|
||||||
|
get_user_by_username,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.input
|
||||||
|
class SshMutationInput:
|
||||||
|
"""Input type for ssh mutation"""
|
||||||
|
|
||||||
|
username: str
|
||||||
|
ssh_key: str
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.type
|
||||||
|
class SshMutations:
|
||||||
|
"""Mutations ssh"""
|
||||||
|
|
||||||
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||||
|
def add_ssh_key(self, ssh_input: SshMutationInput) -> UserMutationReturn:
|
||||||
|
"""Add a new ssh key"""
|
||||||
|
|
||||||
|
success, message, code = create_ssh_key(ssh_input.username, ssh_input.ssh_key)
|
||||||
|
|
||||||
|
return UserMutationReturn(
|
||||||
|
success=success,
|
||||||
|
message=message,
|
||||||
|
code=code,
|
||||||
|
user=get_user_by_username(ssh_input.username),
|
||||||
|
)
|
||||||
|
|
||||||
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||||
|
def remove_ssh_key(self, ssh_input: SshMutationInput) -> UserMutationReturn:
|
||||||
|
"""Remove ssh key from user"""
|
||||||
|
|
||||||
|
success, message, code = remove_ssh_key(ssh_input.username, ssh_input.ssh_key)
|
||||||
|
|
||||||
|
return UserMutationReturn(
|
||||||
|
success=success,
|
||||||
|
message=message,
|
||||||
|
code=code,
|
||||||
|
user=get_user_by_username(ssh_input.username),
|
||||||
|
)
|
74
selfprivacy_api/graphql/mutations/ssh_utils.py
Normal file
74
selfprivacy_api/graphql/mutations/ssh_utils.py
Normal file
|
@ -0,0 +1,74 @@
|
||||||
|
from selfprivacy_api.graphql.common_types.user import ensure_ssh_and_users_fields_exist
|
||||||
|
from selfprivacy_api.utils import (
|
||||||
|
WriteUserData,
|
||||||
|
validate_ssh_public_key,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def create_ssh_key(username: str, ssh_key: str) -> tuple[bool, str, int]:
|
||||||
|
"""Create a new ssh key"""
|
||||||
|
|
||||||
|
if not validate_ssh_public_key(ssh_key):
|
||||||
|
return (
|
||||||
|
False,
|
||||||
|
"Invalid key type. Only ssh-ed25519 and ssh-rsa are supported",
|
||||||
|
400,
|
||||||
|
)
|
||||||
|
|
||||||
|
with WriteUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
if username == data["username"]:
|
||||||
|
if ssh_key in data["sshKeys"]:
|
||||||
|
return False, "Key already exists", 409
|
||||||
|
|
||||||
|
data["sshKeys"].append(ssh_key)
|
||||||
|
return True, "New SSH key successfully written", 201
|
||||||
|
|
||||||
|
if username == "root":
|
||||||
|
if ssh_key in data["ssh"]["rootKeys"]:
|
||||||
|
return False, "Key already exists", 409
|
||||||
|
|
||||||
|
data["ssh"]["rootKeys"].append(ssh_key)
|
||||||
|
return True, "New SSH key successfully written", 201
|
||||||
|
|
||||||
|
for user in data["users"]:
|
||||||
|
if user["username"] == username:
|
||||||
|
if ssh_key in user["sshKeys"]:
|
||||||
|
return False, "Key already exists", 409
|
||||||
|
|
||||||
|
user["sshKeys"].append(ssh_key)
|
||||||
|
return True, "New SSH key successfully written", 201
|
||||||
|
|
||||||
|
return False, "User not found", 404
|
||||||
|
|
||||||
|
|
||||||
|
def remove_ssh_key(username: str, ssh_key: str) -> tuple[bool, str, int]:
|
||||||
|
"""Delete a ssh key"""
|
||||||
|
|
||||||
|
with WriteUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
if username == "root":
|
||||||
|
if ssh_key in data["ssh"]["rootKeys"]:
|
||||||
|
data["ssh"]["rootKeys"].remove(ssh_key)
|
||||||
|
return True, "SSH key deleted", 200
|
||||||
|
|
||||||
|
return False, "Key not found", 404
|
||||||
|
|
||||||
|
if username == data["username"]:
|
||||||
|
if ssh_key in data["sshKeys"]:
|
||||||
|
data["sshKeys"].remove(ssh_key)
|
||||||
|
return True, "SSH key deleted", 200
|
||||||
|
|
||||||
|
return False, "Key not found", 404
|
||||||
|
|
||||||
|
for user in data["users"]:
|
||||||
|
if user["username"] == username:
|
||||||
|
if ssh_key in user["sshKeys"]:
|
||||||
|
user["sshKeys"].remove(ssh_key)
|
||||||
|
return True, "SSH key deleted", 200
|
||||||
|
|
||||||
|
return False, "Key not found", 404
|
||||||
|
|
||||||
|
return False, "User not found", 404
|
|
@ -7,7 +7,7 @@ from selfprivacy_api.graphql import IsAuthenticated
|
||||||
from selfprivacy_api.graphql.mutations.mutation_interface import (
|
from selfprivacy_api.graphql.mutations.mutation_interface import (
|
||||||
MutationReturnInterface,
|
MutationReturnInterface,
|
||||||
)
|
)
|
||||||
from selfprivacy_api.utils import ReadUserData, WriteUserData
|
from selfprivacy_api.utils import WriteUserData
|
||||||
|
|
||||||
|
|
||||||
@strawberry.type
|
@strawberry.type
|
||||||
|
|
65
selfprivacy_api/graphql/mutations/users_mutations.py
Normal file
65
selfprivacy_api/graphql/mutations/users_mutations.py
Normal file
|
@ -0,0 +1,65 @@
|
||||||
|
#!/usr/bin/env python3
|
||||||
|
"""Users management module"""
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
|
import strawberry
|
||||||
|
from selfprivacy_api.graphql import IsAuthenticated
|
||||||
|
from selfprivacy_api.graphql.common_types.user import (
|
||||||
|
UserMutationReturn,
|
||||||
|
get_user_by_username,
|
||||||
|
)
|
||||||
|
from selfprivacy_api.graphql.mutations.mutation_interface import (
|
||||||
|
GenericMutationReturn,
|
||||||
|
)
|
||||||
|
from selfprivacy_api.graphql.mutations.users_utils import (
|
||||||
|
create_user,
|
||||||
|
delete_user,
|
||||||
|
update_user,
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.input
|
||||||
|
class UserMutationInput:
|
||||||
|
"""Input type for user mutation"""
|
||||||
|
|
||||||
|
username: str
|
||||||
|
password: str
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.type
|
||||||
|
class UserMutations:
|
||||||
|
"""Mutations change user settings"""
|
||||||
|
|
||||||
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||||
|
def create_user(self, user: UserMutationInput) -> UserMutationReturn:
|
||||||
|
|
||||||
|
success, message, code = create_user(user.username, user.password)
|
||||||
|
|
||||||
|
return UserMutationReturn(
|
||||||
|
success=success,
|
||||||
|
message=message,
|
||||||
|
code=code,
|
||||||
|
user=get_user_by_username(user.username),
|
||||||
|
)
|
||||||
|
|
||||||
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||||
|
def delete_user(self, username: str) -> GenericMutationReturn:
|
||||||
|
success, message, code = delete_user(username)
|
||||||
|
|
||||||
|
return GenericMutationReturn(
|
||||||
|
success=success,
|
||||||
|
message=message,
|
||||||
|
code=code,
|
||||||
|
)
|
||||||
|
|
||||||
|
@strawberry.mutation(permission_classes=[IsAuthenticated])
|
||||||
|
def update_user(self, user: UserMutationInput) -> UserMutationReturn:
|
||||||
|
"""Update user mutation"""
|
||||||
|
|
||||||
|
success, message, code = update_user(user.username, user.password)
|
||||||
|
|
||||||
|
return UserMutationReturn(
|
||||||
|
success=success,
|
||||||
|
message=message,
|
||||||
|
code=code,
|
||||||
|
user=get_user_by_username(user.username),
|
||||||
|
)
|
111
selfprivacy_api/graphql/mutations/users_utils.py
Normal file
111
selfprivacy_api/graphql/mutations/users_utils.py
Normal file
|
@ -0,0 +1,111 @@
|
||||||
|
import re
|
||||||
|
from selfprivacy_api.utils import (
|
||||||
|
WriteUserData,
|
||||||
|
ReadUserData,
|
||||||
|
is_username_forbidden,
|
||||||
|
)
|
||||||
|
from selfprivacy_api.utils import hash_password
|
||||||
|
|
||||||
|
|
||||||
|
def ensure_ssh_and_users_fields_exist(data):
|
||||||
|
if "ssh" not in data:
|
||||||
|
data["ssh"] = []
|
||||||
|
data["ssh"]["rootKeys"] = []
|
||||||
|
|
||||||
|
elif data["ssh"].get("rootKeys") is None:
|
||||||
|
data["ssh"]["rootKeys"] = []
|
||||||
|
|
||||||
|
if "sshKeys" not in data:
|
||||||
|
data["sshKeys"] = []
|
||||||
|
|
||||||
|
if "users" not in data:
|
||||||
|
data["users"] = []
|
||||||
|
|
||||||
|
|
||||||
|
def create_user(username: str, password: str) -> tuple[bool, str, int]:
|
||||||
|
"""Create a new user"""
|
||||||
|
|
||||||
|
# Check if password is null or none
|
||||||
|
if password == "":
|
||||||
|
return False, "Password is null", 400
|
||||||
|
|
||||||
|
# Check if username is forbidden
|
||||||
|
if is_username_forbidden(username):
|
||||||
|
return False, "Username is forbidden", 409
|
||||||
|
|
||||||
|
# Check is username passes regex
|
||||||
|
if not re.match(r"^[a-z_][a-z0-9_]+$", username):
|
||||||
|
return False, "Username must be alphanumeric", 400
|
||||||
|
|
||||||
|
# Check if username less than 32 characters
|
||||||
|
if len(username) >= 32:
|
||||||
|
return False, "Username must be less than 32 characters", 400
|
||||||
|
|
||||||
|
with ReadUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
# Return 409 if user already exists
|
||||||
|
if data["username"] == username:
|
||||||
|
return False, "User already exists", 409
|
||||||
|
|
||||||
|
for data_user in data["users"]:
|
||||||
|
if data_user["username"] == username:
|
||||||
|
return False, "User already exists", 409
|
||||||
|
|
||||||
|
hashed_password = hash_password(password)
|
||||||
|
|
||||||
|
with WriteUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
data["users"].append(
|
||||||
|
{
|
||||||
|
"username": username,
|
||||||
|
"hashedPassword": hashed_password,
|
||||||
|
"sshKeys": [],
|
||||||
|
}
|
||||||
|
)
|
||||||
|
|
||||||
|
return True, "User was successfully created!", 201
|
||||||
|
|
||||||
|
|
||||||
|
def delete_user(username: str) -> tuple[bool, str, int]:
|
||||||
|
with WriteUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
if username == data["username"] or username == "root":
|
||||||
|
return False, "Cannot delete main or root user", 400
|
||||||
|
|
||||||
|
# Return 404 if user does not exist
|
||||||
|
for data_user in data["users"]:
|
||||||
|
if data_user["username"] == username:
|
||||||
|
data["users"].remove(data_user)
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
return False, "User does not exist", 404
|
||||||
|
|
||||||
|
return True, "User was deleted", 200
|
||||||
|
|
||||||
|
|
||||||
|
def update_user(username: str, password: str) -> tuple[bool, str, int]:
|
||||||
|
# Check if password is null or none
|
||||||
|
if password == "":
|
||||||
|
return False, "Password is null", 400
|
||||||
|
|
||||||
|
hashed_password = hash_password(password)
|
||||||
|
|
||||||
|
with WriteUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
if username == data["username"]:
|
||||||
|
data["hashedMasterPassword"] = hashed_password
|
||||||
|
|
||||||
|
# Return 404 if user does not exist
|
||||||
|
else:
|
||||||
|
for data_user in data["users"]:
|
||||||
|
if data_user["username"] == username:
|
||||||
|
data_user["hashedPassword"] = hashed_password
|
||||||
|
break
|
||||||
|
else:
|
||||||
|
return False, "User does not exist", 404
|
||||||
|
|
||||||
|
return True, "User was successfully updated", 200
|
38
selfprivacy_api/graphql/queries/users.py
Normal file
38
selfprivacy_api/graphql/queries/users.py
Normal file
|
@ -0,0 +1,38 @@
|
||||||
|
"""Users"""
|
||||||
|
# pylint: disable=too-few-public-methods
|
||||||
|
import typing
|
||||||
|
import strawberry
|
||||||
|
|
||||||
|
from selfprivacy_api.graphql.common_types.user import (
|
||||||
|
User,
|
||||||
|
ensure_ssh_and_users_fields_exist,
|
||||||
|
get_user_by_username,
|
||||||
|
)
|
||||||
|
from selfprivacy_api.utils import ReadUserData
|
||||||
|
from selfprivacy_api.graphql import IsAuthenticated
|
||||||
|
|
||||||
|
|
||||||
|
def get_users() -> typing.List[User]:
|
||||||
|
"""Get users"""
|
||||||
|
user_list = []
|
||||||
|
with ReadUserData() as data:
|
||||||
|
ensure_ssh_and_users_fields_exist(data)
|
||||||
|
|
||||||
|
for user in data["users"]:
|
||||||
|
user_list.append(get_user_by_username(user["username"]))
|
||||||
|
|
||||||
|
user_list.append(get_user_by_username(data["username"]))
|
||||||
|
|
||||||
|
return user_list
|
||||||
|
|
||||||
|
|
||||||
|
@strawberry.type
|
||||||
|
class Users:
|
||||||
|
@strawberry.field(permission_classes=[IsAuthenticated])
|
||||||
|
def get_user(self, username: str) -> typing.Optional[User]:
|
||||||
|
"""Get users"""
|
||||||
|
return get_user_by_username(username)
|
||||||
|
|
||||||
|
all_users: typing.List[User] = strawberry.field(
|
||||||
|
permission_classes=[IsAuthenticated], resolver=get_users
|
||||||
|
)
|
|
@ -1,9 +1,10 @@
|
||||||
"""GraphQL API for SelfPrivacy."""
|
"""GraphQL API for SelfPrivacy."""
|
||||||
# pylint: disable=too-few-public-methods
|
# pylint: disable=too-few-public-methods
|
||||||
import typing
|
|
||||||
import strawberry
|
import strawberry
|
||||||
from selfprivacy_api.graphql import IsAuthenticated
|
from selfprivacy_api.graphql import IsAuthenticated
|
||||||
from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations
|
from selfprivacy_api.graphql.mutations.api_mutations import ApiMutations
|
||||||
|
from selfprivacy_api.graphql.mutations.ssh_mutations import SshMutations
|
||||||
from selfprivacy_api.graphql.mutations.storage_mutation import StorageMutations
|
from selfprivacy_api.graphql.mutations.storage_mutation import StorageMutations
|
||||||
from selfprivacy_api.graphql.mutations.system_mutations import SystemMutations
|
from selfprivacy_api.graphql.mutations.system_mutations import SystemMutations
|
||||||
|
|
||||||
|
@ -11,6 +12,9 @@ from selfprivacy_api.graphql.queries.api_queries import Api
|
||||||
from selfprivacy_api.graphql.queries.storage import Storage
|
from selfprivacy_api.graphql.queries.storage import Storage
|
||||||
from selfprivacy_api.graphql.queries.system import System
|
from selfprivacy_api.graphql.queries.system import System
|
||||||
|
|
||||||
|
from selfprivacy_api.graphql.mutations.users_mutations import UserMutations
|
||||||
|
from selfprivacy_api.graphql.queries.users import Users
|
||||||
|
|
||||||
|
|
||||||
@strawberry.type
|
@strawberry.type
|
||||||
class Query:
|
class Query:
|
||||||
|
@ -26,6 +30,11 @@ class Query:
|
||||||
"""API access status"""
|
"""API access status"""
|
||||||
return Api()
|
return Api()
|
||||||
|
|
||||||
|
@strawberry.field(permission_classes=[IsAuthenticated])
|
||||||
|
def users(self) -> Users:
|
||||||
|
"""Users queries"""
|
||||||
|
return Users()
|
||||||
|
|
||||||
@strawberry.field(permission_classes=[IsAuthenticated])
|
@strawberry.field(permission_classes=[IsAuthenticated])
|
||||||
def storage(self) -> Storage:
|
def storage(self) -> Storage:
|
||||||
"""Storage queries"""
|
"""Storage queries"""
|
||||||
|
@ -33,7 +42,13 @@ class Query:
|
||||||
|
|
||||||
|
|
||||||
@strawberry.type
|
@strawberry.type
|
||||||
class Mutation(ApiMutations, SystemMutations, StorageMutations):
|
class Mutation(
|
||||||
|
ApiMutations,
|
||||||
|
SystemMutations,
|
||||||
|
UserMutations,
|
||||||
|
SshMutations,
|
||||||
|
StorageMutations,
|
||||||
|
):
|
||||||
"""Root schema for mutations"""
|
"""Root schema for mutations"""
|
||||||
|
|
||||||
pass
|
pass
|
||||||
|
|
|
@ -159,3 +159,17 @@ def get_dkim_key(domain):
|
||||||
dkim = cat_process.communicate()[0]
|
dkim = cat_process.communicate()[0]
|
||||||
return str(dkim, "utf-8")
|
return str(dkim, "utf-8")
|
||||||
return None
|
return None
|
||||||
|
|
||||||
|
|
||||||
|
def hash_password(password):
|
||||||
|
hashing_command = ["mkpasswd", "-m", "sha-512", password]
|
||||||
|
password_hash_process_descriptor = subprocess.Popen(
|
||||||
|
hashing_command,
|
||||||
|
shell=False,
|
||||||
|
stdout=subprocess.PIPE,
|
||||||
|
stderr=subprocess.STDOUT,
|
||||||
|
)
|
||||||
|
hashed_password = password_hash_process_descriptor.communicate()[0]
|
||||||
|
hashed_password = hashed_password.decode("ascii")
|
||||||
|
hashed_password = hashed_password.rstrip()
|
||||||
|
return hashed_password
|
||||||
|
|
|
@ -20,5 +20,9 @@ def generate_system_query(query_array):
|
||||||
return "query TestSystem {\n system {" + "\n".join(query_array) + "}\n}"
|
return "query TestSystem {\n system {" + "\n".join(query_array) + "}\n}"
|
||||||
|
|
||||||
|
|
||||||
|
def generate_users_query(query_array):
|
||||||
|
return "query TestUsers {\n users {" + "\n".join(query_array) + "}\n}"
|
||||||
|
|
||||||
|
|
||||||
def mnemonic_to_hex(mnemonic):
|
def mnemonic_to_hex(mnemonic):
|
||||||
return Mnemonic(language="english").to_entropy(mnemonic).hex()
|
return Mnemonic(language="english").to_entropy(mnemonic).hex()
|
||||||
|
|
|
@ -1,12 +1,10 @@
|
||||||
# pylint: disable=redefined-outer-name
|
# pylint: disable=redefined-outer-name
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
# pylint: disable=missing-function-docstring
|
# pylint: disable=missing-function-docstring
|
||||||
import json
|
|
||||||
import os
|
import os
|
||||||
import pytest
|
import pytest
|
||||||
import datetime
|
|
||||||
|
|
||||||
from tests.common import generate_system_query, read_json, write_json
|
from tests.common import generate_system_query, read_json
|
||||||
|
|
||||||
|
|
||||||
@pytest.fixture
|
@pytest.fixture
|
||||||
|
@ -56,7 +54,7 @@ class ProcessMock:
|
||||||
self.args = args
|
self.args = args
|
||||||
self.kwargs = kwargs
|
self.kwargs = kwargs
|
||||||
|
|
||||||
def communicate():
|
def communicate(): # pylint: disable=no-method-argument
|
||||||
return (b"", None)
|
return (b"", None)
|
||||||
|
|
||||||
returncode = 0
|
returncode = 0
|
||||||
|
@ -65,7 +63,7 @@ class ProcessMock:
|
||||||
class BrokenServiceMock(ProcessMock):
|
class BrokenServiceMock(ProcessMock):
|
||||||
"""Mock subprocess.Popen for broken service"""
|
"""Mock subprocess.Popen for broken service"""
|
||||||
|
|
||||||
def communicate():
|
def communicate(): # pylint: disable=no-method-argument
|
||||||
return (b"Testing error", None)
|
return (b"Testing error", None)
|
||||||
|
|
||||||
returncode = 3
|
returncode = 3
|
||||||
|
|
|
@ -1,7 +1,6 @@
|
||||||
# pylint: disable=redefined-outer-name
|
# pylint: disable=redefined-outer-name
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
# pylint: disable=missing-function-docstring
|
# pylint: disable=missing-function-docstring
|
||||||
import pytest
|
|
||||||
|
|
||||||
from tests.common import generate_api_query
|
from tests.common import generate_api_query
|
||||||
from tests.test_graphql.test_api_devices import API_DEVICES_QUERY
|
from tests.test_graphql.test_api_devices import API_DEVICES_QUERY
|
||||||
|
|
|
@ -1,9 +1,6 @@
|
||||||
# pylint: disable=redefined-outer-name
|
# pylint: disable=redefined-outer-name
|
||||||
# pylint: disable=unused-argument
|
# pylint: disable=unused-argument
|
||||||
# pylint: disable=missing-function-docstring
|
# pylint: disable=missing-function-docstring
|
||||||
import json
|
|
||||||
from time import strftime
|
|
||||||
import pytest
|
|
||||||
import datetime
|
import datetime
|
||||||
|
|
||||||
from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json
|
from tests.common import generate_api_query, mnemonic_to_hex, read_json, write_json
|
||||||
|
|
353
tests/test_graphql/test_ssh.py
Normal file
353
tests/test_graphql/test_ssh.py
Normal file
|
@ -0,0 +1,353 @@
|
||||||
|
# pylint: disable=redefined-outer-name
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tests.common import read_json
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessMock:
|
||||||
|
"""Mock subprocess.Popen"""
|
||||||
|
|
||||||
|
def __init__(self, args, **kwargs):
|
||||||
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
def communicate(): # pylint: disable=no-method-argument
|
||||||
|
return (b"NEW_HASHED", None)
|
||||||
|
|
||||||
|
returncode = 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_subprocess_popen(mocker):
|
||||||
|
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def some_users(mocker, datadir):
|
||||||
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json")
|
||||||
|
assert read_json(datadir / "some_users.json")["users"] == [
|
||||||
|
{
|
||||||
|
"username": "user1",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_1",
|
||||||
|
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
||||||
|
},
|
||||||
|
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
|
||||||
|
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
|
||||||
|
]
|
||||||
|
return datadir
|
||||||
|
|
||||||
|
|
||||||
|
# TESTS ########################################################
|
||||||
|
|
||||||
|
|
||||||
|
API_CREATE_SSH_KEY_MUTATION = """
|
||||||
|
mutation addSshKey($sshInput: SshMutationInput!) {
|
||||||
|
addSshKey(sshInput: $sshInput) {
|
||||||
|
success
|
||||||
|
message
|
||||||
|
code
|
||||||
|
user {
|
||||||
|
username
|
||||||
|
sshKeys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_ssh_key_unauthorized(client, some_users, mock_subprocess_popen):
|
||||||
|
response = client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user1",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_ssh_key(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user1",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["code"] == 201
|
||||||
|
assert response.json["data"]["addSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["addSshKey"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["user"]["username"] == "user1"
|
||||||
|
assert response.json["data"]["addSshKey"]["user"]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY user1@pc",
|
||||||
|
"ssh-rsa KEY test_key@pc",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_root_ssh_key(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "root",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["code"] == 201
|
||||||
|
assert response.json["data"]["addSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["addSshKey"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["user"]["username"] == "root"
|
||||||
|
assert response.json["data"]["addSshKey"]["user"]["sshKeys"] == [
|
||||||
|
"ssh-ed25519 KEY test@pc",
|
||||||
|
"ssh-rsa KEY test_key@pc",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_main_ssh_key(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "tester",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["code"] == 201
|
||||||
|
assert response.json["data"]["addSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["addSshKey"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["user"]["username"] == "tester"
|
||||||
|
assert response.json["data"]["addSshKey"]["user"]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY test@pc",
|
||||||
|
"ssh-rsa KEY test_key@pc",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_bad_ssh_key(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user1",
|
||||||
|
"sshKey": "trust me, this is the ssh key",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["code"] == 400
|
||||||
|
assert response.json["data"]["addSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["addSshKey"]["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_ssh_key_nonexistent_user(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user666",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["addSshKey"]["code"] == 404
|
||||||
|
assert response.json["data"]["addSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["addSshKey"]["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
API_REMOVE_SSH_KEY_MUTATION = """
|
||||||
|
mutation removeSshKey($sshInput: SshMutationInput!) {
|
||||||
|
removeSshKey(sshInput: $sshInput) {
|
||||||
|
success
|
||||||
|
message
|
||||||
|
code
|
||||||
|
user {
|
||||||
|
username
|
||||||
|
sshKeys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_remove_ssh_key_unauthorized(client, some_users, mock_subprocess_popen):
|
||||||
|
response = client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_REMOVE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user1",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_remove_ssh_key(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_REMOVE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user1",
|
||||||
|
"sshKey": "ssh-rsa KEY user1@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["code"] == 200
|
||||||
|
assert response.json["data"]["removeSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["removeSshKey"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["user"]["username"] == "user1"
|
||||||
|
assert response.json["data"]["removeSshKey"]["user"]["sshKeys"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_remove_root_ssh_key(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_REMOVE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "root",
|
||||||
|
"sshKey": "ssh-ed25519 KEY test@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["code"] == 200
|
||||||
|
assert response.json["data"]["removeSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["removeSshKey"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["user"]["username"] == "root"
|
||||||
|
assert response.json["data"]["removeSshKey"]["user"]["sshKeys"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_remove_main_ssh_key(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_REMOVE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "tester",
|
||||||
|
"sshKey": "ssh-rsa KEY test@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["code"] == 200
|
||||||
|
assert response.json["data"]["removeSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["removeSshKey"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["user"]["username"] == "tester"
|
||||||
|
assert response.json["data"]["removeSshKey"]["user"]["sshKeys"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_remove_nonexistent_ssh_key(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_REMOVE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user1",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["code"] == 404
|
||||||
|
assert response.json["data"]["removeSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["removeSshKey"]["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_remove_ssh_key_nonexistent_user(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_REMOVE_SSH_KEY_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"sshInput": {
|
||||||
|
"username": "user666",
|
||||||
|
"sshKey": "ssh-rsa KEY test_key@pc",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["removeSshKey"]["code"] == 404
|
||||||
|
assert response.json["data"]["removeSshKey"]["message"] is not None
|
||||||
|
assert response.json["data"]["removeSshKey"]["success"] is False
|
71
tests/test_graphql/test_ssh/some_users.json
Normal file
71
tests/test_graphql/test_ssh/some_users.json
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
{
|
||||||
|
"backblaze": {
|
||||||
|
"accountId": "ID",
|
||||||
|
"accountKey": "KEY",
|
||||||
|
"bucket": "selfprivacy"
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"token": "TEST_TOKEN",
|
||||||
|
"enableSwagger": false
|
||||||
|
},
|
||||||
|
"bitwarden": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"cloudflare": {
|
||||||
|
"apiKey": "TOKEN"
|
||||||
|
},
|
||||||
|
"databasePassword": "PASSWORD",
|
||||||
|
"domain": "test.tld",
|
||||||
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
|
"hostname": "test-instance",
|
||||||
|
"nextcloud": {
|
||||||
|
"adminPassword": "ADMIN",
|
||||||
|
"databasePassword": "ADMIN",
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"resticPassword": "PASS",
|
||||||
|
"ssh": {
|
||||||
|
"enable": true,
|
||||||
|
"passwordAuthentication": true,
|
||||||
|
"rootKeys": [
|
||||||
|
"ssh-ed25519 KEY test@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"username": "tester",
|
||||||
|
"gitea": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"ocserv": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"pleroma": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"autoUpgrade": {
|
||||||
|
"enable": true,
|
||||||
|
"allowReboot": true
|
||||||
|
},
|
||||||
|
"timezone": "Europe/Moscow",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
],
|
||||||
|
"users": [
|
||||||
|
{
|
||||||
|
"username": "user1",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_1",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY user1@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"username": "user2",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_2",
|
||||||
|
"sshKeys": [
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"username": "user3",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_3"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
733
tests/test_graphql/test_users.py
Normal file
733
tests/test_graphql/test_users.py
Normal file
|
@ -0,0 +1,733 @@
|
||||||
|
# pylint: disable=redefined-outer-name
|
||||||
|
# pylint: disable=unused-argument
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from tests.common import (
|
||||||
|
generate_users_query,
|
||||||
|
read_json,
|
||||||
|
)
|
||||||
|
|
||||||
|
invalid_usernames = [
|
||||||
|
"messagebus",
|
||||||
|
"postfix",
|
||||||
|
"polkituser",
|
||||||
|
"dovecot2",
|
||||||
|
"dovenull",
|
||||||
|
"nginx",
|
||||||
|
"postgres",
|
||||||
|
"systemd-journal-gateway",
|
||||||
|
"prosody",
|
||||||
|
"systemd-network",
|
||||||
|
"systemd-resolve",
|
||||||
|
"systemd-timesync",
|
||||||
|
"opendkim",
|
||||||
|
"rspamd",
|
||||||
|
"sshd",
|
||||||
|
"selfprivacy-api",
|
||||||
|
"restic",
|
||||||
|
"redis",
|
||||||
|
"pleroma",
|
||||||
|
"ocserv",
|
||||||
|
"nextcloud",
|
||||||
|
"memcached",
|
||||||
|
"knot-resolver",
|
||||||
|
"gitea",
|
||||||
|
"bitwarden_rs",
|
||||||
|
"vaultwarden",
|
||||||
|
"acme",
|
||||||
|
"virtualMail",
|
||||||
|
"nixbld1",
|
||||||
|
"nixbld2",
|
||||||
|
"nixbld29",
|
||||||
|
"nobody",
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
## FIXTURES ###################################################
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def no_users(mocker, datadir):
|
||||||
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "no_users.json")
|
||||||
|
assert read_json(datadir / "no_users.json")["users"] == []
|
||||||
|
return datadir
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def one_user(mocker, datadir):
|
||||||
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "one_user.json")
|
||||||
|
assert read_json(datadir / "one_user.json")["users"] == [
|
||||||
|
{
|
||||||
|
"username": "user1",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_1",
|
||||||
|
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
||||||
|
}
|
||||||
|
]
|
||||||
|
return datadir
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def some_users(mocker, datadir):
|
||||||
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "some_users.json")
|
||||||
|
assert read_json(datadir / "some_users.json")["users"] == [
|
||||||
|
{
|
||||||
|
"username": "user1",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_1",
|
||||||
|
"sshKeys": ["ssh-rsa KEY user1@pc"],
|
||||||
|
},
|
||||||
|
{"username": "user2", "hashedPassword": "HASHED_PASSWORD_2", "sshKeys": []},
|
||||||
|
{"username": "user3", "hashedPassword": "HASHED_PASSWORD_3"},
|
||||||
|
]
|
||||||
|
return datadir
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def undefined_settings(mocker, datadir):
|
||||||
|
mocker.patch("selfprivacy_api.utils.USERDATA_FILE", new=datadir / "undefined.json")
|
||||||
|
assert "users" not in read_json(datadir / "undefined.json")
|
||||||
|
return datadir
|
||||||
|
|
||||||
|
|
||||||
|
class ProcessMock:
|
||||||
|
"""Mock subprocess.Popen"""
|
||||||
|
|
||||||
|
def __init__(self, args, **kwargs):
|
||||||
|
self.args = args
|
||||||
|
self.kwargs = kwargs
|
||||||
|
|
||||||
|
def communicate(): # pylint: disable=no-method-argument
|
||||||
|
return (b"NEW_HASHED", None)
|
||||||
|
|
||||||
|
returncode = 0
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.fixture
|
||||||
|
def mock_subprocess_popen(mocker):
|
||||||
|
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
|
||||||
|
return mock
|
||||||
|
|
||||||
|
|
||||||
|
## TESTS ######################################################
|
||||||
|
|
||||||
|
API_USERS_INFO = """
|
||||||
|
allUsers {
|
||||||
|
username
|
||||||
|
sshKeys
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_users_unauthorized(client, some_users, mock_subprocess_popen):
|
||||||
|
"""Test wrong auth"""
|
||||||
|
response = client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": generate_users_query([API_USERS_INFO]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_some_users(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": generate_users_query([API_USERS_INFO]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
assert len(response.json["data"]["users"]["allUsers"]) == 4
|
||||||
|
assert response.json["data"]["users"]["allUsers"][0]["username"] == "user1"
|
||||||
|
assert response.json["data"]["users"]["allUsers"][0]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY user1@pc"
|
||||||
|
]
|
||||||
|
|
||||||
|
assert response.json["data"]["users"]["allUsers"][1]["username"] == "user2"
|
||||||
|
assert response.json["data"]["users"]["allUsers"][1]["sshKeys"] == []
|
||||||
|
|
||||||
|
assert response.json["data"]["users"]["allUsers"][3]["username"] == "tester"
|
||||||
|
assert response.json["data"]["users"]["allUsers"][3]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_no_users(authorized_client, no_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": generate_users_query([API_USERS_INFO]),
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert len(response.json["data"]["users"]["allUsers"]) == 1
|
||||||
|
assert response.json["data"]["users"]["allUsers"][0]["username"] == "tester"
|
||||||
|
assert response.json["data"]["users"]["allUsers"][0]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
API_GET_USERS = """
|
||||||
|
query TestUsers($username: String!) {
|
||||||
|
users {
|
||||||
|
getUser(username: $username) {
|
||||||
|
sshKeys
|
||||||
|
username
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_one_user_unauthorized(client, one_user, mock_subprocess_popen):
|
||||||
|
response = client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_GET_USERS,
|
||||||
|
"variables": {
|
||||||
|
"username": "user1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_one_user(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_GET_USERS,
|
||||||
|
"variables": {
|
||||||
|
"username": "user1",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert len(response.json["data"]["users"]["getUser"]) == 2
|
||||||
|
assert response.json["data"]["users"]["getUser"]["username"] == "user1"
|
||||||
|
assert response.json["data"]["users"]["getUser"]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY user1@pc"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_some_user(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_GET_USERS,
|
||||||
|
"variables": {
|
||||||
|
"username": "user2",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert len(response.json["data"]["users"]["getUser"]) == 2
|
||||||
|
assert response.json["data"]["users"]["getUser"]["username"] == "user2"
|
||||||
|
assert response.json["data"]["users"]["getUser"]["sshKeys"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_root_user(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_GET_USERS,
|
||||||
|
"variables": {
|
||||||
|
"username": "root",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert len(response.json["data"]["users"]["getUser"]) == 2
|
||||||
|
assert response.json["data"]["users"]["getUser"]["username"] == "root"
|
||||||
|
assert response.json["data"]["users"]["getUser"]["sshKeys"] == [
|
||||||
|
"ssh-ed25519 KEY test@pc"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_main_user(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_GET_USERS,
|
||||||
|
"variables": {
|
||||||
|
"username": "tester",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert len(response.json["data"]["users"]["getUser"]) == 2
|
||||||
|
assert response.json["data"]["users"]["getUser"]["username"] == "tester"
|
||||||
|
assert response.json["data"]["users"]["getUser"]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_get_nonexistent_user(
|
||||||
|
authorized_client, one_user, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.get(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_GET_USERS,
|
||||||
|
"variables": {
|
||||||
|
"username": "tyler_durden",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["users"]["getUser"] is None
|
||||||
|
|
||||||
|
|
||||||
|
API_CREATE_USERS_MUTATION = """
|
||||||
|
mutation createUser($user: UserMutationInput!) {
|
||||||
|
createUser(user: $user) {
|
||||||
|
success
|
||||||
|
message
|
||||||
|
code
|
||||||
|
user {
|
||||||
|
username
|
||||||
|
sshKeys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_user_unauthorize(client, one_user, mock_subprocess_popen):
|
||||||
|
response = client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user2",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_user(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user2",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 201
|
||||||
|
assert response.json["data"]["createUser"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"]["username"] == "user2"
|
||||||
|
assert response.json["data"]["createUser"]["user"]["sshKeys"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_undefined_settings(
|
||||||
|
authorized_client, undefined_settings, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user2",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 201
|
||||||
|
assert response.json["data"]["createUser"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"]["username"] == "user2"
|
||||||
|
assert response.json["data"]["createUser"]["user"]["sshKeys"] == []
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_without_password(
|
||||||
|
authorized_client, one_user, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user2",
|
||||||
|
"password": "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 400
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_without_both(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "",
|
||||||
|
"password": "",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 400
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"] is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("username", invalid_usernames)
|
||||||
|
def test_graphql_add_system_username(
|
||||||
|
authorized_client, one_user, mock_subprocess_popen, username
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": username,
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 409
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"] is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_existing_user(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user1",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 409
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"]["username"] == "user1"
|
||||||
|
assert (
|
||||||
|
response.json["data"]["createUser"]["user"]["sshKeys"][0]
|
||||||
|
== "ssh-rsa KEY user1@pc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_main_user(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "tester",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 409
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"]["username"] == "tester"
|
||||||
|
assert (
|
||||||
|
response.json["data"]["createUser"]["user"]["sshKeys"][0]
|
||||||
|
== "ssh-rsa KEY test@pc"
|
||||||
|
)
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_add_long_username(authorized_client, one_user, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "a" * 32,
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 400
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"] is None
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("username", ["", "1", "фыр", "user1@", "^-^"])
|
||||||
|
def test_graphql_add_invalid_username(
|
||||||
|
authorized_client, one_user, mock_subprocess_popen, username
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_CREATE_USERS_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": username,
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["createUser"]["code"] == 400
|
||||||
|
assert response.json["data"]["createUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["createUser"]["user"] is None
|
||||||
|
|
||||||
|
|
||||||
|
API_DELETE_USER_MUTATION = """
|
||||||
|
mutation deleteUser($username: String!) {
|
||||||
|
deleteUser(username: $username) {
|
||||||
|
success
|
||||||
|
message
|
||||||
|
code
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_delete_user_unauthorized(client, some_users, mock_subprocess_popen):
|
||||||
|
response = client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_DELETE_USER_MUTATION,
|
||||||
|
"variables": {"username": "user1"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_delete_user(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_DELETE_USER_MUTATION,
|
||||||
|
"variables": {"username": "user1"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["deleteUser"]["code"] == 200
|
||||||
|
assert response.json["data"]["deleteUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["deleteUser"]["success"] is True
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("username", ["", "def"])
|
||||||
|
def test_graphql_delete_nonexistent_users(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen, username
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_DELETE_USER_MUTATION,
|
||||||
|
"variables": {"username": username},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["deleteUser"]["code"] == 404
|
||||||
|
assert response.json["data"]["deleteUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["deleteUser"]["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.parametrize("username", invalid_usernames)
|
||||||
|
def test_graphql_delete_system_users(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen, username
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_DELETE_USER_MUTATION,
|
||||||
|
"variables": {"username": username},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert (
|
||||||
|
response.json["data"]["deleteUser"]["code"] == 404
|
||||||
|
or response.json["data"]["deleteUser"]["code"] == 400
|
||||||
|
)
|
||||||
|
assert response.json["data"]["deleteUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["deleteUser"]["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_delete_main_user(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_DELETE_USER_MUTATION,
|
||||||
|
"variables": {"username": "tester"},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["deleteUser"]["code"] == 400
|
||||||
|
assert response.json["data"]["deleteUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["deleteUser"]["success"] is False
|
||||||
|
|
||||||
|
|
||||||
|
API_UPDATE_USER_MUTATION = """
|
||||||
|
mutation updateUser($user: UserMutationInput!) {
|
||||||
|
updateUser(user: $user) {
|
||||||
|
success
|
||||||
|
message
|
||||||
|
code
|
||||||
|
user {
|
||||||
|
username
|
||||||
|
sshKeys
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
"""
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_update_user_unauthorized(client, some_users, mock_subprocess_popen):
|
||||||
|
response = client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_UPDATE_USER_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user1",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is None
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_update_user(authorized_client, some_users, mock_subprocess_popen):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_UPDATE_USER_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user1",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["updateUser"]["code"] == 200
|
||||||
|
assert response.json["data"]["updateUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["updateUser"]["success"] is True
|
||||||
|
|
||||||
|
assert response.json["data"]["updateUser"]["user"]["username"] == "user1"
|
||||||
|
assert response.json["data"]["updateUser"]["user"]["sshKeys"] == [
|
||||||
|
"ssh-rsa KEY user1@pc"
|
||||||
|
]
|
||||||
|
assert mock_subprocess_popen.call_count == 1
|
||||||
|
|
||||||
|
|
||||||
|
def test_graphql_update_nonexistent_user(
|
||||||
|
authorized_client, some_users, mock_subprocess_popen
|
||||||
|
):
|
||||||
|
response = authorized_client.post(
|
||||||
|
"/graphql",
|
||||||
|
json={
|
||||||
|
"query": API_UPDATE_USER_MUTATION,
|
||||||
|
"variables": {
|
||||||
|
"user": {
|
||||||
|
"username": "user666",
|
||||||
|
"password": "12345678",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
)
|
||||||
|
assert response.status_code == 200
|
||||||
|
assert response.json.get("data") is not None
|
||||||
|
|
||||||
|
assert response.json["data"]["updateUser"]["code"] == 404
|
||||||
|
assert response.json["data"]["updateUser"]["message"] is not None
|
||||||
|
assert response.json["data"]["updateUser"]["success"] is False
|
||||||
|
|
||||||
|
assert response.json["data"]["updateUser"]["user"] is None
|
||||||
|
assert mock_subprocess_popen.call_count == 1
|
54
tests/test_graphql/test_users/no_users.json
Normal file
54
tests/test_graphql/test_users/no_users.json
Normal file
|
@ -0,0 +1,54 @@
|
||||||
|
{
|
||||||
|
"backblaze": {
|
||||||
|
"accountId": "ID",
|
||||||
|
"accountKey": "KEY",
|
||||||
|
"bucket": "selfprivacy"
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"token": "TEST_TOKEN",
|
||||||
|
"enableSwagger": false
|
||||||
|
},
|
||||||
|
"bitwarden": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"cloudflare": {
|
||||||
|
"apiKey": "TOKEN"
|
||||||
|
},
|
||||||
|
"databasePassword": "PASSWORD",
|
||||||
|
"domain": "test.tld",
|
||||||
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
|
"hostname": "test-instance",
|
||||||
|
"nextcloud": {
|
||||||
|
"adminPassword": "ADMIN",
|
||||||
|
"databasePassword": "ADMIN",
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"resticPassword": "PASS",
|
||||||
|
"ssh": {
|
||||||
|
"enable": true,
|
||||||
|
"passwordAuthentication": true,
|
||||||
|
"rootKeys": [
|
||||||
|
"ssh-ed25519 KEY test@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"username": "tester",
|
||||||
|
"gitea": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"ocserv": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"pleroma": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"autoUpgrade": {
|
||||||
|
"enable": true,
|
||||||
|
"allowReboot": true
|
||||||
|
},
|
||||||
|
"timezone": "Europe/Moscow",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
],
|
||||||
|
"users": [
|
||||||
|
]
|
||||||
|
}
|
61
tests/test_graphql/test_users/one_user.json
Normal file
61
tests/test_graphql/test_users/one_user.json
Normal file
|
@ -0,0 +1,61 @@
|
||||||
|
{
|
||||||
|
"backblaze": {
|
||||||
|
"accountId": "ID",
|
||||||
|
"accountKey": "KEY",
|
||||||
|
"bucket": "selfprivacy"
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"token": "TEST_TOKEN",
|
||||||
|
"enableSwagger": false
|
||||||
|
},
|
||||||
|
"bitwarden": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"cloudflare": {
|
||||||
|
"apiKey": "TOKEN"
|
||||||
|
},
|
||||||
|
"databasePassword": "PASSWORD",
|
||||||
|
"domain": "test.tld",
|
||||||
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
|
"hostname": "test-instance",
|
||||||
|
"nextcloud": {
|
||||||
|
"adminPassword": "ADMIN",
|
||||||
|
"databasePassword": "ADMIN",
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"resticPassword": "PASS",
|
||||||
|
"ssh": {
|
||||||
|
"enable": true,
|
||||||
|
"passwordAuthentication": true,
|
||||||
|
"rootKeys": [
|
||||||
|
"ssh-ed25519 KEY test@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"username": "tester",
|
||||||
|
"gitea": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"ocserv": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"pleroma": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"autoUpgrade": {
|
||||||
|
"enable": true,
|
||||||
|
"allowReboot": true
|
||||||
|
},
|
||||||
|
"timezone": "Europe/Moscow",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
],
|
||||||
|
"users": [
|
||||||
|
{
|
||||||
|
"username": "user1",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_1",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY user1@pc"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
71
tests/test_graphql/test_users/some_users.json
Normal file
71
tests/test_graphql/test_users/some_users.json
Normal file
|
@ -0,0 +1,71 @@
|
||||||
|
{
|
||||||
|
"backblaze": {
|
||||||
|
"accountId": "ID",
|
||||||
|
"accountKey": "KEY",
|
||||||
|
"bucket": "selfprivacy"
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"token": "TEST_TOKEN",
|
||||||
|
"enableSwagger": false
|
||||||
|
},
|
||||||
|
"bitwarden": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"cloudflare": {
|
||||||
|
"apiKey": "TOKEN"
|
||||||
|
},
|
||||||
|
"databasePassword": "PASSWORD",
|
||||||
|
"domain": "test.tld",
|
||||||
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
|
"hostname": "test-instance",
|
||||||
|
"nextcloud": {
|
||||||
|
"adminPassword": "ADMIN",
|
||||||
|
"databasePassword": "ADMIN",
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"resticPassword": "PASS",
|
||||||
|
"ssh": {
|
||||||
|
"enable": true,
|
||||||
|
"passwordAuthentication": true,
|
||||||
|
"rootKeys": [
|
||||||
|
"ssh-ed25519 KEY test@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"username": "tester",
|
||||||
|
"gitea": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"ocserv": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"pleroma": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"autoUpgrade": {
|
||||||
|
"enable": true,
|
||||||
|
"allowReboot": true
|
||||||
|
},
|
||||||
|
"timezone": "Europe/Moscow",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
],
|
||||||
|
"users": [
|
||||||
|
{
|
||||||
|
"username": "user1",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_1",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY user1@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"username": "user2",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_2",
|
||||||
|
"sshKeys": [
|
||||||
|
]
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"username": "user3",
|
||||||
|
"hashedPassword": "HASHED_PASSWORD_3"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
52
tests/test_graphql/test_users/undefined.json
Normal file
52
tests/test_graphql/test_users/undefined.json
Normal file
|
@ -0,0 +1,52 @@
|
||||||
|
{
|
||||||
|
"backblaze": {
|
||||||
|
"accountId": "ID",
|
||||||
|
"accountKey": "KEY",
|
||||||
|
"bucket": "selfprivacy"
|
||||||
|
},
|
||||||
|
"api": {
|
||||||
|
"token": "TEST_TOKEN",
|
||||||
|
"enableSwagger": false
|
||||||
|
},
|
||||||
|
"bitwarden": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"cloudflare": {
|
||||||
|
"apiKey": "TOKEN"
|
||||||
|
},
|
||||||
|
"databasePassword": "PASSWORD",
|
||||||
|
"domain": "test.tld",
|
||||||
|
"hashedMasterPassword": "HASHED_PASSWORD",
|
||||||
|
"hostname": "test-instance",
|
||||||
|
"nextcloud": {
|
||||||
|
"adminPassword": "ADMIN",
|
||||||
|
"databasePassword": "ADMIN",
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"resticPassword": "PASS",
|
||||||
|
"ssh": {
|
||||||
|
"enable": true,
|
||||||
|
"passwordAuthentication": true,
|
||||||
|
"rootKeys": [
|
||||||
|
"ssh-ed25519 KEY test@pc"
|
||||||
|
]
|
||||||
|
},
|
||||||
|
"username": "tester",
|
||||||
|
"gitea": {
|
||||||
|
"enable": false
|
||||||
|
},
|
||||||
|
"ocserv": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"pleroma": {
|
||||||
|
"enable": true
|
||||||
|
},
|
||||||
|
"autoUpgrade": {
|
||||||
|
"enable": true,
|
||||||
|
"allowReboot": true
|
||||||
|
},
|
||||||
|
"timezone": "Europe/Moscow",
|
||||||
|
"sshKeys": [
|
||||||
|
"ssh-rsa KEY test@pc"
|
||||||
|
]
|
||||||
|
}
|
Loading…
Reference in a new issue