mirror of
https://github.com/XTLS/Xray-core.git
synced 2024-11-25 10:01:28 +00:00
2becdd6414
When Close and Push are called concurrently, it may happen that Push attempts to write to an already-closed channel, and trigger a panic. From a user perspective, it results in logs like this: http: panic serving 172.19.0.6:50476: send on closed channel It's probably triggered when download is closed at the same time an upload packet is submitted. These panics don't crash the server and the inbound is still usable.
131 lines
2.6 KiB
Go
131 lines
2.6 KiB
Go
package splithttp
|
|
|
|
// upload_queue is a specialized priorityqueue + channel to reorder generic
|
|
// packets by a sequence number
|
|
|
|
import (
|
|
"container/heap"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/xtls/xray-core/common/errors"
|
|
)
|
|
|
|
type Packet struct {
|
|
Payload []byte
|
|
Seq uint64
|
|
}
|
|
|
|
type uploadQueue struct {
|
|
pushedPackets chan Packet
|
|
writeCloseMutex sync.Mutex
|
|
heap uploadHeap
|
|
nextSeq uint64
|
|
closed bool
|
|
maxPackets int
|
|
}
|
|
|
|
func NewUploadQueue(maxPackets int) *uploadQueue {
|
|
return &uploadQueue{
|
|
pushedPackets: make(chan Packet, maxPackets),
|
|
heap: uploadHeap{},
|
|
nextSeq: 0,
|
|
closed: false,
|
|
maxPackets: maxPackets,
|
|
}
|
|
}
|
|
|
|
func (h *uploadQueue) Push(p Packet) error {
|
|
h.writeCloseMutex.Lock()
|
|
defer h.writeCloseMutex.Unlock()
|
|
|
|
if h.closed {
|
|
return errors.New("splithttp packet queue closed")
|
|
}
|
|
|
|
h.pushedPackets <- p
|
|
return nil
|
|
}
|
|
|
|
func (h *uploadQueue) Close() error {
|
|
h.writeCloseMutex.Lock()
|
|
defer h.writeCloseMutex.Unlock()
|
|
|
|
h.closed = true
|
|
close(h.pushedPackets)
|
|
return nil
|
|
}
|
|
|
|
func (h *uploadQueue) Read(b []byte) (int, error) {
|
|
if h.closed {
|
|
return 0, io.EOF
|
|
}
|
|
|
|
if len(h.heap) == 0 {
|
|
packet, more := <-h.pushedPackets
|
|
if !more {
|
|
return 0, io.EOF
|
|
}
|
|
heap.Push(&h.heap, packet)
|
|
}
|
|
|
|
for len(h.heap) > 0 {
|
|
packet := heap.Pop(&h.heap).(Packet)
|
|
n := 0
|
|
|
|
if packet.Seq == h.nextSeq {
|
|
copy(b, packet.Payload)
|
|
n = min(len(b), len(packet.Payload))
|
|
|
|
if n < len(packet.Payload) {
|
|
// partial read
|
|
packet.Payload = packet.Payload[n:]
|
|
heap.Push(&h.heap, packet)
|
|
} else {
|
|
h.nextSeq = packet.Seq + 1
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
// misordered packet
|
|
if packet.Seq > h.nextSeq {
|
|
if len(h.heap) > h.maxPackets {
|
|
// the "reassembly buffer" is too large, and we want to
|
|
// constrain memory usage somehow. let's tear down the
|
|
// connection, and hope the application retries.
|
|
return 0, errors.New("packet queue is too large")
|
|
}
|
|
heap.Push(&h.heap, packet)
|
|
packet2, more := <-h.pushedPackets
|
|
if !more {
|
|
return 0, io.EOF
|
|
}
|
|
heap.Push(&h.heap, packet2)
|
|
}
|
|
}
|
|
|
|
return 0, nil
|
|
}
|
|
|
|
// heap code directly taken from https://pkg.go.dev/container/heap
|
|
type uploadHeap []Packet
|
|
|
|
func (h uploadHeap) Len() int { return len(h) }
|
|
func (h uploadHeap) Less(i, j int) bool { return h[i].Seq < h[j].Seq }
|
|
func (h uploadHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
|
|
|
|
func (h *uploadHeap) Push(x any) {
|
|
// Push and Pop use pointer receivers because they modify the slice's length,
|
|
// not just its contents.
|
|
*h = append(*h, x.(Packet))
|
|
}
|
|
|
|
func (h *uploadHeap) Pop() any {
|
|
old := *h
|
|
n := len(old)
|
|
x := old[n-1]
|
|
*h = old[0 : n-1]
|
|
return x
|
|
}
|