From 3d96e4efa15b61ca4331848fa7c90c6216e25c8f Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 11 Nov 2024 20:29:05 -0800 Subject: [PATCH] Hold lock when setting slowConsumer timer --- p2p/host/eventbus/basic.go | 7 +++- p2p/host/eventbus/basic_test.go | 60 ++++++++++++++++++++++----------- 2 files changed, 46 insertions(+), 21 deletions(-) diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index ed07c3eac8..af3e83a9da 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -383,7 +383,12 @@ func (n *wildcardNode) emit(evt interface{}) { select { case sink.ch <- evt: default: - n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink) + slowConsumerTimer := emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink) + defer func() { + n.Lock() + n.slowConsumerTimer = slowConsumerTimer + n.Unlock() + }() } } n.RUnlock() diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index ad70bb8137..15768cda5c 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -150,6 +150,12 @@ func (m *mockLogger) Logs() []string { return m.logs } +func (m *mockLogger) Clear() { + m.mu.Lock() + defer m.mu.Unlock() + m.logs = nil +} + func TestEmitLogsErrorOnStall(t *testing.T) { oldLogger := log defer func() { @@ -158,38 +164,52 @@ func TestEmitLogsErrorOnStall(t *testing.T) { ml := &mockLogger{} log = ml - bus := NewBus() - sub, err := bus.Subscribe(new(EventA)) + bus1 := NewBus() + bus2 := NewBus() + + eventSub, err := bus1.Subscribe(new(EventA)) if err != nil { t.Fatal(err) } - em, err := bus.Emitter(new(EventA)) + wildcardSub, err := bus2.Subscribe(event.WildcardSubscription) if err != nil { t.Fatal(err) } - defer em.Close() - go func() { - for i := 0; i < subSettingsDefault.buffer+2; i++ { - em.Emit(EventA{}) + testCases := []event.Subscription{eventSub, wildcardSub} + eventBuses := []event.Bus{bus1, bus2} + + for i, sub := range testCases { + bus := eventBuses[i] + em, err := bus.Emitter(new(EventA)) + if err != nil { + t.Fatal(err) } - }() + defer em.Close() - require.EventuallyWithT(t, func(collect *assert.CollectT) { - logs := ml.Logs() - found := false - for _, log := range logs { - if strings.Contains(log, "slow consumer") { - found = true - break + go func() { + for i := 0; i < subSettingsDefault.buffer+2; i++ { + em.Emit(EventA{}) } - } - assert.True(collect, found, "expected to find slow consumer log") - }, 3*time.Second, 500*time.Millisecond) + }() - // Close the subscriber so the worker can finish. - sub.Close() + require.EventuallyWithT(t, func(collect *assert.CollectT) { + logs := ml.Logs() + found := false + for _, log := range logs { + if strings.Contains(log, "slow consumer") { + found = true + break + } + } + assert.True(collect, found, "expected to find slow consumer log") + }, 3*time.Second, 500*time.Millisecond) + ml.Clear() + + // Close the subscriber so the worker can finish. + sub.Close() + } } func TestEmitOnClosed(t *testing.T) {