forked from apernet/quic-go
-
Notifications
You must be signed in to change notification settings - Fork 11
/
conn_id_manager.go
214 lines (189 loc) · 6.66 KB
/
conn_id_manager.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
package quic
import (
"fmt"
"github.com/metacubex/quic-go/internal/protocol"
"github.com/metacubex/quic-go/internal/qerr"
"github.com/metacubex/quic-go/internal/utils"
list "github.com/metacubex/quic-go/internal/utils/linkedlist"
"github.com/metacubex/quic-go/internal/wire"
)
type newConnID struct {
SequenceNumber uint64
ConnectionID protocol.ConnectionID
StatelessResetToken protocol.StatelessResetToken
}
type connIDManager struct {
queue list.List[newConnID]
handshakeComplete bool
activeSequenceNumber uint64
highestRetired uint64
activeConnectionID protocol.ConnectionID
activeStatelessResetToken *protocol.StatelessResetToken
// We change the connection ID after sending on average
// protocol.PacketsPerConnectionID packets. The actual value is randomized
// hide the packet loss rate from on-path observers.
rand utils.Rand
packetsSinceLastChange uint32
packetsPerConnectionID uint32
addStatelessResetToken func(protocol.StatelessResetToken)
removeStatelessResetToken func(protocol.StatelessResetToken)
queueControlFrame func(wire.Frame)
}
func newConnIDManager(
initialDestConnID protocol.ConnectionID,
addStatelessResetToken func(protocol.StatelessResetToken),
removeStatelessResetToken func(protocol.StatelessResetToken),
queueControlFrame func(wire.Frame),
) *connIDManager {
return &connIDManager{
activeConnectionID: initialDestConnID,
addStatelessResetToken: addStatelessResetToken,
removeStatelessResetToken: removeStatelessResetToken,
queueControlFrame: queueControlFrame,
}
}
func (h *connIDManager) AddFromPreferredAddress(connID protocol.ConnectionID, resetToken protocol.StatelessResetToken) error {
return h.addConnectionID(1, connID, resetToken)
}
func (h *connIDManager) Add(f *wire.NewConnectionIDFrame) error {
if err := h.add(f); err != nil {
return err
}
if h.queue.Len() >= protocol.MaxActiveConnectionIDs {
return &qerr.TransportError{ErrorCode: qerr.ConnectionIDLimitError}
}
return nil
}
func (h *connIDManager) add(f *wire.NewConnectionIDFrame) error {
// If the NEW_CONNECTION_ID frame is reordered, such that its sequence number is smaller than the currently active
// connection ID or if it was already retired, send the RETIRE_CONNECTION_ID frame immediately.
if f.SequenceNumber < h.activeSequenceNumber || f.SequenceNumber < h.highestRetired {
h.queueControlFrame(&wire.RetireConnectionIDFrame{
SequenceNumber: f.SequenceNumber,
})
return nil
}
// Retire elements in the queue.
// Doesn't retire the active connection ID.
if f.RetirePriorTo > h.highestRetired {
var next *list.Element[newConnID]
for el := h.queue.Front(); el != nil; el = next {
if el.Value.SequenceNumber >= f.RetirePriorTo {
break
}
next = el.Next()
h.queueControlFrame(&wire.RetireConnectionIDFrame{
SequenceNumber: el.Value.SequenceNumber,
})
h.queue.Remove(el)
}
h.highestRetired = f.RetirePriorTo
}
if f.SequenceNumber == h.activeSequenceNumber {
return nil
}
if err := h.addConnectionID(f.SequenceNumber, f.ConnectionID, f.StatelessResetToken); err != nil {
return err
}
// Retire the active connection ID, if necessary.
if h.activeSequenceNumber < f.RetirePriorTo {
// The queue is guaranteed to have at least one element at this point.
h.updateConnectionID()
}
return nil
}
func (h *connIDManager) addConnectionID(seq uint64, connID protocol.ConnectionID, resetToken protocol.StatelessResetToken) error {
// insert a new element at the end
if h.queue.Len() == 0 || h.queue.Back().Value.SequenceNumber < seq {
h.queue.PushBack(newConnID{
SequenceNumber: seq,
ConnectionID: connID,
StatelessResetToken: resetToken,
})
return nil
}
// insert a new element somewhere in the middle
for el := h.queue.Front(); el != nil; el = el.Next() {
if el.Value.SequenceNumber == seq {
if el.Value.ConnectionID != connID {
return fmt.Errorf("received conflicting connection IDs for sequence number %d", seq)
}
if el.Value.StatelessResetToken != resetToken {
return fmt.Errorf("received conflicting stateless reset tokens for sequence number %d", seq)
}
break
}
if el.Value.SequenceNumber > seq {
h.queue.InsertBefore(newConnID{
SequenceNumber: seq,
ConnectionID: connID,
StatelessResetToken: resetToken,
}, el)
break
}
}
return nil
}
func (h *connIDManager) updateConnectionID() {
h.queueControlFrame(&wire.RetireConnectionIDFrame{
SequenceNumber: h.activeSequenceNumber,
})
h.highestRetired = utils.Max(h.highestRetired, h.activeSequenceNumber)
if h.activeStatelessResetToken != nil {
h.removeStatelessResetToken(*h.activeStatelessResetToken)
}
front := h.queue.Remove(h.queue.Front())
h.activeSequenceNumber = front.SequenceNumber
h.activeConnectionID = front.ConnectionID
h.activeStatelessResetToken = &front.StatelessResetToken
h.packetsSinceLastChange = 0
h.packetsPerConnectionID = protocol.PacketsPerConnectionID/2 + uint32(h.rand.Int31n(protocol.PacketsPerConnectionID))
h.addStatelessResetToken(*h.activeStatelessResetToken)
}
func (h *connIDManager) Close() {
if h.activeStatelessResetToken != nil {
h.removeStatelessResetToken(*h.activeStatelessResetToken)
}
}
// is called when the server performs a Retry
// and when the server changes the connection ID in the first Initial sent
func (h *connIDManager) ChangeInitialConnID(newConnID protocol.ConnectionID) {
if h.activeSequenceNumber != 0 {
panic("expected first connection ID to have sequence number 0")
}
h.activeConnectionID = newConnID
}
// is called when the server provides a stateless reset token in the transport parameters
func (h *connIDManager) SetStatelessResetToken(token protocol.StatelessResetToken) {
if h.activeSequenceNumber != 0 {
panic("expected first connection ID to have sequence number 0")
}
h.activeStatelessResetToken = &token
h.addStatelessResetToken(token)
}
func (h *connIDManager) SentPacket() {
h.packetsSinceLastChange++
}
func (h *connIDManager) shouldUpdateConnID() bool {
if !h.handshakeComplete {
return false
}
// initiate the first change as early as possible (after handshake completion)
if h.queue.Len() > 0 && h.activeSequenceNumber == 0 {
return true
}
// For later changes, only change if
// 1. The queue of connection IDs is filled more than 50%.
// 2. We sent at least PacketsPerConnectionID packets
return 2*h.queue.Len() >= protocol.MaxActiveConnectionIDs &&
h.packetsSinceLastChange >= h.packetsPerConnectionID
}
func (h *connIDManager) Get() protocol.ConnectionID {
if h.shouldUpdateConnID() {
h.updateConnectionID()
}
return h.activeConnectionID
}
func (h *connIDManager) SetHandshakeComplete() {
h.handshakeComplete = true
}