From c240f1b359b908d661eb6af331596f7480636b7c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E4=B8=96=E7=95=8C?= Date: Wed, 27 Jul 2022 21:57:21 +0800 Subject: [PATCH] Add shadowsocks-multiuser control api --- common/badjson/array.go | 18 ++-- common/badjson/json.go | 9 +- common/pipelistener/listener.go | 57 ++++++++++++ common/trafficcontrol/manager.go | 145 +++++++++++++++++++++++++++++++ inbound/shadowsocks_control.go | 94 ++++++++++++++++++++ inbound/shadowsocks_multi.go | 51 ++++++++++- option/shadowsocks.go | 11 +-- 7 files changed, 363 insertions(+), 22 deletions(-) create mode 100644 common/pipelistener/listener.go create mode 100644 common/trafficcontrol/manager.go create mode 100644 inbound/shadowsocks_control.go diff --git a/common/badjson/array.go b/common/badjson/array.go index f64b8b95..2a731687 100644 --- a/common/badjson/array.go +++ b/common/badjson/array.go @@ -2,19 +2,18 @@ package badjson import ( "bytes" - "reflect" "github.com/sagernet/sing-box/common/json" E "github.com/sagernet/sing/common/exceptions" ) -type JSONArray[T any] []T +type JSONArray []any -func (a JSONArray[T]) MarshalJSON() ([]byte, error) { - return json.Marshal([]T(a)) +func (a JSONArray) MarshalJSON() ([]byte, error) { + return json.Marshal([]any(a)) } -func (a *JSONArray[T]) UnmarshalJSON(content []byte) error { +func (a *JSONArray) UnmarshalJSON(content []byte) error { decoder := json.NewDecoder(bytes.NewReader(content)) arrayStart, err := decoder.Token() if err != nil { @@ -35,17 +34,12 @@ func (a *JSONArray[T]) UnmarshalJSON(content []byte) error { return nil } -func (a *JSONArray[T]) decodeJSON(decoder *json.Decoder) error { +func (a *JSONArray) decodeJSON(decoder *json.Decoder) error { for decoder.More() { - value, err := decodeJSON(decoder) + item, err := decodeJSON(decoder) if err != nil { return err } - item, ok := value.(T) - if !ok { - var defValue T - return E.New("can't cast ", value, " to ", reflect.TypeOf(defValue)) - } *a = append(*a, item) } return nil diff --git a/common/badjson/json.go b/common/badjson/json.go index 7f00f09b..e2185b86 100644 --- a/common/badjson/json.go +++ b/common/badjson/json.go @@ -1,10 +1,17 @@ package badjson import ( + "bytes" + "github.com/sagernet/sing-box/common/json" E "github.com/sagernet/sing/common/exceptions" ) +func Decode(content []byte) (any, error) { + decoder := json.NewDecoder(bytes.NewReader(content)) + return decodeJSON(decoder) +} + func decodeJSON(decoder *json.Decoder) (any, error) { rawToken, err := decoder.Token() if err != nil { @@ -27,7 +34,7 @@ func decodeJSON(decoder *json.Decoder) (any, error) { } return &object, nil case '[': - var array JSONArray[any] + var array JSONArray err = array.decodeJSON(decoder) if err != nil { return nil, err diff --git a/common/pipelistener/listener.go b/common/pipelistener/listener.go new file mode 100644 index 00000000..68de4d90 --- /dev/null +++ b/common/pipelistener/listener.go @@ -0,0 +1,57 @@ +package pipelistener + +import ( + "io" + "net" +) + +var _ net.Listener = (*Listener)(nil) + +type Listener struct { + pipe chan net.Conn + done chan struct{} +} + +func New(channelSize int) *Listener { + return &Listener{ + pipe: make(chan net.Conn, channelSize), + done: make(chan struct{}), + } +} + +func (l *Listener) Serve(conn net.Conn) { + l.pipe <- conn +} + +func (l *Listener) Accept() (net.Conn, error) { + select { + case conn := <-l.pipe: + return conn, nil + case <-l.done: + return nil, io.ErrClosedPipe + } +} + +func (l *Listener) Close() error { + select { + case <-l.done: + return io.ErrClosedPipe + default: + } + close(l.done) + return nil +} + +func (l *Listener) Addr() net.Addr { + return addr{} +} + +type addr struct{} + +func (a addr) Network() string { + return "pipe" +} + +func (a addr) String() string { + return "pipe" +} diff --git a/common/trafficcontrol/manager.go b/common/trafficcontrol/manager.go new file mode 100644 index 00000000..86134a77 --- /dev/null +++ b/common/trafficcontrol/manager.go @@ -0,0 +1,145 @@ +package trafficcontrol + +import ( + "io" + "net" + "sync" + "sync/atomic" + + "github.com/sagernet/sing/common/buf" + "github.com/sagernet/sing/common/bufio" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" +) + +type Manager[U comparable] struct { + access sync.Mutex + users map[U]*Traffic +} + +type Traffic struct { + Upload uint64 + Download uint64 +} + +func NewManager[U comparable]() *Manager[U] { + return &Manager[U]{ + users: make(map[U]*Traffic), + } +} + +func (m *Manager[U]) Reset() { + m.users = make(map[U]*Traffic) +} + +func (m *Manager[U]) TrackConnection(user U, conn net.Conn) net.Conn { + m.access.Lock() + defer m.access.Unlock() + var traffic *Traffic + if t, loaded := m.users[user]; loaded { + traffic = t + } else { + traffic = new(Traffic) + m.users[user] = traffic + } + return &TrackConn{conn, traffic} +} + +func (m *Manager[U]) TrackPacketConnection(user U, conn N.PacketConn) N.PacketConn { + m.access.Lock() + defer m.access.Unlock() + var traffic *Traffic + if t, loaded := m.users[user]; loaded { + traffic = t + } else { + traffic = new(Traffic) + m.users[user] = traffic + } + return &TrackPacketConn{conn, traffic} +} + +func (m *Manager[U]) ReadTraffics() map[U]Traffic { + m.access.Lock() + defer m.access.Unlock() + + trafficMap := make(map[U]Traffic) + for user, traffic := range m.users { + upload := atomic.SwapUint64(&traffic.Upload, 0) + download := atomic.SwapUint64(&traffic.Download, 0) + if upload == 0 && download == 0 { + continue + } + trafficMap[user] = Traffic{ + Upload: upload, + Download: download, + } + } + return trafficMap +} + +type TrackConn struct { + net.Conn + *Traffic +} + +func (c *TrackConn) Read(p []byte) (n int, err error) { + n, err = c.Conn.Read(p) + if n > 0 { + atomic.AddUint64(&c.Upload, uint64(n)) + } + return +} + +func (c *TrackConn) Write(p []byte) (n int, err error) { + n, err = c.Conn.Write(p) + if n > 0 { + atomic.AddUint64(&c.Download, uint64(n)) + } + return +} + +func (c *TrackConn) WriteTo(w io.Writer) (n int64, err error) { + n, err = bufio.Copy(w, c.Conn) + if n > 0 { + atomic.AddUint64(&c.Upload, uint64(n)) + } + return +} + +func (c *TrackConn) ReadFrom(r io.Reader) (n int64, err error) { + n, err = bufio.Copy(c.Conn, r) + if n > 0 { + atomic.AddUint64(&c.Download, uint64(n)) + } + return +} + +func (c *TrackConn) Upstream() any { + return c.Conn +} + +type TrackPacketConn struct { + N.PacketConn + *Traffic +} + +func (c *TrackPacketConn) ReadPacket(buffer *buf.Buffer) (M.Socksaddr, error) { + destination, err := c.PacketConn.ReadPacket(buffer) + if err == nil { + atomic.AddUint64(&c.Upload, uint64(buffer.Len())) + } + return destination, err +} + +func (c *TrackPacketConn) WritePacket(buffer *buf.Buffer, destination M.Socksaddr) error { + n := buffer.Len() + err := c.PacketConn.WritePacket(buffer, destination) + if err == nil { + atomic.AddUint64(&c.Download, uint64(n)) + } + return err +} + +func (c *TrackPacketConn) Upstream() any { + return c.PacketConn +} diff --git a/inbound/shadowsocks_control.go b/inbound/shadowsocks_control.go new file mode 100644 index 00000000..1a6c02ae --- /dev/null +++ b/inbound/shadowsocks_control.go @@ -0,0 +1,94 @@ +package inbound + +import ( + "encoding/json" + "io" + "net/http" + + C "github.com/sagernet/sing-box/constant" + "github.com/sagernet/sing-box/option" + "github.com/sagernet/sing/common" + E "github.com/sagernet/sing/common/exceptions" + F "github.com/sagernet/sing/common/format" + + "github.com/go-chi/chi/v5" + "github.com/go-chi/render" +) + +func (h *ShadowsocksMulti) createHandler() http.Handler { + router := chi.NewRouter() + router.Get("/", h.handleHello) + router.Put("/users", h.handleUpdateUsers) + router.Get("/traffics", h.handleReadTraffics) + return router +} + +func (h *ShadowsocksMulti) handleHello(writer http.ResponseWriter, request *http.Request) { + render.JSON(writer, request, render.M{ + "server": "sing-box", + "version": C.Version, + }) +} + +func (h *ShadowsocksMulti) handleUpdateUsers(writer http.ResponseWriter, request *http.Request) { + var users []option.ShadowsocksUser + err := readRequest(request, &users) + if err != nil { + h.newError(E.Cause(err, "controller: update users: parse request")) + writer.WriteHeader(http.StatusBadRequest) + writer.Write([]byte(F.ToString(err))) + return + } + users = append([]option.ShadowsocksUser{{ + Name: "control", + Password: h.users[0].Password, + }}, users...) + err = h.service.UpdateUsersWithPasswords(common.MapIndexed(users, func(index int, user option.ShadowsocksUser) int { + return index + }), common.Map(users, func(user option.ShadowsocksUser) string { + return user.Password + })) + if err != nil { + h.newError(E.Cause(err, "controller: update users")) + writer.WriteHeader(http.StatusBadRequest) + writer.Write([]byte(F.ToString(err))) + return + } + h.users = users + h.trafficManager.Reset() + writer.WriteHeader(http.StatusNoContent) + h.logger.Info("controller: updated ", len(users)-1, " users") +} + +type ShadowsocksUserTraffic struct { + Name string `json:"name,omitempty"` + Upload uint64 `json:"upload,omitempty"` + Download uint64 `json:"download,omitempty"` +} + +func (h *ShadowsocksMulti) handleReadTraffics(writer http.ResponseWriter, request *http.Request) { + h.logger.Debug("controller: traffics sent") + trafficMap := h.trafficManager.ReadTraffics() + if len(trafficMap) == 0 { + writer.WriteHeader(http.StatusNoContent) + return + } + traffics := make([]ShadowsocksUserTraffic, 0, len(trafficMap)) + for user, traffic := range trafficMap { + traffics = append(traffics, ShadowsocksUserTraffic{ + Name: h.users[user].Name, + Upload: traffic.Upload, + Download: traffic.Download, + }) + } + render.JSON(writer, request, traffics) +} + +func readRequest(request *http.Request, v any) error { + defer request.Body.Close() + content, err := io.ReadAll(request.Body) + if err != nil { + return err + } + return json.Unmarshal(content, v) +} diff --git a/inbound/shadowsocks_multi.go b/inbound/shadowsocks_multi.go index 7e7309d5..85743f8e 100644 --- a/inbound/shadowsocks_multi.go +++ b/inbound/shadowsocks_multi.go @@ -3,9 +3,12 @@ package inbound import ( "context" "net" + "net/http" "os" "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/common/pipelistener" + "github.com/sagernet/sing-box/common/trafficcontrol" C "github.com/sagernet/sing-box/constant" "github.com/sagernet/sing-box/log" "github.com/sagernet/sing-box/option" @@ -13,6 +16,7 @@ import ( "github.com/sagernet/sing/common" "github.com/sagernet/sing/common/auth" "github.com/sagernet/sing/common/buf" + E "github.com/sagernet/sing/common/exceptions" F "github.com/sagernet/sing/common/format" N "github.com/sagernet/sing/common/network" ) @@ -21,8 +25,12 @@ var _ adapter.Inbound = (*ShadowsocksMulti)(nil) type ShadowsocksMulti struct { myInboundAdapter - service *shadowaead_2022.MultiService[int] - users []option.ShadowsocksUser + service *shadowaead_2022.MultiService[int] + users []option.ShadowsocksUser + controlEnabled bool + controller *http.Server + controllerPipe *pipelistener.Listener + trafficManager *trafficcontrol.Manager[int] } func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log.ContextLogger, tag string, options option.ShadowsocksInboundOptions) (*ShadowsocksMulti, error) { @@ -36,7 +44,6 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log. tag: tag, listenOptions: options.ListenOptions, }, - users: options.Users, } inbound.connHandler = inbound inbound.packetHandler = inbound @@ -52,10 +59,20 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log. udpTimeout, adapter.NewUpstreamContextHandler(inbound.newConnection, inbound.newPacketConnection, inbound), ) + users := options.Users + if options.ControlPassword != "" { + inbound.controlEnabled = true + users = append([]option.ShadowsocksUser{{ + Name: "control", + Password: options.ControlPassword, + }}, users...) + inbound.controller = &http.Server{Handler: inbound.createHandler()} + inbound.trafficManager = trafficcontrol.NewManager[int]() + } if err != nil { return nil, err } - err = service.UpdateUsersWithPasswords(common.MapIndexed(options.Users, func(index int, user option.ShadowsocksUser) int { + err = service.UpdateUsersWithPasswords(common.MapIndexed(users, func(index int, user option.ShadowsocksUser) int { return index }), common.Map(options.Users, func(user option.ShadowsocksUser) string { return user.Password @@ -65,9 +82,30 @@ func newShadowsocksMulti(ctx context.Context, router adapter.Router, logger log. } inbound.service = service inbound.packetUpstream = service + inbound.users = users return inbound, err } +func (h *ShadowsocksMulti) Start() error { + if h.controlEnabled { + h.controllerPipe = pipelistener.New(16) + go func() { + err := h.controller.Serve(h.controllerPipe) + if err != nil { + h.newError(E.Cause(err, "controller serve error")) + } + }() + } + return h.myInboundAdapter.Start() +} + +func (h *ShadowsocksMulti) Close() error { + if h.controlEnabled { + h.controllerPipe.Close() + } + return h.myInboundAdapter.Close() +} + func (h *ShadowsocksMulti) NewConnection(ctx context.Context, conn net.Conn, metadata adapter.InboundContext) error { return h.service.NewConnection(adapter.WithContext(log.ContextWithNewID(ctx), &metadata), conn, adapter.UpstreamMetadata(metadata)) } @@ -81,6 +119,11 @@ func (h *ShadowsocksMulti) newConnection(ctx context.Context, conn net.Conn, met if !loaded { return os.ErrInvalid } + if userIndex == 0 && h.controlEnabled { + h.logger.InfoContext(ctx, "inbound control connection") + h.controllerPipe.Serve(conn) + return nil + } user := h.users[userIndex].Name if user == "" { user = F.ToString(userIndex) diff --git a/option/shadowsocks.go b/option/shadowsocks.go index 881f9cb1..0e39dbdd 100644 --- a/option/shadowsocks.go +++ b/option/shadowsocks.go @@ -2,11 +2,12 @@ package option type ShadowsocksInboundOptions struct { ListenOptions - Network NetworkList `json:"network,omitempty"` - Method string `json:"method"` - Password string `json:"password"` - Users []ShadowsocksUser `json:"users,omitempty"` - Destinations []ShadowsocksDestination `json:"destinations,omitempty"` + Network NetworkList `json:"network,omitempty"` + Method string `json:"method"` + Password string `json:"password"` + ControlPassword string `json:"control_password,omitempty"` + Users []ShadowsocksUser `json:"users,omitempty"` + Destinations []ShadowsocksDestination `json:"destinations,omitempty"` } type ShadowsocksUser struct {