Skip to content

Commit

Permalink
eventbus: dont panic on closing Subscription twice (#3034)
Browse files Browse the repository at this point in the history
  • Loading branch information
sukunrt authored Nov 13, 2024
1 parent 88e2f7a commit 7268c98
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 19 deletions.
40 changes: 21 additions & 19 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ type sub struct {
dropper func(reflect.Type)
metricsTracer MetricsTracer
name string
closeOnce sync.Once
}

func (s *sub) Name() string {
Expand All @@ -172,31 +173,32 @@ func (s *sub) Close() error {
for range s.ch {
}
}()

for _, n := range s.nodes {
n.lk.Lock()

for i := 0; i < len(n.sinks); i++ {
if n.sinks[i].ch == s.ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]

if s.metricsTracer != nil {
s.metricsTracer.RemoveSubscriber(n.typ)
s.closeOnce.Do(func() {
for _, n := range s.nodes {
n.lk.Lock()

for i := 0; i < len(n.sinks); i++ {
if n.sinks[i].ch == s.ch {
n.sinks[i], n.sinks[len(n.sinks)-1] = n.sinks[len(n.sinks)-1], nil
n.sinks = n.sinks[:len(n.sinks)-1]

if s.metricsTracer != nil {
s.metricsTracer.RemoveSubscriber(n.typ)
}
break
}
break
}
}

tryDrop := len(n.sinks) == 0 && n.nEmitters.Load() == 0
tryDrop := len(n.sinks) == 0 && n.nEmitters.Load() == 0

n.lk.Unlock()
n.lk.Unlock()

if tryDrop {
s.dropper(n.typ)
if tryDrop {
s.dropper(n.typ)
}
}
}
close(s.ch)
close(s.ch)
})
return nil
}

Expand Down
11 changes: 11 additions & 0 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -562,6 +562,17 @@ func TestSubFailFully(t *testing.T) {
}
}

func TestSubCloseMultiple(t *testing.T) {
bus := NewBus()

sub, err := bus.Subscribe([]interface{}{new(EventB)})
require.NoError(t, err)
err = sub.Close()
require.NoError(t, err)
err = sub.Close()
require.NoError(t, err)
}

func testMany(t testing.TB, subs, emits, msgs int, stateful bool) {
if race.WithRace() && subs+emits > 5000 {
t.SkipNow()
Expand Down

0 comments on commit 7268c98

Please sign in to comment.