mirror of
https://git.selfprivacy.org/SelfPrivacy/selfprivacy-rest-api.git
synced 2024-11-29 15:31:28 +00:00
599 lines
15 KiB
Python
599 lines
15 KiB
Python
import pytest
|
|
|
|
|
|
def get_service_by_id_return_none_mock():
|
|
return None
|
|
|
|
|
|
def get_service_by_id_mock():
|
|
return "nextcloud"
|
|
|
|
|
|
def service_to_graphql_service_mock():
|
|
pass
|
|
|
|
|
|
class BlockDevicesMock:
|
|
def get_block_device(self, name: str):
|
|
pass
|
|
|
|
|
|
class BlockDevicesReturnNoneMock:
|
|
def get_block_device(self, name: str):
|
|
return None
|
|
|
|
|
|
class NextcloudMock:
|
|
def __init__(self, args, **kwargs):
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def enable(self):
|
|
pass
|
|
|
|
def disable(self):
|
|
pass
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
def is_movable(self):
|
|
return True
|
|
|
|
def move_to_volume(self):
|
|
pass
|
|
|
|
returncode = 0
|
|
|
|
|
|
class NextcloudReturnFalseMock:
|
|
def __init__(self, args, **kwargs):
|
|
self.args = args
|
|
self.kwargs = kwargs
|
|
|
|
def enable(self):
|
|
pass
|
|
|
|
def disable(self):
|
|
pass
|
|
|
|
def stop(self):
|
|
pass
|
|
|
|
def is_movable(self):
|
|
return False
|
|
|
|
def move_to_volume(self):
|
|
pass
|
|
|
|
returncode = 0
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_service_to_graphql_service(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.graphql.common_types.service.service_to_graphql_service",
|
|
autospec=True,
|
|
return_value=service_to_graphql_service_mock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_nextcloud(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
|
|
autospec=True,
|
|
return_value=NextcloudMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_block_devices_return_none(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.utils.block_devices.BlockDevices",
|
|
autospec=True,
|
|
return_value=BlockDevicesReturnNoneMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_block_devices(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.utils.block_devices.BlockDevices",
|
|
autospec=True,
|
|
return_value=BlockDevicesMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_nextcloud_return_false(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.services.nextcloud.__init__.Nextcloud",
|
|
autospec=True,
|
|
return_value=NextcloudReturnFalseMock,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_get_service_by_id_return_none(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.services.__init__.get_service_by_id",
|
|
autospec=True,
|
|
return_value=mock_get_service_by_id_return_none,
|
|
)
|
|
return mock
|
|
|
|
|
|
@pytest.fixture
|
|
def mock_get_service_by_id(mocker):
|
|
mock = mocker.patch(
|
|
"selfprivacy_api.services.__init__.get_service_by_id",
|
|
autospec=True,
|
|
return_value=mock_get_service_by_id,
|
|
)
|
|
return mock
|
|
|
|
|
|
####################################################################
|
|
|
|
|
|
API_ENABLE_SERVICE_MUTATION = """
|
|
mutation enableService($service_id: String!) {
|
|
enableService(service_id: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_enable_service_unathorized_client(
|
|
client, mock_get_service_by_id_return_none, mock_nextcloud
|
|
):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_ENABLE_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_graphql_enable_not_found_service(
|
|
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_ENABLE_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["enableService"]["code"] == 404
|
|
assert response.json()["data"]["enableService"]["message"] is not None
|
|
assert response.json()["data"]["enableService"]["success"] is False
|
|
|
|
|
|
def test_graphql_enable_service(
|
|
authorized_client, mock_get_service_by_id, mock_nextcloud
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_ENABLE_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["enableService"]["code"] == 200
|
|
assert response.json()["data"]["enableService"]["message"] is not None
|
|
assert response.json()["data"]["enableService"]["success"] is True
|
|
|
|
|
|
API_DISABLE_SERVICE_MUTATION = """
|
|
mutation disableService($service_id: String!) {
|
|
disableService(service_id: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_disable_service_unathorized_client(
|
|
client, mock_get_service_by_id_return_none, mock_nextcloud
|
|
):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DISABLE_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_graphql_disable_not_found_service(
|
|
authorized_client, mock_get_service_by_id_return_none, mock_nextcloud
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DISABLE_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["disableService"]["code"] == 404
|
|
assert response.json()["data"]["disableService"]["message"] is not None
|
|
assert response.json()["data"]["disableService"]["success"] is False
|
|
|
|
|
|
def test_graphql_disable_services(
|
|
authorized_client, mock_get_service_by_id, mock_nextcloud
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_DISABLE_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["disableService"]["code"] == 200
|
|
assert response.json()["data"]["disableService"]["message"] is not None
|
|
assert response.json()["data"]["disableService"]["success"] is True
|
|
|
|
|
|
API_STOP_SERVICE_MUTATION = """
|
|
mutation stopService($service_id: String!) {
|
|
stopService(service_id: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_stop_service_unathorized_client(
|
|
client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_STOP_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_graphql_stop_not_found_service(
|
|
authorized_client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_STOP_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["stopService"]["code"] == 404
|
|
assert response.json()["data"]["stopService"]["message"] is not None
|
|
assert response.json()["data"]["stopService"]["success"] is False
|
|
|
|
|
|
def test_graphql_stop_services(
|
|
authorized_client,
|
|
mock_get_service_by_id,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_STOP_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["stopService"]["code"] == 200
|
|
assert response.json()["data"]["stopService"]["message"] is not None
|
|
assert response.json()["data"]["stopService"]["success"] is True
|
|
|
|
|
|
API_START_SERVICE_MUTATION = """
|
|
mutation startService($service_id: String!) {
|
|
startService(service_id: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_start_service_unathorized_client(
|
|
client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_START_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_graphql_start_not_found_service(
|
|
authorized_client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_START_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["startService"]["code"] == 404
|
|
assert response.json()["data"]["startService"]["message"] is not None
|
|
assert response.json()["data"]["startService"]["success"] is False
|
|
|
|
|
|
def test_graphql_start_services(
|
|
authorized_client,
|
|
mock_get_service_by_id,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_START_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["startService"]["code"] == 200
|
|
assert response.json()["data"]["startService"]["message"] is not None
|
|
assert response.json()["data"]["startService"]["success"] is True
|
|
|
|
|
|
API_RESTART_SERVICE_MUTATION = """
|
|
mutation restartService($service_id: String!) {
|
|
restartService(service_id: $service_id) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_restart_service_unathorized_client(
|
|
client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_RESTART_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_graphql_restart_not_found_service(
|
|
authorized_client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_RESTART_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["restartService"]["code"] == 404
|
|
assert response.json()["data"]["restartService"]["message"] is not None
|
|
assert response.json()["data"]["restartService"]["success"] is False
|
|
|
|
|
|
def test_graphql_restart_service(
|
|
authorized_client,
|
|
mock_get_service_by_id,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_RESTART_SERVICE_MUTATION,
|
|
"variables": {"service_id": "nextcloud"},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["restartService"]["code"] == 200
|
|
assert response.json()["data"]["restartService"]["message"] is not None
|
|
assert response.json()["data"]["restartService"]["success"] is True
|
|
|
|
|
|
API_MOVE_SERVICE_MUTATION = """
|
|
mutation moveService($input: MoveServiceInput!) {
|
|
moveService(input: $input) {
|
|
success
|
|
message
|
|
code
|
|
}
|
|
}
|
|
"""
|
|
|
|
|
|
def test_graphql_move_service_unathorized_client(
|
|
client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_MOVE_SERVICE_MUTATION,
|
|
"variables": {
|
|
"input": {"service_id": "nextcloud", "location": "sdx"},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is None
|
|
|
|
|
|
def test_graphql_move_not_found_service(
|
|
authorized_client,
|
|
mock_get_service_by_id_return_none,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_MOVE_SERVICE_MUTATION,
|
|
"variables": {
|
|
"input": {"service_id": "nextcloud", "location": "sdx"},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["moveService"]["code"] == 404
|
|
assert response.json()["data"]["moveService"]["message"] is not None
|
|
assert response.json()["data"]["moveService"]["success"] is False
|
|
|
|
|
|
def test_graphql_move_not_moveble_service(
|
|
authorized_client,
|
|
mock_get_service_by_id,
|
|
mock_nextcloud_return_false,
|
|
mock_service_to_graphql_service,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_MOVE_SERVICE_MUTATION,
|
|
"variables": {
|
|
"input": {"service_id": "nextcloud", "location": "sdx"},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["moveService"]["code"] == 400
|
|
assert response.json()["data"]["moveService"]["message"] is not None
|
|
assert response.json()["data"]["moveService"]["success"] is False
|
|
|
|
|
|
def test_graphql_move_service_volume_not_found(
|
|
authorized_client,
|
|
mock_get_service_by_id,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
mock_block_devices_return_none,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_MOVE_SERVICE_MUTATION,
|
|
"variables": {
|
|
"input": {"service_id": "nextcloud", "location": "sdx"},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["moveService"]["code"] == 400
|
|
assert response.json()["data"]["moveService"]["message"] is not None
|
|
assert response.json()["data"]["moveService"]["success"] is False
|
|
|
|
|
|
def test_graphql_move_service(
|
|
authorized_client,
|
|
mock_get_service_by_id,
|
|
mock_nextcloud,
|
|
mock_service_to_graphql_service,
|
|
mock_block_devices,
|
|
):
|
|
response = authorized_client.post(
|
|
"/graphql",
|
|
json={
|
|
"query": API_MOVE_SERVICE_MUTATION,
|
|
"variables": {
|
|
"input": {"service_id": "nextcloud", "location": "sdx"},
|
|
},
|
|
},
|
|
)
|
|
assert response.status_code == 200
|
|
assert response.json().get("data") is not None
|
|
|
|
assert response.json()["data"]["moveService"]["code"] == 200
|
|
assert response.json()["data"]["moveService"]["message"] is not None
|
|
assert response.json()["data"]["moveService"]["success"] is True
|