From 25cb9d468d45df87ac07b0e43a81b80d1a7e44ba Mon Sep 17 00:00:00 2001 From: gammazero <11790789+gammazero@users.noreply.github.com> Date: Thu, 12 Dec 2024 07:00:06 -1000 Subject: [PATCH] revert removable of mockable time --- .../messagequeue/donthavetimeoutmgr.go | 18 ++-- .../messagequeue/donthavetimeoutmgr_test.go | 86 ++++++++++++------- 2 files changed, 66 insertions(+), 38 deletions(-) diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go index 8db1ca53d..f2e1d2c4c 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/benbjohnson/clock" "github.com/gammazero/deque" cid "github.com/ipfs/go-cid" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -32,9 +33,12 @@ type DontHaveTimeoutConfig struct { // MessageLatencyMultiplier gives a margin for error. The timeout is calculated as // MessageLatencyMultiplier * message latency MessageLatencyMultiplier int + // timeoutsSignal used for testing -- caller-provided channel to signals // when a dont have timeout was triggered. timeoutsSignal chan<- struct{} + // clock is a mockable time api used for testing. + clock clock.Clock } func (c *DontHaveTimeoutConfig) Enabled() bool { @@ -95,7 +99,7 @@ type dontHaveTimeoutMgr struct { // ewma of message latency (time from message sent to response received) messageLatency *latencyEwma // timer used to wait until want at front of queue expires - checkForTimeoutsTimer *time.Timer + checkForTimeoutsTimer *clock.Timer } // newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr @@ -110,6 +114,9 @@ func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, if cfg == nil { cfg = DefaultDontHaveTimeoutConfig() } + if cfg.clock == nil { + cfg.clock = clock.New() + } ctx, shutdown := context.WithCancel(context.Background()) return &dontHaveTimeoutMgr{ ctx: ctx, @@ -221,6 +228,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Figure out which of the blocks that were wanted were not received // within the timeout + now := dhtm.config.clock.Now() expired := make([]cid.Cid, 0, len(dhtm.activeWants)) for dhtm.wantQueue.Len() > 0 { pw := dhtm.wantQueue.Front() @@ -229,7 +237,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { if pw.active { // The queue is in order from earliest to latest, so if we // didn't find an expired entry we can stop iterating - if time.Since(pw.sent) < dhtm.timeout { + if now.Sub(pw.sent) < dhtm.timeout { break } @@ -260,9 +268,9 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() { // Schedule the next check for the moment when the oldest pending want will // timeout oldestStart := dhtm.wantQueue.Front().sent - until := time.Until(oldestStart.Add(dhtm.timeout)) + until := oldestStart.Add(dhtm.timeout).Sub(now) if dhtm.checkForTimeoutsTimer == nil { - dhtm.checkForTimeoutsTimer = time.NewTimer(until) + dhtm.checkForTimeoutsTimer = dhtm.config.clock.Timer(until) go func() { for { select { @@ -288,7 +296,7 @@ func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) { return } - start := time.Now() + start := dhtm.config.clock.Now() dhtm.lk.Lock() defer dhtm.lk.Unlock() diff --git a/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go b/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go index 4e51678bd..5d0c79df8 100644 --- a/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go +++ b/bitswap/client/internal/messagequeue/donthavetimeoutmgr_test.go @@ -7,6 +7,7 @@ import ( "testing" "time" + "github.com/benbjohnson/clock" cid "github.com/ipfs/go-cid" "github.com/ipfs/go-test/random" "github.com/libp2p/go-libp2p/p2p/protocol/ping" @@ -16,11 +17,12 @@ type mockPeerConn struct { err error latency time.Duration latencies []time.Duration + clock clock.Clock pinged chan struct{} } func (pc *mockPeerConn) Ping(ctx context.Context) ping.Result { - timer := time.NewTimer(pc.latency) + timer := pc.clock.Timer(pc.latency) pc.pinged <- struct{}{} select { case <-timer.C: @@ -77,14 +79,16 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { latMultiplier := 2 expProcessTime := 5 * time.Millisecond expectedTimeout := expProcessTime + latency*time.Duration(latMultiplier) + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) cfg := DefaultDontHaveTimeoutConfig() cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -93,7 +97,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { dhtm.AddPending(firstks) // Wait for less than the expected timeout - time.Sleep(expectedTimeout - 10*time.Millisecond) + clock.Add(expectedTimeout - 10*time.Millisecond) // At this stage no keys should have timed out if tr.timedOutCount() > 0 { @@ -104,7 +108,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { dhtm.AddPending(secondks) // Wait until after the expected timeout - time.Sleep(30 * time.Millisecond) + clock.Add(20 * time.Millisecond) <-timeoutsTriggered @@ -116,7 +120,7 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { tr.clear() // Sleep until the second set of keys should have timed out - time.Sleep(expectedTimeout + 50*time.Millisecond) + clock.Add(expectedTimeout + 10*time.Millisecond) <-timeoutsTriggered @@ -130,18 +134,20 @@ func TestDontHaveTimeoutMgrTimeout(t *testing.T) { func TestDontHaveTimeoutMgrCancel(t *testing.T) { ks := random.Cids(3) - latency := time.Millisecond * 50 + latency := time.Millisecond * 10 latMultiplier := 1 expProcessTime := time.Duration(0) expectedTimeout := latency + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) cfg := DefaultDontHaveTimeoutConfig() cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -149,14 +155,14 @@ func TestDontHaveTimeoutMgrCancel(t *testing.T) { // Add keys dhtm.AddPending(ks) - time.Sleep(5 * time.Millisecond) + clock.Add(5 * time.Millisecond) // Cancel keys cancelCount := 1 dhtm.CancelPending(ks[:cancelCount]) // Wait for the expected timeout - time.Sleep(expectedTimeout) + clock.Add(expectedTimeout) <-timeoutsTriggered @@ -172,14 +178,16 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { latMultiplier := 1 expProcessTime := time.Duration(0) expectedTimeout := latency + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) cfg := DefaultDontHaveTimeoutConfig() cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -189,18 +197,18 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { dhtm.AddPending(ks) // Wait for a short time - time.Sleep(expectedTimeout - 10*time.Millisecond) + clock.Add(expectedTimeout - 10*time.Millisecond) // Cancel two keys dhtm.CancelPending(ks[:2]) - time.Sleep(5 * time.Millisecond) + clock.Add(5 * time.Millisecond) // Add back one cancelled key dhtm.AddPending(ks[:1]) // Wait till after initial timeout - time.Sleep(10 * time.Millisecond) + clock.Add(10 * time.Millisecond) <-timeoutsTriggered @@ -210,7 +218,7 @@ func TestDontHaveTimeoutWantCancelWant(t *testing.T) { } // Wait till after added back key should time out - time.Sleep(latency) + clock.Add(latency) <-timeoutsTriggered @@ -225,8 +233,9 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { latency := time.Millisecond * 5 latMultiplier := 1 expProcessTime := time.Duration(0) + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) @@ -234,6 +243,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -245,7 +255,7 @@ func TestDontHaveTimeoutRepeatedAddPending(t *testing.T) { } // Wait for the expected timeout - time.Sleep(latency + 5*time.Millisecond) + clock.Add(latency + 5*time.Millisecond) <-timeoutsTriggered @@ -261,8 +271,9 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { latMultiplier := 1 expProcessTime := time.Duration(0) msgLatencyMultiplier := 1 + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) @@ -271,6 +282,7 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { cfg.MessageLatencyMultiplier = msgLatencyMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -284,7 +296,7 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { // = 40ms // Wait for less than the expected timeout - time.Sleep(25 * time.Millisecond) + clock.Add(25 * time.Millisecond) // Receive two message latency updates dhtm.UpdateMessageLatency(time.Millisecond * 20) @@ -298,7 +310,7 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { // the keys should have timed out // Give the queue some time to process the updates - time.Sleep(5 * time.Millisecond) + clock.Add(5 * time.Millisecond) <-timeoutsTriggered @@ -309,8 +321,9 @@ func TestDontHaveTimeoutMgrMessageLatency(t *testing.T) { func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { ks := random.Cids(2) + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: time.Second, pinged: pinged} + pc := &mockPeerConn{latency: time.Second, clock: clock, pinged: pinged} tr := timeoutRecorder{} msgLatencyMultiplier := 1 testMaxTimeout := time.Millisecond * 10 @@ -320,6 +333,7 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { cfg.MessageLatencyMultiplier = msgLatencyMultiplier cfg.MaxTimeout = testMaxTimeout cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -332,7 +346,7 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { dhtm.UpdateMessageLatency(testMaxTimeout * 4) // Sleep until just after the maximum timeout - time.Sleep(testMaxTimeout + 5*time.Millisecond) + clock.Add(testMaxTimeout + 5*time.Millisecond) <-timeoutsTriggered @@ -344,14 +358,15 @@ func TestDontHaveTimeoutMgrMessageLatencyMax(t *testing.T) { func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { ks := random.Cids(2) - latency := time.Millisecond * 10 + latency := time.Millisecond latMultiplier := 2 expProcessTime := 2 * time.Millisecond defaultTimeout := 10 * time.Millisecond expectedTimeout := expProcessTime + defaultTimeout tr := timeoutRecorder{} + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged, err: errors.New("ping error")} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged, err: errors.New("ping error")} timeoutsTriggered := make(chan struct{}) cfg := DefaultDontHaveTimeoutConfig() @@ -359,6 +374,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -368,7 +384,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { dhtm.AddPending(ks) // Sleep for less than the expected timeout - time.Sleep(expectedTimeout - 5*time.Millisecond) + clock.Add(expectedTimeout - 5*time.Millisecond) // At this stage no timeout should have happened yet if tr.timedOutCount() > 0 { @@ -376,7 +392,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfPingError(t *testing.T) { } // Sleep until after the expected timeout - time.Sleep(50 * time.Millisecond) + clock.Add(10 * time.Millisecond) <-timeoutsTriggered @@ -392,8 +408,9 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { latMultiplier := 1 expProcessTime := time.Duration(0) defaultTimeout := 100 * time.Millisecond + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) @@ -402,6 +419,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -411,7 +429,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { dhtm.AddPending(ks) // Sleep for less than the default timeout - time.Sleep(defaultTimeout - 50*time.Millisecond) + clock.Add(defaultTimeout - 50*time.Millisecond) // At this stage no timeout should have happened yet if tr.timedOutCount() > 0 { @@ -419,7 +437,7 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { } // Sleep until after the default timeout - time.Sleep(defaultTimeout * 2) + clock.Add(defaultTimeout * 2) <-timeoutsTriggered @@ -431,11 +449,12 @@ func TestDontHaveTimeoutMgrUsesDefaultTimeoutIfLatencyLonger(t *testing.T) { func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { ks := random.Cids(2) - latency := time.Millisecond * 100 + latency := time.Millisecond * 10 latMultiplier := 1 expProcessTime := time.Duration(0) + clock := clock.NewMock() pinged := make(chan struct{}) - pc := &mockPeerConn{latency: latency, pinged: pinged} + pc := &mockPeerConn{latency: latency, clock: clock, pinged: pinged} tr := timeoutRecorder{} timeoutsTriggered := make(chan struct{}) @@ -443,6 +462,7 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { cfg.PingLatencyMultiplier = latMultiplier cfg.MaxExpectedWantProcessTime = expProcessTime cfg.timeoutsSignal = timeoutsTriggered + cfg.clock = clock dhtm := newDontHaveTimeoutMgr(pc, tr.onTimeout, cfg) dhtm.Start() defer dhtm.Shutdown() @@ -452,13 +472,13 @@ func TestDontHaveTimeoutNoTimeoutAfterShutdown(t *testing.T) { dhtm.AddPending(ks) // Wait less than the timeout - time.Sleep(latency - 20*time.Millisecond) + clock.Add(latency - 5*time.Millisecond) // Shutdown the manager dhtm.Shutdown() - // Wait for the unexpected timeout - time.Sleep(200 * time.Millisecond) + // Wait for the expected timeout + clock.Add(10 * time.Millisecond) // Manager was shut down so timeout should not have fired if tr.timedOutCount() != 0 {