Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: eventbus: log error on slow consumers #3031

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 59 additions & 2 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@ import (
"reflect"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/event"
)

type logInterface interface {
Errorf(string, ...interface{})
}

var log logInterface = logging.Logger("eventbus")

const slowConsumerWarningTimeout = time.Second

// /////////////////////
// BUS

Expand Down Expand Up @@ -322,6 +332,8 @@ type wildcardNode struct {
nSinks atomic.Int32
sinks []*namedSink
metricsTracer MetricsTracer

slowConsumerTimer *time.Timer
}

func (n *wildcardNode) addSink(sink *namedSink) {
Expand All @@ -336,6 +348,12 @@ func (n *wildcardNode) addSink(sink *namedSink) {
}

func (n *wildcardNode) removeSink(ch chan interface{}) {
go func() {
// drain the event channel, will return when closed and drained.
// this is necessary to unblock publishes to this channel.
for range ch {
}
}()
n.nSinks.Add(-1) // ok to do outside the lock
n.Lock()
for i := 0; i < len(n.sinks); i++ {
Expand All @@ -348,6 +366,8 @@ func (n *wildcardNode) removeSink(ch chan interface{}) {
n.Unlock()
}

var wildcardType = reflect.TypeOf(event.WildcardSubscription)

func (n *wildcardNode) emit(evt interface{}) {
if n.nSinks.Load() == 0 {
return
Expand All @@ -360,7 +380,16 @@ func (n *wildcardNode) emit(evt interface{}) {
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)

sink.ch <- evt
select {
case sink.ch <- evt:
default:
slowConsumerTimer := emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink)
defer func() {
n.Lock()
n.slowConsumerTimer = slowConsumerTimer
n.Unlock()
}()
}
}
n.RUnlock()
}
Expand All @@ -379,6 +408,8 @@ type node struct {

sinks []*namedSink
metricsTracer MetricsTracer

slowConsumerTimer *time.Timer
}

func newNode(typ reflect.Type, metricsTracer MetricsTracer) *node {
Expand All @@ -404,11 +435,37 @@ func (n *node) emit(evt interface{}) {
// Sending metrics before sending on channel allows us to
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)
sink.ch <- evt
select {
case sink.ch <- evt:
default:
n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, n.typ, evt, sink)
}
}
n.lk.Unlock()
}

func emitAndLogError(timer *time.Timer, typ reflect.Type, evt interface{}, sink *namedSink) *time.Timer {
// Slow consumer. Log a warning if stalled for the timeout
if timer == nil {
timer = time.NewTimer(slowConsumerWarningTimeout)
} else {
timer.Reset(slowConsumerWarningTimeout)
}

select {
case sink.ch <- evt:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
log.Errorf("subscriber named \"%s\" is a slow consumer of %s. This can lead to libp2p stalling and hard to debug issues.", sink.name, typ)
// Continue to stall since there's nothing else we can do.
sink.ch <- evt
}

return timer
}

func sendSubscriberMetrics(metricsTracer MetricsTracer, sink *namedSink) {
if metricsTracer != nil {
metricsTracer.SubscriberQueueLength(sink.name, len(sink.ch)+1)
Expand Down
81 changes: 81 additions & 0 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-testing/race"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -131,6 +133,85 @@ func TestEmitNoSubNoBlock(t *testing.T) {
em.Emit(EventA{})
}

type mockLogger struct {
mu sync.Mutex
logs []string
}

func (m *mockLogger) Errorf(format string, args ...interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
m.logs = append(m.logs, fmt.Sprintf(format, args...))
}

func (m *mockLogger) Logs() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.logs
}

func (m *mockLogger) Clear() {
m.mu.Lock()
defer m.mu.Unlock()
m.logs = nil
}

func TestEmitLogsErrorOnStall(t *testing.T) {
oldLogger := log
defer func() {
log = oldLogger
}()
ml := &mockLogger{}
log = ml

bus1 := NewBus()
bus2 := NewBus()

eventSub, err := bus1.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}

wildcardSub, err := bus2.Subscribe(event.WildcardSubscription)
if err != nil {
t.Fatal(err)
}

testCases := []event.Subscription{eventSub, wildcardSub}
eventBuses := []event.Bus{bus1, bus2}

for i, sub := range testCases {
bus := eventBuses[i]
em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer em.Close()

go func() {
for i := 0; i < subSettingsDefault.buffer+2; i++ {
em.Emit(EventA{})
}
}()

require.EventuallyWithT(t, func(collect *assert.CollectT) {
logs := ml.Logs()
found := false
for _, log := range logs {
if strings.Contains(log, "slow consumer") {
found = true
break
}
}
assert.True(collect, found, "expected to find slow consumer log")
}, 3*time.Second, 500*time.Millisecond)
ml.Clear()

// Close the subscriber so the worker can finish.
sub.Close()
}
}

func TestEmitOnClosed(t *testing.T) {
bus := NewBus()

Expand Down
Loading