mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2025-01-24 09:46:41 +00:00
671 lines
20 KiB
Python
671 lines
20 KiB
Python
import pytest
|
|
import shutil
|
|
|
|
from typing import Generator
|
|
from os import mkdir
|
|
|
|
from selfprivacy_api.utils.block_devices import BlockDevices
|
|
|
|
import selfprivacy_api.services as service_module
|
|
from selfprivacy_api.services import get_service_by_id
|
|
from selfprivacy_api.services.service import Service, ServiceStatus
|
|
from selfprivacy_api.services.test_service import DummyService
|
|
|
|
from tests.common import generate_service_query
|
|
from tests.test_graphql.common import assert_empty, assert_ok, get_data
|
|
from tests.test_graphql.test_system_nixos_tasks import prepare_nixos_rebuild_calls
|
|
|
|
LSBLK_BLOCKDEVICES_DICTS = [
|
|
{
|
|
"name": "sda1",
|
|
"path": "/dev/sda1",
|
|
"fsavail": "4614107136",
|
|
"fssize": "19814920192",
|
|
"fstype": "ext4",
|
|
"fsused": "14345314304",
|
|
"mountpoints": ["/nix/store", "/"],
|
|
"label": None,
|
|
"uuid": "ec80c004-baec-4a2c-851d-0e1807135511",
|
|
"size": 20210236928,
|
|
"model": None,
|
|
"serial": None,
|
|
"type": "part",
|
|
},
|
|
{
|
|
"name": "sda2",
|
|
"path": "/dev/sda2",
|
|
"fsavail": "4614107136",
|
|
"fssize": "19814920192",
|
|
"fstype": "ext4",
|
|
"fsused": "14345314304",
|
|
"mountpoints": ["/home"],
|
|
"label": None,
|
|
"uuid": "deadbeef-baec-4a2c-851d-0e1807135511",
|
|
"size": 20210236928,
|
|
"model": None,
|
|
"serial": None,
|
|
"type": "part",
|
|
},
|
|
]
|
|
|
|
|
|
@pytest.fixture()
|
|
def mock_lsblk_devices(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.utils.block_devices.BlockDevices.lsblk_device_dicts",
|
|
autospec=True,
|
|
return_value=LSBLK_BLOCKDEVICES_DICTS,
|
|
)
|
|
BlockDevices().update()
|
|
assert BlockDevices().lsblk_device_dicts() == LSBLK_BLOCKDEVICES_DICTS
|
|
devices = BlockDevices().get_block_devices()
|
|
|
|
assert len(devices) == 2
|
|
|
|
names = [device.name for device in devices]
|
|
assert "sda1" in names
|
|
assert "sda2" in names
|
|
return mock
|
|
|
|
|
|
@pytest.fixture()
|
|
def dummy_service_with_binds(dummy_service, mock_lsblk_devices, volume_folders):
|
|
binds = dummy_service.binds()
|
|
for bind in binds:
|
|
path = bind.binding_path
|
|
shutil.move(bind.binding_path, bind.location_at_volume())
|
|
mkdir(bind.binding_path)
|
|
|
|
bind.ensure_ownership()
|
|
bind.validate()
|
|
|
|
bind.bind()
|
|
return dummy_service
|
|
|
|
|
|
@pytest.fixture()
|
|
def only_dummy_service(dummy_service) -> Generator[DummyService, None, None]:
|
|
# because queries to services that are not really there error out
|
|
back_copy = service_module.services.copy()
|
|
service_module.services.clear()
|
|
service_module.services.append(dummy_service)
|
|
yield dummy_service
|
|
service_module.services.clear()
|
|
service_module.services.extend(back_copy)
|
|
|
|
|
|
@pytest.fixture()
|
|
def mock_check_volume(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.services.service.check_volume",
|
|
autospec=True,
|
|
return_value=None,
|
|
)
|
|
return mock
|
|
|
|
|
|
API_START_MUTATION = """
|
|
mutation TestStartService($service_id: String!) {
|
|
services {
|
|
startService(serviceId: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
service {
|
|
id
|
|
status
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
API_RESTART_MUTATION = """
|
|
mutation TestRestartService($service_id: String!) {
|
|
services {
|
|
restartService(serviceId: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
service {
|
|
id
|
|
status
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
API_ENABLE_MUTATION = """
|
|
mutation TestStartService($service_id: String!) {
|
|
services {
|
|
enableService(serviceId: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
service {
|
|
id
|
|
isEnabled
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
API_DISABLE_MUTATION = """
|
|
mutation TestStartService($service_id: String!) {
|
|
services {
|
|
disableService(serviceId: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
service {
|
|
id
|
|
isEnabled
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
API_STOP_MUTATION = """
|
|
mutation TestStopService($service_id: String!) {
|
|
services {
|
|
stopService(serviceId: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
service {
|
|
id
|
|
status
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
"""
|
|
API_SERVICES_QUERY = """
|
|
allServices {
|
|
id
|
|
status
|
|
isEnabled
|
|
}
|
|
"""
|
|
|
|
API_MOVE_MUTATION = """
|
|
mutation TestMoveService($input: MoveServiceInput!) {
|
|
services {
|
|
moveService(input: $input) {
|
|
success
|
|
message
|
|
code
|
|
job {
|
|
uid
|
|
status
|
|
}
|
|
service {
|
|
id
|
|
status
|
|
}
|
|
}
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def assert_notfound(data):
|
|
assert_errorcode(data, 404)
|
|
|
|
|
|
def assert_errorcode(data, errorcode):
|
|
assert data["code"] == errorcode
|
|
assert data["success"] is False
|
|
assert data["message"] is not None
|
|
|
|
|
|
def api_enable(client, service: Service) -> dict:
|
|
return api_enable_by_name(client, service.get_id())
|
|
|
|
|
|
def api_enable_by_name(client, service_id: str) -> dict:
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_ENABLE_MUTATION,
|
|
"variables": {"service_id": service_id},
|
|
},
|
|
)
|
|
return response
|
|
|
|
|
|
def api_disable(client, service: Service) -> dict:
|
|
return api_disable_by_name(client, service.get_id())
|
|
|
|
|
|
def api_disable_by_name(client, service_id: str) -> dict:
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DISABLE_MUTATION,
|
|
"variables": {"service_id": service_id},
|
|
},
|
|
)
|
|
return response
|
|
|
|
|
|
def api_start(client, service: Service) -> dict:
|
|
return api_start_by_name(client, service.get_id())
|
|
|
|
|
|
def api_start_by_name(client, service_id: str) -> dict:
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_START_MUTATION,
|
|
"variables": {"service_id": service_id},
|
|
},
|
|
)
|
|
return response
|
|
|
|
|
|
def api_move(client, service: Service, location: str) -> dict:
|
|
return api_move_by_name(client, service.get_id(), location)
|
|
|
|
|
|
def api_move_by_name(client, service_id: str, location: str) -> dict:
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_MOVE_MUTATION,
|
|
"variables": {
|
|
"input": {
|
|
"serviceId": service_id,
|
|
"location": location,
|
|
}
|
|
},
|
|
},
|
|
)
|
|
return response
|
|
|
|
|
|
def api_restart(client, service: Service) -> dict:
|
|
return api_restart_by_name(client, service.get_id())
|
|
|
|
|
|
def api_restart_by_name(client, service_id: str) -> dict:
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_RESTART_MUTATION,
|
|
"variables": {"service_id": service_id},
|
|
},
|
|
)
|
|
return response
|
|
|
|
|
|
def api_stop(client, service: Service) -> dict:
|
|
return api_stop_by_name(client, service.get_id())
|
|
|
|
|
|
def api_stop_by_name(client, service_id: str) -> dict:
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_STOP_MUTATION,
|
|
"variables": {"service_id": service_id},
|
|
},
|
|
)
|
|
return response
|
|
|
|
|
|
def api_all_services(authorized_client):
|
|
response = api_all_services_raw(authorized_client)
|
|
data = get_data(response)
|
|
result = data["services"]["allServices"]
|
|
assert result is not None
|
|
return result
|
|
|
|
|
|
def api_all_services_raw(client):
|
|
return client.post(
|
|
"/graphql",
|
|
json={"query": generate_service_query([API_SERVICES_QUERY])},
|
|
)
|
|
|
|
|
|
def api_service(authorized_client, service: Service):
|
|
id = service.get_id()
|
|
for _service in api_all_services(authorized_client):
|
|
if _service["id"] == id:
|
|
return _service
|
|
|
|
|
|
def test_get_services(authorized_client, only_dummy_service):
|
|
services = api_all_services(authorized_client)
|
|
assert len(services) == 1
|
|
|
|
api_dummy_service = services[0]
|
|
assert api_dummy_service["id"] == "testservice"
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
assert api_dummy_service["isEnabled"] is True
|
|
|
|
|
|
def test_enable_return_value(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_enable(authorized_client, dummy_service)
|
|
data = get_data(mutation_response)["services"]["enableService"]
|
|
assert_ok(data)
|
|
service = data["service"]
|
|
assert service["id"] == dummy_service.get_id()
|
|
assert service["isEnabled"] == True
|
|
|
|
|
|
def test_disable_return_value(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_disable(authorized_client, dummy_service)
|
|
data = get_data(mutation_response)["services"]["disableService"]
|
|
assert_ok(data)
|
|
service = data["service"]
|
|
assert service["id"] == dummy_service.get_id()
|
|
assert service["isEnabled"] == False
|
|
|
|
|
|
def test_start_return_value(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_start(authorized_client, dummy_service)
|
|
data = get_data(mutation_response)["services"]["startService"]
|
|
assert_ok(data)
|
|
service = data["service"]
|
|
assert service["id"] == dummy_service.get_id()
|
|
assert service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
|
|
def test_restart(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
dummy_service.set_delay(0.3)
|
|
mutation_response = api_restart(authorized_client, dummy_service)
|
|
data = get_data(mutation_response)["services"]["restartService"]
|
|
assert_ok(data)
|
|
service = data["service"]
|
|
assert service["id"] == dummy_service.get_id()
|
|
assert service["status"] == ServiceStatus.RELOADING.value
|
|
|
|
|
|
def test_stop_return_value(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_stop(authorized_client, dummy_service)
|
|
data = get_data(mutation_response)["services"]["stopService"]
|
|
assert_ok(data)
|
|
service = data["service"]
|
|
assert service["id"] == dummy_service.get_id()
|
|
assert service["status"] == ServiceStatus.INACTIVE.value
|
|
|
|
|
|
def test_allservices_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_all_services_raw(client)
|
|
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_start_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_start(client, dummy_service)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_restart_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_restart(client, dummy_service)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_stop_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_stop(client, dummy_service)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_enable_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_enable(client, dummy_service)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_disable_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_disable(client, dummy_service)
|
|
assert_empty(response)
|
|
|
|
|
|
def test_move_unauthorized(client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
response = api_move(client, dummy_service, "sda1")
|
|
assert_empty(response)
|
|
|
|
|
|
def test_start_nonexistent(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_start_by_name(authorized_client, "bogus_service")
|
|
data = get_data(mutation_response)["services"]["startService"]
|
|
assert_notfound(data)
|
|
|
|
assert data["service"] is None
|
|
|
|
|
|
def test_restart_nonexistent(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_restart_by_name(authorized_client, "bogus_service")
|
|
data = get_data(mutation_response)["services"]["restartService"]
|
|
assert_notfound(data)
|
|
|
|
assert data["service"] is None
|
|
|
|
|
|
def test_stop_nonexistent(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_stop_by_name(authorized_client, "bogus_service")
|
|
data = get_data(mutation_response)["services"]["stopService"]
|
|
assert_notfound(data)
|
|
|
|
assert data["service"] is None
|
|
|
|
|
|
def test_enable_nonexistent(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_enable_by_name(authorized_client, "bogus_service")
|
|
data = get_data(mutation_response)["services"]["enableService"]
|
|
assert_notfound(data)
|
|
|
|
assert data["service"] is None
|
|
|
|
|
|
def test_disable_nonexistent(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_disable_by_name(authorized_client, "bogus_service")
|
|
data = get_data(mutation_response)["services"]["disableService"]
|
|
assert_notfound(data)
|
|
|
|
assert data["service"] is None
|
|
|
|
|
|
def test_stop_start(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
# attempting to start an already started service
|
|
api_start(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
api_stop(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["status"] == ServiceStatus.INACTIVE.value
|
|
|
|
# attempting to stop an already stopped service
|
|
api_stop(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["status"] == ServiceStatus.INACTIVE.value
|
|
|
|
api_start(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
|
|
def test_disable_enable(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["isEnabled"] is True
|
|
|
|
# attempting to enable an already enableed service
|
|
api_enable(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["isEnabled"] is True
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
api_disable(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["isEnabled"] is False
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
# attempting to disable an already disableped service
|
|
api_disable(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["isEnabled"] is False
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
api_enable(authorized_client, dummy_service)
|
|
api_dummy_service = api_all_services(authorized_client)[0]
|
|
assert api_dummy_service["isEnabled"] is True
|
|
assert api_dummy_service["status"] == ServiceStatus.ACTIVE.value
|
|
|
|
|
|
def test_move_immovable(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
dummy_service.set_movable(False)
|
|
root = BlockDevices().get_root_block_device()
|
|
mutation_response = api_move(authorized_client, dummy_service, root.name)
|
|
data = get_data(mutation_response)["services"]["moveService"]
|
|
assert_errorcode(data, 400)
|
|
try:
|
|
assert "not movable" in data["message"]
|
|
except AssertionError:
|
|
raise ValueError("wrong type of error?: ", data["message"])
|
|
|
|
# is there a meaning in returning the service in this?
|
|
assert data["service"] is not None
|
|
assert data["job"] is None
|
|
|
|
|
|
def test_move_no_such_service(authorized_client, only_dummy_service):
|
|
mutation_response = api_move_by_name(authorized_client, "bogus_service", "sda1")
|
|
data = get_data(mutation_response)["services"]["moveService"]
|
|
assert_errorcode(data, 404)
|
|
|
|
assert data["service"] is None
|
|
assert data["job"] is None
|
|
|
|
|
|
def test_move_no_such_volume(authorized_client, only_dummy_service):
|
|
dummy_service = only_dummy_service
|
|
mutation_response = api_move(authorized_client, dummy_service, "bogus_volume")
|
|
data = get_data(mutation_response)["services"]["moveService"]
|
|
assert_notfound(data)
|
|
|
|
assert data["service"] is None
|
|
assert data["job"] is None
|
|
|
|
|
|
def test_move_same_volume(authorized_client, dummy_service):
|
|
# dummy_service = only_dummy_service
|
|
|
|
# we need a drive that actually exists
|
|
root_volume = BlockDevices().get_root_block_device()
|
|
dummy_service.set_simulated_moves(False)
|
|
dummy_service.set_drive(root_volume.name)
|
|
|
|
mutation_response = api_move(authorized_client, dummy_service, root_volume.name)
|
|
data = get_data(mutation_response)["services"]["moveService"]
|
|
assert_errorcode(data, 400)
|
|
|
|
# is there a meaning in returning the service in this?
|
|
assert data["service"] is not None
|
|
# We do not create a job if task is not created
|
|
assert data["job"] is None
|
|
|
|
|
|
def test_graphql_move_service_without_folders_on_old_volume(
|
|
authorized_client,
|
|
generic_userdata,
|
|
mock_lsblk_devices,
|
|
dummy_service: DummyService,
|
|
):
|
|
target = "sda1"
|
|
BlockDevices().update()
|
|
assert BlockDevices().get_block_device(target) is not None
|
|
|
|
dummy_service.set_simulated_moves(False)
|
|
dummy_service.set_drive("sda2")
|
|
mutation_response = api_move(authorized_client, dummy_service, target)
|
|
|
|
data = get_data(mutation_response)["services"]["moveService"]
|
|
assert_errorcode(data, 400)
|
|
assert "sda2/test_service is not found" in data["message"]
|
|
|
|
|
|
def test_graphql_move_service(
|
|
authorized_client, generic_userdata, mock_check_volume, dummy_service_with_binds, fp
|
|
):
|
|
dummy_service = dummy_service_with_binds
|
|
|
|
origin = "sda1"
|
|
target = "sda2"
|
|
assert BlockDevices().get_block_device(target) is not None
|
|
assert BlockDevices().get_block_device(origin) is not None
|
|
|
|
dummy_service.set_drive(origin)
|
|
dummy_service.set_simulated_moves(False)
|
|
|
|
unit_name = "sp-nixos-rebuild.service"
|
|
rebuild_command = ["systemctl", "start", unit_name]
|
|
prepare_nixos_rebuild_calls(fp, unit_name)
|
|
|
|
# We will be mounting and remounting folders
|
|
mount_command = ["mount", fp.any()]
|
|
unmount_command = ["umount", fp.any()]
|
|
fp.pass_command(mount_command, 2)
|
|
fp.pass_command(unmount_command, 2)
|
|
|
|
# We will be changing ownership
|
|
chown_command = ["chown", fp.any()]
|
|
fp.pass_command(chown_command, 2)
|
|
|
|
mutation_response = api_move(authorized_client, dummy_service, target)
|
|
|
|
data = get_data(mutation_response)["services"]["moveService"]
|
|
assert_ok(data)
|
|
assert data["service"] is not None
|
|
|
|
assert fp.call_count(rebuild_command) == 1
|
|
assert fp.call_count(mount_command) == 2
|
|
assert fp.call_count(unmount_command) == 2
|
|
assert fp.call_count(chown_command) == 2
|
|
|
|
|
|
def test_mailservice_cannot_enable_disable(authorized_client):
|
|
mailservice = get_service_by_id("simple-nixos-mailserver")
|
|
|
|
mutation_response = api_enable(authorized_client, mailservice)
|
|
data = get_data(mutation_response)["services"]["enableService"]
|
|
assert_errorcode(data, 400)
|
|
# TODO?: we cannot convert mailservice to graphql Service without /var/domain yet
|
|
# assert data["service"] is not None
|
|
|
|
mutation_response = api_disable(authorized_client, mailservice)
|
|
data = get_data(mutation_response)["services"]["disableService"]
|
|
assert_errorcode(data, 400)
|
|
# assert data["service"] is not None
|