-
Notifications
You must be signed in to change notification settings - Fork 19
/
hub_benchmark_test.go
75 lines (60 loc) · 1.44 KB
/
hub_benchmark_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package hub
import (
"math/rand"
"strconv"
"sync"
"testing"
)
func BenchmarkPublishOnNonBlockingSubscribers(b *testing.B) {
runBenchmark(b, 100, 4, 30, false)
}
func BenchmarkPublishOnBlockingSubscribers(b *testing.B) {
runBenchmark(b, 100, 4, 30, true)
}
func runBenchmark(b *testing.B, numItems, numThreads, numSubscribers int, blocking bool) {
h := New()
subs := createSubscribers(h, numSubscribers, blocking)
itemsToInsert := generateTopics(numThreads, numItems)
var wgPub, wgSub sync.WaitGroup
wgSub.Add(len(subs))
processSubscriptionsForBench(subs, &wgSub)
b.ResetTimer()
for i := 0; i < b.N; i++ {
wgPub.Add(numThreads)
for j := 0; j < numThreads; j++ {
go func(j int) {
for _, key := range itemsToInsert[j] {
h.Publish(Message{Name: key})
}
wgPub.Done()
}(j)
}
wgPub.Wait()
}
h.Close()
wgSub.Wait()
b.StopTimer()
}
func createSubscribers(h *Hub, qtd int, blocking bool) []Subscription {
subs := make([]Subscription, 0, qtd)
for i := 0; i < qtd; i++ {
topic := strconv.Itoa(rand.Intn(10)) + "." + strconv.Itoa(rand.Intn(50)) + ".*"
var sub Subscription
if blocking {
sub = h.Subscribe(20, topic)
} else {
sub = h.NonBlockingSubscribe(20, topic)
}
subs = append(subs, sub)
}
return subs
}
func processSubscriptionsForBench(subs []Subscription, wg *sync.WaitGroup) {
for _, sub := range subs {
go func(s Subscription) {
for range s.Receiver {
}
wg.Done()
}(sub)
}
}