From e4f9d03bef5a50e2b11887125adde85b0af2403c Mon Sep 17 00:00:00 2001 From: maskedeken <52683904+maskedeken@users.noreply.github.com> Date: Mon, 24 Jun 2024 23:09:24 +0800 Subject: [PATCH] splithttp Read() using blocking mode (#3473) * blocking splithttp read * Add testcase * simplify conditions --------- Co-authored-by: mmmray <142015632+mmmray@users.noreply.github.com> --- transport/internet/splithttp/upload_queue.go | 28 +++++++++---------- .../internet/splithttp/upload_queue_test.go | 22 +++++++++++++++ 2 files changed, 36 insertions(+), 14 deletions(-) create mode 100644 transport/internet/splithttp/upload_queue_test.go diff --git a/transport/internet/splithttp/upload_queue.go b/transport/internet/splithttp/upload_queue.go index 8b2c8f36..62c69411 100644 --- a/transport/internet/splithttp/upload_queue.go +++ b/transport/internet/splithttp/upload_queue.go @@ -47,13 +47,19 @@ func (h *UploadQueue) Close() error { } func (h *UploadQueue) Read(b []byte) (int, error) { - if h.closed && len(h.heap) == 0 && len(h.pushedPackets) == 0 { + if h.closed { return 0, io.EOF } - needMorePackets := false + if len(h.heap) == 0 { + packet, more := <-h.pushedPackets + if !more { + return 0, io.EOF + } + heap.Push(&h.heap, packet) + } - if len(h.heap) > 0 { + for len(h.heap) > 0 { packet := heap.Pop(&h.heap).(Packet) n := 0 @@ -81,18 +87,12 @@ func (h *UploadQueue) Read(b []byte) (int, error) { return 0, newError("packet queue is too large") } heap.Push(&h.heap, packet) - needMorePackets = true + packet2, more := <-h.pushedPackets + if !more { + return 0, io.EOF + } + heap.Push(&h.heap, packet2) } - } else { - needMorePackets = true - } - - if needMorePackets { - packet, more := <-h.pushedPackets - if !more { - return 0, io.EOF - } - heap.Push(&h.heap, packet) } return 0, nil diff --git a/transport/internet/splithttp/upload_queue_test.go b/transport/internet/splithttp/upload_queue_test.go new file mode 100644 index 00000000..8185cd8f --- /dev/null +++ b/transport/internet/splithttp/upload_queue_test.go @@ -0,0 +1,22 @@ +package splithttp_test + +import ( + "testing" + + "github.com/xtls/xray-core/common" + . "github.com/xtls/xray-core/transport/internet/splithttp" +) + +func Test_regression_readzero(t *testing.T) { + q := NewUploadQueue(10) + q.Push(Packet{ + Payload: []byte("x"), + Seq: 0, + }) + buf := make([]byte, 20) + n, err := q.Read(buf) + common.Must(err) + if n != 1 { + t.Error("n=", n) + } +}