selfprivacy-rest-api/selfprivacy_api/graphql/queries/monitoring.py

107 lines
3 KiB
Python

import strawberry
from typing import Optional
from datetime import datetime
from selfprivacy_api.models.services import ServiceStatus
from selfprivacy_api.services.prometheus import Prometheus
from selfprivacy_api.utils.monitoring import (
MonitoringQueries,
MonitoringQueryError,
MonitoringValuesResult,
MonitoringMetricsResult,
)
@strawberry.type
class CpuMonitoring:
start: Optional[datetime]
end: Optional[datetime]
step: int
@strawberry.field
def overall_usage(self) -> MonitoringValuesResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.cpu_usage(self.start, self.end, self.step)
@strawberry.type
class MemoryMonitoring:
start: Optional[datetime]
end: Optional[datetime]
step: int
@strawberry.field
def overall_usage(self) -> MonitoringValuesResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.memory_usage(self.start, self.end, self.step)
@strawberry.type
class DiskMonitoring:
start: Optional[datetime]
end: Optional[datetime]
step: int
@strawberry.field
def overall_usage(self) -> MonitoringMetricsResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.disk_usage(self.start, self.end, self.step)
@strawberry.type
class NetworkMonitoring:
start: Optional[datetime]
end: Optional[datetime]
step: int
@strawberry.field
def overall_usage(self) -> MonitoringMetricsResult:
if Prometheus().get_status() != ServiceStatus.ACTIVE:
return MonitoringQueryError(error="Prometheus is not running")
return MonitoringQueries.network_usage(self.start, self.end, self.step)
@strawberry.type
class Monitoring:
@strawberry.field
def cpu_usage(
self,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> CpuMonitoring:
return CpuMonitoring(start=start, end=end, step=step)
@strawberry.field
def memory_usage(
self,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> MemoryMonitoring:
return MemoryMonitoring(start=start, end=end, step=step)
@strawberry.field
def disk_usage(
self,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> DiskMonitoring:
return DiskMonitoring(start=start, end=end, step=step)
@strawberry.field
def network_usage(
self,
start: Optional[datetime] = None,
end: Optional[datetime] = None,
step: int = 60,
) -> NetworkMonitoring:
return NetworkMonitoring(start=start, end=end, step=step)