platform: Add log update interval

This commit is contained in:
世界 2024-06-18 17:49:06 +08:00
parent 31f98525be
commit 969f7e02cd
No known key found for this signature in database
GPG key ID: CD109927C34A63C4
3 changed files with 71 additions and 47 deletions

View file

@ -25,8 +25,8 @@ type CommandClientOptions struct {
type CommandClientHandler interface {
Connected()
Disconnected(message string)
ClearLog()
WriteLog(message string)
ClearLogs()
WriteLogs(messageList StringIterator)
WriteStatus(message *StatusMessage)
WriteGroups(message OutboundGroupIterator)
InitializeClashMode(modeList StringIterator, currentMode string)
@ -84,6 +84,10 @@ func (c *CommandClient) Connect() error {
}
switch c.options.Command {
case CommandLog:
err = binary.Write(conn, binary.BigEndian, c.options.StatusInterval)
if err != nil {
return E.Cause(err, "write interval")
}
c.handler.Connected()
go c.handleLogConn(conn)
case CommandStatus:

View file

@ -1,10 +1,14 @@
package libbox
import (
"bufio"
"context"
"encoding/binary"
"io"
"net"
"time"
"github.com/sagernet/sing/common/binary"
E "github.com/sagernet/sing/common/exceptions"
)
func (s *CommandServer) WriteMessage(message string) {
@ -17,43 +21,39 @@ func (s *CommandServer) WriteMessage(message string) {
s.access.Unlock()
}
func readLog(reader io.Reader) ([]byte, error) {
var messageLength uint16
err := binary.Read(reader, binary.BigEndian, &messageLength)
if err != nil {
return nil, err
}
if messageLength == 0 {
return nil, nil
}
data := make([]byte, messageLength)
_, err = io.ReadFull(reader, data)
if err != nil {
return nil, err
}
return data, nil
}
func writeLog(writer io.Writer, message []byte) error {
func writeLog(writer *bufio.Writer, messages []string) error {
err := binary.Write(writer, binary.BigEndian, uint8(0))
if err != nil {
return err
}
err = binary.Write(writer, binary.BigEndian, uint16(len(message)))
err = binary.WriteData(writer, binary.BigEndian, messages)
if err != nil {
return err
}
if len(message) > 0 {
_, err = writer.Write(message)
}
return err
return writer.Flush()
}
func writeClearLog(writer io.Writer) error {
return binary.Write(writer, binary.BigEndian, uint8(1))
func writeClearLog(writer *bufio.Writer) error {
err := binary.Write(writer, binary.BigEndian, uint8(1))
if err != nil {
return err
}
return writer.Flush()
}
func (s *CommandServer) handleLogConn(conn net.Conn) error {
var (
interval int64
timer *time.Timer
)
err := binary.Read(conn, binary.BigEndian, &interval)
if err != nil {
return E.Cause(err, "read interval")
}
timer = time.NewTimer(time.Duration(interval))
if !timer.Stop() {
<-timer.C
}
var savedLines []string
s.access.Lock()
savedLines = make([]string, 0, s.savedLines.Len())
@ -66,52 +66,67 @@ func (s *CommandServer) handleLogConn(conn net.Conn) error {
return err
}
defer s.observer.UnSubscribe(subscription)
for _, line := range savedLines {
err = writeLog(conn, []byte(line))
writer := bufio.NewWriter(conn)
if len(savedLines) > 0 {
err = writeLog(writer, savedLines)
if err != nil {
return err
}
}
ctx := connKeepAlive(conn)
var logLines []string
for {
select {
case <-ctx.Done():
return ctx.Err()
case message := <-subscription:
err = writeLog(conn, []byte(message))
if err != nil {
return err
}
case <-s.logReset:
err = writeClearLog(conn)
err = writeClearLog(writer)
if err != nil {
return err
}
case <-done:
return nil
case logLine := <-subscription:
logLines = logLines[:0]
logLines = append(logLines, logLine)
timer.Reset(time.Duration(interval))
loopLogs:
for {
select {
case logLine = <-subscription:
logLines = append(logLines, logLine)
case <-timer.C:
break loopLogs
}
}
err = writeLog(writer, logLines)
if err != nil {
return err
}
}
}
}
func (c *CommandClient) handleLogConn(conn net.Conn) {
reader := bufio.NewReader(conn)
for {
var messageType uint8
err := binary.Read(conn, binary.BigEndian, &messageType)
err := binary.Read(reader, binary.BigEndian, &messageType)
if err != nil {
c.handler.Disconnected(err.Error())
return
}
var message []byte
var messages []string
switch messageType {
case 0:
message, err = readLog(conn)
err = binary.ReadData(reader, binary.BigEndian, &messages)
if err != nil {
c.handler.Disconnected(err.Error())
return
}
c.handler.WriteLog(string(message))
c.handler.WriteLogs(newIterator(messages))
case 1:
c.handler.ClearLog()
c.handler.ClearLogs()
}
}
}
@ -120,7 +135,7 @@ func connKeepAlive(reader io.Reader) context.Context {
ctx, cancel := context.WithCancelCause(context.Background())
go func() {
for {
_, err := readLog(reader)
_, err := reader.Read(make([]byte, 1))
if err != nil {
cancel(err)
return

View file

@ -3,8 +3,9 @@ package libbox
import "github.com/sagernet/sing/common"
type StringIterator interface {
Next() string
Len() int32
HasNext() bool
Next() string
}
var _ StringIterator = (*iterator[string])(nil)
@ -21,6 +22,14 @@ func newPtrIterator[T any](values []T) *iterator[*T] {
return &iterator[*T]{common.Map(values, func(value T) *T { return &value })}
}
func (i *iterator[T]) Len() int32 {
return int32(len(i.values))
}
func (i *iterator[T]) HasNext() bool {
return len(i.values) > 0
}
func (i *iterator[T]) Next() T {
if len(i.values) == 0 {
return common.DefaultValue[T]()
@ -30,10 +39,6 @@ func (i *iterator[T]) Next() T {
return nextValue
}
func (i *iterator[T]) HasNext() bool {
return len(i.values) > 0
}
type abstractIterator[T any] interface {
Next() T
HasNext() bool