2024-07-30 13:55:57 +00:00
""" Prometheus monitoring queries. """
# pylint: disable=too-few-public-methods
import requests
import strawberry
from dataclasses import dataclass
from typing import Optional , Annotated , Union , List , Tuple
from datetime import datetime , timedelta
PROMETHEUS_URL = " http://localhost:9001 "
@strawberry.type
@dataclass
class MonitoringValue :
timestamp : datetime
value : str
@strawberry.type
@dataclass
class MonitoringMetric :
metric_id : str
values : List [ MonitoringValue ]
@strawberry.type
class MonitoringQueryError :
error : str
@strawberry.type
class MonitoringValues :
values : List [ MonitoringValue ]
@strawberry.type
class MonitoringMetrics :
metrics : List [ MonitoringMetric ]
MonitoringValuesResult = Annotated [
Union [ MonitoringValues , MonitoringQueryError ] ,
strawberry . union ( " MonitoringValuesResult " ) ,
]
MonitoringMetricsResult = Annotated [
Union [ MonitoringMetrics , MonitoringQueryError ] ,
strawberry . union ( " MonitoringMetricsResult " ) ,
]
class MonitoringQueries :
@staticmethod
def _send_range_query (
query : str , start : int , end : int , step : int , result_type : Optional [ str ] = None
) - > Union [ dict , MonitoringQueryError ] :
try :
response = requests . get (
f " { PROMETHEUS_URL } /api/v1/query_range " ,
params = {
" query " : query ,
" start " : start ,
" end " : end ,
" step " : step ,
} ,
2024-08-09 19:10:21 +00:00
timeout = 0.8 ,
2024-07-30 13:55:57 +00:00
)
if response . status_code != 200 :
return MonitoringQueryError (
error = f " Prometheus returned unexpected HTTP status code. Error: { response . text } . The query was { query } "
)
json = response . json ( )
if result_type and json [ " data " ] [ " resultType " ] != result_type :
return MonitoringQueryError (
error = " Unexpected resultType returned from Prometheus, request failed "
)
return json [ " data " ]
except Exception as error :
return MonitoringQueryError (
error = f " Prometheus request failed! Error: { str ( error ) } "
)
@staticmethod
def _send_query (
query : str , result_type : Optional [ str ] = None
) - > Union [ dict , MonitoringQueryError ] :
try :
response = requests . get (
f " { PROMETHEUS_URL } /api/v1/query " ,
params = {
" query " : query ,
} ,
2024-08-09 19:10:21 +00:00
timeout = 0.8 ,
2024-07-30 13:55:57 +00:00
)
if response . status_code != 200 :
return MonitoringQueryError (
error = f " Prometheus returned unexpected HTTP status code. Error: { response . text } . The query was { query } "
)
json = response . json ( )
if result_type and json [ " data " ] [ " resultType " ] != result_type :
return MonitoringQueryError (
error = " Unexpected resultType returned from Prometheus, request failed "
)
return json [ " data " ]
except Exception as error :
return MonitoringQueryError (
error = f " Prometheus request failed! Error: { str ( error ) } "
)
@staticmethod
def _prometheus_value_to_monitoring_value ( x : Tuple [ int , str ] ) :
return MonitoringValue ( timestamp = datetime . fromtimestamp ( x [ 0 ] ) , value = x [ 1 ] )
@staticmethod
def _clean_slice_id ( slice_id : str , clean_id : bool ) - > str :
""" Slices come in form of `/slice_name.slice`, we need to remove the `.slice` and `/` part. """
if clean_id :
return slice_id . split ( " . " ) [ 0 ] . split ( " / " ) [ 1 ]
return slice_id
@staticmethod
def _prometheus_response_to_monitoring_metrics (
response : dict , id_key : str , clean_id : bool = False
) - > List [ MonitoringMetric ] :
if response [ " resultType " ] == " vector " :
return list (
map (
lambda x : MonitoringMetric (
metric_id = MonitoringQueries . _clean_slice_id (
x [ " metric " ] [ id_key ] , clean_id = clean_id
) ,
values = [
MonitoringQueries . _prometheus_value_to_monitoring_value (
x [ " value " ]
)
] ,
) ,
response [ " result " ] ,
)
)
else :
return list (
map (
lambda x : MonitoringMetric (
metric_id = MonitoringQueries . _clean_slice_id (
x [ " metric " ] [ id_key ] , clean_id = clean_id
) ,
values = list (
map (
MonitoringQueries . _prometheus_value_to_monitoring_value ,
x [ " values " ] ,
)
) ,
) ,
response [ " result " ] ,
)
)
@staticmethod
def _calculate_offset_and_duration (
start : datetime , end : datetime
) - > Tuple [ int , int ] :
""" Calculate the offset and duration for Prometheus queries.
They mast be in seconds .
"""
offset = int ( ( datetime . now ( ) - end ) . total_seconds ( ) )
duration = int ( ( end - start ) . total_seconds ( ) )
return offset , duration
@staticmethod
def cpu_usage_overall (
start : Optional [ datetime ] = None ,
end : Optional [ datetime ] = None ,
step : int = 60 , # seconds
) - > MonitoringValuesResult :
"""
Get CPU information .
Args :
start ( datetime , optional ) : The start time .
Defaults to 20 minutes ago if not provided .
end ( datetime , optional ) : The end time .
Defaults to current time if not provided .
step ( int ) : Interval in seconds for querying disk usage data .
"""
if start is None :
start = datetime . now ( ) - timedelta ( minutes = 20 )
if end is None :
end = datetime . now ( )
start_timestamp = int ( start . timestamp ( ) )
end_timestamp = int ( end . timestamp ( ) )
query = ' 100 - (avg by (instance) (rate(node_cpu_seconds_total { mode= " idle " }[5m])) * 100) '
data = MonitoringQueries . _send_range_query (
query , start_timestamp , end_timestamp , step , result_type = " matrix "
)
if isinstance ( data , MonitoringQueryError ) :
return data
return MonitoringValues (
values = list (
map (
MonitoringQueries . _prometheus_value_to_monitoring_value ,
data [ " result " ] [ 0 ] [ " values " ] ,
)
)
)
@staticmethod
def memory_usage_overall (
start : Optional [ datetime ] = None ,
end : Optional [ datetime ] = None ,
step : int = 60 , # seconds
) - > MonitoringValuesResult :
"""
Get memory usage .
Args :
start ( datetime , optional ) : The start time .
Defaults to 20 minutes ago if not provided .
end ( datetime , optional ) : The end time .
Defaults to current time if not provided .
step ( int ) : Interval in seconds for querying memory usage data .
"""
if start is None :
start = datetime . now ( ) - timedelta ( minutes = 20 )
if end is None :
end = datetime . now ( )
start_timestamp = int ( start . timestamp ( ) )
end_timestamp = int ( end . timestamp ( ) )
query = " 100 - (100 * (node_memory_MemAvailable_bytes / node_memory_MemTotal_bytes)) "
data = MonitoringQueries . _send_range_query (
query , start_timestamp , end_timestamp , step , result_type = " matrix "
)
if isinstance ( data , MonitoringQueryError ) :
return data
return MonitoringValues (
values = list (
map (
MonitoringQueries . _prometheus_value_to_monitoring_value ,
data [ " result " ] [ 0 ] [ " values " ] ,
)
)
)
@staticmethod
def memory_usage_max_by_slice (
start : Optional [ datetime ] = None ,
end : Optional [ datetime ] = None ,
) - > MonitoringMetricsResult :
"""
Get maximum memory usage for each service ( i . e . systemd slice ) .
Args :
start ( datetime , optional ) : The start time .
Defaults to 20 minutes ago if not provided .
end ( datetime , optional ) : The end time .
Defaults to current time if not provided .
"""
if start is None :
start = datetime . now ( ) - timedelta ( minutes = 20 )
if end is None :
end = datetime . now ( )
offset , duration = MonitoringQueries . _calculate_offset_and_duration ( start , end )
if offset == 0 :
query = f ' max_over_time((container_memory_rss {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} +container_memory_swap {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} )[ { duration } s:]) '
else :
query = f ' max_over_time((container_memory_rss {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} +container_memory_swap {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} )[ { duration } s:] offset { offset } s) '
data = MonitoringQueries . _send_query ( query , result_type = " vector " )
if isinstance ( data , MonitoringQueryError ) :
return data
return MonitoringMetrics (
metrics = MonitoringQueries . _prometheus_response_to_monitoring_metrics (
data , " id " , clean_id = True
)
)
@staticmethod
def memory_usage_average_by_slice (
start : Optional [ datetime ] = None ,
end : Optional [ datetime ] = None ,
) - > MonitoringMetricsResult :
"""
Get average memory usage for each service ( i . e . systemd slice ) .
Args :
start ( datetime , optional ) : The start time .
Defaults to 20 minutes ago if not provided .
end ( datetime , optional ) : The end time .
Defaults to current time if not provided .
"""
if start is None :
start = datetime . now ( ) - timedelta ( minutes = 20 )
if end is None :
end = datetime . now ( )
offset , duration = MonitoringQueries . _calculate_offset_and_duration ( start , end )
if offset == 0 :
query = f ' avg_over_time((container_memory_rss {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} +container_memory_swap {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} )[ { duration } s:]) '
else :
query = f ' avg_over_time((container_memory_rss {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} +container_memory_swap {{ id!~ " .*slice.*slice " , id=~ " .*slice " }} )[ { duration } s:] offset { offset } s) '
data = MonitoringQueries . _send_query ( query , result_type = " vector " )
if isinstance ( data , MonitoringQueryError ) :
return data
return MonitoringMetrics (
metrics = MonitoringQueries . _prometheus_response_to_monitoring_metrics (
data , " id " , clean_id = True
)
)
@staticmethod
def disk_usage_overall (
start : Optional [ datetime ] = None ,
end : Optional [ datetime ] = None ,
step : int = 60 , # seconds
) - > MonitoringMetricsResult :
"""
Get disk usage information .
Args :
start ( datetime , optional ) : The start time .
Defaults to 20 minutes ago if not provided .
end ( datetime , optional ) : The end time .
Defaults to current time if not provided .
step ( int ) : Interval in seconds for querying disk usage data .
"""
if start is None :
start = datetime . now ( ) - timedelta ( minutes = 20 )
if end is None :
end = datetime . now ( )
start_timestamp = int ( start . timestamp ( ) )
end_timestamp = int ( end . timestamp ( ) )
query = """ 100 - (100 * sum by (device) (node_filesystem_avail_bytes { fstype!= " rootfs " ,fstype!= " ramfs " ,fstype!= " tmpfs " ,mountpoint!= " /efi " }) / sum by (device) (node_filesystem_size_bytes { fstype!= " rootfs " ,fstype!= " ramfs " ,fstype!= " tmpfs " ,mountpoint!= " /efi " })) """
data = MonitoringQueries . _send_range_query (
query , start_timestamp , end_timestamp , step , result_type = " matrix "
)
if isinstance ( data , MonitoringQueryError ) :
return data
return MonitoringMetrics (
metrics = MonitoringQueries . _prometheus_response_to_monitoring_metrics (
data , " device "
)
)
@staticmethod
def network_usage_overall (
start : Optional [ datetime ] = None ,
end : Optional [ datetime ] = None ,
step : int = 60 , # seconds
) - > MonitoringMetricsResult :
"""
Get network usage information for both download and upload .
Args :
start ( datetime , optional ) : The start time .
Defaults to 20 minutes ago if not provided .
end ( datetime , optional ) : The end time .
Defaults to current time if not provided .
step ( int ) : Interval in seconds for querying network data .
"""
if start is None :
start = datetime . now ( ) - timedelta ( minutes = 20 )
if end is None :
end = datetime . now ( )
start_timestamp = int ( start . timestamp ( ) )
end_timestamp = int ( end . timestamp ( ) )
query = """
label_replace ( rate ( node_network_receive_bytes_total { device != " lo " } [ 5 m ] ) , " direction " , " receive " , " device " , " .* " )
or
label_replace ( rate ( node_network_transmit_bytes_total { device != " lo " } [ 5 m ] ) , " direction " , " transmit " , " device " , " .* " )
"""
data = MonitoringQueries . _send_range_query (
query , start_timestamp , end_timestamp , step , result_type = " matrix "
)
if isinstance ( data , MonitoringQueryError ) :
return data
return MonitoringMetrics (
metrics = MonitoringQueries . _prometheus_response_to_monitoring_metrics (
data , " direction "
)
)