Skip to content

Commit

Permalink
perf: optimize Publish for 0 and 1 subscribers
Browse files Browse the repository at this point in the history
  • Loading branch information
mdawar committed Aug 13, 2024
1 parent b86f4ce commit 6fbe6aa
Showing 1 changed file with 26 additions and 12 deletions.
38 changes: 26 additions & 12 deletions broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,22 +150,36 @@ func (b *Broker[T, P]) Publish(ctx context.Context, topic T, payload P) error {
b.mu.RLock()
defer b.mu.RUnlock()

var wg sync.WaitGroup
subs := b.subs[topic]
msg := Message[T, P]{Topic: topic, Payload: payload}

wg.Add(len(b.subs[topic]))
for _, sub := range b.subs[topic] {
go func() {
defer wg.Done()
switch len(subs) {
case 0:
// Do nothing.
case 1:
select {
case <-ctx.Done():
case subs[0] <- msg:
}
default:
var wg sync.WaitGroup

wg.Add(len(subs))
for _, sub := range subs {
go func() {
defer wg.Done()

select {
case <-ctx.Done():
return
case sub <- msg:
}
}()
}

select {
case <-ctx.Done():
return
case sub <- Message[T, P]{Topic: topic, Payload: payload}:
}
}()
wg.Wait()
}

wg.Wait()
return ctx.Err()
}

Expand Down

0 comments on commit 6fbe6aa

Please sign in to comment.