Skip to content

Commit

Permalink
Hold lock when setting slowConsumer timer
Browse files Browse the repository at this point in the history
  • Loading branch information
MarcoPolo committed Nov 12, 2024
1 parent 270ea36 commit 3d96e4e
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 21 deletions.
7 changes: 6 additions & 1 deletion p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,12 @@ func (n *wildcardNode) emit(evt interface{}) {
select {
case sink.ch <- evt:
default:
n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink)
slowConsumerTimer := emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink)
defer func() {
n.Lock()
n.slowConsumerTimer = slowConsumerTimer
n.Unlock()
}()
}
}
n.RUnlock()
Expand Down
60 changes: 40 additions & 20 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,12 @@ func (m *mockLogger) Logs() []string {
return m.logs
}

func (m *mockLogger) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.logs = nil
}

func TestEmitLogsErrorOnStall(t *testing.T) {
oldLogger := log
defer func() {
Expand All @@ -158,38 +164,52 @@ func TestEmitLogsErrorOnStall(t *testing.T) {
ml := &mockLogger{}
log = ml

bus := NewBus()
sub, err := bus.Subscribe(new(EventA))
bus1 := NewBus()
bus2 := NewBus()

eventSub, err := bus1.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}

em, err := bus.Emitter(new(EventA))
wildcardSub, err := bus2.Subscribe(event.WildcardSubscription)
if err != nil {
t.Fatal(err)
}
defer em.Close()

go func() {
for i := 0; i < subSettingsDefault.buffer+2; i++ {
em.Emit(EventA{})
testCases := []event.Subscription{eventSub, wildcardSub}
eventBuses := []event.Bus{bus1, bus2}

for i, sub := range testCases {
bus := eventBuses[i]
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
}()
defer em.Close()

require.EventuallyWithT(t, func(collect *assert.CollectT) {
logs := ml.Logs()
found := false
for _, log := range logs {
if strings.Contains(log, "slow consumer") {
found = true
break
go func() {
for i := 0; i < subSettingsDefault.buffer+2; i++ {
em.Emit(EventA{})
}
}
assert.True(collect, found, "expected to find slow consumer log")
}, 3*time.Second, 500*time.Millisecond)
}()

// Close the subscriber so the worker can finish.
sub.Close()
require.EventuallyWithT(t, func(collect *assert.CollectT) {
logs := ml.Logs()
found := false
for _, log := range logs {
if strings.Contains(log, "slow consumer") {
found = true
break
}
}
assert.True(collect, found, "expected to find slow consumer log")
}, 3*time.Second, 500*time.Millisecond)
ml.Clear()

// Close the subscriber so the worker can finish.
sub.Close()
}
}

func TestEmitOnClosed(t *testing.T) {
Expand Down

0 comments on commit 3d96e4e

Please sign in to comment.