# pylint: disable=redefined-outer-name # pylint: disable=unused-argument # pylint: disable=missing-function-docstring from datetime import datetime import pytest from selfprivacy_api.utils.prometheus import MonitoringQueryResult from tests.test_graphql.common import ( assert_empty, get_data, ) def generate_mock_metrics(name: str): return { "data": { "monitoring": { f"{name}": { "resultType": "matrix", "result": [ { "metric": {"instance": "127.0.0.1:9002"}, "values": [ [1720135748, "3.75"], [1720135808, "4.525000000139698"], [1720135868, "4.541666666433841"], [1720135928, "4.574999999798209"], [1720135988, "4.579166666759804"], [1720136048, "3.8791666664959195"], [1720136108, "4.5458333333954215"], [1720136168, "4.566666666651145"], [1720136228, "4.791666666666671"], [1720136288, "4.720833333364382"], [1720136348, "3.9624999999068677"], [1720136408, "4.6875"], [1720136468, "4.404166666790843"], [1720136528, "4.31666666680637"], [1720136588, "4.358333333317816"], [1720136648, "3.7083333334885538"], [1720136708, "4.558333333116025"], [1720136768, "4.729166666511446"], [1720136828, "4.75416666672875"], [1720136888, "4.624999999844775"], [1720136948, "3.9041666667132375"], ], } ], } } } } MOCK_CPU_USAGE_RESPONSE = generate_mock_metrics("cpuUsage") MOCK_DISK_USAGE_RESPONSE = generate_mock_metrics("diskUsage") MOCK_MEMORY_USAGE_RESPONSE = generate_mock_metrics("memoryUsage") def generate_mock_query(name): return f""" query Query {{ monitoring {{ {name} {{ resultType, result }} }} }} """ def generate_mock_query_with_options(name): return f""" query Query($start: DateTime, $end: DateTime, $step: Int) {{ monitoring {{ {name}(start: $start, end: $end, step: $step) {{ resultType, result }} }} }} """ def prometheus_result_from_dict(dict): return MonitoringQueryResult(result_type=dict["resultType"], result=dict["result"]) @pytest.fixture def mock_cpu_usage(mocker): mock = mocker.patch( "selfprivacy_api.utils.prometheus.PrometheusQueries._send_query", return_value=prometheus_result_from_dict( MOCK_CPU_USAGE_RESPONSE["data"]["monitoring"]["cpuUsage"] ), ) return mock @pytest.fixture def mock_memory_usage(mocker): mock = mocker.patch( "selfprivacy_api.utils.prometheus.PrometheusQueries._send_query", return_value=prometheus_result_from_dict( MOCK_MEMORY_USAGE_RESPONSE["data"]["monitoring"]["memoryUsage"] ), ) return mock @pytest.fixture def mock_disk_usage(mocker): mock = mocker.patch( "selfprivacy_api.utils.prometheus.PrometheusQueries._send_query", return_value=prometheus_result_from_dict( MOCK_DISK_USAGE_RESPONSE["data"]["monitoring"]["diskUsage"] ), ) return mock def test_graphql_get_disk_usage(client, authorized_client, mock_disk_usage): response = authorized_client.post( "/graphql", json={"query": generate_mock_query("diskUsage")}, ) data = get_data(response) assert data == MOCK_DISK_USAGE_RESPONSE["data"] def test_graphql_get_disk_usage_with_options( client, authorized_client, mock_disk_usage ): response = authorized_client.post( "/graphql", json={ "query": generate_mock_query_with_options("diskUsage"), "variables": { "start": datetime.fromtimestamp(1720136108).isoformat(), "end": datetime.fromtimestamp(1720137319).isoformat(), "step": 90, }, }, ) data = get_data(response) assert data == MOCK_DISK_USAGE_RESPONSE["data"] def test_graphql_get_disk_usage_unauthorized(client): response = client.post( "/graphql", json={"query": generate_mock_query("diskUsage")}, ) assert_empty(response) def test_graphql_get_memory_usage(client, authorized_client, mock_memory_usage): response = authorized_client.post( "/graphql", json={"query": generate_mock_query("memoryUsage")}, ) data = get_data(response) assert data == MOCK_MEMORY_USAGE_RESPONSE["data"] def test_graphql_get_memory_usage_with_options( client, authorized_client, mock_memory_usage ): response = authorized_client.post( "/graphql", json={ "query": generate_mock_query_with_options("memoryUsage"), "variables": { "start": datetime.fromtimestamp(1720136108).isoformat(), "end": datetime.fromtimestamp(1720137319).isoformat(), "step": 90, }, }, ) data = get_data(response) assert data == MOCK_MEMORY_USAGE_RESPONSE["data"] def test_graphql_get_memory_usage_unauthorized(client): response = client.post( "/graphql", json={"query": generate_mock_query("memoryUsage")}, ) assert_empty(response) def test_graphql_get_cpu_usage(client, authorized_client, mock_cpu_usage): response = authorized_client.post( "/graphql", json={"query": generate_mock_query("cpuUsage")}, ) data = get_data(response) assert data == MOCK_CPU_USAGE_RESPONSE["data"] def test_graphql_get_cpu_usage_with_options(client, authorized_client, mock_cpu_usage): response = authorized_client.post( "/graphql", json={ "query": generate_mock_query_with_options("cpuUsage"), "variables": { "start": datetime.fromtimestamp(1720136108).isoformat(), "end": datetime.fromtimestamp(1720137319).isoformat(), "step": 90, }, }, ) data = get_data(response) assert data == MOCK_CPU_USAGE_RESPONSE["data"] def test_graphql_get_cpu_usage_unauthorized(client): response = client.post( "/graphql", json={"query": generate_mock_query("cpuUsage")}, ) assert_empty(response)