mirror of
https://github.com/XTLS/Xray-core.git
synced 2024-11-14 04:33:18 +00:00
017f53b5fc
* Add session context outbounds as slice slice is needed for dialer proxy where two outbounds work on top of each other There are two sets of target addr for example It also enable Xtls to correctly do splice copy by checking both outbounds are ready to do direct copy * Fill outbound tag info * Splice now checks capalibility from all outbounds * Fix unit tests
187 lines
6.1 KiB
Go
187 lines
6.1 KiB
Go
package grpc
|
|
|
|
import (
|
|
"context"
|
|
gonet "net"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/xtls/xray-core/common"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/session"
|
|
"github.com/xtls/xray-core/transport/internet"
|
|
"github.com/xtls/xray-core/transport/internet/grpc/encoding"
|
|
"github.com/xtls/xray-core/transport/internet/reality"
|
|
"github.com/xtls/xray-core/transport/internet/stat"
|
|
"github.com/xtls/xray-core/transport/internet/tls"
|
|
"google.golang.org/grpc"
|
|
"google.golang.org/grpc/backoff"
|
|
"google.golang.org/grpc/connectivity"
|
|
"google.golang.org/grpc/credentials/insecure"
|
|
"google.golang.org/grpc/keepalive"
|
|
)
|
|
|
|
func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (stat.Connection, error) {
|
|
newError("creating connection to ", dest).WriteToLog(session.ExportIDToError(ctx))
|
|
|
|
conn, err := dialgRPC(ctx, dest, streamSettings)
|
|
if err != nil {
|
|
return nil, newError("failed to dial gRPC").Base(err)
|
|
}
|
|
return stat.Connection(conn), nil
|
|
}
|
|
|
|
func init() {
|
|
common.Must(internet.RegisterTransportDialer(protocolName, Dial))
|
|
}
|
|
|
|
type dialerConf struct {
|
|
net.Destination
|
|
*internet.MemoryStreamConfig
|
|
}
|
|
|
|
var (
|
|
globalDialerMap map[dialerConf]*grpc.ClientConn
|
|
globalDialerAccess sync.Mutex
|
|
)
|
|
|
|
func dialgRPC(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (net.Conn, error) {
|
|
grpcSettings := streamSettings.ProtocolSettings.(*Config)
|
|
|
|
conn, err := getGrpcClient(ctx, dest, streamSettings)
|
|
if err != nil {
|
|
return nil, newError("Cannot dial gRPC").Base(err)
|
|
}
|
|
client := encoding.NewGRPCServiceClient(conn)
|
|
if grpcSettings.MultiMode {
|
|
newError("using gRPC multi mode service name: `" + grpcSettings.getServiceName() + "` stream name: `" + grpcSettings.getTunMultiStreamName() + "`").AtDebug().WriteToLog()
|
|
grpcService, err := client.(encoding.GRPCServiceClientX).TunMultiCustomName(ctx, grpcSettings.getServiceName(), grpcSettings.getTunMultiStreamName())
|
|
if err != nil {
|
|
return nil, newError("Cannot dial gRPC").Base(err)
|
|
}
|
|
return encoding.NewMultiHunkConn(grpcService, nil), nil
|
|
}
|
|
|
|
newError("using gRPC tun mode service name: `" + grpcSettings.getServiceName() + "` stream name: `" + grpcSettings.getTunStreamName() + "`").AtDebug().WriteToLog()
|
|
grpcService, err := client.(encoding.GRPCServiceClientX).TunCustomName(ctx, grpcSettings.getServiceName(), grpcSettings.getTunStreamName())
|
|
if err != nil {
|
|
return nil, newError("Cannot dial gRPC").Base(err)
|
|
}
|
|
|
|
return encoding.NewHunkConn(grpcService, nil), nil
|
|
}
|
|
|
|
func getGrpcClient(ctx context.Context, dest net.Destination, streamSettings *internet.MemoryStreamConfig) (*grpc.ClientConn, error) {
|
|
globalDialerAccess.Lock()
|
|
defer globalDialerAccess.Unlock()
|
|
|
|
if globalDialerMap == nil {
|
|
globalDialerMap = make(map[dialerConf]*grpc.ClientConn)
|
|
}
|
|
tlsConfig := tls.ConfigFromStreamSettings(streamSettings)
|
|
realityConfig := reality.ConfigFromStreamSettings(streamSettings)
|
|
sockopt := streamSettings.SocketSettings
|
|
grpcSettings := streamSettings.ProtocolSettings.(*Config)
|
|
|
|
if client, found := globalDialerMap[dialerConf{dest, streamSettings}]; found && client.GetState() != connectivity.Shutdown {
|
|
return client, nil
|
|
}
|
|
|
|
dialOptions := []grpc.DialOption{
|
|
grpc.WithConnectParams(grpc.ConnectParams{
|
|
Backoff: backoff.Config{
|
|
BaseDelay: 500 * time.Millisecond,
|
|
Multiplier: 1.5,
|
|
Jitter: 0.2,
|
|
MaxDelay: 19 * time.Second,
|
|
},
|
|
MinConnectTimeout: 5 * time.Second,
|
|
}),
|
|
grpc.WithContextDialer(func(gctx context.Context, s string) (gonet.Conn, error) {
|
|
select {
|
|
case <-gctx.Done():
|
|
return nil, gctx.Err()
|
|
default:
|
|
}
|
|
|
|
rawHost, rawPort, err := net.SplitHostPort(s)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(rawPort) == 0 {
|
|
rawPort = "443"
|
|
}
|
|
port, err := net.PortFromString(rawPort)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
address := net.ParseAddress(rawHost)
|
|
|
|
gctx = session.ContextWithID(gctx, session.IDFromContext(ctx))
|
|
gctx = session.ContextWithOutbounds(gctx, session.OutboundsFromContext(ctx))
|
|
gctx = session.ContextWithTimeoutOnly(gctx, true)
|
|
|
|
c, err := internet.DialSystem(gctx, net.TCPDestination(address, port), sockopt)
|
|
if err == nil {
|
|
if tlsConfig != nil {
|
|
config := tlsConfig.GetTLSConfig()
|
|
if config.ServerName == "" && address.Family().IsDomain() {
|
|
config.ServerName = address.Domain()
|
|
}
|
|
if fingerprint := tls.GetFingerprint(tlsConfig.Fingerprint); fingerprint != nil {
|
|
return tls.UClient(c, config, fingerprint), nil
|
|
} else { // Fallback to normal gRPC TLS
|
|
return tls.Client(c, config), nil
|
|
}
|
|
}
|
|
if realityConfig != nil {
|
|
return reality.UClient(c, realityConfig, gctx, dest)
|
|
}
|
|
}
|
|
return c, err
|
|
}),
|
|
}
|
|
|
|
dialOptions = append(dialOptions, grpc.WithTransportCredentials(insecure.NewCredentials()))
|
|
|
|
authority := ""
|
|
if grpcSettings.Authority != "" {
|
|
authority = grpcSettings.Authority
|
|
} else if tlsConfig != nil && tlsConfig.ServerName != "" {
|
|
authority = tlsConfig.ServerName
|
|
} else if realityConfig == nil && dest.Address.Family().IsDomain() {
|
|
authority = dest.Address.Domain()
|
|
}
|
|
dialOptions = append(dialOptions, grpc.WithAuthority(authority))
|
|
|
|
if grpcSettings.IdleTimeout > 0 || grpcSettings.HealthCheckTimeout > 0 || grpcSettings.PermitWithoutStream {
|
|
dialOptions = append(dialOptions, grpc.WithKeepaliveParams(keepalive.ClientParameters{
|
|
Time: time.Second * time.Duration(grpcSettings.IdleTimeout),
|
|
Timeout: time.Second * time.Duration(grpcSettings.HealthCheckTimeout),
|
|
PermitWithoutStream: grpcSettings.PermitWithoutStream,
|
|
}))
|
|
}
|
|
|
|
if grpcSettings.InitialWindowsSize > 0 {
|
|
dialOptions = append(dialOptions, grpc.WithInitialWindowSize(grpcSettings.InitialWindowsSize))
|
|
}
|
|
|
|
if grpcSettings.UserAgent != "" {
|
|
dialOptions = append(dialOptions, grpc.WithUserAgent(grpcSettings.UserAgent))
|
|
}
|
|
|
|
var grpcDestHost string
|
|
if dest.Address.Family().IsDomain() {
|
|
grpcDestHost = dest.Address.Domain()
|
|
} else {
|
|
grpcDestHost = dest.Address.IP().String()
|
|
}
|
|
|
|
conn, err := grpc.Dial(
|
|
gonet.JoinHostPort(grpcDestHost, dest.Port.String()),
|
|
dialOptions...,
|
|
)
|
|
globalDialerMap[dialerConf{dest, streamSettings}] = conn
|
|
return conn, err
|
|
}
|