mirror of
https://github.com/XTLS/Xray-core.git
synced 2024-11-22 00:21:29 +00:00
079d0bd8a9
* Refactor log * Add new log methods * Fix logger test * Change all logging code * Clean up pathObj * Rebase to latest main * Remove invoking method name after the dot
271 lines
5.5 KiB
Go
271 lines
5.5 KiB
Go
package reverse
|
|
|
|
import (
|
|
"context"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/xtls/xray-core/common"
|
|
"github.com/xtls/xray-core/common/buf"
|
|
"github.com/xtls/xray-core/common/errors"
|
|
"github.com/xtls/xray-core/common/mux"
|
|
"github.com/xtls/xray-core/common/net"
|
|
"github.com/xtls/xray-core/common/session"
|
|
"github.com/xtls/xray-core/common/task"
|
|
"github.com/xtls/xray-core/features/outbound"
|
|
"github.com/xtls/xray-core/transport"
|
|
"github.com/xtls/xray-core/transport/pipe"
|
|
"google.golang.org/protobuf/proto"
|
|
)
|
|
|
|
type Portal struct {
|
|
ohm outbound.Manager
|
|
tag string
|
|
domain string
|
|
picker *StaticMuxPicker
|
|
client *mux.ClientManager
|
|
}
|
|
|
|
func NewPortal(config *PortalConfig, ohm outbound.Manager) (*Portal, error) {
|
|
if config.Tag == "" {
|
|
return nil, errors.New("portal tag is empty")
|
|
}
|
|
|
|
if config.Domain == "" {
|
|
return nil, errors.New("portal domain is empty")
|
|
}
|
|
|
|
picker, err := NewStaticMuxPicker()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return &Portal{
|
|
ohm: ohm,
|
|
tag: config.Tag,
|
|
domain: config.Domain,
|
|
picker: picker,
|
|
client: &mux.ClientManager{
|
|
Picker: picker,
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
func (p *Portal) Start() error {
|
|
return p.ohm.AddHandler(context.Background(), &Outbound{
|
|
portal: p,
|
|
tag: p.tag,
|
|
})
|
|
}
|
|
|
|
func (p *Portal) Close() error {
|
|
return p.ohm.RemoveHandler(context.Background(), p.tag)
|
|
}
|
|
|
|
func (p *Portal) HandleConnection(ctx context.Context, link *transport.Link) error {
|
|
outbounds := session.OutboundsFromContext(ctx)
|
|
ob := outbounds[len(outbounds)-1]
|
|
if ob == nil {
|
|
return errors.New("outbound metadata not found").AtError()
|
|
}
|
|
|
|
if isDomain(ob.Target, p.domain) {
|
|
muxClient, err := mux.NewClientWorker(*link, mux.ClientStrategy{})
|
|
if err != nil {
|
|
return errors.New("failed to create mux client worker").Base(err).AtWarning()
|
|
}
|
|
|
|
worker, err := NewPortalWorker(muxClient)
|
|
if err != nil {
|
|
return errors.New("failed to create portal worker").Base(err)
|
|
}
|
|
|
|
p.picker.AddWorker(worker)
|
|
return nil
|
|
}
|
|
|
|
return p.client.Dispatch(ctx, link)
|
|
}
|
|
|
|
type Outbound struct {
|
|
portal *Portal
|
|
tag string
|
|
}
|
|
|
|
func (o *Outbound) Tag() string {
|
|
return o.tag
|
|
}
|
|
|
|
func (o *Outbound) Dispatch(ctx context.Context, link *transport.Link) {
|
|
if err := o.portal.HandleConnection(ctx, link); err != nil {
|
|
errors.LogInfoInner(ctx, err, "failed to process reverse connection")
|
|
common.Interrupt(link.Writer)
|
|
}
|
|
}
|
|
|
|
func (o *Outbound) Start() error {
|
|
return nil
|
|
}
|
|
|
|
func (o *Outbound) Close() error {
|
|
return nil
|
|
}
|
|
|
|
type StaticMuxPicker struct {
|
|
access sync.Mutex
|
|
workers []*PortalWorker
|
|
cTask *task.Periodic
|
|
}
|
|
|
|
func NewStaticMuxPicker() (*StaticMuxPicker, error) {
|
|
p := &StaticMuxPicker{}
|
|
p.cTask = &task.Periodic{
|
|
Execute: p.cleanup,
|
|
Interval: time.Second * 30,
|
|
}
|
|
p.cTask.Start()
|
|
return p, nil
|
|
}
|
|
|
|
func (p *StaticMuxPicker) cleanup() error {
|
|
p.access.Lock()
|
|
defer p.access.Unlock()
|
|
|
|
var activeWorkers []*PortalWorker
|
|
for _, w := range p.workers {
|
|
if !w.Closed() {
|
|
activeWorkers = append(activeWorkers, w)
|
|
}
|
|
}
|
|
|
|
if len(activeWorkers) != len(p.workers) {
|
|
p.workers = activeWorkers
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (p *StaticMuxPicker) PickAvailable() (*mux.ClientWorker, error) {
|
|
p.access.Lock()
|
|
defer p.access.Unlock()
|
|
|
|
if len(p.workers) == 0 {
|
|
return nil, errors.New("empty worker list")
|
|
}
|
|
|
|
var minIdx int = -1
|
|
var minConn uint32 = 9999
|
|
for i, w := range p.workers {
|
|
if w.draining {
|
|
continue
|
|
}
|
|
if w.client.Closed() {
|
|
continue
|
|
}
|
|
if w.client.ActiveConnections() < minConn {
|
|
minConn = w.client.ActiveConnections()
|
|
minIdx = i
|
|
}
|
|
}
|
|
|
|
if minIdx == -1 {
|
|
for i, w := range p.workers {
|
|
if w.IsFull() {
|
|
continue
|
|
}
|
|
if w.client.ActiveConnections() < minConn {
|
|
minConn = w.client.ActiveConnections()
|
|
minIdx = i
|
|
}
|
|
}
|
|
}
|
|
|
|
if minIdx != -1 {
|
|
return p.workers[minIdx].client, nil
|
|
}
|
|
|
|
return nil, errors.New("no mux client worker available")
|
|
}
|
|
|
|
func (p *StaticMuxPicker) AddWorker(worker *PortalWorker) {
|
|
p.access.Lock()
|
|
defer p.access.Unlock()
|
|
|
|
p.workers = append(p.workers, worker)
|
|
}
|
|
|
|
type PortalWorker struct {
|
|
client *mux.ClientWorker
|
|
control *task.Periodic
|
|
writer buf.Writer
|
|
reader buf.Reader
|
|
draining bool
|
|
}
|
|
|
|
func NewPortalWorker(client *mux.ClientWorker) (*PortalWorker, error) {
|
|
opt := []pipe.Option{pipe.WithSizeLimit(16 * 1024)}
|
|
uplinkReader, uplinkWriter := pipe.New(opt...)
|
|
downlinkReader, downlinkWriter := pipe.New(opt...)
|
|
|
|
ctx := context.Background()
|
|
outbounds := []*session.Outbound{{
|
|
Target: net.UDPDestination(net.DomainAddress(internalDomain), 0),
|
|
}}
|
|
ctx = session.ContextWithOutbounds(ctx, outbounds)
|
|
f := client.Dispatch(ctx, &transport.Link{
|
|
Reader: uplinkReader,
|
|
Writer: downlinkWriter,
|
|
})
|
|
if !f {
|
|
return nil, errors.New("unable to dispatch control connection")
|
|
}
|
|
w := &PortalWorker{
|
|
client: client,
|
|
reader: downlinkReader,
|
|
writer: uplinkWriter,
|
|
}
|
|
w.control = &task.Periodic{
|
|
Execute: w.heartbeat,
|
|
Interval: time.Second * 2,
|
|
}
|
|
w.control.Start()
|
|
return w, nil
|
|
}
|
|
|
|
func (w *PortalWorker) heartbeat() error {
|
|
if w.client.Closed() {
|
|
return errors.New("client worker stopped")
|
|
}
|
|
|
|
if w.draining || w.writer == nil {
|
|
return errors.New("already disposed")
|
|
}
|
|
|
|
msg := &Control{}
|
|
msg.FillInRandom()
|
|
|
|
if w.client.TotalConnections() > 256 {
|
|
w.draining = true
|
|
msg.State = Control_DRAIN
|
|
|
|
defer func() {
|
|
common.Close(w.writer)
|
|
common.Interrupt(w.reader)
|
|
w.writer = nil
|
|
}()
|
|
}
|
|
|
|
b, err := proto.Marshal(msg)
|
|
common.Must(err)
|
|
mb := buf.MergeBytes(nil, b)
|
|
return w.writer.WriteMultiBuffer(mb)
|
|
}
|
|
|
|
func (w *PortalWorker) IsFull() bool {
|
|
return w.client.IsFull()
|
|
}
|
|
|
|
func (w *PortalWorker) Closed() bool {
|
|
return w.client.Closed()
|
|
}
|