forked from qlik-oss/enigma-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsession_messages.go
107 lines (96 loc) · 3.08 KB
/
session_messages.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
package enigma
import (
"context"
"encoding/json"
"sync"
)
type (
sessionMessageChannelEntry struct {
topics []string
channel chan SessionMessage
}
// SessionMessage is a notification regarding the session coming from Qlik Associative Engine.
// The content is stored as a raw json structure.
SessionMessage struct {
Topic string
Content json.RawMessage
}
sessionMessages struct {
history []SessionMessage
mutex sync.Mutex
channels map[*sessionMessageChannelEntry]bool
}
)
func (entry *sessionMessageChannelEntry) emitSessionEvent(sessionEvent SessionMessage) {
if len(entry.topics) > 0 && entry.topics[0] != "*" {
for _, topic := range entry.topics {
if sessionEvent.Topic == topic {
entry.channel <- sessionEvent
break
}
}
} else {
// Send all events if no limiting is supplied
entry.channel <- sessionEvent
}
}
func (e *sessionMessages) emitSessionMessage(topic string, value json.RawMessage) {
e.mutex.Lock()
defer e.mutex.Unlock()
event := SessionMessage{Topic: topic, Content: value}
e.history = append(e.history, event)
for channelEntry := range e.channels {
channelEntry.emitSessionEvent(event)
}
}
// SessionMessageChannel returns a channel that receives notifications from Qlik Associative Engine. To only receive
// certain events a list of topics can be supplied. If no topics are supplied all events are received.
func (e *sessionMessages) SessionMessageChannel(topics ...string) chan SessionMessage {
e.mutex.Lock()
defer e.mutex.Unlock()
channelEntry := &sessionMessageChannelEntry{topics: topics, channel: make(chan SessionMessage, 16+len(e.history))}
e.channels[channelEntry] = true
for _, oldEvent := range e.history {
channelEntry.emitSessionEvent(oldEvent)
}
return channelEntry.channel
}
// CloseSessionMessageChannel closes and unregisters the supplied event channel from the session.
func (e *sessionMessages) CloseSessionMessageChannel(channel chan SessionMessage) {
e.mutex.Lock()
defer e.mutex.Unlock()
for channelEntry := range e.channels {
if channelEntry.channel == channel {
close(channelEntry.channel)
delete(e.channels, channelEntry)
break
}
}
}
func (e *sessionMessages) closeAllSessionEventChannels() {
e.mutex.Lock()
defer e.mutex.Unlock()
for channelEntry := range e.channels {
close(channelEntry.channel)
}
e.channels = make(map[*sessionMessageChannelEntry]bool)
}
func newSessionEvents() *sessionMessages {
return &sessionMessages{channels: make(map[*sessionMessageChannelEntry]bool), mutex: sync.Mutex{}, history: make([]SessionMessage, 0)}
}
// SessionState returns either SESSION_CREATED or SESSION_ATTACHED to describe the status of the current websocket session
func (e *sessionMessages) SessionState(ctx context.Context) (string, error) {
channel := e.SessionMessageChannel("OnConnected")
defer e.CloseSessionMessageChannel(channel)
select {
case <-ctx.Done():
return "", ctx.Err()
case message := <-channel:
connectedInfo := &onConnectedEvent{}
err := json.Unmarshal(message.Content, connectedInfo)
if err != nil {
return "", err
}
return connectedInfo.SessionState, nil
}
}