Quic related improvements (#915)

* DialSystem for Quic

DialSystem() is needed in case of Android client,
where the raw conn is protected for vpn service

* Fix client dialer log

Log such as:
tunneling request to tcp:www.google.com:80 via tcp:x.x.x.x:443
the second "tcp" is misleading when using mKcp or quic transport

Remove the second "tcp" and add the correct logging for transport dialer:
- transport/internet/tcp: dialing TCP to tcp:x.x.x.x:443
- transport/internet/quic: dialing quic to udp:x.x.x.x:443

* Quic new stream allocation mode

Currently this is how Quic works: client muxing all tcp and udp traffic through a single session, when there are more than 32 running streams in the session,
the next stream request will fail and open with a new session (port). Imagine lineup the session from left to right:
 |
 |  |
 |  |  |

As the streams finishes, we still open stream from the left, original session. So the base session will always be there and new sessions on the right come and go.
However, either due to QOS or bugs in Quic implementation, the traffic "wear out" the base session. It will become slower and in the end not receiving any data from server side.
I couldn't figure out a solution for this problem at the moment, as a workaround:
       |  |
    |  |  |
 |  |  |

I came up with this new stream allocation mode, that it will never open new streams in the old sessions, but only from current or new session from right.
The keeplive config is turned off from server and client side. This way old sessions will natually close and new sessions keep generating.
Note the frequency of new session is still controlled by the server side. Server can assign a large max stream limit. In this case the new allocation mode will be similar to the current mode.
This commit is contained in:
yuhan6665 2022-01-28 18:11:30 -05:00 committed by GitHub
parent 30a40aa6f1
commit 578d903a9e
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
6 changed files with 30 additions and 37 deletions

View file

@ -74,7 +74,7 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
if err != nil { if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err) return newError("failed to find an available destination").AtWarning().Base(err)
} }
newError("tunneling request to ", destination, " via ", server.Destination()).WriteToLog(session.ExportIDToError(ctx)) newError("tunneling request to ", destination, " via ", network, ":", server.Destination().NetAddr()).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close() defer conn.Close()

View file

