mirror of
https://github.com/XTLS/Xray-core.git
synced 2025-01-05 23:54:16 +00:00
Add observatory / latestPing balancing strategy
Co-authored-by: Shelikhoo <xiaokangwang@outlook.com>
This commit is contained in:
parent
77d0419aca
commit
5c366db847
|
@ -199,8 +199,8 @@ func shouldOverride(ctx context.Context, result SniffResult, request session.Sni
|
|||
}
|
||||
if fkr0, ok := fakeDNSEngine.(dns.FakeDNSEngineRev0); ok && protocolString != "bittorrent" && p == "fakedns" &&
|
||||
destination.Address.Family().IsIP() && fkr0.IsIPInIPPool(destination.Address) {
|
||||
newError("Using sniffer ", protocolString, " since the fake DNS missed").WriteToLog(session.ExportIDToError(ctx))
|
||||
return true
|
||||
newError("Using sniffer ", protocolString, " since the fake DNS missed").WriteToLog(session.ExportIDToError(ctx))
|
||||
return true
|
||||
}
|
||||
if resultSubset, ok := result.(SnifferIsProtoSubsetOf); ok {
|
||||
if resultSubset.IsProtoSubsetOf(p) {
|
||||
|
@ -399,7 +399,18 @@ func (d *DefaultDispatcher) routedDispatch(ctx context.Context, link *transport.
|
|||
|
||||
var handler outbound.Handler
|
||||
|
||||
if d.router != nil {
|
||||
if forcedOutboundTag := session.GetForcedOutboundTagFromContext(ctx); forcedOutboundTag != "" {
|
||||
ctx = session.SetForcedOutboundTagToContext(ctx, "")
|
||||
if h := d.ohm.GetHandler(forcedOutboundTag); h != nil {
|
||||
newError("taking platform initialized detour [", forcedOutboundTag, "] for [", destination, "]").WriteToLog(session.ExportIDToError(ctx))
|
||||
handler = h
|
||||
} else {
|
||||
newError("non existing tag for platform initialized detour: ", forcedOutboundTag).AtError().WriteToLog(session.ExportIDToError(ctx))
|
||||
common.Close(link.Writer)
|
||||
common.Interrupt(link.Reader)
|
||||
return
|
||||
}
|
||||
} else if d.router != nil {
|
||||
if route, err := d.router.PickRoute(routing_session.AsRoutingContext(ctx)); err == nil {
|
||||
tag := route.GetOutboundTag()
|
||||
if h := d.ohm.GetHandler(tag); h != nil {
|
||||
|
|
530
app/observatory/config.pb.go
Normal file
530
app/observatory/config.pb.go
Normal file
|
@ -0,0 +1,530 @@
|
|||
// Code generated by protoc-gen-go. DO NOT EDIT.
|
||||
// versions:
|
||||
// protoc-gen-go v1.27.1
|
||||
// protoc v3.18.0
|
||||
// source: app/observatory/config.proto
|
||||
|
||||
package observatory
|
||||
|
||||
import (
|
||||
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
|
||||
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
|
||||
reflect "reflect"
|
||||
sync "sync"
|
||||
)
|
||||
|
||||
const (
|
||||
// Verify that this generated code is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion)
|
||||
// Verify that runtime/protoimpl is sufficiently up-to-date.
|
||||
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
|
||||
)
|
||||
|
||||
type ObservationResult struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
Status []*OutboundStatus `protobuf:"bytes,1,rep,name=status,proto3" json:"status,omitempty"`
|
||||
}
|
||||
|
||||
func (x *ObservationResult) Reset() {
|
||||
*x = ObservationResult{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[0]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *ObservationResult) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*ObservationResult) ProtoMessage() {}
|
||||
|
||||
func (x *ObservationResult) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[0]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use ObservationResult.ProtoReflect.Descriptor instead.
|
||||
func (*ObservationResult) Descriptor() ([]byte, []int) {
|
||||
return file_app_observatory_config_proto_rawDescGZIP(), []int{0}
|
||||
}
|
||||
|
||||
func (x *ObservationResult) GetStatus() []*OutboundStatus {
|
||||
if x != nil {
|
||||
return x.Status
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
type OutboundStatus struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// @Document Whether this outbound is usable
|
||||
//@Restriction ReadOnlyForUser
|
||||
Alive bool `protobuf:"varint,1,opt,name=alive,proto3" json:"alive,omitempty"`
|
||||
// @Document The time for probe request to finish.
|
||||
//@Type time.ms
|
||||
//@Restriction ReadOnlyForUser
|
||||
Delay int64 `protobuf:"varint,2,opt,name=delay,proto3" json:"delay,omitempty"`
|
||||
// @Document The last error caused this outbound failed to relay probe request
|
||||
//@Restriction NotMachineReadable
|
||||
LastErrorReason string `protobuf:"bytes,3,opt,name=last_error_reason,json=lastErrorReason,proto3" json:"last_error_reason,omitempty"`
|
||||
// @Document The outbound tag for this Server
|
||||
//@Type id.outboundTag
|
||||
OutboundTag string `protobuf:"bytes,4,opt,name=outbound_tag,json=outboundTag,proto3" json:"outbound_tag,omitempty"`
|
||||
// @Document The time this outbound is known to be alive
|
||||
//@Type id.outboundTag
|
||||
LastSeenTime int64 `protobuf:"varint,5,opt,name=last_seen_time,json=lastSeenTime,proto3" json:"last_seen_time,omitempty"`
|
||||
// @Document The time this outbound is tried
|
||||
//@Type id.outboundTag
|
||||
LastTryTime int64 `protobuf:"varint,6,opt,name=last_try_time,json=lastTryTime,proto3" json:"last_try_time,omitempty"`
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) Reset() {
|
||||
*x = OutboundStatus{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[1]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*OutboundStatus) ProtoMessage() {}
|
||||
|
||||
func (x *OutboundStatus) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[1]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use OutboundStatus.ProtoReflect.Descriptor instead.
|
||||
func (*OutboundStatus) Descriptor() ([]byte, []int) {
|
||||
return file_app_observatory_config_proto_rawDescGZIP(), []int{1}
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) GetAlive() bool {
|
||||
if x != nil {
|
||||
return x.Alive
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) GetDelay() int64 {
|
||||
if x != nil {
|
||||
return x.Delay
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) GetLastErrorReason() string {
|
||||
if x != nil {
|
||||
return x.LastErrorReason
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) GetOutboundTag() string {
|
||||
if x != nil {
|
||||
return x.OutboundTag
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) GetLastSeenTime() int64 {
|
||||
if x != nil {
|
||||
return x.LastSeenTime
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *OutboundStatus) GetLastTryTime() int64 {
|
||||
if x != nil {
|
||||
return x.LastTryTime
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type ProbeResult struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// @Document Whether this outbound is usable
|
||||
//@Restriction ReadOnlyForUser
|
||||
Alive bool `protobuf:"varint,1,opt,name=alive,proto3" json:"alive,omitempty"`
|
||||
// @Document The time for probe request to finish.
|
||||
//@Type time.ms
|
||||
//@Restriction ReadOnlyForUser
|
||||
Delay int64 `protobuf:"varint,2,opt,name=delay,proto3" json:"delay,omitempty"`
|
||||
// @Document The error caused this outbound failed to relay probe request
|
||||
//@Restriction NotMachineReadable
|
||||
LastErrorReason string `protobuf:"bytes,3,opt,name=last_error_reason,json=lastErrorReason,proto3" json:"last_error_reason,omitempty"`
|
||||
}
|
||||
|
||||
func (x *ProbeResult) Reset() {
|
||||
*x = ProbeResult{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[2]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *ProbeResult) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*ProbeResult) ProtoMessage() {}
|
||||
|
||||
func (x *ProbeResult) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[2]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use ProbeResult.ProtoReflect.Descriptor instead.
|
||||
func (*ProbeResult) Descriptor() ([]byte, []int) {
|
||||
return file_app_observatory_config_proto_rawDescGZIP(), []int{2}
|
||||
}
|
||||
|
||||
func (x *ProbeResult) GetAlive() bool {
|
||||
if x != nil {
|
||||
return x.Alive
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
func (x *ProbeResult) GetDelay() int64 {
|
||||
if x != nil {
|
||||
return x.Delay
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *ProbeResult) GetLastErrorReason() string {
|
||||
if x != nil {
|
||||
return x.LastErrorReason
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type Intensity struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// @Document The time interval for a probe request in ms.
|
||||
//@Type time.ms
|
||||
ProbeInterval uint32 `protobuf:"varint,1,opt,name=probe_interval,json=probeInterval,proto3" json:"probe_interval,omitempty"`
|
||||
}
|
||||
|
||||
func (x *Intensity) Reset() {
|
||||
*x = Intensity{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[3]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *Intensity) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*Intensity) ProtoMessage() {}
|
||||
|
||||
func (x *Intensity) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[3]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use Intensity.ProtoReflect.Descriptor instead.
|
||||
func (*Intensity) Descriptor() ([]byte, []int) {
|
||||
return file_app_observatory_config_proto_rawDescGZIP(), []int{3}
|
||||
}
|
||||
|
||||
func (x *Intensity) GetProbeInterval() uint32 {
|
||||
if x != nil {
|
||||
return x.ProbeInterval
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
unknownFields protoimpl.UnknownFields
|
||||
|
||||
// @Document The selectors for outbound under observation
|
||||
SubjectSelector []string `protobuf:"bytes,2,rep,name=subject_selector,json=subjectSelector,proto3" json:"subject_selector,omitempty"`
|
||||
ProbeUrl string `protobuf:"bytes,3,opt,name=probe_url,json=probeUrl,proto3" json:"probe_url,omitempty"`
|
||||
ProbeInterval int64 `protobuf:"varint,4,opt,name=probe_interval,json=probeInterval,proto3" json:"probe_interval,omitempty"`
|
||||
EnableConcurrency bool `protobuf:"varint,5,opt,name=enable_concurrency,json=enableConcurrency,proto3" json:"enable_concurrency,omitempty"`
|
||||
}
|
||||
|
||||
func (x *Config) Reset() {
|
||||
*x = Config{}
|
||||
if protoimpl.UnsafeEnabled {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[4]
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
}
|
||||
|
||||
func (x *Config) String() string {
|
||||
return protoimpl.X.MessageStringOf(x)
|
||||
}
|
||||
|
||||
func (*Config) ProtoMessage() {}
|
||||
|
||||
func (x *Config) ProtoReflect() protoreflect.Message {
|
||||
mi := &file_app_observatory_config_proto_msgTypes[4]
|
||||
if protoimpl.UnsafeEnabled && x != nil {
|
||||
ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x))
|
||||
if ms.LoadMessageInfo() == nil {
|
||||
ms.StoreMessageInfo(mi)
|
||||
}
|
||||
return ms
|
||||
}
|
||||
return mi.MessageOf(x)
|
||||
}
|
||||
|
||||
// Deprecated: Use Config.ProtoReflect.Descriptor instead.
|
||||
func (*Config) Descriptor() ([]byte, []int) {
|
||||
return file_app_observatory_config_proto_rawDescGZIP(), []int{4}
|
||||
}
|
||||
|
||||
func (x *Config) GetSubjectSelector() []string {
|
||||
if x != nil {
|
||||
return x.SubjectSelector
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (x *Config) GetProbeUrl() string {
|
||||
if x != nil {
|
||||
return x.ProbeUrl
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
func (x *Config) GetProbeInterval() int64 {
|
||||
if x != nil {
|
||||
return x.ProbeInterval
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (x *Config) GetEnableConcurrency() bool {
|
||||
if x != nil {
|
||||
return x.EnableConcurrency
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
var File_app_observatory_config_proto protoreflect.FileDescriptor
|
||||
|
||||
var file_app_observatory_config_proto_rawDesc = []byte{
|
||||
0x0a, 0x1c, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72,
|
||||
0x79, 0x2f, 0x63, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x19,
|
||||
0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62,
|
||||
0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x22, 0x56, 0x0a, 0x11, 0x4f, 0x62, 0x73,
|
||||
0x65, 0x72, 0x76, 0x61, 0x74, 0x69, 0x6f, 0x6e, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x41,
|
||||
0x0a, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75, 0x73, 0x18, 0x01, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x29,
|
||||
0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x63, 0x6f, 0x72, 0x65, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f,
|
||||
0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x2e, 0x4f, 0x75, 0x74, 0x62, 0x6f,
|
||||
0x75, 0x6e, 0x64, 0x53, 0x74, 0x61, 0x74, 0x75, 0x73, 0x52, 0x06, 0x73, 0x74, 0x61, 0x74, 0x75,
|
||||
0x73, 0x22, 0xd5, 0x01, 0x0a, 0x0e, 0x4f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x74,
|
||||
0x61, 0x74, 0x75, 0x73, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x18, 0x01, 0x20,
|
||||
0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x64, 0x65,
|
||||
0x6c, 0x61, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79,
|
||||
0x12, 0x2a, 0x0a, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x72, 0x72, 0x6f, 0x72, 0x5f, 0x72,
|
||||
0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x0f, 0x6c, 0x61, 0x73,
|
||||
0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x12, 0x21, 0x0a, 0x0c,
|
||||
0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x74, 0x61, 0x67, 0x18, 0x04, 0x20, 0x01,
|
||||
0x28, 0x09, 0x52, 0x0b, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x54, 0x61, 0x67, 0x12,
|
||||
0x24, 0x0a, 0x0e, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x73, 0x65, 0x65, 0x6e, 0x5f, 0x74, 0x69, 0x6d,
|
||||
0x65, 0x18, 0x05, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0c, 0x6c, 0x61, 0x73, 0x74, 0x53, 0x65, 0x65,
|
||||
0x6e, 0x54, 0x69, 0x6d, 0x65, 0x12, 0x22, 0x0a, 0x0d, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x74, 0x72,
|
||||
0x79, 0x5f, 0x74, 0x69, 0x6d, 0x65, 0x18, 0x06, 0x20, 0x01, 0x28, 0x03, 0x52, 0x0b, 0x6c, 0x61,
|
||||
0x73, 0x74, 0x54, 0x72, 0x79, 0x54, 0x69, 0x6d, 0x65, 0x22, 0x65, 0x0a, 0x0b, 0x50, 0x72, 0x6f,
|
||||
0x62, 0x65, 0x52, 0x65, 0x73, 0x75, 0x6c, 0x74, 0x12, 0x14, 0x0a, 0x05, 0x61, 0x6c, 0x69, 0x76,
|
||||
0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x05, 0x61, 0x6c, 0x69, 0x76, 0x65, 0x12, 0x14,
|
||||
0x0a, 0x05, 0x64, 0x65, 0x6c, 0x61, 0x79, 0x18, 0x02, 0x20, 0x01, 0x28, 0x03, 0x52, 0x05, 0x64,
|
||||
0x65, 0x6c, 0x61, 0x79, 0x12, 0x2a, 0x0a, 0x11, 0x6c, 0x61, 0x73, 0x74, 0x5f, 0x65, 0x72, 0x72,
|
||||
0x6f, 0x72, 0x5f, 0x72, 0x65, 0x61, 0x73, 0x6f, 0x6e, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52,
|
||||
0x0f, 0x6c, 0x61, 0x73, 0x74, 0x45, 0x72, 0x72, 0x6f, 0x72, 0x52, 0x65, 0x61, 0x73, 0x6f, 0x6e,
|
||||
0x22, 0x32, 0x0a, 0x09, 0x49, 0x6e, 0x74, 0x65, 0x6e, 0x73, 0x69, 0x74, 0x79, 0x12, 0x25, 0x0a,
|
||||
0x0e, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18,
|
||||
0x01, 0x20, 0x01, 0x28, 0x0d, 0x52, 0x0d, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x49, 0x6e, 0x74, 0x65,
|
||||
0x72, 0x76, 0x61, 0x6c, 0x22, 0xa6, 0x01, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12,
|
||||
0x29, 0x0a, 0x10, 0x73, 0x75, 0x62, 0x6a, 0x65, 0x63, 0x74, 0x5f, 0x73, 0x65, 0x6c, 0x65, 0x63,
|
||||
0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0f, 0x73, 0x75, 0x62, 0x6a, 0x65,
|
||||
0x63, 0x74, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x1b, 0x0a, 0x09, 0x70, 0x72,
|
||||
0x6f, 0x62, 0x65, 0x5f, 0x75, 0x72, 0x6c, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x08, 0x70,
|
||||
0x72, 0x6f, 0x62, 0x65, 0x55, 0x72, 0x6c, 0x12, 0x25, 0x0a, 0x0e, 0x70, 0x72, 0x6f, 0x62, 0x65,
|
||||
0x5f, 0x69, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52,
|
||||
0x0d, 0x70, 0x72, 0x6f, 0x62, 0x65, 0x49, 0x6e, 0x74, 0x65, 0x72, 0x76, 0x61, 0x6c, 0x12, 0x2d,
|
||||
0x0a, 0x12, 0x65, 0x6e, 0x61, 0x62, 0x6c, 0x65, 0x5f, 0x63, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72,
|
||||
0x65, 0x6e, 0x63, 0x79, 0x18, 0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x11, 0x65, 0x6e, 0x61, 0x62,
|
||||
0x6c, 0x65, 0x43, 0x6f, 0x6e, 0x63, 0x75, 0x72, 0x72, 0x65, 0x6e, 0x63, 0x79, 0x42, 0x5e, 0x0a,
|
||||
0x18, 0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x6f, 0x62,
|
||||
0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x50, 0x01, 0x5a, 0x29, 0x67, 0x69, 0x74,
|
||||
0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61,
|
||||
0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70, 0x2f, 0x6f, 0x62, 0x73, 0x65, 0x72,
|
||||
0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0xaa, 0x02, 0x14, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x41, 0x70,
|
||||
0x70, 0x2e, 0x4f, 0x62, 0x73, 0x65, 0x72, 0x76, 0x61, 0x74, 0x6f, 0x72, 0x79, 0x62, 0x06, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
file_app_observatory_config_proto_rawDescOnce sync.Once
|
||||
file_app_observatory_config_proto_rawDescData = file_app_observatory_config_proto_rawDesc
|
||||
)
|
||||
|
||||
func file_app_observatory_config_proto_rawDescGZIP() []byte {
|
||||
file_app_observatory_config_proto_rawDescOnce.Do(func() {
|
||||
file_app_observatory_config_proto_rawDescData = protoimpl.X.CompressGZIP(file_app_observatory_config_proto_rawDescData)
|
||||
})
|
||||
return file_app_observatory_config_proto_rawDescData
|
||||
}
|
||||
|
||||
var file_app_observatory_config_proto_msgTypes = make([]protoimpl.MessageInfo, 5)
|
||||
var file_app_observatory_config_proto_goTypes = []interface{}{
|
||||
(*ObservationResult)(nil), // 0: xray.core.app.observatory.ObservationResult
|
||||
(*OutboundStatus)(nil), // 1: xray.core.app.observatory.OutboundStatus
|
||||
(*ProbeResult)(nil), // 2: xray.core.app.observatory.ProbeResult
|
||||
(*Intensity)(nil), // 3: xray.core.app.observatory.Intensity
|
||||
(*Config)(nil), // 4: xray.core.app.observatory.Config
|
||||
}
|
||||
var file_app_observatory_config_proto_depIdxs = []int32{
|
||||
1, // 0: xray.core.app.observatory.ObservationResult.status:type_name -> xray.core.app.observatory.OutboundStatus
|
||||
1, // [1:1] is the sub-list for method output_type
|
||||
1, // [1:1] is the sub-list for method input_type
|
||||
1, // [1:1] is the sub-list for extension type_name
|
||||
1, // [1:1] is the sub-list for extension extendee
|
||||
0, // [0:1] is the sub-list for field type_name
|
||||
}
|
||||
|
||||
func init() { file_app_observatory_config_proto_init() }
|
||||
func file_app_observatory_config_proto_init() {
|
||||
if File_app_observatory_config_proto != nil {
|
||||
return
|
||||
}
|
||||
if !protoimpl.UnsafeEnabled {
|
||||
file_app_observatory_config_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*ObservationResult); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_app_observatory_config_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*OutboundStatus); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_app_observatory_config_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*ProbeResult); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_app_observatory_config_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*Intensity); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
file_app_observatory_config_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} {
|
||||
switch v := v.(*Config); i {
|
||||
case 0:
|
||||
return &v.state
|
||||
case 1:
|
||||
return &v.sizeCache
|
||||
case 2:
|
||||
return &v.unknownFields
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
}
|
||||
type x struct{}
|
||||
out := protoimpl.TypeBuilder{
|
||||
File: protoimpl.DescBuilder{
|
||||
GoPackagePath: reflect.TypeOf(x{}).PkgPath(),
|
||||
RawDescriptor: file_app_observatory_config_proto_rawDesc,
|
||||
NumEnums: 0,
|
||||
NumMessages: 5,
|
||||
NumExtensions: 0,
|
||||
NumServices: 0,
|
||||
},
|
||||
GoTypes: file_app_observatory_config_proto_goTypes,
|
||||
DependencyIndexes: file_app_observatory_config_proto_depIdxs,
|
||||
MessageInfos: file_app_observatory_config_proto_msgTypes,
|
||||
}.Build()
|
||||
File_app_observatory_config_proto = out.File
|
||||
file_app_observatory_config_proto_rawDesc = nil
|
||||
file_app_observatory_config_proto_goTypes = nil
|
||||
file_app_observatory_config_proto_depIdxs = nil
|
||||
}
|
73
app/observatory/config.proto
Normal file
73
app/observatory/config.proto
Normal file
|
@ -0,0 +1,73 @@
|
|||
syntax = "proto3";
|
||||
|
||||
package xray.core.app.observatory;
|
||||
option csharp_namespace = "Xray.App.Observatory";
|
||||
option go_package = "github.com/xtls/xray-core/app/observatory";
|
||||
option java_package = "com.xray.app.observatory";
|
||||
option java_multiple_files = true;
|
||||
|
||||
message ObservationResult {
|
||||
repeated OutboundStatus status = 1;
|
||||
}
|
||||
|
||||
message OutboundStatus{
|
||||
/* @Document Whether this outbound is usable
|
||||
@Restriction ReadOnlyForUser
|
||||
*/
|
||||
bool alive = 1;
|
||||
/* @Document The time for probe request to finish.
|
||||
@Type time.ms
|
||||
@Restriction ReadOnlyForUser
|
||||
*/
|
||||
int64 delay = 2;
|
||||
/* @Document The last error caused this outbound failed to relay probe request
|
||||
@Restriction NotMachineReadable
|
||||
*/
|
||||
string last_error_reason = 3;
|
||||
/* @Document The outbound tag for this Server
|
||||
@Type id.outboundTag
|
||||
*/
|
||||
string outbound_tag = 4;
|
||||
/* @Document The time this outbound is known to be alive
|
||||
@Type id.outboundTag
|
||||
*/
|
||||
int64 last_seen_time = 5;
|
||||
/* @Document The time this outbound is tried
|
||||
@Type id.outboundTag
|
||||
*/
|
||||
int64 last_try_time = 6;
|
||||
}
|
||||
|
||||
message ProbeResult{
|
||||
/* @Document Whether this outbound is usable
|
||||
@Restriction ReadOnlyForUser
|
||||
*/
|
||||
bool alive = 1;
|
||||
/* @Document The time for probe request to finish.
|
||||
@Type time.ms
|
||||
@Restriction ReadOnlyForUser
|
||||
*/
|
||||
int64 delay = 2;
|
||||
/* @Document The error caused this outbound failed to relay probe request
|
||||
@Restriction NotMachineReadable
|
||||
*/
|
||||
string last_error_reason = 3;
|
||||
}
|
||||
|
||||
message Intensity{
|
||||
/* @Document The time interval for a probe request in ms.
|
||||
@Type time.ms
|
||||
*/
|
||||
uint32 probe_interval = 1;
|
||||
}
|
||||
message Config {
|
||||
/* @Document The selectors for outbound under observation
|
||||
*/
|
||||
repeated string subject_selector = 2;
|
||||
|
||||
string probe_url = 3;
|
||||
|
||||
int64 probe_interval = 4;
|
||||
|
||||
bool enable_concurrency = 5;
|
||||
}
|
9
app/observatory/errors.generated.go
Normal file
9
app/observatory/errors.generated.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package observatory
|
||||
|
||||
import "github.com/xtls/xray-core/common/errors"
|
||||
|
||||
type errPathObjHolder struct{}
|
||||
|
||||
func newError(values ...interface{}) *errors.Error {
|
||||
return errors.New(values...).WithPathObj(errPathObjHolder{})
|
||||
}
|
26
app/observatory/explainErrors.go
Normal file
26
app/observatory/explainErrors.go
Normal file
|
@ -0,0 +1,26 @@
|
|||
package observatory
|
||||
|
||||
import "github.com/xtls/xray-core/common/errors"
|
||||
|
||||
type errorCollector struct {
|
||||
errors *errors.Error
|
||||
}
|
||||
|
||||
func (e *errorCollector) SubmitError(err error) {
|
||||
if e.errors == nil {
|
||||
e.errors = newError("underlying connection error").Base(err)
|
||||
return
|
||||
}
|
||||
e.errors = e.errors.Base(newError("underlying connection error").Base(err))
|
||||
}
|
||||
|
||||
func newErrorCollector() *errorCollector {
|
||||
return &errorCollector{}
|
||||
}
|
||||
|
||||
func (e *errorCollector) UnderlyingError() error {
|
||||
if e.errors == nil {
|
||||
return newError("failed to produce report")
|
||||
}
|
||||
return e.errors
|
||||
}
|
3
app/observatory/observatory.go
Normal file
3
app/observatory/observatory.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package observatory
|
||||
|
||||
//go:generate go run github.com/xtls/xray-core/common/errors/errorgen
|
216
app/observatory/observer.go
Normal file
216
app/observatory/observer.go
Normal file
|
@ -0,0 +1,216 @@
|
|||
package observatory
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/xtls/xray-core/core"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"sort"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/xtls/xray-core/common"
|
||||
v2net "github.com/xtls/xray-core/common/net"
|
||||
"github.com/xtls/xray-core/common/session"
|
||||
"github.com/xtls/xray-core/common/signal/done"
|
||||
"github.com/xtls/xray-core/common/task"
|
||||
"github.com/xtls/xray-core/features/extension"
|
||||
"github.com/xtls/xray-core/features/outbound"
|
||||
"github.com/xtls/xray-core/transport/internet/tagged"
|
||||
)
|
||||
|
||||
type Observer struct {
|
||||
config *Config
|
||||
ctx context.Context
|
||||
|
||||
statusLock sync.Mutex
|
||||
status []*OutboundStatus
|
||||
|
||||
finished *done.Instance
|
||||
|
||||
ohm outbound.Manager
|
||||
}
|
||||
|
||||
func (o *Observer) GetObservation(ctx context.Context) (proto.Message, error) {
|
||||
return &ObservationResult{Status: o.status}, nil
|
||||
}
|
||||
|
||||
func (o *Observer) Type() interface{} {
|
||||
return extension.ObservatoryType()
|
||||
}
|
||||
|
||||
func (o *Observer) Start() error {
|
||||
if o.config != nil && len(o.config.SubjectSelector) != 0 {
|
||||
o.finished = done.New()
|
||||
go o.background()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Observer) Close() error {
|
||||
if o.finished != nil {
|
||||
return o.finished.Close()
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (o *Observer) background() {
|
||||
for !o.finished.Done() {
|
||||
hs, ok := o.ohm.(outbound.HandlerSelector)
|
||||
if !ok {
|
||||
newError("outbound.Manager is not a HandlerSelector").WriteToLog()
|
||||
return
|
||||
}
|
||||
|
||||
outbounds := hs.Select(o.config.SubjectSelector)
|
||||
sort.Strings(outbounds)
|
||||
|
||||
o.updateStatus(outbounds)
|
||||
|
||||
for _, v := range outbounds {
|
||||
result := o.probe(v)
|
||||
o.updateStatusForResult(v, &result)
|
||||
if o.finished.Done() {
|
||||
return
|
||||
}
|
||||
sleepTime := time.Second * 10
|
||||
if o.config.ProbeInterval != 0 {
|
||||
sleepTime = time.Duration(o.config.ProbeInterval)
|
||||
}
|
||||
time.Sleep(sleepTime)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Observer) updateStatus(outbounds []string) {
|
||||
o.statusLock.Lock()
|
||||
defer o.statusLock.Unlock()
|
||||
// TODO should remove old inbound that is removed
|
||||
_ = outbounds
|
||||
}
|
||||
|
||||
func (o *Observer) probe(outbound string) ProbeResult {
|
||||
errorCollectorForRequest := newErrorCollector()
|
||||
|
||||
httpTransport := http.Transport{
|
||||
Proxy: func(*http.Request) (*url.URL, error) {
|
||||
return nil, nil
|
||||
},
|
||||
DialContext: func(ctx context.Context, network string, addr string) (net.Conn, error) {
|
||||
var connection net.Conn
|
||||
taskErr := task.Run(ctx, func() error {
|
||||
// MUST use V2Fly's built in context system
|
||||
dest, err := v2net.ParseDestination(network + ":" + addr)
|
||||
if err != nil {
|
||||
return newError("cannot understand address").Base(err)
|
||||
}
|
||||
trackedCtx := session.TrackedConnectionError(o.ctx, errorCollectorForRequest)
|
||||
conn, err := tagged.Dialer(trackedCtx, dest, outbound)
|
||||
if err != nil {
|
||||
return newError("cannot dial remote address ", dest).Base(err)
|
||||
}
|
||||
connection = conn
|
||||
return nil
|
||||
})
|
||||
if taskErr != nil {
|
||||
return nil, newError("cannot finish connection").Base(taskErr)
|
||||
}
|
||||
return connection, nil
|
||||
},
|
||||
TLSHandshakeTimeout: time.Second * 5,
|
||||
}
|
||||
httpClient := &http.Client{
|
||||
Transport: &httpTransport,
|
||||
CheckRedirect: func(req *http.Request, via []*http.Request) error {
|
||||
return http.ErrUseLastResponse
|
||||
},
|
||||
Jar: nil,
|
||||
Timeout: time.Second * 5,
|
||||
}
|
||||
var GETTime time.Duration
|
||||
err := task.Run(o.ctx, func() error {
|
||||
startTime := time.Now()
|
||||
probeURL := "https://api.v2fly.org/checkConnection.svgz"
|
||||
if o.config.ProbeUrl != "" {
|
||||
probeURL = o.config.ProbeUrl
|
||||
}
|
||||
response, err := httpClient.Get(probeURL)
|
||||
if err != nil {
|
||||
return newError("outbound failed to relay connection").Base(err)
|
||||
}
|
||||
if response.Body != nil {
|
||||
response.Body.Close()
|
||||
}
|
||||
endTime := time.Now()
|
||||
GETTime = endTime.Sub(startTime)
|
||||
return nil
|
||||
})
|
||||
if err != nil {
|
||||
fullerr := newError("underlying connection failed").Base(errorCollectorForRequest.UnderlyingError())
|
||||
fullerr = newError("with outbound handler report").Base(fullerr)
|
||||
fullerr = newError("GET request failed:", err).Base(fullerr)
|
||||
fullerr = newError("the outbound ", outbound, " is dead:").Base(fullerr)
|
||||
fullerr = fullerr.AtInfo()
|
||||
fullerr.WriteToLog()
|
||||
return ProbeResult{Alive: false, LastErrorReason: fullerr.Error()}
|
||||
}
|
||||
newError("the outbound ", outbound, " is alive:", GETTime.Seconds()).AtInfo().WriteToLog()
|
||||
return ProbeResult{Alive: true, Delay: GETTime.Milliseconds()}
|
||||
}
|
||||
|
||||
func (o *Observer) updateStatusForResult(outbound string, result *ProbeResult) {
|
||||
o.statusLock.Lock()
|
||||
defer o.statusLock.Unlock()
|
||||
var status *OutboundStatus
|
||||
if location := o.findStatusLocationLockHolderOnly(outbound); location != -1 {
|
||||
status = o.status[location]
|
||||
} else {
|
||||
status = &OutboundStatus{}
|
||||
o.status = append(o.status, status)
|
||||
}
|
||||
|
||||
status.LastTryTime = time.Now().Unix()
|
||||
status.OutboundTag = outbound
|
||||
status.Alive = result.Alive
|
||||
if result.Alive {
|
||||
status.Delay = result.Delay
|
||||
status.LastSeenTime = status.LastTryTime
|
||||
status.LastErrorReason = ""
|
||||
} else {
|
||||
status.LastErrorReason = result.LastErrorReason
|
||||
status.Delay = 99999999
|
||||
}
|
||||
}
|
||||
|
||||
func (o *Observer) findStatusLocationLockHolderOnly(outbound string) int {
|
||||
for i, v := range o.status {
|
||||
if v.OutboundTag == outbound {
|
||||
return i
|
||||
}
|
||||
}
|
||||
return -1
|
||||
}
|
||||
|
||||
func New(ctx context.Context, config *Config) (*Observer, error) {
|
||||
var outboundManager outbound.Manager
|
||||
err := core.RequireFeatures(ctx, func(om outbound.Manager) {
|
||||
outboundManager = om
|
||||
})
|
||||
if err != nil {
|
||||
return nil, newError("Cannot get depended features").Base(err)
|
||||
}
|
||||
return &Observer{
|
||||
config: config,
|
||||
ctx: ctx,
|
||||
ohm: outboundManager,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
common.Must(common.RegisterConfig((*Config)(nil), func(ctx context.Context, config interface{}) (interface{}, error) {
|
||||
return New(ctx, config.(*Config))
|
||||
}))
|
||||
}
|
|
@ -1,7 +1,10 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/xtls/xray-core/common/dice"
|
||||
"github.com/xtls/xray-core/features/extension"
|
||||
"github.com/xtls/xray-core/features/outbound"
|
||||
)
|
||||
|
||||
|
@ -41,3 +44,8 @@ func (b *Balancer) PickOutbound() (string, error) {
|
|||
}
|
||||
return tag, nil
|
||||
}
|
||||
func (b *Balancer) InjectContext(ctx context.Context) {
|
||||
if contextReceiver, ok := b.strategy.(extension.ContextReceiver); ok {
|
||||
contextReceiver.InjectContext(ctx)
|
||||
}
|
||||
}
|
||||
|
|
|
@ -708,6 +708,7 @@ type BalancingRule struct {
|
|||
|
||||
Tag string `protobuf:"bytes,1,opt,name=tag,proto3" json:"tag,omitempty"`
|
||||
OutboundSelector []string `protobuf:"bytes,2,rep,name=outbound_selector,json=outboundSelector,proto3" json:"outbound_selector,omitempty"`
|
||||
Strategy string `protobuf:"bytes,3,opt,name=strategy,proto3" json:"strategy,omitempty"`
|
||||
}
|
||||
|
||||
func (x *BalancingRule) Reset() {
|
||||
|
@ -756,6 +757,13 @@ func (x *BalancingRule) GetOutboundSelector() []string {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (x *BalancingRule) GetStrategy() string {
|
||||
if x != nil {
|
||||
return x.Strategy
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
state protoimpl.MessageState
|
||||
sizeCache protoimpl.SizeCache
|
||||
|
@ -1010,36 +1018,38 @@ var file_app_router_config_proto_rawDesc = []byte{
|
|||
0x74, 0x74, 0x72, 0x69, 0x62, 0x75, 0x74, 0x65, 0x73, 0x12, 0x25, 0x0a, 0x0e, 0x64, 0x6f, 0x6d,
|
||||
0x61, 0x69, 0x6e, 0x5f, 0x6d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72, 0x18, 0x11, 0x20, 0x01, 0x28,
|
||||
0x09, 0x52, 0x0d, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x65, 0x72,
|
||||
0x42, 0x0c, 0x0a, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x22, 0x4e,
|
||||
0x42, 0x0c, 0x0a, 0x0a, 0x74, 0x61, 0x72, 0x67, 0x65, 0x74, 0x5f, 0x74, 0x61, 0x67, 0x22, 0x6a,
|
||||
0x0a, 0x0d, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, 0x12,
|
||||
0x10, 0x0a, 0x03, 0x74, 0x61, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x74, 0x61,
|
||||
0x67, 0x12, 0x2b, 0x0a, 0x11, 0x6f, 0x75, 0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x5f, 0x73, 0x65,
|
||||
0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x18, 0x02, 0x20, 0x03, 0x28, 0x09, 0x52, 0x10, 0x6f, 0x75,
|
||||
0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x22, 0x9b,
|
||||
0x02, 0x0a, 0x06, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4f, 0x0a, 0x0f, 0x64, 0x6f, 0x6d,
|
||||
0x61, 0x69, 0x6e, 0x5f, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x01, 0x20, 0x01,
|
||||
0x28, 0x0e, 0x32, 0x26, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f,
|
||||
0x75, 0x74, 0x65, 0x72, 0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x44, 0x6f, 0x6d, 0x61,
|
||||
0x69, 0x6e, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x0e, 0x64, 0x6f, 0x6d, 0x61,
|
||||
0x69, 0x6e, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x30, 0x0a, 0x04, 0x72, 0x75,
|
||||
0x6c, 0x65, 0x18, 0x02, 0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e,
|
||||
0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x69,
|
||||
0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, 0x52, 0x04, 0x72, 0x75, 0x6c, 0x65, 0x12, 0x45, 0x0a, 0x0e,
|
||||
0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x18, 0x03,
|
||||
0x20, 0x03, 0x28, 0x0b, 0x32, 0x1e, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e,
|
||||
0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67,
|
||||
0x52, 0x75, 0x6c, 0x65, 0x52, 0x0d, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52,
|
||||
0x75, 0x6c, 0x65, 0x22, 0x47, 0x0a, 0x0e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, 0x72,
|
||||
0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x08, 0x0a, 0x04, 0x41, 0x73, 0x49, 0x73, 0x10, 0x00, 0x12,
|
||||
0x09, 0x0a, 0x05, 0x55, 0x73, 0x65, 0x49, 0x70, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x70,
|
||||
0x49, 0x66, 0x4e, 0x6f, 0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a,
|
||||
0x49, 0x70, 0x4f, 0x6e, 0x44, 0x65, 0x6d, 0x61, 0x6e, 0x64, 0x10, 0x03, 0x42, 0x4f, 0x0a, 0x13,
|
||||
0x63, 0x6f, 0x6d, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75,
|
||||
0x74, 0x65, 0x72, 0x50, 0x01, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f,
|
||||
0x6d, 0x2f, 0x78, 0x74, 0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65,
|
||||
0x2f, 0x61, 0x70, 0x70, 0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0xaa, 0x02, 0x0f, 0x58, 0x72,
|
||||
0x61, 0x79, 0x2e, 0x41, 0x70, 0x70, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70,
|
||||
0x72, 0x6f, 0x74, 0x6f, 0x33,
|
||||
0x74, 0x62, 0x6f, 0x75, 0x6e, 0x64, 0x53, 0x65, 0x6c, 0x65, 0x63, 0x74, 0x6f, 0x72, 0x12, 0x1a,
|
||||
0x0a, 0x08, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09,
|
||||
0x52, 0x08, 0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x22, 0x9b, 0x02, 0x0a, 0x06, 0x43,
|
||||
0x6f, 0x6e, 0x66, 0x69, 0x67, 0x12, 0x4f, 0x0a, 0x0f, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x5f,
|
||||
0x73, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0e, 0x32, 0x26,
|
||||
0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72,
|
||||
0x2e, 0x43, 0x6f, 0x6e, 0x66, 0x69, 0x67, 0x2e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74,
|
||||
0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x52, 0x0e, 0x64, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74,
|
||||
0x72, 0x61, 0x74, 0x65, 0x67, 0x79, 0x12, 0x30, 0x0a, 0x04, 0x72, 0x75, 0x6c, 0x65, 0x18, 0x02,
|
||||
0x20, 0x03, 0x28, 0x0b, 0x32, 0x1c, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e,
|
||||
0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x69, 0x6e, 0x67, 0x52, 0x75,
|
||||
0x6c, 0x65, 0x52, 0x04, 0x72, 0x75, 0x6c, 0x65, 0x12, 0x45, 0x0a, 0x0e, 0x62, 0x61, 0x6c, 0x61,
|
||||
0x6e, 0x63, 0x69, 0x6e, 0x67, 0x5f, 0x72, 0x75, 0x6c, 0x65, 0x18, 0x03, 0x20, 0x03, 0x28, 0x0b,
|
||||
0x32, 0x1e, 0x2e, 0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74,
|
||||
0x65, 0x72, 0x2e, 0x42, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65,
|
||||
0x52, 0x0d, 0x62, 0x61, 0x6c, 0x61, 0x6e, 0x63, 0x69, 0x6e, 0x67, 0x52, 0x75, 0x6c, 0x65, 0x22,
|
||||
0x47, 0x0a, 0x0e, 0x44, 0x6f, 0x6d, 0x61, 0x69, 0x6e, 0x53, 0x74, 0x72, 0x61, 0x74, 0x65, 0x67,
|
||||
0x79, 0x12, 0x08, 0x0a, 0x04, 0x41, 0x73, 0x49, 0x73, 0x10, 0x00, 0x12, 0x09, 0x0a, 0x05, 0x55,
|
||||
0x73, 0x65, 0x49, 0x70, 0x10, 0x01, 0x12, 0x10, 0x0a, 0x0c, 0x49, 0x70, 0x49, 0x66, 0x4e, 0x6f,
|
||||
0x6e, 0x4d, 0x61, 0x74, 0x63, 0x68, 0x10, 0x02, 0x12, 0x0e, 0x0a, 0x0a, 0x49, 0x70, 0x4f, 0x6e,
|
||||
0x44, 0x65, 0x6d, 0x61, 0x6e, 0x64, 0x10, 0x03, 0x42, 0x4f, 0x0a, 0x13, 0x63, 0x6f, 0x6d, 0x2e,
|
||||
0x78, 0x72, 0x61, 0x79, 0x2e, 0x61, 0x70, 0x70, 0x2e, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x50,
|
||||
0x01, 0x5a, 0x24, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x78, 0x74,
|
||||
0x6c, 0x73, 0x2f, 0x78, 0x72, 0x61, 0x79, 0x2d, 0x63, 0x6f, 0x72, 0x65, 0x2f, 0x61, 0x70, 0x70,
|
||||
0x2f, 0x72, 0x6f, 0x75, 0x74, 0x65, 0x72, 0xaa, 0x02, 0x0f, 0x58, 0x72, 0x61, 0x79, 0x2e, 0x41,
|
||||
0x70, 0x70, 0x2e, 0x52, 0x6f, 0x75, 0x74, 0x65, 0x72, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f,
|
||||
0x33,
|
||||
}
|
||||
|
||||
var (
|
||||
|
|
|
@ -127,6 +127,7 @@ message RoutingRule {
|
|||
message BalancingRule {
|
||||
string tag = 1;
|
||||
repeated string outbound_selector = 2;
|
||||
string strategy = 3;
|
||||
}
|
||||
|
||||
message Config {
|
||||
|
|
57
app/router/strategy_leastping.go
Normal file
57
app/router/strategy_leastping.go
Normal file
|
@ -0,0 +1,57 @@
|
|||
package router
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/xtls/xray-core/core"
|
||||
|
||||
"github.com/xtls/xray-core/app/observatory"
|
||||
"github.com/xtls/xray-core/common"
|
||||
"github.com/xtls/xray-core/features/extension"
|
||||
)
|
||||
|
||||
type LeastPingStrategy struct {
|
||||
ctx context.Context
|
||||
observatory extension.Observatory
|
||||
}
|
||||
|
||||
func (l *LeastPingStrategy) InjectContext(ctx context.Context) {
|
||||
common.Must(core.RequireFeatures(ctx, func(observatory extension.Observatory) error {
|
||||
l.observatory = observatory
|
||||
return nil
|
||||
}))
|
||||
l.ctx = ctx
|
||||
}
|
||||
|
||||
func (l *LeastPingStrategy) PickOutbound(strings []string) string {
|
||||
observeReport, err := l.observatory.GetObservation(l.ctx)
|
||||
if err != nil {
|
||||
newError("cannot get observe report").Base(err).WriteToLog()
|
||||
return ""
|
||||
}
|
||||
outboundsList := outboundList(strings)
|
||||
if result, ok := observeReport.(*observatory.ObservationResult); ok {
|
||||
status := result.Status
|
||||
leastPing := int64(99999999)
|
||||
selectedOutboundName := ""
|
||||
for _, v := range status {
|
||||
if outboundsList.contains(v.OutboundTag) && v.Alive && v.Delay < leastPing {
|
||||
selectedOutboundName = v.OutboundTag
|
||||
}
|
||||
}
|
||||
return selectedOutboundName
|
||||
}
|
||||
|
||||
//No way to understand observeReport
|
||||
return ""
|
||||
}
|
||||
|
||||
type outboundList []string
|
||||
|
||||
func (o outboundList) contains(name string) bool {
|
||||
for _, v := range o {
|
||||
if v == name {
|
||||
return true
|
||||
}
|
||||
}
|
||||
return false
|
||||
}
|
|
@ -11,6 +11,7 @@ const (
|
|||
contentSessionKey
|
||||
muxPreferedSessionKey
|
||||
sockoptSessionKey
|
||||
trackedConnectionErrorKey
|
||||
)
|
||||
|
||||
// ContextWithID returns a new context with the given ID.
|
||||
|
@ -84,3 +85,33 @@ func SockoptFromContext(ctx context.Context) *Sockopt {
|
|||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func GetForcedOutboundTagFromContext(ctx context.Context) string {
|
||||
if ContentFromContext(ctx) == nil {
|
||||
return ""
|
||||
}
|
||||
return ContentFromContext(ctx).Attribute("forcedOutboundTag")
|
||||
}
|
||||
|
||||
func SetForcedOutboundTagToContext(ctx context.Context, tag string) context.Context {
|
||||
if contentFromContext := ContentFromContext(ctx); contentFromContext == nil {
|
||||
ctx = ContextWithContent(ctx, &Content{})
|
||||
}
|
||||
ContentFromContext(ctx).SetAttribute("forcedOutboundTag", tag)
|
||||
return ctx
|
||||
}
|
||||
|
||||
type TrackedRequestErrorFeedback interface {
|
||||
SubmitError(err error)
|
||||
}
|
||||
|
||||
func SubmitOutboundErrorToOriginator(ctx context.Context, err error) {
|
||||
if errorTracker := ctx.Value(trackedConnectionErrorKey); errorTracker != nil {
|
||||
errorTracker := errorTracker.(TrackedRequestErrorFeedback)
|
||||
errorTracker.SubmitError(err)
|
||||
}
|
||||
}
|
||||
|
||||
func TrackedConnectionError(ctx context.Context, tracker TrackedRequestErrorFeedback) context.Context {
|
||||
return context.WithValue(ctx, trackedConnectionErrorKey, tracker)
|
||||
}
|
||||
|
|
7
features/extension/contextreceiver.go
Normal file
7
features/extension/contextreceiver.go
Normal file
|
@ -0,0 +1,7 @@
|
|||
package extension
|
||||
|
||||
import "context"
|
||||
|
||||
type ContextReceiver interface {
|
||||
InjectContext(ctx context.Context)
|
||||
}
|
19
features/extension/observatory.go
Normal file
19
features/extension/observatory.go
Normal file
|
@ -0,0 +1,19 @@
|
|||
package extension
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/golang/protobuf/proto"
|
||||
|
||||
"github.com/xtls/xray-core/features"
|
||||
)
|
||||
|
||||
type Observatory interface {
|
||||
features.Feature
|
||||
|
||||
GetObservation(ctx context.Context) (proto.Message, error)
|
||||
}
|
||||
|
||||
func ObservatoryType() interface{} {
|
||||
return (*Observatory)(nil)
|
||||
}
|
14
infra/conf/observatory.go
Normal file
14
infra/conf/observatory.go
Normal file
|
@ -0,0 +1,14 @@
|
|||
package conf
|
||||
|
||||
import (
|
||||
"github.com/golang/protobuf/proto"
|
||||
"github.com/xtls/xray-core/app/observatory"
|
||||
)
|
||||
|
||||
type ObservatoryConfig struct {
|
||||
SubjectSelector []string `json:"subjectSelector"`
|
||||
}
|
||||
|
||||
func (o ObservatoryConfig) Build() (proto.Message, error) {
|
||||
return &observatory.Config{SubjectSelector: o.SubjectSelector}, nil
|
||||
}
|
|
@ -6,12 +6,12 @@ import (
|
|||
"os"
|
||||
"strings"
|
||||
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
|
||||
"github.com/xtls/xray-core/app/dispatcher"
|
||||
"github.com/xtls/xray-core/app/proxyman"
|
||||
"github.com/xtls/xray-core/app/stats"
|
||||
"github.com/xtls/xray-core/common/serial"
|
||||
"github.com/xtls/xray-core/transport/internet"
|
||||
|
||||
core "github.com/xtls/xray-core/core"
|
||||
"github.com/xtls/xray-core/transport/internet/xtls"
|
||||
)
|
||||
|
@ -405,6 +405,7 @@ type Config struct {
|
|||
Stats *StatsConfig `json:"stats"`
|
||||
Reverse *ReverseConfig `json:"reverse"`
|
||||
FakeDNS *FakeDNSConfig `json:"fakeDns"`
|
||||
Observatory *ObservatoryConfig `json:"observatory"`
|
||||
}
|
||||
|
||||
func (c *Config) findInboundTag(tag string) int {
|
||||
|
@ -611,6 +612,14 @@ func (c *Config) Build() (*core.Config, error) {
|
|||
config.App = append(config.App, serial.ToTypedMessage(r))
|
||||
}
|
||||
|
||||
if c.Observatory != nil {
|
||||
r, err := c.Observatory.Build()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
config.App = append(config.App, serial.ToTypedMessage(r))
|
||||
}
|
||||
|
||||
var inbounds []InboundDetourConfig
|
||||
|
||||
if c.InboundConfig != nil {
|
||||
|
|
|
@ -23,6 +23,9 @@ import (
|
|||
_ "github.com/xtls/xray-core/app/router"
|
||||
_ "github.com/xtls/xray-core/app/stats"
|
||||
|
||||
// Fix dependency cycle caused by core import in internet package
|
||||
_ "github.com/xtls/xray-core/transport/internet/tagged/taggedimpl"
|
||||
|
||||
// Inbound and outbound proxies.
|
||||
_ "github.com/xtls/xray-core/proxy/blackhole"
|
||||
_ "github.com/xtls/xray-core/proxy/dns"
|
||||
|
|
11
transport/internet/tagged/tagged.go
Normal file
11
transport/internet/tagged/tagged.go
Normal file
|
@ -0,0 +1,11 @@
|
|||
package tagged
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
)
|
||||
|
||||
type DialFunc func(ctx context.Context, dest net.Destination, tag string) (net.Conn, error)
|
||||
|
||||
var Dialer DialFunc
|
9
transport/internet/tagged/taggedimpl/errors.generated.go
Normal file
9
transport/internet/tagged/taggedimpl/errors.generated.go
Normal file
|
@ -0,0 +1,9 @@
|
|||
package taggedimpl
|
||||
|
||||
import "github.com/xtls/xray-core/common/errors"
|
||||
|
||||
type errPathObjHolder struct{}
|
||||
|
||||
func newError(values ...interface{}) *errors.Error {
|
||||
return errors.New(values...).WithPathObj(errPathObjHolder{})
|
||||
}
|
46
transport/internet/tagged/taggedimpl/impl.go
Normal file
46
transport/internet/tagged/taggedimpl/impl.go
Normal file
|
@ -0,0 +1,46 @@
|
|||
package taggedimpl
|
||||
|
||||
import (
|
||||
"context"
|
||||
"github.com/xtls/xray-core/common/net/cnc"
|
||||
"github.com/xtls/xray-core/core"
|
||||
|
||||
"github.com/xtls/xray-core/common/net"
|
||||
"github.com/xtls/xray-core/common/session"
|
||||
"github.com/xtls/xray-core/features/routing"
|
||||
"github.com/xtls/xray-core/transport/internet/tagged"
|
||||
)
|
||||
|
||||
func DialTaggedOutbound(ctx context.Context, dest net.Destination, tag string) (net.Conn, error) {
|
||||
var dispatcher routing.Dispatcher
|
||||
if core.FromContext(ctx) == nil {
|
||||
return nil, newError("Instance context variable is not in context, dial denied. ")
|
||||
}
|
||||
if err := core.RequireFeatures(ctx, func(dispatcherInstance routing.Dispatcher) {
|
||||
dispatcher = dispatcherInstance
|
||||
}); err != nil {
|
||||
return nil, newError("Required Feature dispatcher not resolved").Base(err)
|
||||
}
|
||||
|
||||
content := new(session.Content)
|
||||
content.SkipDNSResolve = true
|
||||
|
||||
ctx = session.ContextWithContent(ctx, content)
|
||||
ctx = session.SetForcedOutboundTagToContext(ctx, tag)
|
||||
|
||||
r, err := dispatcher.Dispatch(ctx, dest)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
var readerOpt cnc.ConnectionOption
|
||||
if dest.Network == net.Network_TCP {
|
||||
readerOpt = cnc.ConnectionOutputMulti(r.Reader)
|
||||
} else {
|
||||
readerOpt = cnc.ConnectionOutputMultiUDP(r.Reader)
|
||||
}
|
||||
return cnc.NewConnection(cnc.ConnectionInputMulti(r.Writer), readerOpt), nil
|
||||
}
|
||||
|
||||
func init() {
|
||||
tagged.Dialer = DialTaggedOutbound
|
||||
}
|
3
transport/internet/tagged/taggedimpl/taggedimpl.go
Normal file
3
transport/internet/tagged/taggedimpl/taggedimpl.go
Normal file
|
@ -0,0 +1,3 @@
|
|||
package taggedimpl
|
||||
|
||||
//go:generate go run github.com/xtls/xray-core/common/errors/errorgen
|
Loading…
Reference in a new issue