package trafficontrol import ( "runtime" "sync" "time" C "github.com/sagernet/sing-box/constant" "github.com/sagernet/sing-box/experimental/clashapi/compatible" "github.com/sagernet/sing/common" "github.com/sagernet/sing/common/atomic" "github.com/sagernet/sing/common/json" "github.com/sagernet/sing/common/x/list" "github.com/gofrs/uuid/v5" ) type Manager struct { uploadTemp atomic.Int64 downloadTemp atomic.Int64 uploadBlip atomic.Int64 downloadBlip atomic.Int64 uploadTotal atomic.Int64 downloadTotal atomic.Int64 connections compatible.Map[uuid.UUID, Tracker] closedConnectionsAccess sync.Mutex closedConnections list.List[TrackerMetadata] ticker *time.Ticker done chan struct{} // process *process.Process memory uint64 } func NewManager() *Manager { manager := &Manager{ ticker: time.NewTicker(time.Second), done: make(chan struct{}), // process: &process.Process{Pid: int32(os.Getpid())}, } go manager.handle() return manager } func (m *Manager) Join(c Tracker) { m.connections.Store(c.Metadata().ID, c) } func (m *Manager) Leave(c Tracker) { metadata := c.Metadata() _, loaded := m.connections.LoadAndDelete(metadata.ID) if loaded { metadata.ClosedAt = time.Now() m.closedConnectionsAccess.Lock() defer m.closedConnectionsAccess.Unlock() if m.closedConnections.Len() >= 1000 { m.closedConnections.PopFront() } m.closedConnections.PushBack(metadata) } } func (m *Manager) PushUploaded(size int64) { m.uploadTemp.Add(size) m.uploadTotal.Add(size) } func (m *Manager) PushDownloaded(size int64) { m.downloadTemp.Add(size) m.downloadTotal.Add(size) } func (m *Manager) Now() (up int64, down int64) { return m.uploadBlip.Load(), m.downloadBlip.Load() } func (m *Manager) Total() (up int64, down int64) { return m.uploadTotal.Load(), m.downloadTotal.Load() } func (m *Manager) ConnectionsLen() int { return m.connections.Len() } func (m *Manager) Connections() []TrackerMetadata { var connections []TrackerMetadata m.connections.Range(func(_ uuid.UUID, value Tracker) bool { connections = append(connections, value.Metadata()) return true }) return connections } func (m *Manager) ClosedConnections() []TrackerMetadata { m.closedConnectionsAccess.Lock() defer m.closedConnectionsAccess.Unlock() return m.closedConnections.Array() } func (m *Manager) Connection(id uuid.UUID) Tracker { connection, loaded := m.connections.Load(id) if !loaded { return nil } return connection } func (m *Manager) Snapshot() *Snapshot { var connections []Tracker m.connections.Range(func(_ uuid.UUID, value Tracker) bool { if value.Metadata().OutboundType != C.TypeDNS { connections = append(connections, value) } return true }) var memStats runtime.MemStats runtime.ReadMemStats(&memStats) m.memory = memStats.StackInuse + memStats.HeapInuse + memStats.HeapIdle - memStats.HeapReleased return &Snapshot{ Upload: m.uploadTotal.Load(), Download: m.downloadTotal.Load(), Connections: connections, Memory: m.memory, } } func (m *Manager) ResetStatistic() { m.uploadTemp.Store(0) m.uploadBlip.Store(0) m.uploadTotal.Store(0) m.downloadTemp.Store(0) m.downloadBlip.Store(0) m.downloadTotal.Store(0) } func (m *Manager) handle() { var uploadTemp int64 var downloadTemp int64 for { select { case <-m.done: return case <-m.ticker.C: } uploadTemp = m.uploadTemp.Swap(0) downloadTemp = m.downloadTemp.Swap(0) m.uploadBlip.Store(uploadTemp) m.downloadBlip.Store(downloadTemp) } } func (m *Manager) Close() error { m.ticker.Stop() close(m.done) return nil } type Snapshot struct { Download int64 Upload int64 Connections []Tracker Memory uint64 } func (s *Snapshot) MarshalJSON() ([]byte, error) { return json.Marshal(map[string]any{ "downloadTotal": s.Download, "uploadTotal": s.Upload, "connections": common.Map(s.Connections, func(t Tracker) TrackerMetadata { return t.Metadata() }), "memory": s.Memory, }) }