fix: handle error raised by dispatcher

This commit is contained in:
pocketW 2023-01-20 22:42:49 +11:00 committed by yuhan6665
parent dc72cf2c78
commit bf35e9dcd6

View file

@ -51,12 +51,12 @@ func (v *Dispatcher) RemoveRay(dest net.Destination) {
} }
} }
func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *connEntry { func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) (*connEntry, error) {
v.Lock() v.Lock()
defer v.Unlock() defer v.Unlock()
if entry, found := v.conns[dest]; found { if entry, found := v.conns[dest]; found {
return entry return entry, nil
} }
newError("establishing new connection for ", dest).WriteToLog() newError("establishing new connection for ", dest).WriteToLog()
@ -67,7 +67,12 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *c
v.RemoveRay(dest) v.RemoveRay(dest)
} }
timer := signal.CancelAfterInactivity(ctx, removeRay, time.Minute) timer := signal.CancelAfterInactivity(ctx, removeRay, time.Minute)
link, _ := v.dispatcher.Dispatch(ctx, dest)
link, err := v.dispatcher.Dispatch(ctx, dest)
if err != nil {
return nil, newError("failed to dispatch request to ", dest).Base(err)
}
entry := &connEntry{ entry := &connEntry{
link: link, link: link,
timer: timer, timer: timer,
@ -75,14 +80,18 @@ func (v *Dispatcher) getInboundRay(ctx context.Context, dest net.Destination) *c
} }
v.conns[dest] = entry v.conns[dest] = entry
go handleInput(ctx, entry, dest, v.callback) go handleInput(ctx, entry, dest, v.callback)
return entry return entry, nil
} }
func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer) { func (v *Dispatcher) Dispatch(ctx context.Context, destination net.Destination, payload *buf.Buffer) {
// TODO: Add user to destString // TODO: Add user to destString
newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx)) newError("dispatch request to: ", destination).AtDebug().WriteToLog(session.ExportIDToError(ctx))
conn := v.getInboundRay(ctx, destination) conn, err := v.getInboundRay(ctx, destination)
if err != nil {
newError("failed to get inbound").Base(err).WriteToLog(session.ExportIDToError(ctx))
return
}
outputStream := conn.link.Writer outputStream := conn.link.Writer
if outputStream != nil { if outputStream != nil {
if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil { if err := outputStream.WriteMultiBuffer(buf.MultiBuffer{payload}); err != nil {