From f83f5e840bdb342c88d2f2c65af7a24f7e0da77e Mon Sep 17 00:00:00 2001 From: Inex Code Date: Mon, 29 Jul 2024 18:25:21 +0300 Subject: [PATCH] feat: Add per-service memory stats --- selfprivacy_api/graphql/queries/monitoring.py | 22 ++- selfprivacy_api/utils/monitoring.py | 149 ++++++++++++++++-- tests/test_graphql/test_api_monitoring.py | 45 ++---- 3 files changed, 162 insertions(+), 54 deletions(-) diff --git a/selfprivacy_api/graphql/queries/monitoring.py b/selfprivacy_api/graphql/queries/monitoring.py index e000c4f..a826cf2 100644 --- a/selfprivacy_api/graphql/queries/monitoring.py +++ b/selfprivacy_api/graphql/queries/monitoring.py @@ -22,7 +22,7 @@ class CpuMonitoring: if Prometheus().get_status() != ServiceStatus.ACTIVE: return MonitoringQueryError(error="Prometheus is not running") - return MonitoringQueries.cpu_usage(self.start, self.end, self.step) + return MonitoringQueries.cpu_usage_overall(self.start, self.end, self.step) @strawberry.type @@ -36,7 +36,21 @@ class MemoryMonitoring: if Prometheus().get_status() != ServiceStatus.ACTIVE: return MonitoringQueryError(error="Prometheus is not running") - return MonitoringQueries.memory_usage(self.start, self.end, self.step) + return MonitoringQueries.memory_usage_overall(self.start, self.end, self.step) + + @strawberry.field + def average_usage_by_service(self) -> MonitoringMetricsResult: + if Prometheus().get_status() != ServiceStatus.ACTIVE: + return MonitoringQueryError(error="Prometheus is not running") + + return MonitoringQueries.memory_usage_average_by_slice(self.start, self.end) + + @strawberry.field + def max_usage_by_service(self) -> MonitoringMetricsResult: + if Prometheus().get_status() != ServiceStatus.ACTIVE: + return MonitoringQueryError(error="Prometheus is not running") + + return MonitoringQueries.memory_usage_max_by_slice(self.start, self.end) @strawberry.type @@ -50,7 +64,7 @@ class DiskMonitoring: if Prometheus().get_status() != ServiceStatus.ACTIVE: return MonitoringQueryError(error="Prometheus is not running") - return MonitoringQueries.disk_usage(self.start, self.end, self.step) + return MonitoringQueries.disk_usage_overall(self.start, self.end, self.step) @strawberry.type @@ -64,7 +78,7 @@ class NetworkMonitoring: if Prometheus().get_status() != ServiceStatus.ACTIVE: return MonitoringQueryError(error="Prometheus is not running") - return MonitoringQueries.network_usage(self.start, self.end, self.step) + return MonitoringQueries.network_usage_overall(self.start, self.end, self.step) @strawberry.type diff --git a/selfprivacy_api/utils/monitoring.py b/selfprivacy_api/utils/monitoring.py index 164fb66..4771bfb 100644 --- a/selfprivacy_api/utils/monitoring.py +++ b/selfprivacy_api/utils/monitoring.py @@ -55,7 +55,7 @@ MonitoringMetricsResult = Annotated[ class MonitoringQueries: @staticmethod - def _send_query( + def _send_range_query( query: str, start: int, end: int, step: int, result_type: Optional[str] = None ) -> Union[dict, MonitoringQueryError]: try: @@ -83,18 +83,53 @@ class MonitoringQueries: error=f"Prometheus request failed! Error: {str(error)}" ) + @staticmethod + def _send_query( + query: str, result_type: Optional[str] = None + ) -> Union[dict, MonitoringQueryError]: + try: + response = requests.get( + f"{PROMETHEUS_URL}/api/v1/query", + params={ + "query": query, + }, + ) + if response.status_code != 200: + return MonitoringQueryError( + error="Prometheus returned unexpected HTTP status code" + ) + json = response.json() + if result_type and json["data"]["resultType"] != result_type: + return MonitoringQueryError( + error="Unexpected resultType returned from Prometheus, request failed" + ) + return json["data"] + except Exception as error: + return MonitoringQueryError( + error=f"Prometheus request failed! Error: {str(error)}" + ) + @staticmethod def _prometheus_value_to_monitoring_value(x: Tuple[int, str]): return MonitoringValue(timestamp=datetime.fromtimestamp(x[0]), value=x[1]) + @staticmethod + def _clean_slice_id(slice_id: str, clean_id: bool) -> str: + """Slices come in form of `/slice_name.slice`, we need to remove the `.slice` and `/` part.""" + if clean_id: + return slice_id.split(".")[0].split("/")[1] + return slice_id + @staticmethod def _prometheus_response_to_monitoring_metrics( - response: dict, id_key: str + response: dict, id_key: str, clean_id: bool = False ) -> List[MonitoringMetric]: return list( map( lambda x: MonitoringMetric( - id=x["metric"][id_key], + id=MonitoringQueries._clean_slice_id( + x["metric"][id_key], clean_id=clean_id + ), values=list( map( MonitoringQueries._prometheus_value_to_monitoring_value, @@ -107,7 +142,18 @@ class MonitoringQueries: ) @staticmethod - def cpu_usage( + def _calculate_offset_and_duration( + start: datetime, end: datetime + ) -> Tuple[int, int]: + """Calculate the offset and duration for Prometheus queries. + They mast be in seconds. + """ + offset = int((datetime.now() - end).total_seconds()) + duration = int((end - start).total_seconds()) + return offset, duration + + @staticmethod + def cpu_usage_overall( start: Optional[datetime] = None, end: Optional[datetime] = None, step: int = 60, # seconds @@ -134,7 +180,7 @@ class MonitoringQueries: query = '100 - (avg by (instance) (rate(node_cpu_seconds_total{mode="idle"}[5m])) * 100)' - data = MonitoringQueries._send_query( + data = MonitoringQueries._send_range_query( query, start_timestamp, end_timestamp, step, result_type="matrix" ) @@ -151,7 +197,7 @@ class MonitoringQueries: ) @staticmethod - def memory_usage( + def memory_usage_overall( start: Optional[datetime] = None, end: Optional[datetime] = None, step: int = 60, # seconds @@ -178,7 +224,7 @@ class MonitoringQueries: query = "100 - (100 * (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes))" - data = MonitoringQueries._send_query( + data = MonitoringQueries._send_range_query( query, start_timestamp, end_timestamp, step, result_type="matrix" ) @@ -195,7 +241,79 @@ class MonitoringQueries: ) @staticmethod - def disk_usage( + def memory_usage_max_by_slice( + start: Optional[datetime] = None, + end: Optional[datetime] = None, + ) -> MonitoringMetricsResult: + """ + Get maximum memory usage for each service (i.e. systemd slice). + + Args: + start (datetime, optional): The start time. + Defaults to 20 minutes ago if not provided. + end (datetime, optional): The end time. + Defaults to current time if not provided. + """ + + if start is None: + start = datetime.now() - timedelta(minutes=20) + + if end is None: + end = datetime.now() + + offset, duration = MonitoringQueries._calculate_offset_and_duration(start, end) + + query = f'max_over_time(container_memory_rss{{id!~".*slice.*slice", id=~".*slice"}}[{duration}s] offset {offset}s)' + + data = MonitoringQueries._send_query(query, result_type="vector") + + if isinstance(data, MonitoringQueryError): + return data + + return MonitoringMetrics( + metrics=MonitoringQueries._prometheus_response_to_monitoring_metrics( + data, "id", clean_id=True + ) + ) + + @staticmethod + def memory_usage_average_by_slice( + start: Optional[datetime] = None, + end: Optional[datetime] = None, + ) -> MonitoringMetricsResult: + """ + Get average memory usage for each service (i.e. systemd slice). + + Args: + start (datetime, optional): The start time. + Defaults to 20 minutes ago if not provided. + end (datetime, optional): The end time. + Defaults to current time if not provided. + """ + + if start is None: + start = datetime.now() - timedelta(minutes=20) + + if end is None: + end = datetime.now() + + offset, duration = MonitoringQueries._calculate_offset_and_duration(start, end) + + query = f'avg_over_time(container_memory_rss{{id!~".*slice.*slice", id=~".*slice"}}[{duration}s] offset {offset}s)' + + data = MonitoringQueries._send_query(query, result_type="vector") + + if isinstance(data, MonitoringQueryError): + return data + + return MonitoringMetrics( + metrics=MonitoringQueries._prometheus_response_to_monitoring_metrics( + data, "id", clean_id=True + ) + ) + + @staticmethod + def disk_usage_overall( start: Optional[datetime] = None, end: Optional[datetime] = None, step: int = 60, # seconds @@ -222,7 +340,7 @@ class MonitoringQueries: query = """100 - (100 * sum by (device) (node_filesystem_avail_bytes{fstype!="rootfs"}) / sum by (device) (node_filesystem_size_bytes{fstype!="rootfs"}))""" - data = MonitoringQueries._send_query( + data = MonitoringQueries._send_range_query( query, start_timestamp, end_timestamp, step, result_type="matrix" ) @@ -236,7 +354,7 @@ class MonitoringQueries: ) @staticmethod - def network_usage( + def network_usage_overall( start: Optional[datetime] = None, end: Optional[datetime] = None, step: int = 60, # seconds @@ -262,13 +380,12 @@ class MonitoringQueries: end_timestamp = int(end.timestamp()) query = """ - ( - sum(rate(node_network_receive_bytes_total{device!="lo"}[5m])) as download, - sum(rate(node_network_transmit_bytes_total{device!="lo"}[5m])) as upload - ) + label_replace(rate(node_network_receive_bytes_total{device!="lo"}[5m]), "direction", "receive", "device", ".*") + or + label_replace(rate(node_network_transmit_bytes_total{device!="lo"}[5m]), "direction", "transmit", "device", ".*") """ - data = MonitoringQueries._send_query( + data = MonitoringQueries._send_range_query( query, start_timestamp, end_timestamp, step, result_type="matrix" ) @@ -277,6 +394,6 @@ class MonitoringQueries: return MonitoringMetrics( metrics=MonitoringQueries._prometheus_response_to_monitoring_metrics( - data, "device" + data, "directon" ) ) diff --git a/tests/test_graphql/test_api_monitoring.py b/tests/test_graphql/test_api_monitoring.py index 506575e..f1fdd46 100644 --- a/tests/test_graphql/test_api_monitoring.py +++ b/tests/test_graphql/test_api_monitoring.py @@ -7,7 +7,6 @@ from datetime import datetime from typing import List, Dict import pytest -from selfprivacy_api.utils.monitoring import MonitoringQueryResult from tests.test_graphql.common import ( assert_empty, get_data, @@ -37,6 +36,7 @@ MOCK_VALUES = [ [1720136948, "3.9041666667132375"], ] + @dataclass class DumbResponse: status_code: int @@ -49,44 +49,20 @@ class DumbResponse: def generate_prometheus_response(result_type: str, result: List[Dict]): return DumbResponse( status_code=200, - json_data={ - 'data': { - 'resultType': result_type, - 'result': result - } - } + json_data={"data": {"resultType": result_type, "result": result}}, ) + MOCK_SINGLE_METRIC_PROMETHEUS_RESPONSE = generate_prometheus_response( - 'matrix', - [ - { - 'values': MOCK_VALUES - } - ] + "matrix", [{"values": MOCK_VALUES}] ) MOCK_MULTIPLE_METRIC_DEVICE_PROMETHEUS_RESPONSE = generate_prometheus_response( - 'matrix', + "matrix", [ - { - 'metric': { - 'device': 'a' - }, - 'values': MOCK_VALUES - }, - { - 'metric': { - 'device': 'b' - }, - 'values': MOCK_VALUES - }, - { - 'metric': { - 'device': 'c' - }, - 'values': MOCK_VALUES - }, - ] + {"metric": {"device": "a"}, "values": MOCK_VALUES}, + {"metric": {"device": "b"}, "values": MOCK_VALUES}, + {"metric": {"device": "c"}, "values": MOCK_VALUES}, + ], ) # def generate_mock_metrics(name: str): @@ -133,7 +109,8 @@ def generate_mock_query_with_options(name): def prometheus_result_from_dict(dict): - return MonitoringQueryResult(result_type=dict["resultType"], result=dict["result"]) + # return MonitoringQueryResult(result_type=dict["resultType"], result=dict["result"]) + return dict @pytest.fixture