diff --git a/bitswap/client/client.go b/bitswap/client/client.go index fc735f448..18845420a 100644 --- a/bitswap/client/client.go +++ b/bitswap/client/client.go @@ -40,6 +40,8 @@ import ( var log = logging.Logger("bitswap/client") +type DontHaveTimeoutConfig = bsmq.DontHaveTimeoutConfig + // Option defines the functional option type that can be used to configure // bitswap instances type Option func(*Client) @@ -71,6 +73,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option { } } +func WithDontHaveTimeoutConfig(cfg *DontHaveTimeoutConfig) Option { + return func(bs *Client) { + bs.dontHaveTimeoutConfig = cfg + } +} + // Configures the Client to use given tracer. // This provides methods to access all messages sent and received by the Client. // This interface can be used to implement various statistics (this is original intent). @@ -133,7 +141,7 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, bstore blockstore } } peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue { - return bsmq.New(ctx, p, network, onDontHaveTimeout) + return bsmq.New(ctx, p, network, onDontHaveTimeout, bs.dontHaveTimeoutConfig) } sim := bssim.New() @@ -242,6 +250,7 @@ type Client struct { // whether we should actually simulate dont haves on request timeout simulateDontHavesOnTimeout bool + dontHaveTimeoutConfig *DontHaveTimeoutConfig // dupMetric will stay at 0 skipDuplicatedBlocksStats bool diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go index cdeee68ec..96b502d31 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go @@ -10,33 +10,41 @@ import ( "github.com/libp2p/go-libp2p/p2p/protocol/ping" ) -const ( - // dontHaveTimeout is used to simulate a DONT_HAVE when communicating with +func defaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig { + cfg := &DontHaveTimeoutConfig{ + DontHaveTimeout: 5 * time.Second, + MaxExpectedWantProcessTime: 2 * time.Second, + PingLatencyMultiplier: 3, + MessageLatencyAlpha: 0.5, + MessageLatencyMultiplier: 2, + } + + cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime + return cfg +} + +type DontHaveTimeoutConfig struct { + // DontHaveTimeout is used to simulate a DONT_HAVE when communicating with // a peer whose Bitswap client doesn't support the DONT_HAVE response, // or when the peer takes too long to respond. // If the peer doesn't respond to a want-block within the timeout, the // local node assumes that the peer doesn't have the block. - dontHaveTimeout = 5 * time.Second - - // maxExpectedWantProcessTime is the maximum amount of time we expect a + DontHaveTimeout time.Duration + // MaxExpectedWantProcessTime is the maximum amount of time we expect a // peer takes to process a want and initiate sending a response to us - maxExpectedWantProcessTime = 2 * time.Second - - // maxTimeout is the maximum allowed timeout, regardless of latency - maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime - - // pingLatencyMultiplier is multiplied by the average ping time to + MaxExpectedWantProcessTime time.Duration + // MaxTimeout is the maximum allowed timeout, regardless of latency + MaxTimeout time.Duration + // PingLatencyMultiplier is multiplied by the average ping time to // get an upper bound on how long we expect to wait for a peer's response // to arrive - pingLatencyMultiplier = 3 - - // messageLatencyAlpha is the alpha supplied to the message latency EWMA - messageLatencyAlpha = 0.5 - + PingLatencyMultiplier int + // MessageLatencyAlpha is the alpha supplied to the message latency EWMA + MessageLatencyAlpha float64 // To give a margin for error, the timeout is calculated as - // messageLatencyMultiplier * message latency - messageLatencyMultiplier = 2 -) + // MessageLatencyMultiplier * message latency + MessageLatencyMultiplier int +} // PeerConnection is a connection to a peer that can be pinged, and the // average latency measured @@ -61,16 +69,12 @@ type pendingWant struct { // we ping the peer to estimate latency. If we receive a response from the // peer we use the response latency. type dontHaveTimeoutMgr struct { - clock clock.Clock - ctx context.Context - shutdown func() - peerConn PeerConnection - onDontHaveTimeout func([]cid.Cid) - defaultTimeout time.Duration - maxTimeout time.Duration - pingLatencyMultiplier int - messageLatencyMultiplier int - maxExpectedWantProcessTime time.Duration + clock clock.Clock + ctx context.Context + shutdown func() + peerConn PeerConnection + onDontHaveTimeout func([]cid.Cid, time.Duration) + config *DontHaveTimeoutConfig // All variables below here must be protected by the lock lk sync.RWMutex @@ -92,39 +96,33 @@ type dontHaveTimeoutMgr struct { // newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr // onDontHaveTimeout is called when pending keys expire (not cancelled before timeout) -func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr { - return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout, - pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil) +func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig, clock clock.Clock) *dontHaveTimeoutMgr { + if cfg == nil { + cfg = defaultDontHaveTimeoutConfig() + } + return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, cfg, clock, nil) } // newDontHaveTimeoutMgrWithParams is used by the tests func newDontHaveTimeoutMgrWithParams( pc PeerConnection, - onDontHaveTimeout func([]cid.Cid), - defaultTimeout time.Duration, - maxTimeout time.Duration, - pingLatencyMultiplier int, - messageLatencyMultiplier int, - maxExpectedWantProcessTime time.Duration, + onDontHaveTimeout func([]cid.Cid, time.Duration), + cfg *DontHaveTimeoutConfig, clock clock.Clock, timeoutsTriggered chan struct{}, ) *dontHaveTimeoutMgr { ctx, shutdown := context.WithCancel(context.Background()) mqp := &dontHaveTimeoutMgr{ - clock: clock, - ctx: ctx, - shutdown: shutdown, - peerConn: pc, - activeWants: make(map[cid.Cid]*pendingWant), - timeout: defaultTimeout, - messageLatency: &latencyEwma{alpha: messageLatencyAlpha}, - defaultTimeout: defaultTimeout, - maxTimeout: maxTimeout, - pingLatencyMultiplier: pingLatencyMultiplier, - messageLatencyMultiplier: messageLatencyMultiplier, - maxExpectedWantProcessTime: maxExpectedWantProcessTime, - onDontHaveTimeout: onDontHaveTimeout, - timeoutsTriggered: timeoutsTriggered, + clock: clock, + ctx: ctx, + shutdown: shutdown, + peerConn: pc, + activeWants: make(map[cid.Cid]*pendingWant), + timeout: cfg.DontHaveTimeout, + messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha}, + onDontHaveTimeout: onDontHaveTimeout, + config: cfg, + timeoutsTriggered: timeoutsTriggered, } return mqp @@ -189,7 +187,7 @@ func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) { // measurePingLatency measures the latency to the peer by pinging it func (dhtm *dontHaveTimeoutMgr) measurePingLatency() { // Wait up to defaultTimeout for a response to the ping - ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout) + ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout) defer cancel() // Ping the peer @@ -252,7 +250,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Fire the timeout event for the expired wants if len(expired) > 0 { - go dhtm.fireTimeout(expired) + go dhtm.fireTimeout(expired, dhtm.timeout) } if len(dhtm.wantQueue) == 0 { @@ -340,14 +338,14 @@ func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) { } // fireTimeout fires the onDontHaveTimeout method with the timed out keys -func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) { +func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid, timeout time.Duration) { // Make sure the timeout manager has not been shut down if dhtm.ctx.Err() != nil { return } // Fire the timeout - dhtm.onDontHaveTimeout(pending) + dhtm.onDontHaveTimeout(pending, timeout) // signal a timeout fired if dhtm.timeoutsTriggered != nil { @@ -360,18 +358,18 @@ func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Dur // The maximum expected time for a response is // the expected time to process the want + (latency * multiplier) // The multiplier is to provide some padding for variable latency. - timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency - if timeout > dhtm.maxTimeout { - timeout = dhtm.maxTimeout + timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency + if timeout > dhtm.config.MaxTimeout { + timeout = dhtm.config.MaxTimeout } return timeout } // calculateTimeoutFromMessageLatency calculates a timeout derived from message latency func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration { - timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier) - if timeout > dhtm.maxTimeout { - timeout = dhtm.maxTimeout + timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier) + if timeout > dhtm.config.MaxTimeout { + timeout = dhtm.config.MaxTimeout } return timeout } diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go index ee478e605..8332fc56a 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go @@ -51,7 +51,7 @@ type timeoutRecorder struct { lk sync.Mutex } -func (tr *timeoutRecorder) onTimeout(tks []cid.Cid) { +func (tr *timeoutRecorder) onTimeout(tks []cid.Cid, _ time.Duration) { tr.lk.Lock() defer tr.lk.Unlock() @@ -84,8 +84,10 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -139,8 +141,10 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -176,8 +180,10 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -228,8 +234,10 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -262,8 +270,11 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, msgLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MessageLatencyMultiplier = msgLatencyMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -309,8 +320,10 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { testMaxTimeout := time.Millisecond * 10 timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, testMaxTimeout, pingLatencyMultiplier, msgLatencyMultiplier, maxExpectedWantProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.MessageLatencyMultiplier = msgLatencyMultiplier + cfg.MaxTimeout = testMaxTimeout + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -345,8 +358,11 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged, err: errors.New("ping error")} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.DontHaveTimeout = defaultTimeout + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -385,8 +401,11 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - defaultTimeout, dontHaveTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.DontHaveTimeout = defaultTimeout + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged @@ -424,8 +443,10 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) - dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, - dontHaveTimeout, maxTimeout, latMultiplier, messageLatencyMultiplier, expProcessTime, clock, timeoutsTriggered) + cfg := defaultDontHaveTimeoutConfig() + cfg.PingLatencyMultiplier = latMultiplier + cfg.MaxExpectedWantProcessTime = expProcessTime + dhtm := newDontHaveTimeoutMgrWithParams(pc, tr.onTimeout, cfg, clock, timeoutsTriggered) dhtm.Start() defer dhtm.Shutdown() <-pinged diff --git a/bitswap/client/internal/messagequeue/messagequeue.go b/bitswap/client/internal/messagequeue/messagequeue.go index edea20b9c..03f211f70 100644 --- a/bitswap/client/internal/messagequeue/messagequeue.go +++ b/bitswap/client/internal/messagequeue/messagequeue.go @@ -214,13 +214,14 @@ type DontHaveTimeoutManager interface { } // New creates a new MessageQueue. -func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue { - onTimeout := func(ks []cid.Cid) { - log.Infow("Bitswap: timeout waiting for blocks", "cids", ks, "peer", p) +func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, + dontHaveTimeoutConfig *DontHaveTimeoutConfig) *MessageQueue { + onTimeout := func(ks []cid.Cid, duration time.Duration) { + log.Infow("Bitswap: timeout waiting for blocks", "timeout", duration.String(), "cids", ks, "peer", p) onDontHaveTimeout(p, ks) } clock := clock.New() - dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, clock) + dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, dontHaveTimeoutConfig, clock) return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil) } diff --git a/bitswap/client/internal/messagequeue/messagequeue_test.go b/bitswap/client/internal/messagequeue/messagequeue_test.go index 3a9c21309..86410f5e3 100644 --- a/bitswap/client/internal/messagequeue/messagequeue_test.go +++ b/bitswap/client/internal/messagequeue/messagequeue_test.go @@ -168,7 +168,7 @@ func TestStartupAndShutdown(t *testing.T) { fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) bcstwh := random.Cids(10) messageQueue.Startup() @@ -206,7 +206,7 @@ func TestSendingMessagesDeduped(t *testing.T) { fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) wantHaves := random.Cids(10) wantBlocks := random.Cids(10) @@ -227,7 +227,7 @@ func TestSendingMessagesPartialDupe(t *testing.T) { fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) wantHaves := random.Cids(10) wantBlocks := random.Cids(10) @@ -248,7 +248,7 @@ func TestSendingMessagesPriority(t *testing.T) { fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) wantHaves1 := random.Cids(5) wantHaves2 := random.Cids(5) wantHaves := append(wantHaves1, wantHaves2...) @@ -315,7 +315,7 @@ func TestCancelOverridesPendingWants(t *testing.T) { fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) wantHaves := random.Cids(2) wantBlocks := random.Cids(2) @@ -365,7 +365,7 @@ func TestWantOverridesPendingCancels(t *testing.T) { fakeSender := newFakeMessageSender(resetChan, messagesSent, true) fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) cids := random.Cids(3) wantBlocks := cids[:1] @@ -549,7 +549,7 @@ func TestSendToPeerThatDoesntSupportHave(t *testing.T) { fakenet := &fakeMessageNetwork{nil, nil, fakeSender} peerID := random.Peers(1)[0] - messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb) + messageQueue := New(ctx, peerID, fakenet, mockTimeoutCb, defaultDontHaveTimeoutConfig()) messageQueue.Startup() // If the remote peer doesn't support HAVE / DONT_HAVE messages