From 2a26bf9eaa59188f1d3cddafb202bc5c9488c0ee Mon Sep 17 00:00:00 2001 From: Andrew Gillis <11790789+gammazero@users.noreply.github.com> Date: Fri, 13 Dec 2024 12:07:03 -1000 Subject: [PATCH] bitswap/client/messagequeue: expose dontHaveTimeoutMgr configuration (#750) * expose dontHaveTimeoutMgr configuration - Create an exported DontHaveTimeoutConfig to hold configuration values for dontHaveTimeoutMrg - Provide function DefaultDontHaveTimeoutConfig to return a DontHaveTimeoutConfig populated with default valued. - Bitswap client New has new option for caller to supply a DontHaveTimeoutConfig - If the required onHaveTimeout function is nil, then disable the dontaHaveTimeoutMgr --- CHANGELOG.md | 11 +- bitswap/client/client.go | 20 +- .../messagequeue/donthavetimeoutmgr.go | 183 +++++++++--------- .../messagequeue/donthavetimeoutmgr_test.go | 80 ++++++-- .../internal/messagequeue/messagequeue.go | 73 +++++-- 5 files changed, 230 insertions(+), 137 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 41ec2392d..e2f2a10ee 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -16,12 +16,21 @@ The following emojis are used to highlight certain changes: ### Added +- `bitswap/client`: Improved timeout configuration for block requests + - Exposed `DontHaveTimeoutConfig` to hold configuration values for `dontHaveTimeoutMgr` which controls how long to wait for requested block before emitting a synthetic DontHave response + - Added `DefaultDontHaveTimeoutConfig()` to return a `DontHaveTimeoutConfig` populated with default values + - Added optional `WithDontHaveTimeoutConfig` to allow passing a custom `DontHaveTimeoutConfig` + - Setting `SetSendDontHaves(false)` works the same as before. Behind the scenes, it will disable `dontHaveTimeoutMgr` by passing a `nil` `onDontHaveTimeout` to `newDontHaveTimeoutMgr`. + + ### Changed * 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`. ### Removed + + ### Fixed ### Security @@ -67,7 +76,7 @@ The following emojis are used to highlight certain changes: provider := provider.New(datastore, provider.Online(contentProvider) // A wrapped providing exchange using the previous exchange and the provider. exch := providing.New(bswap, provider) - + // Finally the blockservice bserv := blockservice.New(blockstore, exch) ... diff --git a/bitswap/client/client.go b/bitswap/client/client.go index a115d07f6..2e83949cb 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -69,6 +69,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { } } +func WithDontHaveTimeoutConfig(cfg *bsmq.DontHaveTimeoutConfig) Option { + return func(bs *Client) { + bs.dontHaveTimeoutConfig = cfg + } +} + // Configures the Client to use given tracer. // This provides methods to access all messages sent and received by the Client. // This interface can be used to implement various statistics (this is original intent). @@ -165,16 +171,23 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr // onDontHaveTimeout is called when a want-block is sent to a peer that // has an old version of Bitswap that doesn't support DONT_HAVE messages, // or when no response is received within a timeout. + // + // When set to nil (when bs.simulateDontHavesOnTimeout is false), then + // disable the dontHaveTimoutMgr and do not simulate DONT_HAVE messages on + // timeout. + var onDontHaveTimeout func(peer.ID, []cid.Cid) + var sm *bssm.SessionManager - onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) { + if bs.simulateDontHavesOnTimeout { // Simulate a message arriving with DONT_HAVEs - if bs.simulateDontHavesOnTimeout { + onDontHaveTimeout = func(p peer.ID, dontHaves []cid.Cid) { sm.ReceiveFrom(ctx, p, nil, nil, dontHaves) } } peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, p, network, onDontHaveTimeout) + return bsmq.New(ctx, p, network, onDontHaveTimeout, bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig)) } + bs.dontHaveTimeoutConfig = nil sim := bssim.New() bpm := bsbpm.New() @@ -284,6 +297,7 @@ type Client struct { // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool + dontHaveTimeoutConfig *bsmq.DontHaveTimeoutConfig // dupMetric will stay at 0 skipDuplicatedBlocksStats bool diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go index a6180a5d8..c90a9f227 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go @@ -11,33 +11,52 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -const ( - // dontHaveTimeout is used to simulate a DONT_HAVE when communicating with +type DontHaveTimeoutConfig struct { + // DontHaveTimeout is used to simulate a DONT_HAVE when communicating with // a peer whose Bitswap client doesn't support the DONT_HAVE response, // or when the peer takes too long to respond. // If the peer doesn't respond to a want-block within the timeout, the // local node assumes that the peer doesn't have the block. - dontHaveTimeout = 5 * time.Second + DontHaveTimeout time.Duration - // maxExpectedWantProcessTime is the maximum amount of time we expect a + // MaxExpectedWantProcessTime is the maximum amount of time we expect a // peer takes to process a want and initiate sending a response to us - maxExpectedWantProcessTime = 2 * time.Second + MaxExpectedWantProcessTime time.Duration - // maxTimeout is the maximum allowed timeout, regardless of latency - maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime + // MaxTimeout is the maximum allowed timeout, regardless of latency + MaxTimeout time.Duration - // pingLatencyMultiplier is multiplied by the average ping time to + // PingLatencyMultiplier is multiplied by the average ping time to // get an upper bound on how long we expect to wait for a peer's response // to arrive - pingLatencyMultiplier = 3 + PingLatencyMultiplier int - // messageLatencyAlpha is the alpha supplied to the message latency EWMA - messageLatencyAlpha = 0.5 + // MessageLatencyAlpha is the alpha supplied to the message latency EWMA + MessageLatencyAlpha float64 - // To give a margin for error, the timeout is calculated as - // messageLatencyMultiplier * message latency - messageLatencyMultiplier = 2 -) + // MessageLatencyMultiplier gives a margin for error. The timeout is calculated as + // MessageLatencyMultiplier * message latency + MessageLatencyMultiplier int + + // timeoutsSignal used for testing -- caller-provided channel to signals + // when a dont have timeout was triggered. + timeoutsSignal chan<- struct{} + + // clock is a mockable time api used for testing. + clock clock.Clock +} + +func DefaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig { + cfg := DontHaveTimeoutConfig{ + DontHaveTimeout: 5 * time.Second, + MaxExpectedWantProcessTime: 2 * time.Second, + PingLatencyMultiplier: 3, + MessageLatencyAlpha: 0.5, + MessageLatencyMultiplier: 2, + } + cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime + return &cfg +} // PeerConnection is a connection to a peer that can be pinged, and the // average latency measured @@ -62,16 +81,11 @@ type pendingWant struct { // we ping the peer to estimate latency. If we receive a response from the // peer we use the response latency. type dontHaveTimeoutMgr struct { - clock clock.Clock - ctx context.Context - shutdown func() - peerConn PeerConnection - onDontHaveTimeout func([]cid.Cid) - defaultTimeout time.Duration - maxTimeout time.Duration - pingLatencyMultiplier int - messageLatencyMultiplier int - maxExpectedWantProcessTime time.Duration + ctx context.Context + shutdown func() + peerConn PeerConnection + onDontHaveTimeout func([]cid.Cid, time.Duration) + config DontHaveTimeoutConfig // All variables below here must be protected by the lock lk sync.RWMutex @@ -87,48 +101,34 @@ type dontHaveTimeoutMgr struct { messageLatency *latencyEwma // timer used to wait until want at front of queue expires checkForTimeoutsTimer *clock.Timer - // used for testing -- timeoutsTriggered when a scheduled dont have timeouts were triggered - timeoutsTriggered chan struct{} } // newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr -// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout) -func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr { - return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout, - pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil) -} - -// newDontHaveTimeoutMgrWithParams is used by the tests -func newDontHaveTimeoutMgrWithParams( - pc PeerConnection, - onDontHaveTimeout func([]cid.Cid), - defaultTimeout time.Duration, - maxTimeout time.Duration, - pingLatencyMultiplier int, - messageLatencyMultiplier int, - maxExpectedWantProcessTime time.Duration, - clock clock.Clock, - timeoutsTriggered chan struct{}, -) *dontHaveTimeoutMgr { +// +// onDontHaveTimeout is the function called when pending keys expire (not +// cancelled before timeout). If this is nil, then DontHaveTimeoutMgm is +// disabled. +func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig) *dontHaveTimeoutMgr { + if onDontHaveTimeout == nil { + return nil + } + if cfg == nil { + cfg = DefaultDontHaveTimeoutConfig() + } + if cfg.clock == nil { + cfg.clock = clock.New() + } ctx, shutdown := context.WithCancel(context.Background()) - mqp := &dontHaveTimeoutMgr{ - clock: clock, - ctx: ctx, - shutdown: shutdown, - peerConn: pc, - activeWants: make(map[cid.Cid]*pendingWant), - timeout: defaultTimeout, - messageLatency: &latencyEwma{alpha: messageLatencyAlpha}, - defaultTimeout: defaultTimeout, - maxTimeout: maxTimeout, - pingLatencyMultiplier: pingLatencyMultiplier, - messageLatencyMultiplier: messageLatencyMultiplier, - maxExpectedWantProcessTime: maxExpectedWantProcessTime, - onDontHaveTimeout: onDontHaveTimeout, - timeoutsTriggered: timeoutsTriggered, + return &dontHaveTimeoutMgr{ + ctx: ctx, + config: *cfg, + shutdown: shutdown, + peerConn: pc, + activeWants: make(map[cid.Cid]*pendingWant), + timeout: cfg.DontHaveTimeout, + messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha}, + onDontHaveTimeout: onDontHaveTimeout, } - - return mqp } // Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored @@ -190,7 +190,7 @@ func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) { // measurePingLatency measures the latency to the peer by pinging it func (dhtm *dontHaveTimeoutMgr) measurePingLatency() { // Wait up to defaultTimeout for a response to the ping - ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout) + ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout) defer cancel() // Ping the peer @@ -229,6 +229,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Figure out which of the blocks that were wanted were not received // within the timeout + now := dhtm.config.clock.Now() expired := make([]cid.Cid, 0, len(dhtm.activeWants)) for dhtm.wantQueue.Len() > 0 { pw := dhtm.wantQueue.Front() @@ -237,7 +238,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { if pw.active { // The queue is in order from earliest to latest, so if we // didn't find an expired entry we can stop iterating - if dhtm.clock.Since(pw.sent) < dhtm.timeout { + if now.Sub(pw.sent) < dhtm.timeout { break } @@ -253,7 +254,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Fire the timeout event for the expired wants if len(expired) > 0 { - go dhtm.fireTimeout(expired) + go dhtm.fireTimeout(expired, dhtm.timeout) } if dhtm.wantQueue.Len() == 0 { @@ -268,29 +269,27 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Schedule the next check for the moment when the oldest pending want will // timeout oldestStart := dhtm.wantQueue.Front().sent - until := oldestStart.Add(dhtm.timeout).Sub(dhtm.clock.Now()) + until := oldestStart.Add(dhtm.timeout).Sub(now) if dhtm.checkForTimeoutsTimer == nil { - dhtm.checkForTimeoutsTimer = dhtm.clock.Timer(until) - go dhtm.consumeTimeouts() + dhtm.checkForTimeoutsTimer = dhtm.config.clock.Timer(until) + go func() { + for { + select { + case <-dhtm.ctx.Done(): + return + case <-dhtm.checkForTimeoutsTimer.C: + dhtm.lk.Lock() + dhtm.checkForTimeouts() + dhtm.lk.Unlock() + } + } + }() } else { dhtm.checkForTimeoutsTimer.Stop() dhtm.checkForTimeoutsTimer.Reset(until) } } -func (dhtm *dontHaveTimeoutMgr) consumeTimeouts() { - for { - select { - case <-dhtm.ctx.Done(): - return - case <-dhtm.checkForTimeoutsTimer.C: - dhtm.lk.Lock() - dhtm.checkForTimeouts() - dhtm.lk.Unlock() - } - } -} - // AddPending adds the given keys that will expire if not cancelled before // the timeout func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) { @@ -298,7 +297,7 @@ func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) { return } - start := dhtm.clock.Now() + start := dhtm.config.clock.Now() dhtm.lk.Lock() defer dhtm.lk.Unlock() @@ -341,18 +340,18 @@ func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) { } // fireTimeout fires the onDontHaveTimeout method with the timed out keys -func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) { +func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid, timeout time.Duration) { // Make sure the timeout manager has not been shut down if dhtm.ctx.Err() != nil { return } // Fire the timeout - dhtm.onDontHaveTimeout(pending) + dhtm.onDontHaveTimeout(pending, timeout) // signal a timeout fired - if dhtm.timeoutsTriggered != nil { - dhtm.timeoutsTriggered <- struct{}{} + if dhtm.config.timeoutsSignal != nil { + dhtm.config.timeoutsSignal <- struct{}{} } } @@ -361,18 +360,18 @@ func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Dur // The maximum expected time for a response is // the expected time to process the want + (latency * multiplier) // The multiplier is to provide some padding for variable latency. - timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency - if timeout > dhtm.maxTimeout { - timeout = dhtm.maxTimeout + timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency + if timeout > dhtm.config.MaxTimeout { + timeout = dhtm.config.MaxTimeout } return timeout } // calculateTimeoutFromMessageLatency calculates a timeout derived from message latency func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration { - timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier) - if timeout > dhtm.maxTimeout { - timeout = dhtm.maxTimeout + timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier) + if timeout > dhtm.config.MaxTimeout { + timeout = dhtm.config.MaxTimeout } return timeout } diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go index ee478e605..5d0c79df8 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go @@ -51,7 +51,7 @@ type timeoutRecorder struct { lk sync.Mutex } -func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) { +func (tr *timeoutRecorder) onTimeout(tks []cid.Cid, _ time.Duration) { tr.lk.Lock() defer tr.lk.Unlock() @@ -84,8 +84,12 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -139,8 +143,12 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -175,9 +183,12 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -228,8 +239,12 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -262,8 +277,13 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MessageLatencyMultiplier = msgLatencyMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -309,8 +329,12 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { testMaxTimeout := time.Millisecond * 10 timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.MessageLatencyMultiplier = msgLatencyMultiplier + cfg.MaxTimeout = testMaxTimeout + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -334,7 +358,7 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { ks := random.Cids(2) - latency := time.Millisecond * 1 + latency := time.Millisecond latMultiplier := 2 expProcessTime := 2 * time.Millisecond defaultTimeout := 10 * time.Millisecond @@ -345,8 +369,13 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged, err: errors.New("ping error")} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.DontHaveTimeout = defaultTimeout + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -385,8 +414,13 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.DontHaveTimeout = defaultTimeout + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -424,8 +458,12 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := DefaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock + dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() <-pinged diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index 0b9dc249e..5c5bb5a17 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -104,7 +104,7 @@ type MessageQueue struct { clock clock.Clock // Used to track things that happen asynchronously -- used only in test - events chan messageEvent + events chan<- messageEvent } // recallWantlist keeps a list of pending wants and a list of sent wants @@ -230,15 +230,37 @@ type DontHaveTimeoutManager interface { UpdateMessageLatency(time.Duration) } +type optsConfig struct { + dhtConfig *DontHaveTimeoutConfig +} + +type option func(*optsConfig) + +func WithDontHaveTimeoutConfig(dhtConfig *DontHaveTimeoutConfig) option { + return func(cfg *optsConfig) { + cfg.dhtConfig = dhtConfig + } +} + // New creates a new MessageQueue. -func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { - onTimeout := func(ks []cid.Cid) { - log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p) - onDontHaveTimeout(p, ks) +// +// If onDontHaveTimeout is nil, then the dontHaveTimeoutMrg is disabled. +func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, options ...option) *MessageQueue { + var opts optsConfig + for _, o := range options { + o(&opts) + } + + var onTimeout func([]cid.Cid, time.Duration) + var dhTimeoutMgr DontHaveTimeoutManager + if onDontHaveTimeout != nil { + onTimeout = func(ks []cid.Cid, timeout time.Duration) { + log.Infow("Bitswap: timeout waiting for blocks", "timeout", timeout.String(), "cids", ks, "peer", p) + onDontHaveTimeout(p, ks) + } + dhTimeoutMgr = newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, opts.dhtConfig) } - clock := clock.New() - dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock) - return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil) + return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, nil, nil) } type messageEvent int @@ -258,9 +280,12 @@ func newMessageQueue( sendErrorBackoff time.Duration, maxValidLatency time.Duration, dhTimeoutMgr DontHaveTimeoutManager, - clock clock.Clock, - events chan messageEvent, + clk clock.Clock, + events chan<- messageEvent, ) *MessageQueue { + if clk == nil { + clk = clock.New() + } ctx, cancel := context.WithCancel(ctx) return &MessageQueue{ ctx: ctx, @@ -281,7 +306,7 @@ func newMessageQueue( // For performance reasons we just clear out the fields of the message // after using it, instead of creating a new one every time. msg: bsmsg.New(false), - clock: clock, + clock: clk, events: events, } } @@ -345,7 +370,9 @@ func (mq *MessageQueue) AddCancels(cancelKs []cid.Cid) { } // Cancel any outstanding DONT_HAVE timers - mq.dhTimeoutMgr.CancelPending(cancelKs) + if mq.dhTimeoutMgr != nil { + mq.dhTimeoutMgr.CancelPending(cancelKs) + } mq.wllock.Lock() @@ -412,8 +439,10 @@ func (mq *MessageQueue) Shutdown() { } func (mq *MessageQueue) onShutdown() { - // Shut down the DONT_HAVE timeout manager - mq.dhTimeoutMgr.Shutdown() + if mq.dhTimeoutMgr != nil { + // Shut down the DONT_HAVE timeout manager + mq.dhTimeoutMgr.Shutdown() + } // Reset the streamMessageSender if mq.sender != nil { @@ -527,9 +556,11 @@ func (mq *MessageQueue) sendMessage() { return } - // Make sure the DONT_HAVE timeout manager has started - // Note: Start is idempotent - mq.dhTimeoutMgr.Start() + if mq.dhTimeoutMgr != nil { + // Make sure the DONT_HAVE timeout manager has started + // Note: Start is idempotent + mq.dhTimeoutMgr.Start() + } // Convert want lists to a Bitswap Message message, onSent := mq.extractOutgoingMessage(mq.sender.SupportsHave()) @@ -589,8 +620,10 @@ func (mq *MessageQueue) simulateDontHaveWithTimeout(wantlist []bsmsg.Entry) { mq.wllock.Unlock() - // Add wants to DONT_HAVE timeout manager - mq.dhTimeoutMgr.AddPending(wants) + if mq.dhTimeoutMgr != nil { + // Add wants to DONT_HAVE timeout manager + mq.dhTimeoutMgr.AddPending(wants) + } } // handleResponse is called when a response is received from the peer, @@ -632,7 +665,7 @@ func (mq *MessageQueue) handleResponse(ks []cid.Cid) { mq.wllock.Unlock() - if !earliest.IsZero() { + if !earliest.IsZero() && mq.dhTimeoutMgr != nil { // Inform the timeout manager of the calculated latency mq.dhTimeoutMgr.UpdateMessageLatency(now.Sub(earliest)) }