-
Notifications
You must be signed in to change notification settings - Fork 16
/
event_store.go
169 lines (142 loc) · 4.43 KB
/
event_store.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
package kubemq
import (
"context"
"fmt"
"time"
pb "github.com/kubemq-io/protobuf/go"
)
type EventStore struct {
Id string
Channel string
Metadata string
Body []byte
ClientId string
Tags map[string]string
transport Transport
}
func NewEventStore() *EventStore {
return &EventStore{}
}
// SetId - set event store id otherwise new random uuid will be set
func (es *EventStore) SetId(id string) *EventStore {
es.Id = id
return es
}
// SetClientId - set event store ClientId - mandatory if default client was not set
func (es *EventStore) SetClientId(clientId string) *EventStore {
es.ClientId = clientId
return es
}
// SetMetadata - set event store metadata - mandatory if body field was not set
func (es *EventStore) SetMetadata(metadata string) *EventStore {
es.Metadata = metadata
return es
}
// SetChannel - set event store channel - mandatory if default channel was not set
func (es *EventStore) SetChannel(channel string) *EventStore {
es.Channel = channel
return es
}
// SetBody - set event store body - mandatory if metadata field was not set
func (es *EventStore) SetBody(body []byte) *EventStore {
es.Body = body
return es
}
// SetTags - set key value tags to event store message
func (es *EventStore) SetTags(tags map[string]string) *EventStore {
es.Tags = map[string]string{}
for key, value := range tags {
es.Tags[key] = value
}
return es
}
// AddTag - add key value tags to event store message
func (es *EventStore) AddTag(key, value string) *EventStore {
if es.Tags == nil {
es.Tags = map[string]string{}
}
es.Tags[key] = value
return es
}
// Send - sending events store message
func (es *EventStore) Send(ctx context.Context) (*EventStoreResult, error) {
if es.transport == nil {
return nil, ErrNoTransportDefined
}
return es.transport.SendEventStore(ctx, es)
}
type EventStoreResult struct {
Id string
Sent bool
Err error
}
type EventStoreReceive struct {
Id string
Sequence uint64
Timestamp time.Time
Channel string
Metadata string
Body []byte
ClientId string
Tags map[string]string
}
func (es *EventStoreReceive) String() string {
return fmt.Sprintf("Id: %s, Sequence: %d, Timestamp: %s, Channel: %s, Metadata: %s, Body: %s, ClientId: %s, Tags: %s", es.Id, es.Sequence, es.Timestamp.String(), es.Channel, es.Metadata, es.Body, es.ClientId, es.Tags)
}
type SubscriptionOption interface {
apply(*subscriptionOption)
}
type subscriptionOption struct {
kind pb.Subscribe_EventsStoreType
value int64
}
type funcSubscriptionOptions struct {
fn func(*subscriptionOption)
}
func (fo *funcSubscriptionOptions) apply(o *subscriptionOption) {
fo.fn(o)
}
func newFuncSubscriptionOption(f func(*subscriptionOption)) *funcSubscriptionOptions {
return &funcSubscriptionOptions{
fn: f,
}
}
// StartFromNewEvents - start event store subscription with only new events
func StartFromNewEvents() SubscriptionOption {
return newFuncSubscriptionOption(func(o *subscriptionOption) {
o.kind = pb.Subscribe_StartNewOnly
})
}
// StartFromFirstEvent - replay all the stored events from the first available sequence and continue stream new events from this point
func StartFromFirstEvent() SubscriptionOption {
return newFuncSubscriptionOption(func(o *subscriptionOption) {
o.kind = pb.Subscribe_StartFromFirst
})
}
// StartFromLastEvent - replay last event and continue stream new events from this point
func StartFromLastEvent() SubscriptionOption {
return newFuncSubscriptionOption(func(o *subscriptionOption) {
o.kind = pb.Subscribe_StartFromLast
})
}
// StartFromSequence - replay events from specific event sequence number and continue stream new events from this point
func StartFromSequence(sequence int) SubscriptionOption {
return newFuncSubscriptionOption(func(o *subscriptionOption) {
o.kind = pb.Subscribe_StartAtSequence
o.value = int64(sequence)
})
}
// StartFromTime - replay events from specific time continue stream new events from this point
func StartFromTime(since time.Time) SubscriptionOption {
return newFuncSubscriptionOption(func(o *subscriptionOption) {
o.kind = pb.Subscribe_StartAtTime
o.value = since.UnixNano()
})
}
// StartFromTimeDelta - replay events from specific current time - delta duration in seconds, continue stream new events from this point
func StartFromTimeDelta(delta time.Duration) SubscriptionOption {
return newFuncSubscriptionOption(func(o *subscriptionOption) {
o.kind = pb.Subscribe_StartAtTimeDelta
o.value = int64(delta.Seconds())
})
}