Skip to content

Commit

Permalink
feat: eventbus: log error on slow consumers (#3031)
Browse files Browse the repository at this point in the history
* feat: eventbus: log error on slow consumers

Slow consumers can stall libp2p in hard to debug ways. For example, a
slow consumer of Identify events can prevent NewStreams from opening. In
some cases we do want this back-pressure, but in other cases it is a bug
in the user's application (or even in go-libp2p). The recommended
approach has been to use metrics with Grafana and Prometheus to identify
these issues. In practice, that's been a non-trivial task for users to
setup. A simple log would help identify these issues.

Note that the codepath is relatively unchanged in the normal case. Only
if a producer is stalled will the extra work be put in.

Co-authored-by: Wondertan <hlibwondertan@gmail.com>

* fix: eventbus: fix deadlock in closing a stalled wildcard subscriber

* Hold lock when setting slowConsumer timer

---------

Co-authored-by: Wondertan <hlibwondertan@gmail.com>
  • Loading branch information
MarcoPolo and Wondertan authored Nov 13, 2024
1 parent 10045d1 commit f3fa48e
Show file tree
Hide file tree
Showing 2 changed files with 140 additions and 2 deletions.
61 changes: 59 additions & 2 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@ import (
"reflect"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/event"
)

type logInterface interface {
Errorf(string, ...interface{})
}

var log logInterface = logging.Logger("eventbus")

const slowConsumerWarningTimeout = time.Second

// /////////////////////
// BUS

Expand Down Expand Up @@ -322,6 +332,8 @@ type wildcardNode struct {
nSinks atomic.Int32
sinks []*namedSink
metricsTracer MetricsTracer

slowConsumerTimer *time.Timer
}

func (n *wildcardNode) addSink(sink *namedSink) {
Expand All @@ -336,6 +348,12 @@ func (n *wildcardNode) addSink(sink *namedSink) {
}

func (n *wildcardNode) removeSink(ch chan interface{}) {
go func() {
// drain the event channel, will return when closed and drained.
// this is necessary to unblock publishes to this channel.
for range ch {
}
}()
n.nSinks.Add(-1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
Expand All @@ -348,6 +366,8 @@ func (n *wildcardNode) removeSink(ch chan interface{}) {
n.Unlock()
}

var wildcardType = reflect.TypeOf(event.WildcardSubscription)

func (n *wildcardNode) emit(evt interface{}) {
if n.nSinks.Load() == 0 {
return
Expand All @@ -360,7 +380,16 @@ func (n *wildcardNode) emit(evt interface{}) {
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)

sink.ch <- evt
select {
case sink.ch <- evt:
default:
slowConsumerTimer := emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink)
defer func() {
n.Lock()
n.slowConsumerTimer = slowConsumerTimer
n.Unlock()
}()
}
}
n.RUnlock()
}
Expand All @@ -379,6 +408,8 @@ type node struct {

sinks []*namedSink
metricsTracer MetricsTracer

slowConsumerTimer *time.Timer
}

func newNode(typ reflect.Type, metricsTracer MetricsTracer) *node {
Expand All @@ -404,11 +435,37 @@ func (n *node) emit(evt interface{}) {
// Sending metrics before sending on channel allows us to
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)
sink.ch <- evt
select {
case sink.ch <- evt:
default:
n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, n.typ, evt, sink)
}
}
n.lk.Unlock()
}

func emitAndLogError(timer *time.Timer, typ reflect.Type, evt interface{}, sink *namedSink) *time.Timer {
// Slow consumer. Log a warning if stalled for the timeout
if timer == nil {
timer = time.NewTimer(slowConsumerWarningTimeout)
} else {
timer.Reset(slowConsumerWarningTimeout)
}

select {
case sink.ch <- evt:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
log.Errorf("subscriber named \"%s\" is a slow consumer of %s. This can lead to libp2p stalling and hard to debug issues.", sink.name, typ)
// Continue to stall since there's nothing else we can do.
sink.ch <- evt
}

return timer
}

func sendSubscriberMetrics(metricsTracer MetricsTracer, sink *namedSink) {
if metricsTracer != nil {
metricsTracer.SubscriberQueueLength(sink.name, len(sink.ch)+1)
Expand Down
81 changes: 81 additions & 0 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-testing/race"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -131,6 +133,85 @@ func TestEmitNoSubNoBlock(t *testing.T) {
em.Emit(EventA{})
}

type mockLogger struct {
mu sync.Mutex
logs []string
}

func (m *mockLogger) Errorf(format string, args ...interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
m.logs = append(m.logs, fmt.Sprintf(format, args...))
}

func (m *mockLogger) Logs() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.logs
}

func (m *mockLogger) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.logs = nil
}

func TestEmitLogsErrorOnStall(t *testing.T) {
oldLogger := log
defer func() {
log = oldLogger
}()
ml := &mockLogger{}
log = ml

bus1 := NewBus()
bus2 := NewBus()

eventSub, err := bus1.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}

wildcardSub, err := bus2.Subscribe(event.WildcardSubscription)
if err != nil {
t.Fatal(err)
}

testCases := []event.Subscription{eventSub, wildcardSub}
eventBuses := []event.Bus{bus1, bus2}

for i, sub := range testCases {
bus := eventBuses[i]
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer em.Close()

go func() {
for i := 0; i < subSettingsDefault.buffer+2; i++ {
em.Emit(EventA{})
}
}()

require.EventuallyWithT(t, func(collect *assert.CollectT) {
logs := ml.Logs()
found := false
for _, log := range logs {
if strings.Contains(log, "slow consumer") {
found = true
break
}
}
assert.True(collect, found, "expected to find slow consumer log")
}, 3*time.Second, 500*time.Millisecond)
ml.Clear()

// Close the subscriber so the worker can finish.
sub.Close()
}
}

func TestEmitOnClosed(t *testing.T) {
bus := NewBus()

Expand Down

0 comments on commit f3fa48e

Please sign in to comment.