Xray-core/transport/internet/splithttp/upload_queue.go
mmmray 2becdd6414
SplitHTTP server: Fix panic during concurrent Close and Push (#3593)
When Close and Push are called concurrently, it may happen that Push attempts to write to an already-closed channel, and trigger a panic.

From a user perspective, it results in logs like this:

    http: panic serving 172.19.0.6:50476: send on closed channel

It's probably triggered when download is closed at the same time an upload packet is submitted.

These panics don't crash the server and the inbound is still usable.
2024-07-26 04:36:55 +02:00

131 lines
2.6 KiB
Go

package splithttp
// upload_queue is a specialized priorityqueue + channel to reorder generic
// packets by a sequence number
import (
"container/heap"
"io"
"sync"
"github.com/xtls/xray-core/common/errors"
)
type Packet struct {
Payload []byte
Seq uint64
}
type uploadQueue struct {
pushedPackets chan Packet
writeCloseMutex sync.Mutex
heap uploadHeap
nextSeq uint64
closed bool
maxPackets int
}
func NewUploadQueue(maxPackets int) *uploadQueue {
return &uploadQueue{
pushedPackets: make(chan Packet, maxPackets),
heap: uploadHeap{},
nextSeq: 0,
closed: false,
maxPackets: maxPackets,
}
}
func (h *uploadQueue) Push(p Packet) error {
h.writeCloseMutex.Lock()
defer h.writeCloseMutex.Unlock()
if h.closed {
return errors.New("splithttp packet queue closed")
}
h.pushedPackets <- p
return nil
}
func (h *uploadQueue) Close() error {
h.writeCloseMutex.Lock()
defer h.writeCloseMutex.Unlock()
h.closed = true
close(h.pushedPackets)
return nil
}
func (h *uploadQueue) Read(b []byte) (int, error) {
if h.closed {
return 0, io.EOF
}
if len(h.heap) == 0 {
packet, more := <-h.pushedPackets
if !more {
return 0, io.EOF
}
heap.Push(&h.heap, packet)
}
for len(h.heap) > 0 {
packet := heap.Pop(&h.heap).(Packet)
n := 0
if packet.Seq == h.nextSeq {
copy(b, packet.Payload)
n = min(len(b), len(packet.Payload))
if n < len(packet.Payload) {
// partial read
packet.Payload = packet.Payload[n:]
heap.Push(&h.heap, packet)
} else {
h.nextSeq = packet.Seq + 1
}
return n, nil
}
// misordered packet
if packet.Seq > h.nextSeq {
if len(h.heap) > h.maxPackets {
// the "reassembly buffer" is too large, and we want to
// constrain memory usage somehow. let's tear down the
// connection, and hope the application retries.
return 0, errors.New("packet queue is too large")
}
heap.Push(&h.heap, packet)
packet2, more := <-h.pushedPackets
if !more {
return 0, io.EOF
}
heap.Push(&h.heap, packet2)
}
}
return 0, nil
}
// heap code directly taken from https://pkg.go.dev/container/heap
type uploadHeap []Packet
func (h uploadHeap) Len() int { return len(h) }
func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq }
func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
func (h *uploadHeap) Push(x any) {
// Push and Pop use pointer receivers because they modify the slice's length,
// not just its contents.
*h = append(*h, x.(Packet))
}
func (h *uploadHeap) Pop() any {
old := *h
n := len(old)
x := old[n-1]
*h = old[0 : n-1]
return x
}