Rename quic session to connection

Co-authored-by: 秋のかえで <autmaple@protonmail.com>
This commit is contained in:
yuhan6665 2022-04-09 00:48:02 -04:00
parent c6550aecfc
commit 393d211d1e
3 changed files with 83 additions and 83 deletions

View file

@ -37,7 +37,7 @@ type QUICNameServer struct {
reqID uint32 reqID uint32
name string name string
destination *net.Destination destination *net.Destination
session quic.Session connection quic.Connection
} }
// NewQUICNameServer creates DNS-over-QUIC client object for local resolving // NewQUICNameServer creates DNS-over-QUIC client object for local resolving
@ -194,7 +194,7 @@ func (s *QUICNameServer) sendQuery(ctx context.Context, domain string, clientIP
conn, err := s.openStream(dnsCtx) conn, err := s.openStream(dnsCtx)
if err != nil { if err != nil {
newError("failed to open quic session").Base(err).AtError().WriteToLog() newError("failed to open quic connection").Base(err).AtError().WriteToLog()
return return
} }
@ -322,7 +322,7 @@ func (s *QUICNameServer) QueryIP(ctx context.Context, domain string, clientIP ne
} }
} }
func isActive(s quic.Session) bool { func isActive(s quic.Connection) bool {
select { select {
case <-s.Context().Done(): case <-s.Context().Done():
return false return false
@ -331,17 +331,17 @@ func isActive(s quic.Session) bool {
} }
} }
func (s *QUICNameServer) getSession() (quic.Session, error) { func (s *QUICNameServer) getConnection() (quic.Connection, error) {
var session quic.Session var conn quic.Connection
s.RLock() s.RLock()
session = s.session conn = s.connection
if session != nil && isActive(session) { if conn != nil && isActive(conn) {
s.RUnlock() s.RUnlock()
return session, nil return conn, nil
} }
if session != nil { if conn != nil {
// we're recreating the session, let's create a new one // we're recreating the connection, let's create a new one
_ = session.CloseWithError(0, "") _ = conn.CloseWithError(0, "")
} }
s.RUnlock() s.RUnlock()
@ -349,42 +349,42 @@ func (s *QUICNameServer) getSession() (quic.Session, error) {
defer s.Unlock() defer s.Unlock()
var err error var err error
session, err = s.openSession() conn, err = s.openConnection()
if err != nil { if err != nil {
// This does not look too nice, but QUIC (or maybe quic-go) // This does not look too nice, but QUIC (or maybe quic-go)
// doesn't seem stable enough. // doesn't seem stable enough.
// Maybe retransmissions aren't fully implemented in quic-go? // Maybe retransmissions aren't fully implemented in quic-go?
// Anyways, the simple solution is to make a second try when // Anyways, the simple solution is to make a second try when
// it fails to open the QUIC session. // it fails to open the QUIC connection.
session, err = s.openSession() conn, err = s.openConnection()
if err != nil { if err != nil {
return nil, err return nil, err
} }
} }
s.session = session s.connection = conn
return session, nil return conn, nil
} }
func (s *QUICNameServer) openSession() (quic.Session, error) { func (s *QUICNameServer) openConnection() (quic.Connection, error) {
tlsConfig := tls.Config{} tlsConfig := tls.Config{}
quicConfig := &quic.Config{ quicConfig := &quic.Config{
HandshakeIdleTimeout: handshakeTimeout, HandshakeIdleTimeout: handshakeTimeout,
} }
session, err := quic.DialAddrContext(context.Background(), s.destination.NetAddr(), tlsConfig.GetTLSConfig(tls.WithNextProto("http/1.1", http2.NextProtoTLS, NextProtoDQ)), quicConfig) conn, err := quic.DialAddrContext(context.Background(), s.destination.NetAddr(), tlsConfig.GetTLSConfig(tls.WithNextProto("http/1.1", http2.NextProtoTLS, NextProtoDQ)), quicConfig)
if err != nil { if err != nil {
return nil, err return nil, err
} }
return session, nil return conn, nil
} }
func (s *QUICNameServer) openStream(ctx context.Context) (quic.Stream, error) { func (s *QUICNameServer) openStream(ctx context.Context) (quic.Stream, error) {
session, err := s.getSession() conn, err := s.getConnection()
if err != nil { if err != nil {
return nil, err return nil, err
} }
// open a new stream // open a new stream
return session.OpenStreamSync(ctx) return conn.OpenStreamSync(ctx)
} }

View file

@ -15,39 +15,39 @@ import (
"github.com/xtls/xray-core/transport/internet/tls" "github.com/xtls/xray-core/transport/internet/tls"
) )
type sessionContext struct { type connectionContext struct {
rawConn *sysConn rawConn *sysConn
session quic.Session conn quic.Connection
} }
var errSessionClosed = newError("session closed") var errConnectionClosed = newError("connection closed")
func (c *sessionContext) openStream(destAddr net.Addr) (*interConn, error) { func (c *connectionContext) openStream(destAddr net.Addr) (*interConn, error) {
if !isActive(c.session) { if !isActive(c.conn) {
return nil, errSessionClosed return nil, errConnectionClosed
} }
stream, err := c.session.OpenStream() stream, err := c.conn.OpenStream()
if err != nil { if err != nil {
return nil, err return nil, err
} }
conn := &interConn{ conn := &interConn{
stream: stream, stream: stream,
local: c.session.LocalAddr(), local: c.conn.LocalAddr(),
remote: destAddr, remote: destAddr,
} }
return conn, nil return conn, nil
} }
type clientSessions struct { type clientConnections struct {
access sync.Mutex access sync.Mutex
sessions map[net.Destination][]*sessionContext conns map[net.Destination][]*connectionContext
cleanup *task.Periodic cleanup *task.Periodic
} }
func isActive(s quic.Session) bool { func isActive(s quic.Connection) bool {
select { select {
case <-s.Context().Done(): case <-s.Context().Done():
return false return false
@ -56,81 +56,81 @@ func isActive(s quic.Session) bool {
} }
} }
func removeInactiveSessions(sessions []*sessionContext) []*sessionContext { func removeInactiveConnections(conns []*connectionContext) []*connectionContext {
activeSessions := make([]*sessionContext, 0, len(sessions)) activeConnections := make([]*connectionContext, 0, len(conns))
for i, s := range sessions { for i, s := range conns {
if isActive(s.session) { if isActive(s.conn) {
activeSessions = append(activeSessions, s) activeConnections = append(activeConnections, s)
continue continue
} }
newError("closing quic session at index: ", i).WriteToLog() newError("closing quic connection at index: ", i).WriteToLog()
if err := s.session.CloseWithError(0, ""); err != nil { if err := s.conn.CloseWithError(0, ""); err != nil {
newError("failed to close session").Base(err).WriteToLog() newError("failed to close connection").Base(err).WriteToLog()
} }
if err := s.rawConn.Close(); err != nil { if err := s.rawConn.Close(); err != nil {
newError("failed to close raw connection").Base(err).WriteToLog() newError("failed to close raw connection").Base(err).WriteToLog()
} }
} }
if len(activeSessions) < len(sessions) { if len(activeConnections) < len(conns) {
newError("active quic session reduced from ", len(sessions), " to ", len(activeSessions)).WriteToLog() newError("active quic connection reduced from ", len(conns), " to ", len(activeConnections)).WriteToLog()
return activeSessions return activeConnections
} }
return sessions return conns
} }
func (s *clientSessions) cleanSessions() error { func (s *clientConnections) cleanConnections() error {
s.access.Lock() s.access.Lock()
defer s.access.Unlock() defer s.access.Unlock()
if len(s.sessions) == 0 { if len(s.conns) == 0 {
return nil return nil
} }
newSessionMap := make(map[net.Destination][]*sessionContext) newConnMap := make(map[net.Destination][]*connectionContext)
for dest, sessions := range s.sessions { for dest, conns := range s.conns {
sessions = removeInactiveSessions(sessions) conns = removeInactiveConnections(conns)
if len(sessions) > 0 { if len(conns) > 0 {
newSessionMap[dest] = sessions newConnMap[dest] = conns
} }
} }
s.sessions = newSessionMap s.conns = newConnMap
return nil return nil
} }
func (s *clientSessions) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) { func (s *clientConnections) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
s.access.Lock() s.access.Lock()
defer s.access.Unlock() defer s.access.Unlock()
if s.sessions == nil { if s.conns == nil {
s.sessions = make(map[net.Destination][]*sessionContext) s.conns = make(map[net.Destination][]*connectionContext)
} }
dest := net.DestinationFromAddr(destAddr) dest := net.DestinationFromAddr(destAddr)
var sessions []*sessionContext var conns []*connectionContext
if s, found := s.sessions[dest]; found { if s, found := s.conns[dest]; found {
sessions = s conns = s
} }
if len(sessions) > 0 { if len(conns) > 0 {
s := sessions[len(sessions)-1] s := conns[len(conns)-1]
if isActive(s.session) { if isActive(s.conn) {
conn, err := s.openStream(destAddr) conn, err := s.openStream(destAddr)
if err == nil { if err == nil {
return conn, nil return conn, nil
} }
newError("failed to openStream: ").Base(err).WriteToLog() newError("failed to openStream: ").Base(err).WriteToLog()
} else { } else {
newError("current quic session is not active!").WriteToLog() newError("current quic connection is not active!").WriteToLog()
} }
} }
sessions = removeInactiveSessions(sessions) conns = removeInactiveConnections(conns)
newError("dialing quic to ", dest).WriteToLog() newError("dialing quic to ", dest).WriteToLog()
rawConn, err := internet.DialSystem(ctx, dest, sockopt) rawConn, err := internet.DialSystem(ctx, dest, sockopt)
if err != nil { if err != nil {
@ -146,33 +146,33 @@ func (s *clientSessions) openConnection(ctx context.Context, destAddr net.Addr,
if udpConn == nil { if udpConn == nil {
udpConn = rawConn.(*internet.PacketConnWrapper).Conn.(*net.UDPConn) udpConn = rawConn.(*internet.PacketConnWrapper).Conn.(*net.UDPConn)
} }
conn, err := wrapSysConn(udpConn, config) sysConn, err := wrapSysConn(udpConn, config)
if err != nil { if err != nil {
rawConn.Close() rawConn.Close()
return nil, err return nil, err
} }
session, err := quic.DialContext(context.Background(), conn, destAddr, "", tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig) conn, err := quic.DialContext(context.Background(), sysConn, destAddr, "", tlsConfig.GetTLSConfig(tls.WithDestination(dest)), quicConfig)
if err != nil { if err != nil {
conn.Close() sysConn.Close()
return nil, err return nil, err
} }
context := &sessionContext{ context := &connectionContext{
session: session, conn: conn,
rawConn: conn, rawConn: sysConn,
} }
s.sessions[dest] = append(sessions, context) s.conns[dest] = append(conns, context)
return context.openStream(destAddr) return context.openStream(destAddr)
} }
var client clientSessions var client clientConnections
func init() { func init() {
client.sessions = make(map[net.Destination][]*sessionContext) client.conns = make(map[net.Destination][]*connectionContext)
client.cleanup = &task.Periodic{ client.cleanup = &task.Periodic{
Interval: time.Minute, Interval: time.Minute,
Execute: client.cleanSessions, Execute: client.cleanConnections,
} }
common.Must(client.cleanup.Start()) common.Must(client.cleanup.Start())
} }

View file

@ -22,17 +22,17 @@ type Listener struct {
addConn internet.ConnHandler addConn internet.ConnHandler
} }
func (l *Listener) acceptStreams(session quic.Session) { func (l *Listener) acceptStreams(conn quic.Connection) {
for { for {
stream, err := session.AcceptStream(context.Background()) stream, err := conn.AcceptStream(context.Background())
if err != nil { if err != nil {
newError("failed to accept stream").Base(err).WriteToLog() newError("failed to accept stream").Base(err).WriteToLog()
select { select {
case <-session.Context().Done(): case <-conn.Context().Done():
return return
case <-l.done.Wait(): case <-l.done.Wait():
if err := session.CloseWithError(0, ""); err != nil { if err := conn.CloseWithError(0, ""); err != nil {
newError("failed to close session").Base(err).WriteToLog() newError("failed to close connection").Base(err).WriteToLog()
} }
return return
default: default:
@ -43,8 +43,8 @@ func (l *Listener) acceptStreams(session quic.Session) {
conn := &interConn{ conn := &interConn{
stream: stream, stream: stream,
local: session.LocalAddr(), local: conn.LocalAddr(),
remote: session.RemoteAddr(), remote: conn.RemoteAddr(),
} }
l.addConn(conn) l.addConn(conn)
@ -55,7 +55,7 @@ func (l *Listener) keepAccepting() {
for { for {
conn, err := l.listener.Accept(context.Background()) conn, err := l.listener.Accept(context.Background())
if err != nil { if err != nil {
newError("failed to accept QUIC sessions").Base(err).WriteToLog() newError("failed to accept QUIC connection").Base(err).WriteToLog()
if l.done.Done() { if l.done.Done() {
break break
} }