forked from davyxu/cellnet
-
Notifications
You must be signed in to change notification settings - Fork 0
/
queue.go
158 lines (119 loc) · 2.58 KB
/
queue.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
package cellnet
import (
"fmt"
"log"
"runtime/debug"
"sync"
"time"
)
// 事件队列
type EventQueue interface {
// 事件队列开始工作
StartLoop() EventQueue
// 停止事件队列
StopLoop() EventQueue
// 等待退出
Wait()
// 投递事件, 通过队列到达消费者端
Post(callback func())
// 是否捕获异常
EnableCapturePanic(v bool)
}
type CapturePanicNotifyFunc func(interface{}, EventQueue)
type eventQueue struct {
*Pipe
endSignal sync.WaitGroup
capturePanic bool
onPanic CapturePanicNotifyFunc
}
// 启动崩溃捕获
func (self *eventQueue) EnableCapturePanic(v bool) {
self.capturePanic = v
}
// 设置捕获崩溃通知
func (self *eventQueue) SetCapturePanicNotify(callback CapturePanicNotifyFunc) {
self.onPanic = callback
}
// 派发事件处理回调到队列中
func (self *eventQueue) Post(callback func()) {
if callback == nil {
return
}
self.Add(callback)
}
// 保护调用用户函数
func (self *eventQueue) protectedCall(callback func()) {
if self.capturePanic {
defer func() {
if err := recover(); err != nil {
self.onPanic(err, self)
}
}()
}
callback()
}
// 开启事件循环
func (self *eventQueue) StartLoop() EventQueue {
self.endSignal.Add(1)
go func() {
var writeList []interface{}
for {
writeList = writeList[0:0]
exit := self.Pick(&writeList)
// 遍历要发送的数据
for _, msg := range writeList {
switch t := msg.(type) {
case func():
self.protectedCall(t)
case nil:
break
default:
log.Printf("unexpected type %T", t)
}
}
if exit {
break
}
}
self.endSignal.Done()
}()
return self
}
// 停止事件循环
func (self *eventQueue) StopLoop() EventQueue {
self.Add(nil)
return self
}
// 等待退出消息
func (self *eventQueue) Wait() {
self.endSignal.Wait()
}
// 创建默认长度的队列
func NewEventQueue() EventQueue {
return &eventQueue{
Pipe: NewPipe(),
// 默认的崩溃捕获打印
onPanic: func(raw interface{}, queue EventQueue) {
fmt.Printf("%s: %v \n%s\n", time.Now().Format("2006-01-02 15:04:05"), raw, string(debug.Stack()))
debug.PrintStack()
},
}
}
// 在会话对应的Peer上的事件队列中执行callback,如果没有队列,则马上执行
func SessionQueuedCall(ses Session, callback func()) {
if ses == nil {
return
}
q := ses.Peer().(interface {
Queue() EventQueue
}).Queue()
QueuedCall(q, callback)
}
// 有队列时队列调用,无队列时直接调用
func QueuedCall(queue EventQueue, callback func()) {
if queue == nil {
callback()
} else {
queue.Post(callback)
}
}