-
Notifications
You must be signed in to change notification settings - Fork 27
/
client.go
301 lines (249 loc) · 8.1 KB
/
client.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
/*
Package gohlslib is a HLS client and muxer library for the Go programming language.
Examples are available at https://github.com/bluenviron/gohlslib/tree/main/examples
*/
package gohlslib
import (
"context"
"errors"
"fmt"
"log"
"net/http"
"net/url"
"time"
)
const (
clientMaxTracksPerStream = 10
clientMPEGTSSampleQueueSize = 100
clientLiveInitialDistance = 3
clientLiveMaxDistanceFromEnd = 5
clientMaxDTSRTCDiff = 10 * time.Second
)
// ErrClientEOS is returned by Wait() when the stream has ended.
var ErrClientEOS = errors.New("end of stream")
// ClientOnDownloadPrimaryPlaylistFunc is the prototype of Client.OnDownloadPrimaryPlaylist.
type ClientOnDownloadPrimaryPlaylistFunc func(url string)
// ClientOnDownloadStreamPlaylistFunc is the prototype of Client.OnDownloadStreamPlaylist.
type ClientOnDownloadStreamPlaylistFunc func(url string)
// ClientOnDownloadSegmentFunc is the prototype of Client.OnDownloadSegment.
type ClientOnDownloadSegmentFunc func(url string)
// ClientOnDownloadPartFunc is the prototype of Client.OnDownloadPart.
type ClientOnDownloadPartFunc func(url string)
// ClientOnDecodeErrorFunc is the prototype of Client.OnDecodeError.
type ClientOnDecodeErrorFunc func(err error)
// ClientOnRequestFunc is the prototype of the function passed to OnRequest().
type ClientOnRequestFunc func(*http.Request)
// ClientOnTracksFunc is the prototype of the function passed to OnTracks().
type ClientOnTracksFunc func([]*Track) error
// ClientOnDataAV1Func is the prototype of the function passed to OnDataAV1().
type ClientOnDataAV1Func func(pts int64, tu [][]byte)
// ClientOnDataVP9Func is the prototype of the function passed to OnDataVP9().
type ClientOnDataVP9Func func(pts int64, frame []byte)
// ClientOnDataH26xFunc is the prototype of the function passed to OnDataH26x().
type ClientOnDataH26xFunc func(pts int64, dts int64, au [][]byte)
// ClientOnDataMPEG4AudioFunc is the prototype of the function passed to OnDataMPEG4Audio().
type ClientOnDataMPEG4AudioFunc func(pts int64, aus [][]byte)
// ClientOnDataOpusFunc is the prototype of the function passed to OnDataOpus().
type ClientOnDataOpusFunc func(pts int64, packets [][]byte)
type clientOnStreamTracksFunc func(ctx context.Context, isLeading bool, tracks []*Track) ([]*clientTrack, bool)
type clientOnDataFunc func(pts int64, dts int64, data [][]byte)
func clientAbsoluteURL(base *url.URL, relative string) (*url.URL, error) {
u, err := url.Parse(relative)
if err != nil {
return nil, err
}
return base.ResolveReference(u), nil
}
// Client is a HLS client.
type Client struct {
//
// parameters (all optional except URI)
//
// URI of the playlist.
URI string
// HTTP client.
// It defaults to http.DefaultClient.
HTTPClient *http.Client
//
// callbacks (all optional)
//
// called when sending a request to the server.
OnRequest ClientOnRequestFunc
// called when tracks are available.
OnTracks ClientOnTracksFunc
// called before downloading a primary playlist.
OnDownloadPrimaryPlaylist ClientOnDownloadPrimaryPlaylistFunc
// called before downloading a stream playlist.
OnDownloadStreamPlaylist ClientOnDownloadStreamPlaylistFunc
// called before downloading a segment.
OnDownloadSegment ClientOnDownloadSegmentFunc
// called before downloading a part.
OnDownloadPart ClientOnDownloadPartFunc
// called when a non-fatal decode error occurs.
OnDecodeError ClientOnDecodeErrorFunc
//
// private
//
ctx context.Context
ctxCancel func()
playlistURL *url.URL
primaryDownloader *clientPrimaryDownloader
leadingTimeConv clientTimeConv
tracks map[*Track]*clientTrack
// out
outErr chan error
leadingTimeConvReady chan struct{}
}
// Start starts the client.
func (c *Client) Start() error {
if c.HTTPClient == nil {
c.HTTPClient = http.DefaultClient
}
if c.OnRequest == nil {
c.OnRequest = func(_ *http.Request) {}
}
if c.OnTracks == nil {
c.OnTracks = func(_ []*Track) error {
return nil
}
}
if c.OnDownloadPrimaryPlaylist == nil {
c.OnDownloadPrimaryPlaylist = func(u string) {
log.Printf("downloading primary playlist %v", u)
}
}
if c.OnDownloadStreamPlaylist == nil {
c.OnDownloadStreamPlaylist = func(u string) {
log.Printf("downloading stream playlist %v", u)
}
}
if c.OnDownloadSegment == nil {
c.OnDownloadSegment = func(u string) {
log.Printf("downloading segment %v", u)
}
}
if c.OnDownloadPart == nil {
c.OnDownloadPart = func(u string) {
log.Printf("downloading part %v", u)
}
}
if c.OnDecodeError == nil {
c.OnDecodeError = func(err error) {
log.Println(err.Error())
}
}
var err error
c.playlistURL, err = url.Parse(c.URI)
if err != nil {
return err
}
c.ctx, c.ctxCancel = context.WithCancel(context.Background())
c.outErr = make(chan error, 1)
c.leadingTimeConvReady = make(chan struct{})
go c.run()
return nil
}
// Close closes all the Client resources.
func (c *Client) Close() {
c.ctxCancel()
}
// Wait waits for any error of the Client.
func (c *Client) Wait() chan error {
return c.outErr
}
// OnDataAV1 sets a callback that is called when data from an AV1 track is received.
func (c *Client) OnDataAV1(track *Track, cb ClientOnDataAV1Func) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}
// OnDataVP9 sets a callback that is called when data from a VP9 track is received.
func (c *Client) OnDataVP9(track *Track, cb ClientOnDataVP9Func) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data[0])
}
}
// OnDataH26x sets a callback that is called when data from an H26x track is received.
func (c *Client) OnDataH26x(track *Track, cb ClientOnDataH26xFunc) {
c.tracks[track].onData = func(pts int64, dts int64, data [][]byte) {
cb(pts, dts, data)
}
}
// OnDataMPEG4Audio sets a callback that is called when data from a MPEG-4 Audio track is received.
func (c *Client) OnDataMPEG4Audio(track *Track, cb ClientOnDataMPEG4AudioFunc) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}
// OnDataOpus sets a callback that is called when data from an Opus track is received.
func (c *Client) OnDataOpus(track *Track, cb ClientOnDataOpusFunc) {
c.tracks[track].onData = func(pts int64, _ int64, data [][]byte) {
cb(pts, data)
}
}
var zero time.Time
// AbsoluteTime returns the absolute timestamp of the last sample.
func (c *Client) AbsoluteTime(track *Track) (time.Time, bool) {
return c.tracks[track].absoluteTime()
}
func (c *Client) run() {
c.outErr <- c.runInner()
}
func (c *Client) runInner() error {
rp := &clientRoutinePool{}
rp.initialize()
c.primaryDownloader = &clientPrimaryDownloader{
primaryPlaylistURL: c.playlistURL,
httpClient: c.HTTPClient,
onRequest: c.OnRequest,
onDownloadPrimaryPlaylist: c.OnDownloadPrimaryPlaylist,
onDownloadStreamPlaylist: c.OnDownloadStreamPlaylist,
onDownloadSegment: c.OnDownloadSegment,
onDownloadPart: c.OnDownloadPart,
onDecodeError: c.OnDecodeError,
rp: rp,
setTracks: c.setTracks,
setLeadingTimeConv: c.setLeadingTimeConv,
getLeadingTimeConv: c.getLeadingTimeConv,
}
c.primaryDownloader.initialize()
rp.add(c.primaryDownloader)
select {
case err := <-rp.errorChan():
rp.close()
return err
case <-c.ctx.Done():
rp.close()
return fmt.Errorf("terminated")
}
}
func (c *Client) setTracks(tracks []*Track) (map[*Track]*clientTrack, error) {
c.tracks = make(map[*Track]*clientTrack)
for _, track := range tracks {
c.tracks[track] = &clientTrack{
track: track,
onData: func(_, _ int64, _ [][]byte) {},
}
}
err := c.OnTracks(tracks)
if err != nil {
return nil, err
}
return c.tracks, nil
}
func (c *Client) setLeadingTimeConv(ts clientTimeConv) {
c.leadingTimeConv = ts
startRTC := time.Now()
for _, track := range c.tracks {
track.startRTC = startRTC
}
close(c.leadingTimeConvReady)
}
func (c *Client) getLeadingTimeConv(ctx context.Context) (clientTimeConv, bool) {
select {
case <-c.leadingTimeConvReady:
case <-ctx.Done():
return nil, false
}
return c.leadingTimeConv, true
}