selfprivacy-rest-api/tests/test_graphql/test_api_monitoring.py

224 lines
6.7 KiB
Python
Raw Normal View History

2024-07-07 12:33:15 +00:00
# pylint: disable=redefined-outer-name
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
from datetime import datetime
2024-07-07 12:33:15 +00:00
import pytest
2024-07-10 12:49:27 +00:00
from selfprivacy_api.utils.prometheus import PrometheusQueryResult
2024-07-07 12:33:15 +00:00
from tests.test_graphql.common import (
assert_empty,
get_data,
)
2024-07-08 15:18:07 +00:00
def generate_mock_metrics(name: str):
return {
"data": {
"monitoring": {
f"{name}": {
2024-07-10 12:49:27 +00:00
"resultType": "matrix",
2024-07-08 15:18:07 +00:00
"result": [
{
"metric": {"instance": "127.0.0.1:9002"},
"values": [
[1720135748, "3.75"],
[1720135808, "4.525000000139698"],
[1720135868, "4.541666666433841"],
[1720135928, "4.574999999798209"],
[1720135988, "4.579166666759804"],
[1720136048, "3.8791666664959195"],
[1720136108, "4.5458333333954215"],
[1720136168, "4.566666666651145"],
[1720136228, "4.791666666666671"],
[1720136288, "4.720833333364382"],
[1720136348, "3.9624999999068677"],
[1720136408, "4.6875"],
[1720136468, "4.404166666790843"],
[1720136528, "4.31666666680637"],
[1720136588, "4.358333333317816"],
[1720136648, "3.7083333334885538"],
[1720136708, "4.558333333116025"],
[1720136768, "4.729166666511446"],
[1720136828, "4.75416666672875"],
[1720136888, "4.624999999844775"],
[1720136948, "3.9041666667132375"],
],
}
],
}
2024-07-07 12:33:15 +00:00
}
}
}
MOCK_CPU_USAGE_RESPONSE = generate_mock_metrics("cpuUsage")
MOCK_DISK_USAGE_RESPONSE = generate_mock_metrics("diskUsage")
MOCK_MEMORY_USAGE_RESPONSE = generate_mock_metrics("memoryUsage")
2024-07-08 15:18:07 +00:00
def generate_mock_query(name):
return f"""
query Query {{
monitoring {{
{name} {{ resultType, result }}
2024-07-08 15:18:07 +00:00
}}
}}
"""
2024-07-07 12:33:15 +00:00
2024-07-08 15:18:07 +00:00
def generate_mock_query_with_options(name):
return f"""
query Query($start: DateTime, $end: DateTime, $step: Int) {{
2024-07-08 15:18:07 +00:00
monitoring {{
{name}(start: $start, end: $end, step: $step) {{ resultType, result }}
2024-07-08 15:18:07 +00:00
}}
}}
"""
2024-07-07 12:33:15 +00:00
2024-07-10 12:49:27 +00:00
def prometheus_result_from_dict(dict):
return PrometheusQueryResult(result_type=dict["resultType"], result=dict["result"])
2024-07-08 15:00:49 +00:00
@pytest.fixture
def mock_cpu_usage(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.prometheus.PrometheusQueries._send_query",
return_value=prometheus_result_from_dict(
MOCK_CPU_USAGE_RESPONSE["data"]["monitoring"]["cpuUsage"]
),
2024-07-08 15:00:49 +00:00
)
return mock
2024-07-08 15:18:07 +00:00
@pytest.fixture
def mock_memory_usage(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.prometheus.PrometheusQueries._send_query",
return_value=prometheus_result_from_dict(
MOCK_MEMORY_USAGE_RESPONSE["data"]["monitoring"]["memoryUsage"]
),
2024-07-08 15:18:07 +00:00
)
return mock
2024-07-08 15:00:49 +00:00
@pytest.fixture
def mock_disk_usage(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.prometheus.PrometheusQueries._send_query",
return_value=prometheus_result_from_dict(
MOCK_DISK_USAGE_RESPONSE["data"]["monitoring"]["diskUsage"]
),
2024-07-08 15:00:49 +00:00
)
return mock
2024-07-08 15:18:07 +00:00
def test_graphql_get_disk_usage(client, authorized_client, mock_disk_usage):
2024-07-07 12:33:15 +00:00
response = authorized_client.post(
"/graphql",
json={"query": generate_mock_query("diskUsage")},
2024-07-07 12:33:15 +00:00
)
2024-07-08 15:00:49 +00:00
data = get_data(response)
2024-07-10 12:49:27 +00:00
assert data == MOCK_DISK_USAGE_RESPONSE["data"]
2024-07-07 12:33:15 +00:00
2024-07-08 15:18:07 +00:00
def test_graphql_get_disk_usage_with_options(
2024-07-08 15:00:49 +00:00
client, authorized_client, mock_disk_usage
):
2024-07-07 12:33:15 +00:00
response = authorized_client.post(
"/graphql",
json={
"query": generate_mock_query_with_options("diskUsage"),
2024-07-08 15:18:07 +00:00
"variables": {
"start": datetime.fromtimestamp(1720136108).isoformat(),
"end": datetime.fromtimestamp(1720137319).isoformat(),
2024-07-08 15:18:07 +00:00
"step": 90,
},
},
)
data = get_data(response)
2024-07-10 12:49:27 +00:00
assert data == MOCK_DISK_USAGE_RESPONSE["data"]
2024-07-08 15:18:07 +00:00
def test_graphql_get_disk_usage_unauthorized(client):
response = client.post(
"/graphql",
json={"query": generate_mock_query("diskUsage")},
2024-07-08 15:18:07 +00:00
)
assert_empty(response)
def test_graphql_get_memory_usage(client, authorized_client, mock_memory_usage):
response = authorized_client.post(
"/graphql",
json={"query": generate_mock_query("memoryUsage")},
2024-07-08 15:18:07 +00:00
)
data = get_data(response)
2024-07-10 12:49:27 +00:00
assert data == MOCK_MEMORY_USAGE_RESPONSE["data"]
2024-07-08 15:18:07 +00:00
def test_graphql_get_memory_usage_with_options(
client, authorized_client, mock_memory_usage
):
response = authorized_client.post(
"/graphql",
json={
"query": generate_mock_query_with_options("memoryUsage"),
2024-07-08 15:18:07 +00:00
"variables": {
"start": datetime.fromtimestamp(1720136108).isoformat(),
"end": datetime.fromtimestamp(1720137319).isoformat(),
2024-07-08 15:18:07 +00:00
"step": 90,
},
},
)
data = get_data(response)
2024-07-10 12:49:27 +00:00
assert data == MOCK_MEMORY_USAGE_RESPONSE["data"]
2024-07-08 15:18:07 +00:00
def test_graphql_get_memory_usage_unauthorized(client):
response = client.post(
"/graphql",
json={"query": generate_mock_query("memoryUsage")},
2024-07-08 15:18:07 +00:00
)
assert_empty(response)
def test_graphql_get_cpu_usage(client, authorized_client, mock_cpu_usage):
response = authorized_client.post(
"/graphql",
json={"query": generate_mock_query("cpuUsage")},
2024-07-08 15:18:07 +00:00
)
data = get_data(response)
2024-07-10 12:49:27 +00:00
assert data == MOCK_CPU_USAGE_RESPONSE["data"]
2024-07-08 15:18:07 +00:00
def test_graphql_get_cpu_usage_with_options(client, authorized_client, mock_cpu_usage):
response = authorized_client.post(
"/graphql",
json={
"query": generate_mock_query_with_options("cpuUsage"),
2024-07-07 12:33:15 +00:00
"variables": {
"start": datetime.fromtimestamp(1720136108).isoformat(),
"end": datetime.fromtimestamp(1720137319).isoformat(),
2024-07-07 12:33:15 +00:00
"step": 90,
},
},
)
2024-07-08 15:00:49 +00:00
data = get_data(response)
2024-07-10 12:49:27 +00:00
assert data == MOCK_CPU_USAGE_RESPONSE["data"]
2024-07-07 12:33:15 +00:00
2024-07-08 15:18:07 +00:00
def test_graphql_get_cpu_usage_unauthorized(client):
2024-07-07 12:33:15 +00:00
response = client.post(
"/graphql",
json={"query": generate_mock_query("cpuUsage")},
2024-07-07 12:33:15 +00:00
)
assert_empty(response)