-
Notifications
You must be signed in to change notification settings - Fork 6
/
peer_connection.go
454 lines (388 loc) · 12.3 KB
/
peer_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
//////////////////////////////////////////////////////////////////////////////
//
// PeerConnection implements a WebRTC native client modeled after the W3C API.
//
// Copyright (c) 2019 Lanikai Labs. All rights reserved.
//
//////////////////////////////////////////////////////////////////////////////
package alohartc
import (
"bytes"
"context"
"crypto"
"crypto/rand"
"crypto/x509"
"encoding/base64"
"fmt"
"strconv"
"strings"
"time"
"github.com/lanikai/alohartc/internal/dtls" // subtree merged pions/dtls
"github.com/lanikai/alohartc/internal/ice"
"github.com/lanikai/alohartc/internal/media"
"github.com/lanikai/alohartc/internal/mux"
"github.com/lanikai/alohartc/internal/rtp"
"github.com/lanikai/alohartc/internal/sdp"
)
const (
sdpUsername = "lanikai"
nalTypeSingleTimeAggregationPacketA = 24
nalReferenceIndicatorPriority1 = 1 << 5
nalReferenceIndicatorPriority2 = 2 << 5
nalReferenceIndicatorPriority3 = 3 << 5
naluBufferSize = 2 * 1024 * 1024
keyLen = 16
saltLen = 14
maxSRTCPSize = 65536
connectTimeout = 10 * time.Second
)
type PeerConnection struct {
// Local context (for signaling)
ctx context.Context
cancel context.CancelFunc
// Local session description
localDescription sdp.Session
// Remote peer session description
remoteDescription sdp.Session
// RTP payload type (negotiated via SDP)
DynamicType uint8
iceAgent *ice.Agent
remoteCandidates chan ice.Candidate
// Callback when a local ICE candidate is available.
OnIceCandidate func(*ice.Candidate)
// Local certificate
certificate *x509.Certificate // Public key
privateKey crypto.PrivateKey // Private key
fingerprint string
// Media tracks
localAudio media.AudioSource
localVideo media.VideoSource
}
// Must is a helper that wraps a call to a function returning
// (*PeerConnection, error) and panics if the error is non-nil. It is intended
// for use in variable initializations such as
// var pc = alohartc.Must(alohartc.NewPeerConnection(config))
func Must(pc *PeerConnection, err error) *PeerConnection {
if err != nil {
panic(err)
}
return pc
}
// NewPeerConnection creates a new peer connection object
func NewPeerConnection(config Config) (*PeerConnection, error) {
return NewPeerConnectionWithContext(context.Background(), config)
}
// NewPeerConnectionWithContext creates a new peer connection object
func NewPeerConnectionWithContext(ctx context.Context, config Config) (*PeerConnection, error) {
// Create cancelable context, derived from upstream context
ctx, cancel := context.WithCancel(ctx)
// Create new peer connection (with local audio and video)
pc := &PeerConnection{
ctx: ctx,
cancel: cancel,
localAudio: config.LocalAudio,
localVideo: config.LocalVideo,
iceAgent: ice.NewAgent(),
remoteCandidates: make(chan ice.Candidate, 4),
// Set initial dummy handler for local ICE candidates.
OnIceCandidate: func(c *ice.Candidate) {
log.Warn("No OnICECandidate handler: %v", c)
},
}
var err error
// Dynamically generate a certificate for the peer connection
if pc.certificate, pc.privateKey, err = dtls.GenerateSelfSigned(); err != nil {
return nil, err
}
// Compute certificate fingerprint for later inclusion in SDP offer/answer
if pc.fingerprint, err = dtls.Fingerprint(pc.certificate, dtls.HashAlgorithmSHA256); err != nil {
return nil, err
}
return pc, nil
}
// Create SDP answer. Only needs SDP offer, no ICE candidates.
func (pc *PeerConnection) createAnswer() (sdp.Session, error) {
s := sdp.Session{
Version: 0,
Origin: sdp.Origin{
Username: sdpUsername,
SessionId: strconv.FormatInt(time.Now().UnixNano(), 10),
SessionVersion: 2,
NetworkType: "IN",
AddressType: "IP4",
Address: "127.0.0.1",
},
Name: "-",
Time: []sdp.Time{
{nil, nil},
},
Attributes: []sdp.Attribute{
{"group", pc.remoteDescription.GetAttr("group")},
},
}
for _, remoteMedia := range pc.remoteDescription.Media {
type payloadTypeAttributes struct {
nack bool
pli bool
fmtp string
codec string
reject bool
}
supportedPayloadTypes := make(map[int]*payloadTypeAttributes)
// Search attributes for supported codecs
for _, attr := range remoteMedia.Attributes {
var pt int
var text string
// Parse payload type from attribute. Will bin by payload type.
pt = -1
switch attr.Key {
case "fmtp", "rtcp-fb", "rtpmap":
if _, err := fmt.Sscanf(
attr.Value, "%3d %s", &pt, &text,
); err != nil {
log.Warn(fmt.Sprintf("malformed %s", attr.Key))
break // switch
}
}
if pt < 0 {
continue // Ignore unsupported attributes
}
if _, ok := supportedPayloadTypes[pt]; !ok {
supportedPayloadTypes[pt] = &payloadTypeAttributes{}
}
switch attr.Key {
case "rtpmap":
switch text {
case "H264/90000":
supportedPayloadTypes[pt].codec = text
}
case "rtcp-fb":
switch text {
case "nack":
supportedPayloadTypes[pt].nack = true
}
case "fmtp":
supportedPayloadTypes[pt].fmtp = text
if !strings.Contains(text, "packetization-mode=1") {
supportedPayloadTypes[pt].reject = true
}
if !strings.Contains(text, "profile-level-id=42") {
supportedPayloadTypes[pt].reject = true
}
}
}
// Require 24 and 128 bits of randomness for ufrag and pwd, respectively
rnd := make([]byte, 3+16)
if _, err := rand.Read(rnd); err != nil {
return sdp.Session{}, err
}
// Base64 encode ice-ufrag and ice-pwd
ufrag := base64.StdEncoding.EncodeToString(rnd[0:3])
pwd := base64.StdEncoding.EncodeToString(rnd[3:])
// Media description with first part of attributes
m := sdp.Media{
Type: "video",
Port: 9,
Proto: "UDP/TLS/RTP/SAVPF",
Connection: &sdp.Connection{
NetworkType: "IN",
AddressType: "IP4",
Address: "0.0.0.0",
},
Attributes: []sdp.Attribute{
{"mid", remoteMedia.GetAttr("mid")},
{"rtcp", "9 IN IP4 0.0.0.0"},
{"ice-ufrag", ufrag},
{"ice-pwd", pwd},
{"ice-options", "trickle"},
{"ice-options", "ice2"},
{"fingerprint", "sha-256 " + strings.ToUpper(pc.fingerprint)},
{"setup", "active"},
{"sendonly", ""},
{"rtcp-mux", ""},
{"rtcp-rsize", ""},
},
}
// Additional attributes per payload type
for pt, a := range supportedPayloadTypes {
switch {
case "H264/90000" == a.codec && "" != a.fmtp && !a.reject:
m.Attributes = append(
m.Attributes,
sdp.Attribute{"rtpmap", fmt.Sprintf("%d %s", pt, a.codec)},
)
if a.nack {
m.Attributes = append(
m.Attributes,
sdp.Attribute{"rtcp-fb", fmt.Sprintf("%d nack", pt)},
)
}
m.Attributes = append(
m.Attributes,
sdp.Attribute{"fmtp", fmt.Sprintf("%d %s", pt, a.fmtp)},
)
m.Format = append(m.Format, strconv.Itoa(pt))
// TODO [chris] Fix payload type selection. Currently we're
// rejecting essentially all but one payload type, assigned
// here. However, we should be prepared to receive RTP flows
// for each accepted payload type.
pc.DynamicType = uint8(pt)
}
}
// Final attributes
m.Attributes = append(
m.Attributes,
[]sdp.Attribute{
{"ssrc", "2541098696 cname:cYhx/N8U7h7+3GW3"},
{"ssrc", "2541098696 msid:SdWLKyaNRoUSWQ7BzkKGcbCWcuV7rScYxCAv e9b60276-a415-4a66-8395-28a893918d4c"},
{"ssrc", "2541098696 mslabel:SdWLKyaNRoUSWQ7BzkKGcbCWcuV7rScYxCAv"},
{"ssrc", "2541098696 label:e9b60276-a415-4a66-8395-28a893918d4c"},
}...,
)
s.Media = append(s.Media, m)
}
pc.localDescription = s
return s, nil
}
// Set remote SDP offer. Return SDP answer.
func (pc *PeerConnection) SetRemoteDescription(sdpOffer string) (sdpAnswer string, err error) {
offer, err := sdp.ParseSession(sdpOffer)
if err != nil {
return
}
pc.remoteDescription = offer
answer, err := pc.createAnswer()
if err != nil {
return
}
mid := offer.Media[0].GetAttr("mid")
remoteUfrag := offer.Media[0].GetAttr("ice-ufrag")
localUfrag := answer.Media[0].GetAttr("ice-ufrag")
username := remoteUfrag + ":" + localUfrag
localPassword := answer.Media[0].GetAttr("ice-pwd")
remotePassword := offer.Media[0].GetAttr("ice-pwd")
pc.iceAgent.Configure(mid, username, localPassword, remotePassword)
// ICE gathering begins implicitly after offer/answer exchange.
go pc.startGathering()
return answer.String(), nil
}
func (pc *PeerConnection) startGathering() {
log.Debug("Starting ICE gathering")
lcand := pc.iceAgent.Start(pc.ctx, pc.remoteCandidates)
for {
select {
case c, more := <-lcand:
if !more {
// Signal end-of-candidates.
pc.OnIceCandidate(nil)
return
}
pc.OnIceCandidate(&c)
case <-pc.ctx.Done():
return
}
}
}
// AddIceCandidate adds a remote ICE candidate.
func (pc *PeerConnection) AddIceCandidate(c *ice.Candidate) {
if c == nil {
// nil means end-of-candidates.
close(pc.remoteCandidates)
pc.remoteCandidates = nil
} else {
select {
case pc.remoteCandidates <- *c:
case <-pc.ctx.Done():
}
}
}
// Stream establishes a connection to the remote peer, and streams media to/from
// the configured tracks. Blocks until an error occurs, or until the
// PeerConnection is closed.
func (pc *PeerConnection) Stream() error {
// Wait for ICE agent to establish a connection.
timeoutCtx, _ := context.WithTimeout(pc.ctx, connectTimeout)
dataStream, err := pc.iceAgent.GetDataStream(timeoutCtx)
if err != nil {
return err
}
defer dataStream.Close()
// Instantiate a new net.Conn multiplexer
dataMux := mux.NewMux(dataStream, 8192)
defer dataMux.Close()
// Instantiate a new endpoint for DTLS from multiplexer
dtlsEndpoint := dataMux.NewEndpoint(mux.MatchDTLS)
// Instantiate a new endpoint for SRTP from multiplexer
srtpEndpoint := dataMux.NewEndpoint(func(b []byte) bool {
// First byte looks like 10??????, representing RTP version 2.
return b[0]&0xb0 == 0x80
})
// Configuration for DTLS handshake, namely certificate and private key
config := &dtls.Config{Certificate: pc.certificate, PrivateKey: pc.privateKey}
// Initiate a DTLS handshake as a client
dtlsConn, err := dtls.Client(dtlsEndpoint, config)
if err != nil {
return err
}
// Create SRTP keys from DTLS handshake (see RFC5764 Section 4.2)
keys, err := dtlsConn.ExportKeyingMaterial("EXTRACTOR-dtls_srtp", nil, 2*keyLen+2*saltLen)
if err != nil {
return err
}
keyReader := bytes.NewBuffer(keys)
writeKey := keyReader.Next(keyLen)
readKey := keyReader.Next(keyLen)
writeSalt := keyReader.Next(saltLen)
readSalt := keyReader.Next(saltLen)
rtpSession := rtp.NewSession(rtp.SessionOptions{
MuxConn: srtpEndpoint, // rtcp-mux assumed
ReadKey: readKey,
ReadSalt: readSalt,
WriteKey: writeKey,
WriteSalt: writeSalt,
})
videoStreamOpts := rtp.StreamOptions{
Direction: "sendonly",
}
for _, m := range pc.localDescription.Media {
if m.Type == "video" {
fmt.Sscanf(m.GetAttr("ssrc"), "%d cname:%s", &videoStreamOpts.LocalSSRC, &videoStreamOpts.LocalCNAME)
break
}
}
for _, m := range pc.remoteDescription.Media {
if m.Type == "video" {
fmt.Sscanf(m.GetAttr("ssrc"), "%d cname:%s", &videoStreamOpts.RemoteSSRC, &videoStreamOpts.RemoteCNAME)
break
}
}
videoStream := rtpSession.AddStream(videoStreamOpts)
go videoStream.SendVideo(pc.ctx.Done(), pc.DynamicType, pc.localVideo)
//rtpSession, err := rtp.NewSecureSession(rtpEndpoint, readKey, readSalt, writeKey, writeSalt)
//go streamH264(pc.ctx, pc.localVideoTrack, rtpSession.NewH264Stream(ssrc, cname))
// Start goroutine for processing incoming SRTCP packets
//go srtcpReaderRunloop(dataMux, readKey, readSalt)
// Begin a new SRTP session
//srtpSession, err := srtp.NewSession(srtpEndpoint, pc.DynamicType, writeKey, writeSalt)
//if err != nil {
// return err
//}
// There are two termination conditions that we need to deal with here:
// 1. Context cancellation. If Close() is called explicitly, or if the
// parent context is canceled, we should terminate cleanly.
// 2. Connection timeout. If the remote peer disconnects unexpectedly, the
// read loop on the underlying net.UDPConn will time out. The associated
// ice.DataStream will then be marked dead, which we check for here.
select {
case <-pc.ctx.Done():
return nil
case <-dataStream.Done():
return dataStream.Err()
}
}
// Close the peer connection
func (pc *PeerConnection) Close() {
log.Info("Closing peer connection")
// Cancel context to notify goroutines to exit.
pc.cancel()
}