diff --git a/p2p/host/eventbus/basic.go b/p2p/host/eventbus/basic.go index 1294431a19..af6a74bd02 100644 --- a/p2p/host/eventbus/basic.go +++ b/p2p/host/eventbus/basic.go @@ -126,6 +126,7 @@ type wildcardSub struct { w *wildcardNode metricsTracer MetricsTracer name string + closeOnce sync.Once } func (w *wildcardSub) Out() <-chan interface{} { @@ -133,10 +134,13 @@ func (w *wildcardSub) Out() <-chan interface{} { } func (w *wildcardSub) Close() error { - w.w.removeSink(w.ch) - if w.metricsTracer != nil { - w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription)) - } + w.closeOnce.Do(func() { + w.w.removeSink(w.ch) + if w.metricsTracer != nil { + w.metricsTracer.RemoveSubscriber(reflect.TypeOf(event.WildcardSubscription)) + } + }) + return nil } diff --git a/p2p/host/eventbus/basic_test.go b/p2p/host/eventbus/basic_test.go index fd320aa623..530a46ff19 100644 --- a/p2p/host/eventbus/basic_test.go +++ b/p2p/host/eventbus/basic_test.go @@ -394,10 +394,13 @@ func TestManyWildcardSubscriptions(t *testing.T) { require.NoError(t, em1.Emit(EventA{})) require.NoError(t, em2.Emit(EventB(1))) - // the first five still have 2 events, while the other five have 4 events. - for _, s := range subs[:5] { - require.Len(t, s.Out(), 2) - } + // the first five 0 events because it was closed. The other five + // have 4 events. + require.EventuallyWithT(t, func(t *assert.CollectT) { + for _, s := range subs[:5] { + require.Len(t, s.Out(), 0, "expected closed subscription to have flushed events") + } + }, 2*time.Second, 100*time.Millisecond) for _, s := range subs[5:] { require.Len(t, s.Out(), 4) @@ -407,6 +410,10 @@ func TestManyWildcardSubscriptions(t *testing.T) { for _, s := range subs { require.NoError(t, s.Close()) } + + for _, s := range subs { + require.Zero(t, s.(*wildcardSub).w.nSinks.Load()) + } } func TestWildcardValidations(t *testing.T) {