feat: make query result typed (WIP, tests are broken)

This commit is contained in:
nhnn 2024-07-27 15:37:38 +03:00
parent 57c5b9781d
commit 2d07505b4d
4 changed files with 206 additions and 91 deletions

View file

@ -6,23 +6,24 @@ from selfprivacy_api.services.prometheus import Prometheus
from selfprivacy_api.utils.monitoring import (
MonitoringQueries,
MonitoringQueryError,
MonitoringResponse,
MonitoringValuesResult,
MonitoringMetricsResult,
)
@strawberry.type
class Monitoring:
@strawberry.field
def disk_usage(
def cpu_usage(
self,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> MonitoringResponse:
) -> MonitoringValuesResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.disk_usage(start, end, step)
return MonitoringQueries.cpu_usage(start, end, step)
@strawberry.field
def memory_usage(
@ -30,23 +31,23 @@ class Monitoring:
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> MonitoringResponse:
) -> MonitoringValuesResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.memory_usage(start, end, step)
@strawberry.field
def cpu_usage(
def disk_usage(
self,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> MonitoringResponse:
) -> MonitoringMetricsResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.cpu_usage(start, end, step)
return MonitoringQueries.disk_usage(start, end, step)
@strawberry.field
def network_usage(
@ -54,8 +55,8 @@ class Monitoring:
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> MonitoringResponse:
) -> MonitoringMetricsResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.cpu_usage(start, end, step)
return MonitoringQueries.network_usage(start, end, step)

View file

@ -26,9 +26,9 @@ class AddMonitoring(Migration):
def migrate(self) -> None:
with FlakeServiceManager() as manager:
if "monitoring" not in manager.services:
manager.services[
"monitoring"
] = "git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/monitoring"
manager.services["monitoring"] = (
"git+https://git.selfprivacy.org/SelfPrivacy/selfprivacy-nixos-config.git?ref=flakes&dir=sp-modules/monitoring"
)
with WriteUserData() as data:
if "monitoring" not in data["modules"]:
data["modules"]["monitoring"] = {

View file

@ -4,10 +4,9 @@
import requests
import strawberry
from strawberry.scalars import JSON
from dataclasses import dataclass
from typing import Optional, Annotated, Union
from typing import Optional, Annotated, Union, List, Tuple
from datetime import datetime, timedelta
PROMETHEUS_URL = "http://localhost:9001"
@ -15,9 +14,16 @@ PROMETHEUS_URL = "http://localhost:9001"
@strawberry.type
@dataclass
class MonitoringQueryResult:
result_type: str
result: JSON
class MonitoringValue:
timestamp: datetime
value: str
@strawberry.type
@dataclass
class MonitoringMetric:
id: str
values: List[MonitoringValue]
@strawberry.type
@ -25,15 +31,23 @@ class MonitoringQueryError:
error: str
MonitoringResponse = Annotated[
Union[MonitoringQueryResult, MonitoringQueryError],
strawberry.union("MonitoringQueryResponse"),
MonitoringValuesResult = Annotated[
Union[List[MonitoringValue], MonitoringQueryError],
strawberry.union("MonitoringValuesResult"),
]
MonitoringMetricsResult = Annotated[
Union[List[MonitoringMetric], MonitoringQueryError],
strawberry.union("MonitoringMetricsResult"),
]
class MonitoringQueries:
@staticmethod
def _send_query(query: str, start: int, end: int, step: int) -> MonitoringResponse:
def _send_query(
query: str, start: int, end: int, step: int, result_type: Optional[str] = None
) -> Union[dict, MonitoringQueryError]:
try:
response = requests.get(
f"{PROMETHEUS_URL}/api/v1/query_range",
@ -49,20 +63,45 @@ class MonitoringQueries:
error="Prometheus returned unexpected HTTP status code"
)
json = response.json()
return MonitoringQueryResult(
result_type=json["data"]["resultType"], result=json["data"]["result"]
if result_type and json["data"]["resultType"] != result_type:
return MonitoringQueryError(
error="Unexpected resultType returned from Prometheus, request failed"
)
return json["data"]
except Exception as error:
return MonitoringQueryError(
error=f"Prometheus request failed! Error: {str(error)}"
)
@staticmethod
def _prometheus_value_to_monitoring_value(x: Tuple[int, str]):
return MonitoringValue(timestamp=datetime.fromtimestamp(x[0]), value=x[1])
@staticmethod
def _prometheus_respone_to_monitoring_metrics(
responese: dict, id_key: str
) -> List[MonitoringMetric]:
return list(
map(
lambda x: MonitoringMetric(
id=x["metric"][id_key],
values=list(
map(
MonitoringQueries._prometheus_value_to_monitoring_value,
x["values"],
)
),
),
responese["result"],
)
)
@staticmethod
def cpu_usage(
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60, # seconds
) -> MonitoringResponse:
) -> MonitoringValuesResult:
"""
Get CPU information.
@ -85,11 +124,18 @@ class MonitoringQueries:
query = '100 - (avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)'
return MonitoringQueries._send_query(
query,
start_timestamp,
end_timestamp,
step,
data = MonitoringQueries._send_query(
query, start_timestamp, end_timestamp, step, result_type="matrix"
)
if isinstance(data, MonitoringQueryError):
return data
return list(
map(
MonitoringQueries._prometheus_value_to_monitoring_value,
data["result"][0]["values"],
)
)
@staticmethod
@ -97,7 +143,7 @@ class MonitoringQueries:
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60, # seconds
) -> MonitoringResponse:
) -> MonitoringValuesResult:
"""
Get memory usage.
@ -120,11 +166,18 @@ class MonitoringQueries:
query = "100 - (100 * (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))"
return MonitoringQueries._send_query(
query,
start_timestamp,
end_timestamp,
step,
data = MonitoringQueries._send_query(
query, start_timestamp, end_timestamp, step, result_type="matrix"
)
if isinstance(data, MonitoringQueryError):
return data
return list(
map(
MonitoringQueries._prometheus_value_to_monitoring_value,
data["result"][0]["values"],
)
)
@staticmethod
@ -132,7 +185,7 @@ class MonitoringQueries:
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60, # seconds
) -> MonitoringResponse:
) -> MonitoringMetricsResult:
"""
Get disk usage information.
@ -155,11 +208,15 @@ class MonitoringQueries:
query = """100 - (100 * sum by (device) (node_filesystem_avail_bytes{fstype!="rootfs"}) / sum by (device) (node_filesystem_size_bytes{fstype!="rootfs"}))"""
return MonitoringQueries._send_query(
query,
start_timestamp,
end_timestamp,
step,
data = MonitoringQueries._send_query(
query, start_timestamp, end_timestamp, step, result_type="matrix"
)
if isinstance(data, MonitoringQueryError):
return data
return MonitoringQueries._prometheus_respone_to_monitoring_metrics(
data, "device"
)
@staticmethod
@ -167,7 +224,7 @@ class MonitoringQueries:
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60, # seconds
) -> MonitoringResponse:
) -> MonitoringMetricsResult:
"""
Get network usage information for both download and upload.
@ -195,9 +252,13 @@ class MonitoringQueries:
)
"""
return MonitoringQueries._send_query(
query,
start_timestamp,
end_timestamp,
step,
data = MonitoringQueries._send_query(
query, start_timestamp, end_timestamp, step, result_type="matrix"
)
if isinstance(data, MonitoringQueryError):
return data
return MonitoringQueries._prometheus_respone_to_monitoring_metrics(
data, "device"
)

View file

@ -2,7 +2,9 @@
# pylint: disable=unused-argument
# pylint: disable=missing-function-docstring
from dataclasses import dataclass
from datetime import datetime
from typing import List, Dict
import pytest
from selfprivacy_api.utils.monitoring import MonitoringQueryResult
@ -11,17 +13,7 @@ from tests.test_graphql.common import (
get_data,
)
def generate_mock_metrics(name: str):
return {
"data": {
"monitoring": {
f"{name}": {
"resultType": "matrix",
"result": [
{
"metric": {"instance": "127.0.0.1:9002"},
"values": [
MOCK_VALUES = [
[1720135748, "3.75"],
[1720135808, "4.525000000139698"],
[1720135868, "4.541666666433841"],
@ -43,18 +35,81 @@ def generate_mock_metrics(name: str):
[1720136828, "4.75416666672875"],
[1720136888, "4.624999999844775"],
[1720136948, "3.9041666667132375"],
],
}
],
}
}
}
}
]
@dataclass
class DumbResponse:
status_code: int
json_data: dict
def json(self):
return self.json_data
MOCK_CPU_USAGE_RESPONSE = generate_mock_metrics("cpuUsage")
MOCK_DISK_USAGE_RESPONSE = generate_mock_metrics("diskUsage")
MOCK_MEMORY_USAGE_RESPONSE = generate_mock_metrics("memoryUsage")
def generate_prometheus_response(result_type: str, result: List[Dict]):
return DumbResponse(
status_code=200,
json_data={
'data': {
'resultType': result_type,
'result': result
}
}
)
MOCK_SINGLE_METRIC_PROMETHEUS_RESPONSE = generate_prometheus_response(
'matrix',
[
{
'values': MOCK_VALUES
}
]
)
MOCK_MULTIPLE_METRIC_DEVICE_PROMETHEUS_RESPONSE = generate_prometheus_response(
'matrix',
[
{
'metric': {
'device': 'a'
},
'values': MOCK_VALUES
},
{
'metric': {
'device': 'b'
},
'values': MOCK_VALUES
},
{
'metric': {
'device': 'c'
},
'values': MOCK_VALUES
},
]
)
# def generate_mock_metrics(name: str):
# return {
# "data": {
# "monitoring": {
# f"{name}": {
# "resultType": "matrix",
# "result": [
# {
# "metric": {"instance": "127.0.0.1:9002"},
# "values": ,
# }
# ],
# }
# }
# }
# }
# MOCK_CPU_USAGE_RESPONSE = generate_mock_metrics("cpuUsage")
# MOCK_DISK_USAGE_RESPONSE = generate_mock_metrics("diskUsage")
# MOCK_MEMORY_USAGE_RESPONSE = generate_mock_metrics("memoryUsage")
def generate_mock_query(name):
@ -85,9 +140,7 @@ def prometheus_result_from_dict(dict):
def mock_cpu_usage(mocker):
mock = mocker.patch(
"selfprivacy_api.utils.prometheus.PrometheusQueries._send_query",
return_value=prometheus_result_from_dict(
MOCK_CPU_USAGE_RESPONSE["data"]["monitoring"]["cpuUsage"]
),
return_value=MOCK_CPU_USAGE_RESPONSE["data"]["monitoring"]["cpuUsage"],
)
return mock