2022-07-11 13:42:51 +00:00
|
|
|
# pylint: disable=redefined-outer-name
|
|
|
|
# pylint: disable=unused-argument
|
|
|
|
# pylint: disable=missing-function-docstring
|
|
|
|
import pytest
|
|
|
|
|
|
|
|
|
|
|
|
class ProcessMock:
|
|
|
|
"""Mock subprocess.Popen"""
|
|
|
|
|
|
|
|
def __init__(self, args, **kwargs):
|
|
|
|
self.args = args
|
|
|
|
self.kwargs = kwargs
|
|
|
|
|
|
|
|
def communicate(): # pylint: disable=no-method-argument
|
|
|
|
return (b"", None)
|
|
|
|
|
|
|
|
returncode = 0
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def mock_subprocess_popen(mocker):
|
|
|
|
mock = mocker.patch("subprocess.Popen", autospec=True, return_value=ProcessMock)
|
|
|
|
return mock
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def mock_os_chdir(mocker):
|
|
|
|
mock = mocker.patch("os.chdir", autospec=True)
|
|
|
|
return mock
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.fixture
|
|
|
|
def mock_subprocess_check_output(mocker):
|
|
|
|
mock = mocker.patch(
|
|
|
|
"subprocess.check_output", autospec=True, return_value=b"Testing Linux"
|
|
|
|
)
|
|
|
|
return mock
|
|
|
|
|
|
|
|
|
|
|
|
API_REBUILD_SYSTEM_MUTATION = """
|
2022-08-01 19:32:20 +00:00
|
|
|
mutation rebuildSystem {
|
2023-06-21 03:46:56 +00:00
|
|
|
system {
|
|
|
|
runSystemRebuild {
|
|
|
|
success
|
|
|
|
message
|
|
|
|
code
|
|
|
|
}
|
2022-07-11 13:42:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_system_rebuild_unauthorized(client, mock_subprocess_popen):
|
|
|
|
"""Test system rebuild without authorization"""
|
|
|
|
response = client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_REBUILD_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is None
|
2022-07-11 13:42:51 +00:00
|
|
|
assert mock_subprocess_popen.call_count == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_system_rebuild(authorized_client, mock_subprocess_popen):
|
|
|
|
"""Test system rebuild"""
|
|
|
|
response = authorized_client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_REBUILD_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is not None
|
2023-06-21 03:46:56 +00:00
|
|
|
assert response.json()["data"]["system"]["runSystemRebuild"]["success"] is True
|
|
|
|
assert response.json()["data"]["system"]["runSystemRebuild"]["message"] is not None
|
|
|
|
assert response.json()["data"]["system"]["runSystemRebuild"]["code"] == 200
|
2022-07-11 13:42:51 +00:00
|
|
|
assert mock_subprocess_popen.call_count == 1
|
|
|
|
assert mock_subprocess_popen.call_args[0][0] == [
|
|
|
|
"systemctl",
|
|
|
|
"start",
|
|
|
|
"sp-nixos-rebuild.service",
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
API_UPGRADE_SYSTEM_MUTATION = """
|
2022-08-01 19:32:20 +00:00
|
|
|
mutation upgradeSystem {
|
2023-06-21 03:46:56 +00:00
|
|
|
system {
|
|
|
|
runSystemUpgrade {
|
|
|
|
success
|
|
|
|
message
|
|
|
|
code
|
|
|
|
}
|
2022-07-11 13:42:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_system_upgrade_unauthorized(client, mock_subprocess_popen):
|
|
|
|
"""Test system upgrade without authorization"""
|
|
|
|
response = client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_UPGRADE_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is None
|
2022-07-11 13:42:51 +00:00
|
|
|
assert mock_subprocess_popen.call_count == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_system_upgrade(authorized_client, mock_subprocess_popen):
|
|
|
|
"""Test system upgrade"""
|
|
|
|
response = authorized_client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_UPGRADE_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is not None
|
2023-06-21 03:46:56 +00:00
|
|
|
assert response.json()["data"]["system"]["runSystemUpgrade"]["success"] is True
|
|
|
|
assert response.json()["data"]["system"]["runSystemUpgrade"]["message"] is not None
|
|
|
|
assert response.json()["data"]["system"]["runSystemUpgrade"]["code"] == 200
|
2022-07-11 13:42:51 +00:00
|
|
|
assert mock_subprocess_popen.call_count == 1
|
|
|
|
assert mock_subprocess_popen.call_args[0][0] == [
|
|
|
|
"systemctl",
|
|
|
|
"start",
|
|
|
|
"sp-nixos-upgrade.service",
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
API_ROLLBACK_SYSTEM_MUTATION = """
|
2022-08-01 19:32:20 +00:00
|
|
|
mutation rollbackSystem {
|
2023-06-21 03:46:56 +00:00
|
|
|
system {
|
|
|
|
runSystemRollback {
|
|
|
|
success
|
|
|
|
message
|
|
|
|
code
|
|
|
|
}
|
2022-07-11 13:42:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_system_rollback_unauthorized(client, mock_subprocess_popen):
|
|
|
|
"""Test system rollback without authorization"""
|
|
|
|
response = client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_ROLLBACK_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is None
|
2022-07-11 13:42:51 +00:00
|
|
|
assert mock_subprocess_popen.call_count == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_system_rollback(authorized_client, mock_subprocess_popen):
|
|
|
|
"""Test system rollback"""
|
|
|
|
response = authorized_client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_ROLLBACK_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is not None
|
2023-06-21 03:46:56 +00:00
|
|
|
assert response.json()["data"]["system"]["runSystemRollback"]["success"] is True
|
|
|
|
assert response.json()["data"]["system"]["runSystemRollback"]["message"] is not None
|
|
|
|
assert response.json()["data"]["system"]["runSystemRollback"]["code"] == 200
|
2022-07-11 13:42:51 +00:00
|
|
|
assert mock_subprocess_popen.call_count == 1
|
|
|
|
assert mock_subprocess_popen.call_args[0][0] == [
|
|
|
|
"systemctl",
|
|
|
|
"start",
|
|
|
|
"sp-nixos-rollback.service",
|
|
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
API_REBOOT_SYSTEM_MUTATION = """
|
|
|
|
mutation system {
|
2023-06-21 03:46:56 +00:00
|
|
|
system {
|
|
|
|
rebootSystem {
|
|
|
|
success
|
|
|
|
message
|
|
|
|
code
|
|
|
|
}
|
2022-07-11 13:42:51 +00:00
|
|
|
}
|
|
|
|
}
|
|
|
|
"""
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_reboot_system_unauthorized(client, mock_subprocess_popen):
|
|
|
|
response = client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_REBOOT_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is None
|
2022-07-11 13:42:51 +00:00
|
|
|
|
|
|
|
assert mock_subprocess_popen.call_count == 0
|
|
|
|
|
|
|
|
|
|
|
|
def test_graphql_reboot_system(authorized_client, mock_subprocess_popen):
|
|
|
|
response = authorized_client.post(
|
|
|
|
"/graphql",
|
|
|
|
json={
|
|
|
|
"query": API_REBOOT_SYSTEM_MUTATION,
|
|
|
|
},
|
|
|
|
)
|
|
|
|
|
|
|
|
assert response.status_code == 200
|
2022-08-25 17:03:56 +00:00
|
|
|
assert response.json().get("data") is not None
|
2022-07-11 13:42:51 +00:00
|
|
|
|
2023-06-21 03:46:56 +00:00
|
|
|
assert response.json()["data"]["system"]["rebootSystem"]["success"] is True
|
|
|
|
assert response.json()["data"]["system"]["rebootSystem"]["message"] is not None
|
|
|
|
assert response.json()["data"]["system"]["rebootSystem"]["code"] == 200
|
2022-07-11 13:42:51 +00:00
|
|
|
|
|
|
|
assert mock_subprocess_popen.call_count == 1
|
|
|
|
assert mock_subprocess_popen.call_args[0][0] == ["reboot"]
|