From e3276df725dc7be3f808c35e76438d81a736feed Mon Sep 17 00:00:00 2001 From: RPRX <63339210+RPRX@users.noreply.github.com> Date: Sun, 3 Nov 2024 07:25:41 +0000 Subject: [PATCH] XHTTP client: Enable XMUX for download in U-D-S (#3965) --- transport/internet/memory_settings.go | 1 + transport/internet/splithttp/dialer.go | 26 +++++++++++++++++++------- 2 files changed, 20 insertions(+), 7 deletions(-) diff --git a/transport/internet/memory_settings.go b/transport/internet/memory_settings.go index e1625f37..5add530c 100644 --- a/transport/internet/memory_settings.go +++ b/transport/internet/memory_settings.go @@ -7,6 +7,7 @@ type MemoryStreamConfig struct { SecurityType string SecuritySettings interface{} SocketSettings *SocketConfig + DownloadSettings *MemoryStreamConfig } // ToMemoryStreamConfig converts a StreamConfig to MemoryStreamConfig. It returns a default non-nil MemoryStreamConfig for nil input. diff --git a/transport/internet/splithttp/dialer.go b/transport/internet/splithttp/dialer.go index c43783ec..43ba0a46 100644 --- a/transport/internet/splithttp/dialer.go +++ b/transport/internet/splithttp/dialer.go @@ -252,18 +252,24 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me requestURL.Path = transportConfiguration.GetNormalizedPath() + sessionIdUuid.String() requestURL.RawQuery = transportConfiguration.GetNormalizedQuery() - httpClient, muxResource := getHTTPClient(ctx, dest, streamSettings) + httpClient, muxRes := getHTTPClient(ctx, dest, streamSettings) var httpClient2 DialerClient + var muxRes2 *muxResource var requestURL2 url.URL if transportConfiguration.DownloadSettings != nil { + globalDialerAccess.Lock() + if streamSettings.DownloadSettings == nil { + streamSettings.DownloadSettings = common.Must2(internet.ToMemoryStreamConfig(transportConfiguration.DownloadSettings)).(*internet.MemoryStreamConfig) + } + globalDialerAccess.Unlock() + memory2 := streamSettings.DownloadSettings dest2 := net.Destination{ Address: transportConfiguration.DownloadSettings.Address.AsAddress(), // just panic Port: net.Port(transportConfiguration.DownloadSettings.Port), Network: net.Network_TCP, } - memory2 := common.Must2(internet.ToMemoryStreamConfig(transportConfiguration.DownloadSettings)).(*internet.MemoryStreamConfig) - httpClient2, _ = getHTTPClient(ctx, dest2, memory2) // no multiplex + httpClient2, muxRes2 = getHTTPClient(ctx, dest2, memory2) if tls.ConfigFromStreamSettings(memory2) != nil || reality.ConfigFromStreamSettings(memory2) != nil { requestURL2.Scheme = "https" } else { @@ -284,13 +290,19 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me // uploadWriter wrapper, exact size limits can be enforced uploadPipeReader, uploadPipeWriter := pipe.New(pipe.WithSizeLimit(maxUploadSize - 1)) - if muxResource != nil { - muxResource.OpenRequests.Add(1) + if muxRes != nil { + muxRes.OpenRequests.Add(1) + } + if muxRes2 != nil { + muxRes2.OpenRequests.Add(1) } go func() { - if muxResource != nil { - defer muxResource.OpenRequests.Add(-1) + if muxRes != nil { + defer muxRes.OpenRequests.Add(-1) + } + if muxRes2 != nil { + defer muxRes2.OpenRequests.Add(-1) } requestsLimiter := semaphore.New(int(scMaxConcurrentPosts.roll()))