From e5b739e78a2fbc266ce095e69eb2990a62c0b57b Mon Sep 17 00:00:00 2001 From: Marco Munizaga Date: Mon, 18 Nov 2024 11:21:03 -0800 Subject: [PATCH] fix(eventbus): Idempotent wildcardSub close The tests expects the wildcardSub .Close to be idempotent, but it never checked this. The tests also weirdly asserted that there were orphaned events on the channel. That was wrong as the events should be flushed on close as to not indefinitely block an emitter. --- p2p/host/eventbus/basic.go | 12 ++++++++---- p2p/host/eventbus/basic_test.go | 15 +++++++++++---- 2 files changed, 19 insertions(+), 8 deletions(-) diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index 1294431a19..af6a74bd02 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -126,6 +126,7 @@ type wildcardSub struct { w *wildcardNode metricsTracer MetricsTracer name string + closeOnce sync.Once } func (w *wildcardSub) Out() <-chan interface{} { @@ -133,10 +134,13 @@ func (w *wildcardSub) Out() <-chan interface{} { } func (w *wildcardSub) Close() error { - w.w.removeSink(w.ch) - if w.metricsTracer != nil { - w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription)) - } + w.closeOnce.Do(func() { + w.w.removeSink(w.ch) + if w.metricsTracer != nil { + w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription)) + } + }) + return nil } diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index fd320aa623..530a46ff19 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -394,10 +394,13 @@ func TestManyWildcardSubscriptions(t *testing.T) { require.NoError(t, em1.Emit(EventA{})) require.NoError(t, em2.Emit(EventB(1))) - // the first five still have 2 events, while the other five have 4 events. - for _, s := range subs[:5] { - require.Len(t, s.Out(), 2) - } + // the first five 0 events because it was closed. The other five + // have 4 events. + require.EventuallyWithT(t, func(t *assert.CollectT) { + for _, s := range subs[:5] { + require.Len(t, s.Out(), 0, "expected closed subscription to have flushed events") + } + }, 2*time.Second, 100*time.Millisecond) for _, s := range subs[5:] { require.Len(t, s.Out(), 4) @@ -407,6 +410,10 @@ func TestManyWildcardSubscriptions(t *testing.T) { for _, s := range subs { require.NoError(t, s.Close()) } + + for _, s := range subs { + require.Zero(t, s.(*wildcardSub).w.nSinks.Load()) + } } func TestWildcardValidations(t *testing.T) {