-
Notifications
You must be signed in to change notification settings - Fork 0
/
core.go
174 lines (153 loc) · 4.03 KB
/
core.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
package mangokit
import (
"os"
"sync"
"github.com/morganhein/mangokit/events"
"github.com/morganhein/mangokit/plugins"
"github.com/morganhein/stacknqueue"
)
type core struct {
output *stacknqueue.StackNQueue
subscribedEvents map[int][]plugins.Plugineer
}
const (
QUIT = iota
PAUSE
RESUME
)
var Core *core
func init() {
Core = &core{
output: stacknqueue.NewStackNQueue(true),
// create the event types to skill association map
subscribedEvents: make(map[int][]plugins.Plugineer),
}
plugins.Core = Core
}
func (c *core) Loop() {
// create the communication channels
// anyone know of a better way to do this? it feels clunky
ctrlReceive := make(chan int)
//controlThink := make(chan int)
ctrlBufferResponse := make(chan int)
ctrlRespond := make(chan int)
wg := sync.WaitGroup{}
wg.Add(3)
go c.receive(&wg, ctrlReceive)
// go b.think(&wg, controlThink)
go c.bufferResponse(&wg, ctrlBufferResponse)
go c.respond(&wg, ctrlRespond)
log.Debug("Core is ready...")
wg.Wait()
}
func (c *core) AddEventTriggers(p plugins.Plugineer) {
log.Debug("Adding event triggers.")
for _, e := range p.Events() {
pls, exists := c.subscribedEvents[e]
if !exists {
c.subscribedEvents[e] = []plugins.Plugineer{p}
}
c.subscribedEvents[e] = append(pls, p)
}
}
// receive takes new events from networks and stores them in short term memory to be processed by a skill
func (c *core) receive(wg *sync.WaitGroup, control chan int) {
defer wg.Done()
log.Debug("Starting up receiver.")
Loop:
for {
for _, pl := range plugins.NetworkPlugins {
select {
// check if we have any new control messages
case con := <-control:
switch con {
case QUIT:
break Loop
}
// check for any new events
case event := <-pl.FromPlugin():
log.Debug("Received network event: " + event.Raw)
// figure out if this is a botcommand
_ = plugins.PopulateCmd(&event)
// Process this in case it's a low-level control command not handled by a plugin
// b.preProcess(&event)
go c.distribute(&event)
}
}
}
}
// distribute figures out which skills plugins want this event
func (c *core) distribute(e *plugins.Event) {
log.Debug("Distributing event: " + e.Raw)
//todo: a skill could potentially receive events twice+ if subbed to ALL+1
for _, p := range c.subscribedEvents[events.ALL] {
if notFull(p.ToPlugin()) {
log.Debug("Sending event to: " + p.Name())
p.ToPlugin() <- *e
}
}
for _, p := range c.subscribedEvents[e.Type] {
if notFull(p.ToPlugin()) {
log.Debug("Sending event to: " + p.Name())
p.ToPlugin() <- *e
}
}
}
// bufferResponse takes new responses from skills and adds them to short term memory to be sent to the networks
func (c *core) bufferResponse(wg *sync.WaitGroup, control chan int) {
defer wg.Done()
log.Debug("Starting up responder queue.")
Loop:
for {
for _, pl := range plugins.SkillPlugins {
select {
// check if we have any new control messages
case con := <-control:
switch con {
case QUIT:
break Loop
}
// check for any new events
case event := <-pl.FromPlugin():
log.Debug("Received skill response: " + event.Message)
// add the event to be processed by the skills
c.output.Push(event)
}
}
}
log.Debug("Responder queue shutting down.")
}
// respond sends out the responses from the response buffer
func (c *core) respond(wg *sync.WaitGroup, control chan int) {
defer wg.Done()
log.Debug("Responder starting up.")
Loop:
for {
if next := c.output.Pop(); next != nil {
event := next.(plugins.Event)
// send the event back up the connection through the ToPlugin chan
event.Source.ToPlugin() <- event
}
select {
case con := <-control:
switch con {
case QUIT:
break Loop
}
}
}
log.Debug("Going silent. Responder shut down.")
}
func (c *core) Leave(pc plugins.Contexter) {
//todo: implement this stub
}
func (c *core) Quit() {
// send disconnect commands to all networks
for _, p := range plugins.NetworkPlugins {
p.Disconnect()
}
os.Exit(0)
}
func notFull(c chan plugins.Event) bool {
return len(c) < 10
}