Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: eventbus: log error on slow consumers #3031

Merged
merged 3 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 48 additions & 2 deletions p2p/host/eventbus/basic.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,20 @@ import (
"reflect"
"sync"
"sync/atomic"
"time"

logging "github.com/ipfs/go-log/v2"
"github.com/libp2p/go-libp2p/core/event"
)

type logInterface interface {
Errorf(string, ...interface{})
}

var log logInterface = logging.Logger("eventbus")

const slowConsumerWarningTimeout = time.Second

// /////////////////////
// BUS

Expand Down Expand Up @@ -322,6 +332,8 @@ type wildcardNode struct {
nSinks atomic.Int32
sinks []*namedSink
metricsTracer MetricsTracer

slowConsumerTimer *time.Timer
}

func (n *wildcardNode) addSink(sink *namedSink) {
Expand All @@ -348,6 +360,8 @@ func (n *wildcardNode) removeSink(ch chan interface{}) {
n.Unlock()
}

var wildcardType = reflect.TypeOf(event.WildcardSubscription)

func (n *wildcardNode) emit(evt interface{}) {
if n.nSinks.Load() == 0 {
return
Expand All @@ -360,7 +374,11 @@ func (n *wildcardNode) emit(evt interface{}) {
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)

sink.ch <- evt
select {
case sink.ch <- evt:
default:
n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, wildcardType, evt, sink)
}
}
n.RUnlock()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This won't work with an RLock, right? We need Lock since we're updating state on n

I think it's fine to create the timer right when we create the node. I don't think there are that many nodes in total.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize this was an RLock. Thanks for highlighting that. I've updated the code to set the timer within a lock. It's technically possible to allocate two or more stopped timers, but that should be fine as it'll get cleaned up by the GC.

I think it's fine to create the timer right when we create the node.

I considered this, but there's no way to create a stopped timer. You must create an armed one. So if you are going through the trouble of setting one, you may as well use it.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm fine with the current state of the code.

The way I create a stopped timer is to create one for MaxInt64 and then Reset. Not resetting such a timer is also fine.

}
Expand All @@ -379,6 +397,8 @@ type node struct {

sinks []*namedSink
metricsTracer MetricsTracer

slowConsumerTimer *time.Timer
}

func newNode(typ reflect.Type, metricsTracer MetricsTracer) *node {
Expand All @@ -404,11 +424,37 @@ func (n *node) emit(evt interface{}) {
// Sending metrics before sending on channel allows us to
// record channel full events before blocking
sendSubscriberMetrics(n.metricsTracer, sink)
sink.ch <- evt
select {
case sink.ch <- evt:
default:
n.slowConsumerTimer = emitAndLogError(n.slowConsumerTimer, n.typ, evt, sink)
}
}
n.lk.Unlock()
}

func emitAndLogError(timer *time.Timer, typ reflect.Type, evt interface{}, sink *namedSink) *time.Timer {
// Slow consumer. Log a warning if stalled for the timeout
if timer == nil {
timer = time.NewTimer(slowConsumerWarningTimeout)
} else {
timer.Reset(slowConsumerWarningTimeout)
}

select {
case sink.ch <- evt:
if !timer.Stop() {
<-timer.C
}
case <-timer.C:
log.Errorf("subscriber named \"%s\" is a slow consumer of %s. This can lead to libp2p stalling and hard to debug issues.", sink.name, typ)
// Continue to stall since there's nothing else we can do.
sink.ch <- evt
}

return timer
}

func sendSubscriberMetrics(metricsTracer MetricsTracer, sink *namedSink) {
if metricsTracer != nil {
metricsTracer.SubscriberQueueLength(sink.name, len(sink.ch)+1)
Expand Down
61 changes: 61 additions & 0 deletions p2p/host/eventbus/basic_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"reflect"
"strings"
"sync"
"sync/atomic"
"testing"
Expand All @@ -13,6 +14,7 @@ import (

"github.com/libp2p/go-libp2p-testing/race"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

Expand Down Expand Up @@ -131,6 +133,65 @@ func TestEmitNoSubNoBlock(t *testing.T) {
em.Emit(EventA{})
}

type mockLogger struct {
mu sync.Mutex
logs []string
}

func (m *mockLogger) Errorf(format string, args ...interface{}) {
m.mu.Lock()
defer m.mu.Unlock()
m.logs = append(m.logs, fmt.Sprintf(format, args...))
}

func (m *mockLogger) Logs() []string {
m.mu.Lock()
defer m.mu.Unlock()
return m.logs
}

func TestEmitLogsErrorOnStall(t *testing.T) {
oldLogger := log
defer func() {
log = oldLogger
}()
ml := &mockLogger{}
log = ml

bus := NewBus()
sub, err := bus.Subscribe(new(EventA))
if err != nil {
t.Fatal(err)
}

em, err := bus.Emitter(new(EventA))
if err != nil {
t.Fatal(err)
}
defer em.Close()

go func() {
for i := 0; i < subSettingsDefault.buffer+2; i++ {
em.Emit(EventA{})
}
}()

require.EventuallyWithT(t, func(collect *assert.CollectT) {
logs := ml.Logs()
found := false
for _, log := range logs {
if strings.Contains(log, "slow consumer") {
found = true
break
}
}
assert.True(collect, found, "expected to find slow consumer log")
}, 3*time.Second, 500*time.Millisecond)

// Close the subscriber so the worker can finish.
sub.Close()
}

func TestEmitOnClosed(t *testing.T) {
bus := NewBus()

Expand Down
Loading