Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap(client/messageque): expose donthavetimeout config #703

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@

var log = logging.Logger("bitswap/client")

type DontHaveTimeoutConfig = bsmq.DontHaveTimeoutConfig

// Option defines the functional option type that can be used to configure
// bitswap instances
type Option func(*Client)
Expand Down Expand Up @@ -71,6 +73,12 @@
}
}

func WithDontHaveTimeoutConfig(cfg *DontHaveTimeoutConfig) Option {
return func(bs *Client) {
bs.dontHaveTimeoutConfig = cfg
}

Check warning on line 79 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L76-L79

Added lines #L76 - L79 were not covered by tests
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -133,7 +141,7 @@
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, bs.dontHaveTimeoutConfig)
}

sim := bssim.New()
Expand Down Expand Up @@ -242,6 +250,7 @@

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
dontHaveTimeoutConfig *DontHaveTimeoutConfig

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool
Expand Down
122 changes: 60 additions & 62 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,33 +10,41 @@
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const (
// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
func defaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig {
cfg := &DontHaveTimeoutConfig{
DontHaveTimeout: 5 * time.Second,
MaxExpectedWantProcessTime: 2 * time.Second,
PingLatencyMultiplier: 3,
MessageLatencyAlpha: 0.5,
MessageLatencyMultiplier: 2,
}

cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime
return cfg
Comment on lines +13 to +23
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two concerns with this approach:

  1. We expose a way for overriding the defaults without exposing the implicit default values themselves
    • people will keep hardcoding current values instead of referencing const that may change in the future
  2. We use a struct which forces override fo all values
    • This makes it tedious to override a single value
    • This means if we add a new knob, people who already override the struct will override default with implicit zero value for that type (likely causing bugs)

My mental model for makign things like this configurable is to

  • have WithFoo configuration options which allow overriding each knob individually
  • and expose DefaultFoo to allow people to programmatically refer to implicit default, rather than hardcoding them in their own code.

This may be more work in this PR upfront, but will save us from headache in the future. Thoughts?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we expose the default struct for people to override necessary knobs this way:

cfg := DefaultDontHaveTimeoutConfig()
// example of granular customization
cfg.MaxExpectedWantProcessTime = 5 * time.Second
// use the global option.
WithDontHaveTimeoutConfig(cfg)

This solves your concern without creating "options hell". This is equivalent to having multi-level options, yet it's cleaner. Also, it doesn't require forwarding all the options from client to bitswap in case you want to add more options horizontally, which I tried initially, and got lost.

Copy link
Member Author

@Wondertan Wondertan Oct 29, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Basically, the compromise I propose it expose the default configuration solving your concern while keeping things straightforward.

}

type DontHaveTimeoutConfig struct {
// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
dontHaveTimeout = 5 * time.Second

// maxExpectedWantProcessTime is the maximum amount of time we expect a
DontHaveTimeout time.Duration
// MaxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second

// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime

// pingLatencyMultiplier is multiplied by the average ping time to
MaxExpectedWantProcessTime time.Duration
// MaxTimeout is the maximum allowed timeout, regardless of latency
MaxTimeout time.Duration
// PingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
pingLatencyMultiplier = 3

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5

PingLatencyMultiplier int
// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
MessageLatencyAlpha float64
// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int
}

// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
Expand All @@ -61,16 +69,12 @@
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid, time.Duration)
config *DontHaveTimeoutConfig

// All variables below here must be protected by the lock
lk sync.RWMutex
Expand All @@ -92,39 +96,33 @@

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig, clock clock.Clock) *dontHaveTimeoutMgr {
if cfg == nil {
cfg = defaultDontHaveTimeoutConfig()
}
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, cfg, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
onDontHaveTimeout func([]cid.Cid, time.Duration),
cfg *DontHaveTimeoutConfig,
clock clock.Clock,
timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
timeoutsTriggered: timeoutsTriggered,
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: cfg.DontHaveTimeout,
messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha},
onDontHaveTimeout: onDontHaveTimeout,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The onDontHaveTimeout function seems to accompany the config is many/most places. So, onDontHaveTimeout callback may be a candidate for moving into the DontHaveTimeoutConfig type.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess the question is: "Do we want users to register a custom handler there?". I would be opposed to that unless there is a prominent use case.

config: cfg,
timeoutsTriggered: timeoutsTriggered,
}

return mqp
Expand Down Expand Up @@ -189,7 +187,7 @@
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout)
defer cancel()

// Ping the peer
Expand Down Expand Up @@ -252,7 +250,7 @@

// Fire the timeout event for the expired wants
if len(expired) > 0 {
go dhtm.fireTimeout(expired)
go dhtm.fireTimeout(expired, dhtm.timeout)
}

if len(dhtm.wantQueue) == 0 {
Expand Down Expand Up @@ -340,14 +338,14 @@
}

// fireTimeout fires the onDontHaveTimeout method with the timed out keys
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid, timeout time.Duration) {
// Make sure the timeout manager has not been shut down
if dhtm.ctx.Err() != nil {
return
}

// Fire the timeout
dhtm.onDontHaveTimeout(pending)
dhtm.onDontHaveTimeout(pending, timeout)

// signal a timeout fired
if dhtm.timeoutsTriggered != nil {
Expand All @@ -360,18 +358,18 @@
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout

Check warning on line 363 in bitswap/client/internal/messagequeue/donthavetimeoutmgr.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/donthavetimeoutmgr.go#L363

Added line #L363 was not covered by tests
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier)
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout
}
return timeout
}
Expand Down
59 changes: 40 additions & 19 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type timeoutRecorder struct {
lk sync.Mutex
}

func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) {
func (tr *timeoutRecorder) onTimeout(tks []cid.Cid, _ time.Duration) {
tr.lk.Lock()
defer tr.lk.Unlock()

Expand Down Expand Up @@ -84,8 +84,10 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) {
pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged}
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -139,8 +141,10 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) {
pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged}
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -176,8 +180,10 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -228,8 +234,10 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -262,8 +270,11 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MessageLatencyMultiplier = msgLatencyMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -309,8 +320,10 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) {
testMaxTimeout := time.Millisecond * 10
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.MessageLatencyMultiplier = msgLatencyMultiplier
cfg.MaxTimeout = testMaxTimeout
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -345,8 +358,11 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) {
pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged, err: errors.New("ping error")}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.DontHaveTimeout = defaultTimeout
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -385,8 +401,11 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.DontHaveTimeout = defaultTimeout
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down Expand Up @@ -424,8 +443,10 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) {
tr := timeoutRecorder{}
timeoutsTriggered := make(chan struct{})

dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout,
dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered)
cfg := defaultDontHaveTimeoutConfig()
cfg.PingLatencyMultiplier = latMultiplier
cfg.MaxExpectedWantProcessTime = expProcessTime
dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered)
dhtm.Start()
defer dhtm.Shutdown()
<-pinged
Expand Down
9 changes: 5 additions & 4 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,13 +214,14 @@ type DontHaveTimeoutManager interface {
}

// New creates a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue {
onTimeout := func(ks []cid.Cid) {
log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p)
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout,
dontHaveTimeoutConfig *DontHaveTimeoutConfig) *MessageQueue {
onTimeout := func(ks []cid.Cid, duration time.Duration) {
log.Infow("Bitswap: timeout waiting for blocks", "timeout", duration.String(), "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
}
clock := clock.New()
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock)
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, dontHaveTimeoutConfig, clock)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil)
}

Expand Down
Loading
Loading