-
Notifications
You must be signed in to change notification settings - Fork 0
/
merge.go
53 lines (48 loc) · 1.24 KB
/
merge.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package urx
import "reflect"
func Merge(obs ...Observable) Observable {
return Create(func(subscriber Subscriber) {
subscriptions := make(map[Observable]Subscription)
for i := range obs {
subscriptions[obs[i]] = obs[i].Subscribe()
}
defer subscriber.Notify(Complete())
subscriber.Add(func() {
for _, sub := range subscriptions {
if sub.IsSubscribed() {
sub.Unsubscribe()
}
}
})
subscriber.Notify(Start())
for {
var selects []reflect.SelectCase
var selectIdx []Observable
for obs, sub := range subscriptions {
selects = append(selects, reflect.SelectCase{Dir: reflect.SelectRecv, Chan: reflect.ValueOf(sub.Events())})
selectIdx = append(selectIdx, obs)
}
from, val, ok := reflect.Select(selects)
var notification Notification
if !ok {
notification = Complete()
} else {
notification, ok = val.Interface().(Notification)
if !ok {
panic("could not convert something known to be a notification to a notification")
}
}
if notification.Type == OnComplete {
delete(subscriptions, selectIdx[from])
if len(subscriptions) > 0 {
continue
}
return
}
if notification.Type == OnStart {
continue
}
subscriber.Notify(notification)
}
})
}