-
Notifications
You must be signed in to change notification settings - Fork 0
/
observable.go
128 lines (105 loc) · 2.44 KB
/
observable.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
package urx
import (
"sync"
)
type FunctionOperator func(Subscriber, Notification)
func (o FunctionOperator) Notify(s Subscriber, n Notification) {
o(s, n)
}
type Observable interface {
Publish() PublishedObservable
Lift(Operator) Observable
Map(m func(interface{}) interface{}) Observable
Filter(func(interface{}) bool) Observable
Buffered(buffer int) Observable
Subscribe() Subscription
getObs() privObservable
}
type PublishedObservable interface {
Observable
Unsubscribe()
IsSubscribed() bool
Add(CompleteHook)
}
type pObservable struct {
bObservable
}
func (p pObservable) IsSubscribed() bool {
return p.privObservable.(*publishedObservable).IsSubscribed()
}
func (p pObservable) Unsubscribe() {
p.privObservable.(*publishedObservable).Unsubscribe()
}
func (p pObservable) Publish() PublishedObservable {
return p
}
func (p pObservable) Add(h CompleteHook) {
p.privObservable.(*publishedObservable).Add(h)
}
type bObservable struct {
privObservable privObservable
}
func (o bObservable) Publish() PublishedObservable {
o.privObservable = published(o.privObservable)
return pObservable{o}
}
func (o bObservable) Lift(operator Operator) Observable {
return bObservable{o.privObservable.Lift(operator)}
}
func (o bObservable) Map(m func(interface{}) interface{}) Observable {
return o.Lift(FunctionOperator(func(sub Subscriber, n Notification) {
if n.Type == OnNext {
n.Body = m(n.Body)
}
sub.Notify(n)
}))
}
func (o bObservable) getObs() privObservable {
return o.privObservable
}
func (o bObservable) Filter(f func(interface{}) bool) Observable {
return o.Lift(FunctionOperator(func(sub Subscriber, n Notification) {
if n.Type == OnNext && !f(n.Body) {
return
}
sub.Notify(n)
}))
}
func (o bObservable) Buffered(buffer int) Observable {
type buffered struct {
to Subscriber
body Notification
}
var c chan buffered
var l sync.RWMutex
pump := func() {
for msg := range c {
if msg.to.IsSubscribed() {
msg.to.Notify(msg.body)
} else {
return
}
}
}
start := func() {
c = make(chan buffered, buffer)
go pump()
}
return o.Lift(FunctionOperator(func(sub Subscriber, n Notification) {
if n.Type == OnStart {
l.Lock()
l.Unlock()
start()
} else {
l.RLock()
defer l.RUnlock()
}
c <- buffered{sub, n}
if n.Type == OnComplete {
close(c)
}
}))
}
func (o bObservable) Subscribe() Subscription {
return wrappedSubscription{sub: o.privObservable.privSubscribe()}.init()
}