@ -77,7 +77,7 @@ func (c *Client) Process(ctx context.Context, link *transport.Link, dialer inter
if err != nil { if err != nil {
return newError("failed to find an available destination").AtWarning().Base(err) return newError("failed to find an available destination").AtWarning().Base(err)
} }
newError("tunneling request to ", destination, " via ", server.Destination()).WriteToLog(session.ExportIDToError(ctx)) newError("tunneling request to ", destination, " via ", server.Destination().NetAddr()).WriteToLog(session.ExportIDToError(ctx))
defer conn.Close() defer conn.Close()

View file

@ -103,7 +103,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
} }
target := outbound.Target target := outbound.Target
newError("tunneling request to ", target, " via ", rec.Destination()).AtInfo().WriteToLog(session.ExportIDToError(ctx)) newError("tunneling request to ", target, " via ", rec.Destination().NetAddr()).AtInfo().WriteToLog(session.ExportIDToError(ctx))
command := protocol.RequestCommandTCP command := protocol.RequestCommandTCP
if target.Network == net.Network_UDP { if target.Network == net.Network_UDP {

View file

@ -84,7 +84,7 @@ func (h *Handler) Process(ctx context.Context, link *transport.Link, dialer inte
} }
target := outbound.Target target := outbound.Target
newError("tunneling request to ", target, " via ", rec.Destination()).WriteToLog(session.ExportIDToError(ctx)) newError("tunneling request to ", target, " via ", rec.Destination().NetAddr()).WriteToLog(session.ExportIDToError(ctx))
command := protocol.RequestCommandTCP command := protocol.RequestCommandTCP
if target.Network == net.Network_UDP { if target.Network == net.Network_UDP {

View file

@ -58,11 +58,13 @@ func isActive(s quic.Session) bool {
func removeInactiveSessions(sessions []*sessionContext) []*sessionContext { func removeInactiveSessions(sessions []*sessionContext) []*sessionContext {
activeSessions := make([]*sessionContext, 0, len(sessions)) activeSessions := make([]*sessionContext, 0, len(sessions))
for _, s := range sessions { for i, s := range sessions {
if isActive(s.session) { if isActive(s.session) {
activeSessions = append(activeSessions, s) activeSessions = append(activeSessions, s)
continue continue
} }
newError("closing quic session at index: ", i).WriteToLog()
if err := s.session.CloseWithError(0, ""); err != nil { if err := s.session.CloseWithError(0, ""); err != nil {
newError("failed to close session").Base(err).WriteToLog() newError("failed to close session").Base(err).WriteToLog()
} }
@ -72,29 +74,13 @@ func removeInactiveSessions(sessions []*sessionContext) []*sessionContext {
} }
if len(activeSessions) < len(sessions) { if len(activeSessions) < len(sessions) {
newError("active quic session reduced from ", len(sessions), " to ", len(activeSessions)).WriteToLog()
return activeSessions return activeSessions
} }
return sessions return sessions
} }
func openStream(sessions []*sessionContext, destAddr net.Addr) *interConn {
for _, s := range sessions {
if !isActive(s.session) {
continue
}
conn, err := s.openStream(destAddr)
if err != nil {
continue
}
return conn
}
return nil
}
func (s *clientSessions) cleanSessions() error { func (s *clientSessions) cleanSessions() error {
s.access.Lock() s.access.Lock()
defer s.access.Unlock() defer s.access.Unlock()
@ -116,7 +102,7 @@ func (s *clientSessions) cleanSessions() error {
return nil return nil
} }
func (s *clientSessions) openConnection(destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) { func (s *clientSessions) openConnection(ctx context.Context, destAddr net.Addr, config *Config, tlsConfig *tls.Config, sockopt *internet.SocketConfig) (stat.Connection, error) {
s.access.Lock() s.access.Lock()
defer s.access.Unlock() defer s.access.Unlock()
@ -131,29 +117,36 @@ func (s *clientSessions) openConnection(destAddr net.Addr, config *Config, tlsCo
sessions = s sessions = s
} }
if true { if len(sessions) > 0 {
conn := openStream(sessions, destAddr) s := sessions[len(sessions)-1]
if conn != nil { if isActive(s.session) {
return conn, nil conn, err := s.openStream(destAddr)
if err == nil {
return conn, nil
}
newError("failed to openStream: ").Base(err).WriteToLog()
} else {
newError("current quic session is not active!").WriteToLog()
} }
} }
sessions = removeInactiveSessions(sessions) sessions = removeInactiveSessions(sessions)
newError("dialing quic to ", dest).WriteToLog()
rawConn, err := internet.ListenSystemPacket(context.Background(), &net.UDPAddr{ rawConn, err := internet.DialSystem(ctx, dest, sockopt)
IP: []byte{0, 0, 0, 0},
Port: 0,
}, sockopt)
if err != nil { if err != nil {
return nil, err return nil, newError("failed to dial to dest: ", err).AtWarning().Base(err)
} }
quicConfig := &quic.Config{ quicConfig := &quic.Config{
ConnectionIDLength: 12, ConnectionIDLength: 12,
KeepAlive: true, KeepAlive: false,
} }
conn, err := wrapSysConn(rawConn.(*net.UDPConn), config) udpConn, _ := rawConn.(*net.UDPConn)
if udpConn == nil {
udpConn = rawConn.(*internet.PacketConnWrapper).Conn.(*net.UDPConn)
}
conn, err := wrapSysConn(udpConn, config)
if err != nil { if err != nil {
rawConn.Close() rawConn.Close()
return nil, err return nil, err
@ -209,7 +202,7 @@ func Dial(ctx context.Context, dest net.Destination, streamSettings *internet.Me
config := streamSettings.ProtocolSettings.(*Config) config := streamSettings.ProtocolSettings.(*Config)
return client.openConnection(destAddr, config, tlsConfig, streamSettings.SocketSettings) return client.openConnection(ctx, destAddr, config, tlsConfig, streamSettings.SocketSettings)
} }
func init() { func init() {

View file

@ -103,7 +103,7 @@ func Listen(ctx context.Context, address net.Address, port net.Port, streamSetti
quicConfig := &quic.Config{ quicConfig := &quic.Config{
ConnectionIDLength: 12, ConnectionIDLength: 12,
KeepAlive: true, KeepAlive: false,
MaxIncomingStreams: 32, MaxIncomingStreams: 32,
MaxIncomingUniStreams: -1, MaxIncomingUniStreams: -1,
} }