From de2453fce9652136e8843d9328fe94022a2b1089 Mon Sep 17 00:00:00 2001 From: Hellojack <106379370+H1JK@users.noreply.github.com> Date: Sat, 27 Aug 2022 21:05:15 +0800 Subject: [PATCH] Add gun-lite gRPC implementation (#44) --- option/v2ray_transport.go | 1 + test/v2ray_transport_test.go | 69 ++++++- transport/v2ray/grpc.go | 10 +- .../v2ray/{grpc_stub.go => grpc_lite.go} | 9 +- transport/v2ray/transport.go | 2 +- transport/v2raygrpclite/client.go | 77 ++++++++ transport/v2raygrpclite/conn.go | 168 ++++++++++++++++++ transport/v2raygrpclite/server.go | 96 ++++++++++ transport/v2rayhttp/client.go | 1 + 9 files changed, 418 insertions(+), 15 deletions(-) rename transport/v2ray/{grpc_stub.go => grpc_lite.go} (60%) create mode 100644 transport/v2raygrpclite/client.go create mode 100644 transport/v2raygrpclite/conn.go create mode 100644 transport/v2raygrpclite/server.go diff --git a/option/v2ray_transport.go b/option/v2ray_transport.go index 6c45b178..f5c6b962 100644 --- a/option/v2ray_transport.go +++ b/option/v2ray_transport.go @@ -78,4 +78,5 @@ type V2RayQUICOptions struct{} type V2RayGRPCOptions struct { ServiceName string `json:"service_name,omitempty"` + ForceLite bool `json:"-"` // for test } diff --git a/test/v2ray_transport_test.go b/test/v2ray_transport_test.go index 326b6e0f..ec8237dc 100644 --- a/test/v2ray_transport_test.go +++ b/test/v2ray_transport_test.go @@ -20,6 +20,52 @@ func TestV2RayGRPCSelf(t *testing.T) { }) } +func TestV2RayGRPCLite(t *testing.T) { + t.Run("server", func(t *testing.T) { + testV2RayTransportSelfWith(t, &option.V2RayTransportOptions{ + Type: C.V2RayTransportTypeGRPC, + GRPCOptions: option.V2RayGRPCOptions{ + ServiceName: "TunService", + ForceLite: true, + }, + }, &option.V2RayTransportOptions{ + Type: C.V2RayTransportTypeGRPC, + GRPCOptions: option.V2RayGRPCOptions{ + ServiceName: "TunService", + }, + }) + }) + t.Run("client", func(t *testing.T) { + testV2RayTransportSelfWith(t, &option.V2RayTransportOptions{ + Type: C.V2RayTransportTypeGRPC, + GRPCOptions: option.V2RayGRPCOptions{ + ServiceName: "TunService", + }, + }, &option.V2RayTransportOptions{ + Type: C.V2RayTransportTypeGRPC, + GRPCOptions: option.V2RayGRPCOptions{ + ServiceName: "TunService", + ForceLite: true, + }, + }) + }) + t.Run("self", func(t *testing.T) { + testV2RayTransportSelfWith(t, &option.V2RayTransportOptions{ + Type: C.V2RayTransportTypeGRPC, + GRPCOptions: option.V2RayGRPCOptions{ + ServiceName: "TunService", + ForceLite: true, + }, + }, &option.V2RayTransportOptions{ + Type: C.V2RayTransportTypeGRPC, + GRPCOptions: option.V2RayGRPCOptions{ + ServiceName: "TunService", + ForceLite: true, + }, + }) + }) +} + func TestV2RayWebscoketSelf(t *testing.T) { t.Run("basic", func(t *testing.T) { testV2RayTransportSelf(t, &option.V2RayTransportOptions{ @@ -48,6 +94,9 @@ func TestV2RayWebscoketSelf(t *testing.T) { func TestV2RayHTTPSelf(t *testing.T) { testV2RayTransportSelf(t, &option.V2RayTransportOptions{ Type: C.V2RayTransportTypeHTTP, + HTTPOptions: option.V2RayHTTPOptions{ + Method: "POST", + }, }) } @@ -58,15 +107,19 @@ func TestV2RayHTTPPlainSelf(t *testing.T) { } func testV2RayTransportSelf(t *testing.T, transport *option.V2RayTransportOptions) { + testV2RayTransportSelfWith(t, transport, transport) +} + +func testV2RayTransportSelfWith(t *testing.T, server, client *option.V2RayTransportOptions) { t.Run("vmess", func(t *testing.T) { - testVMessTransportSelf(t, transport) + testVMessTransportSelf(t, server, client) }) t.Run("trojan", func(t *testing.T) { - testTrojanTransportSelf(t, transport) + testTrojanTransportSelf(t, server, client) }) } -func testVMessTransportSelf(t *testing.T, transport *option.V2RayTransportOptions) { +func testVMessTransportSelf(t *testing.T, server *option.V2RayTransportOptions, client *option.V2RayTransportOptions) { user, err := uuid.DefaultGenerator.NewV4() require.NoError(t, err) _, certPem, keyPem := createSelfSignedCertificate(t, "example.org") @@ -104,7 +157,7 @@ func testVMessTransportSelf(t *testing.T, transport *option.V2RayTransportOption CertificatePath: certPem, KeyPath: keyPem, }, - Transport: transport, + Transport: server, }, }, }, @@ -127,7 +180,7 @@ func testVMessTransportSelf(t *testing.T, transport *option.V2RayTransportOption ServerName: "example.org", CertificatePath: certPem, }, - Transport: transport, + Transport: client, }, }, }, @@ -145,7 +198,7 @@ func testVMessTransportSelf(t *testing.T, transport *option.V2RayTransportOption testSuit(t, clientPort, testPort) } -func testTrojanTransportSelf(t *testing.T, transport *option.V2RayTransportOptions) { +func testTrojanTransportSelf(t *testing.T, server *option.V2RayTransportOptions, client *option.V2RayTransportOptions) { user, err := uuid.DefaultGenerator.NewV4() require.NoError(t, err) _, certPem, keyPem := createSelfSignedCertificate(t, "example.org") @@ -183,7 +236,7 @@ func testTrojanTransportSelf(t *testing.T, transport *option.V2RayTransportOptio CertificatePath: certPem, KeyPath: keyPem, }, - Transport: transport, + Transport: server, }, }, }, @@ -205,7 +258,7 @@ func testTrojanTransportSelf(t *testing.T, transport *option.V2RayTransportOptio ServerName: "example.org", CertificatePath: certPem, }, - Transport: transport, + Transport: client, }, }, }, diff --git a/transport/v2ray/grpc.go b/transport/v2ray/grpc.go index 41e70bbe..a6f031f8 100644 --- a/transport/v2ray/grpc.go +++ b/transport/v2ray/grpc.go @@ -9,14 +9,22 @@ import ( "github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/option" "github.com/sagernet/sing-box/transport/v2raygrpc" + "github.com/sagernet/sing-box/transport/v2raygrpclite" + E "github.com/sagernet/sing/common/exceptions" M "github.com/sagernet/sing/common/metadata" N "github.com/sagernet/sing/common/network" ) -func NewGRPCServer(ctx context.Context, options option.V2RayGRPCOptions, tlsConfig *tls.Config, handler N.TCPConnectionHandler) (adapter.V2RayServerTransport, error) { +func NewGRPCServer(ctx context.Context, options option.V2RayGRPCOptions, tlsConfig *tls.Config, handler N.TCPConnectionHandler, errorHandler E.Handler) (adapter.V2RayServerTransport, error) { + if options.ForceLite { + return v2raygrpclite.NewServer(ctx, options, tlsConfig, handler, errorHandler), nil + } return v2raygrpc.NewServer(ctx, options, tlsConfig, handler), nil } func NewGRPCClient(ctx context.Context, dialer N.Dialer, serverAddr M.Socksaddr, options option.V2RayGRPCOptions, tlsConfig *tls.Config) (adapter.V2RayClientTransport, error) { + if options.ForceLite { + return v2raygrpclite.NewClient(ctx, dialer, serverAddr, options, tlsConfig), nil + } return v2raygrpc.NewClient(ctx, dialer, serverAddr, options, tlsConfig), nil } diff --git a/transport/v2ray/grpc_stub.go b/transport/v2ray/grpc_lite.go similarity index 60% rename from transport/v2ray/grpc_stub.go rename to transport/v2ray/grpc_lite.go index 971492f5..35eb4462 100644 --- a/transport/v2ray/grpc_stub.go +++ b/transport/v2ray/grpc_lite.go @@ -8,17 +8,16 @@ import ( "github.com/sagernet/sing-box/adapter" "github.com/sagernet/sing-box/option" + "github.com/sagernet/sing-box/transport/v2raygrpclite" E "github.com/sagernet/sing/common/exceptions" M "github.com/sagernet/sing/common/metadata" N "github.com/sagernet/sing/common/network" ) -var errGRPCNotIncluded = E.New("gRPC is not included in this build, rebuild with -tags with_grpc") - -func NewGRPCServer(ctx context.Context, options option.V2RayGRPCOptions, tlsConfig *tls.Config, handler N.TCPConnectionHandler) (adapter.V2RayServerTransport, error) { - return nil, errGRPCNotIncluded +func NewGRPCServer(ctx context.Context, options option.V2RayGRPCOptions, tlsConfig *tls.Config, handler N.TCPConnectionHandler, errorHandler E.Handler) (adapter.V2RayServerTransport, error) { + return v2raygrpclite.NewServer(ctx, options, tlsConfig, handler, errorHandler), nil } func NewGRPCClient(ctx context.Context, dialer N.Dialer, serverAddr M.Socksaddr, options option.V2RayGRPCOptions, tlsConfig *tls.Config) (adapter.V2RayClientTransport, error) { - return nil, errGRPCNotIncluded + return v2raygrpclite.NewClient(ctx, dialer, serverAddr, options, tlsConfig), nil } diff --git a/transport/v2ray/transport.go b/transport/v2ray/transport.go index 9411baaf..fc4f02e0 100644 --- a/transport/v2ray/transport.go +++ b/transport/v2ray/transport.go @@ -29,7 +29,7 @@ func NewServerTransport(ctx context.Context, options option.V2RayTransportOption } return NewQUICServer(ctx, options.QUICOptions, tlsConfig, handler, errorHandler) case C.V2RayTransportTypeGRPC: - return NewGRPCServer(ctx, options.GRPCOptions, tlsConfig, handler) + return NewGRPCServer(ctx, options.GRPCOptions, tlsConfig, handler, errorHandler) default: return nil, E.New("unknown transport type: " + options.Type) } diff --git a/transport/v2raygrpclite/client.go b/transport/v2raygrpclite/client.go new file mode 100644 index 00000000..1fc36bbc --- /dev/null +++ b/transport/v2raygrpclite/client.go @@ -0,0 +1,77 @@ +package v2raygrpclite + +import ( + "context" + "crypto/tls" + "fmt" + "io" + "net" + "net/http" + "net/url" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/option" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" +) + +var _ adapter.V2RayClientTransport = (*Client)(nil) + +var defaultClientHeader = http.Header{ + "Content-Type": []string{"application/grpc"}, + "User-Agent": []string{"grpc-go/1.48.0"}, + "TE": []string{"trailers"}, +} + +type Client struct { + ctx context.Context + dialer N.Dialer + serverAddr M.Socksaddr + transport *http.Transport + options option.V2RayGRPCOptions + url *url.URL +} + +func NewClient(ctx context.Context, dialer N.Dialer, serverAddr M.Socksaddr, options option.V2RayGRPCOptions, tlsConfig *tls.Config) adapter.V2RayClientTransport { + return &Client{ + ctx: ctx, + dialer: dialer, + serverAddr: serverAddr, + options: options, + transport: &http.Transport{ + DialContext: func(ctx context.Context, network, addr string) (net.Conn, error) { + return dialer.DialContext(ctx, network, M.ParseSocksaddr(addr)) + }, + ForceAttemptHTTP2: true, + TLSClientConfig: tlsConfig, + }, + url: &url.URL{ + Scheme: "https", + Host: serverAddr.String(), + Path: fmt.Sprintf("/%s/Tun", url.QueryEscape(options.ServiceName)), + }, + } +} + +func (c *Client) DialContext(ctx context.Context) (net.Conn, error) { + pipeInReader, pipeInWriter := io.Pipe() + request := &http.Request{ + Method: http.MethodPost, + Body: pipeInReader, + URL: c.url, + Proto: "HTTP/2", + ProtoMajor: 2, + Header: defaultClientHeader, + } + request = request.WithContext(ctx) + conn := newLateGunConn(pipeInWriter) + go func() { + response, err := c.transport.RoundTrip(request) + if err == nil { + conn.setup(response.Body, nil) + } else { + conn.setup(nil, err) + } + }() + return conn, nil +} diff --git a/transport/v2raygrpclite/conn.go b/transport/v2raygrpclite/conn.go new file mode 100644 index 00000000..b0607176 --- /dev/null +++ b/transport/v2raygrpclite/conn.go @@ -0,0 +1,168 @@ +package v2raygrpclite + +import ( + std_bufio "bufio" + "bytes" + "encoding/binary" + "io" + "net" + "net/http" + "os" + "time" + + "github.com/sagernet/sing/common" + "github.com/sagernet/sing/common/buf" + "github.com/sagernet/sing/common/bufio" + E "github.com/sagernet/sing/common/exceptions" + "github.com/sagernet/sing/common/rw" +) + +// kanged from: https://github.com/Qv2ray/gun-lite + +var _ net.Conn = (*GunConn)(nil) + +type GunConn struct { + reader *std_bufio.Reader + writer io.Writer + flusher http.Flusher + create chan struct{} + err error + readRemaining int +} + +func newGunConn(reader io.Reader, writer io.Writer, flusher http.Flusher) *GunConn { + return &GunConn{ + reader: std_bufio.NewReader(reader), + writer: writer, + flusher: flusher, + } +} + +func newLateGunConn(writer io.Writer) *GunConn { + return &GunConn{ + create: make(chan struct{}), + writer: writer, + } +} + +func (c *GunConn) setup(reader io.Reader, err error) { + c.reader = std_bufio.NewReader(reader) + c.err = err + close(c.create) +} + +func (c *GunConn) Read(b []byte) (n int, err error) { + n, err = c.read(b) + return n, wrapError(err) +} + +func (c *GunConn) read(b []byte) (n int, err error) { + if c.reader == nil { + <-c.create + if c.err != nil { + return 0, c.err + } + } + + if c.readRemaining > 0 { + if len(b) > c.readRemaining { + b = b[:c.readRemaining] + } + n, err = c.reader.Read(b) + c.readRemaining -= n + return + } + + _, err = c.reader.Discard(6) + if err != nil { + return + } + + dataLen, err := binary.ReadUvarint(c.reader) + if err != nil { + return + } + + readLen := int(dataLen) + c.readRemaining = readLen + if len(b) > readLen { + b = b[:readLen] + } + + n, err = c.reader.Read(b) + c.readRemaining -= n + return +} + +func (c *GunConn) Write(b []byte) (n int, err error) { + protobufHeader := [1 + binary.MaxVarintLen64]byte{0x0A} + varuintLen := binary.PutUvarint(protobufHeader[1:], uint64(len(b))) + grpcHeader := buf.Get(5) + grpcPayloadLen := uint32(1 + varuintLen + len(b)) + binary.BigEndian.PutUint32(grpcHeader[1:5], grpcPayloadLen) + _, err = bufio.Copy(c.writer, io.MultiReader(bytes.NewReader(grpcHeader), bytes.NewReader(protobufHeader[:varuintLen+1]), bytes.NewReader(b))) + buf.Put(grpcHeader) + if f, ok := c.writer.(http.Flusher); ok { + f.Flush() + } + return len(b), wrapError(err) +} + +func uLen(x uint64) int { + i := 0 + for x >= 0x80 { + x >>= 7 + i++ + } + return i + 1 +} + +func (c *GunConn) WriteBuffer(buffer *buf.Buffer) error { + defer buffer.Release() + dataLen := buffer.Len() + varLen := uLen(uint64(dataLen)) + header := buffer.ExtendHeader(6 + varLen) + binary.BigEndian.PutUint32(header[1:5], uint32(1+varLen+dataLen)) + header[5] = 0x0A + binary.PutUvarint(header[6:], uint64(dataLen)) + err := rw.WriteBytes(c.writer, buffer.Bytes()) + if c.flusher != nil { + c.flusher.Flush() + } + return wrapError(err) +} + +func (c *GunConn) FrontHeadroom() int { + return 6 + binary.MaxVarintLen64 +} + +func (c *GunConn) Close() error { + return common.Close(c.reader, c.writer) +} + +func (c *GunConn) LocalAddr() net.Addr { + return nil +} + +func (c *GunConn) RemoteAddr() net.Addr { + return nil +} + +func (c *GunConn) SetDeadline(t time.Time) error { + return os.ErrInvalid +} + +func (c *GunConn) SetReadDeadline(t time.Time) error { + return os.ErrInvalid +} + +func (c *GunConn) SetWriteDeadline(t time.Time) error { + return os.ErrInvalid +} + +func wrapError(err error) error { + if E.IsMulti(err, io.ErrUnexpectedEOF) { + return io.EOF + } + return err +} diff --git a/transport/v2raygrpclite/server.go b/transport/v2raygrpclite/server.go new file mode 100644 index 00000000..0af41593 --- /dev/null +++ b/transport/v2raygrpclite/server.go @@ -0,0 +1,96 @@ +package v2raygrpclite + +import ( + "context" + "crypto/tls" + "fmt" + "net" + "net/http" + "net/url" + "os" + "strings" + + "github.com/sagernet/sing-box/adapter" + "github.com/sagernet/sing-box/option" + "github.com/sagernet/sing/common" + E "github.com/sagernet/sing/common/exceptions" + M "github.com/sagernet/sing/common/metadata" + N "github.com/sagernet/sing/common/network" + sHttp "github.com/sagernet/sing/protocol/http" +) + +var _ adapter.V2RayServerTransport = (*Server)(nil) + +type Server struct { + handler N.TCPConnectionHandler + errorHandler E.Handler + httpServer *http.Server + path string +} + +func (s *Server) Network() []string { + return []string{N.NetworkTCP} +} + +func NewServer(ctx context.Context, options option.V2RayGRPCOptions, tlsConfig *tls.Config, handler N.TCPConnectionHandler, errorHandler E.Handler) *Server { + server := &Server{ + handler: handler, + errorHandler: errorHandler, + path: fmt.Sprintf("/%s/Tun", url.QueryEscape(options.ServiceName)), + } + if tlsConfig != nil { + if !common.Contains(tlsConfig.NextProtos, "h2") { + tlsConfig.NextProtos = append(tlsConfig.NextProtos, "h2") + } + } + server.httpServer = &http.Server{ + Handler: server, + TLSConfig: tlsConfig, + } + return server +} + +func (s *Server) ServeHTTP(writer http.ResponseWriter, request *http.Request) { + if request.URL.Path != s.path { + writer.WriteHeader(http.StatusNotFound) + s.badRequest(request, E.New("bad path: ", request.URL.Path)) + return + } + if request.Method != http.MethodPost { + writer.WriteHeader(http.StatusNotFound) + s.badRequest(request, E.New("bad method: ", request.Method)) + return + } + if ct := request.Header.Get("Content-Type"); !strings.HasPrefix(ct, "application/grpc") { + writer.WriteHeader(http.StatusNotFound) + s.badRequest(request, E.New("bad content type: ", ct)) + return + } + writer.Header().Set("Content-Type", "application/grpc") + writer.Header().Set("TE", "trailers") + writer.WriteHeader(http.StatusOK) + var metadata M.Metadata + metadata.Source = sHttp.SourceAddress(request) + conn := newGunConn(request.Body, writer, writer.(http.Flusher)) + s.handler.NewConnection(request.Context(), conn, metadata) +} + +func (s *Server) badRequest(request *http.Request, err error) { + s.errorHandler.NewError(request.Context(), E.Cause(err, "process connection from ", request.RemoteAddr)) +} + +func (s *Server) Serve(listener net.Listener) error { + if s.httpServer.TLSConfig == nil { + return s.httpServer.Serve(listener) + } else { + return s.httpServer.ServeTLS(listener, "", "") + } +} + +func (s *Server) ServePacket(listener net.PacketConn) error { + return os.ErrInvalid +} + +func (s *Server) Close() error { + return common.Close(common.PtrOrNil(s.httpServer)) +} diff --git a/transport/v2rayhttp/client.go b/transport/v2rayhttp/client.go index 56616bd7..7e06e18e 100644 --- a/transport/v2rayhttp/client.go +++ b/transport/v2rayhttp/client.go @@ -141,6 +141,7 @@ func (c *Client) dialHTTP2(ctx context.Context) (net.Conn, error) { return nil, err } if response.StatusCode != 200 { + pipeInWriter.Close() return nil, E.New("unexpected status: ", response.StatusCode, " ", response.Status) } return &HTTPConn{