-
Notifications
You must be signed in to change notification settings - Fork 60
/
stream.go
185 lines (150 loc) · 3.24 KB
/
stream.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
package dca
import (
"errors"
"github.com/bwmarrin/discordgo"
"io"
"sync"
"time"
)
var (
ErrVoiceConnClosed = errors.New("Voice connection closed")
)
// StreamingSession provides an easy way to directly transmit opus audio
// to discord from an encode session.
type StreamingSession struct {
sync.Mutex
// If this channel is not nil, an error will be sen when finished (or nil if no error)
done chan error
source OpusReader
vc *discordgo.VoiceConnection
paused bool
framesSent int
finished bool
running bool
err error // If an error occured and we had to stop
}
// Creates a new stream from an Opusreader.
// source : The source of the opus frames to be sent, either from an encoder or decoder.
// vc : The voice connecion to stream to.
// done : If not nil, an error will be sent on it when completed.
func NewStream(source OpusReader, vc *discordgo.VoiceConnection, done chan error) *StreamingSession {
session := &StreamingSession{
source: source,
vc: vc,
done: done,
}
go session.stream()
return session
}
func (s *StreamingSession) stream() {
// Check if we are already running and if so stop
s.Lock()
if s.running {
s.Unlock()
panic("Stream is already running!")
return
}
s.running = true
s.Unlock()
defer func() {
s.Lock()
s.running = false
s.Unlock()
}()
for {
s.Lock()
if s.paused {
s.Unlock()
return
}
s.Unlock()
err := s.readNext()
if err != nil {
s.Lock()
s.finished = true
if err != io.EOF {
s.err = err
}
if s.done != nil {
go func() {
s.done <- err
}()
}
s.Unlock()
break
}
}
}
func (s *StreamingSession) readNext() error {
opus, err := s.source.OpusFrame()
if err != nil {
return err
}
// Timeout after 100ms (Maybe this needs to be changed?)
timeOut := time.NewTimer(time.Second)
// This will attempt to send on the channel before the timeout, which is 1s
select {
case <-timeOut.C:
return ErrVoiceConnClosed
case s.vc.OpusSend <- opus:
timeOut.Stop()
}
s.Lock()
s.framesSent++
s.Unlock()
return nil
}
// SetPaused provides pause/unpause functionality
func (s *StreamingSession) SetPaused(paused bool) {
s.Lock()
if s.finished {
s.Unlock()
return
}
// Already running
if !paused && s.running {
if s.paused {
// Was set to stop running after next frame so undo this
s.paused = false
}
s.Unlock()
return
}
// Already stopped
if paused && !s.running {
// Not running, but starting up..
if !s.paused {
s.paused = true
}
s.Unlock()
return
}
// Time to start it up again
if !s.running && s.paused && !paused {
go s.stream()
}
s.paused = paused
s.Unlock()
}
// PlaybackPosition returns the the duration of content we have transmitted so far
func (s *StreamingSession) PlaybackPosition() time.Duration {
s.Lock()
dur := time.Duration(s.framesSent) * s.source.FrameDuration()
s.Unlock()
return dur
}
// Finished returns wether the stream finished or not, and any error that caused it to stop
func (s *StreamingSession) Finished() (bool, error) {
s.Lock()
err := s.err
fin := s.finished
s.Unlock()
return fin, err
}
// Paused returns wether the sream is paused or not
func (s *StreamingSession) Paused() bool {
s.Lock()
p := s.paused
s.Unlock()
return p
}