Skip to content

Commit

Permalink
revert removable of mockable time
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Dec 12, 2024
1 parent e8d42d5 commit 25cb9d4
Show file tree
Hide file tree
Showing 2 changed files with 66 additions and 38 deletions.
18 changes: 13 additions & 5 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"sync"
"time"

"github.com/benbjohnson/clock"
"github.com/gammazero/deque"
cid "github.com/ipfs/go-cid"
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
Expand Down Expand Up @@ -32,9 +33,12 @@ type DontHaveTimeoutConfig struct {
// MessageLatencyMultiplier gives a margin for error. The timeout is calculated as
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int

// timeoutsSignal used for testing -- caller-provided channel to signals
// when a dont have timeout was triggered.
timeoutsSignal chan<- struct{}
// clock is a mockable time api used for testing.
clock clock.Clock
}

func (c *DontHaveTimeoutConfig) Enabled() bool {
Expand Down Expand Up @@ -95,7 +99,7 @@ type dontHaveTimeoutMgr struct {
// ewma of message latency (time from message sent to response received)
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *time.Timer
checkForTimeoutsTimer *clock.Timer
}

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
Expand All @@ -110,6 +114,9 @@ func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid,
if cfg == nil {
cfg = DefaultDontHaveTimeoutConfig()
}
if cfg.clock == nil {
cfg.clock = clock.New()
}
ctx, shutdown := context.WithCancel(context.Background())
return &dontHaveTimeoutMgr{
ctx: ctx,
Expand Down Expand Up @@ -221,6 +228,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {

// Figure out which of the blocks that were wanted were not received
// within the timeout
now := dhtm.config.clock.Now()
expired := make([]cid.Cid, 0, len(dhtm.activeWants))
for dhtm.wantQueue.Len() > 0 {
pw := dhtm.wantQueue.Front()
Expand All @@ -229,7 +237,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
if pw.active {
// The queue is in order from earliest to latest, so if we
// didn't find an expired entry we can stop iterating
if time.Since(pw.sent) < dhtm.timeout {
if now.Sub(pw.sent) < dhtm.timeout {
break
}

Expand Down Expand Up @@ -260,9 +268,9 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
// Schedule the next check for the moment when the oldest pending want will
// timeout
oldestStart := dhtm.wantQueue.Front().sent
until := time.Until(oldestStart.Add(dhtm.timeout))
until := oldestStart.Add(dhtm.timeout).Sub(now)
if dhtm.checkForTimeoutsTimer == nil {
dhtm.checkForTimeoutsTimer = time.NewTimer(until)
dhtm.checkForTimeoutsTimer = dhtm.config.clock.Timer(until)
go func() {
for {
select {
Expand All @@ -288,7 +296,7 @@ func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
return
}

start := time.Now()
start := dhtm.config.clock.Now()

dhtm.lk.Lock()
defer dhtm.lk.Unlock()
Expand Down
Loading

0 comments on commit 25cb9d4

Please sign in to comment.