package udp_test import ( "context" "sync/atomic" "testing" "time" "github.com/xtls/xray-core/v1/common" "github.com/xtls/xray-core/v1/common/buf" "github.com/xtls/xray-core/v1/common/net" "github.com/xtls/xray-core/v1/common/protocol/udp" "github.com/xtls/xray-core/v1/features/routing" "github.com/xtls/xray-core/v1/transport" . "github.com/xtls/xray-core/v1/transport/internet/udp" "github.com/xtls/xray-core/v1/transport/pipe" ) type TestDispatcher struct { OnDispatch func(ctx context.Context, dest net.Destination) (*transport.Link, error) } func (d *TestDispatcher) Dispatch(ctx context.Context, dest net.Destination) (*transport.Link, error) { return d.OnDispatch(ctx, dest) } func (d *TestDispatcher) Start() error { return nil } func (d *TestDispatcher) Close() error { return nil } func (*TestDispatcher) Type() interface{} { return routing.DispatcherType() } func TestSameDestinationDispatching(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) uplinkReader, uplinkWriter := pipe.New(pipe.WithSizeLimit(1024)) downlinkReader, downlinkWriter := pipe.New(pipe.WithSizeLimit(1024)) go func() { for { data, err := uplinkReader.ReadMultiBuffer() if err != nil { break } err = downlinkWriter.WriteMultiBuffer(data) common.Must(err) } }() var count uint32 td := &TestDispatcher{ OnDispatch: func(ctx context.Context, dest net.Destination) (*transport.Link, error) { atomic.AddUint32(&count, 1) return &transport.Link{Reader: downlinkReader, Writer: uplinkWriter}, nil }, } dest := net.UDPDestination(net.LocalHostIP, 53) b := buf.New() b.WriteString("abcd") var msgCount uint32 dispatcher := NewDispatcher(td, func(ctx context.Context, packet *udp.Packet) { atomic.AddUint32(&msgCount, 1) }) dispatcher.Dispatch(ctx, dest, b) for i := 0; i < 5; i++ { dispatcher.Dispatch(ctx, dest, b) } time.Sleep(time.Second) cancel() if count != 1 { t.Error("count: ", count) } if v := atomic.LoadUint32(&msgCount); v != 6 { t.Error("msgCount: ", v) } }