Skip to content

Commit

Permalink
Client options for DontHaveTimeoutConfig
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Dec 12, 2024
1 parent a23b36e commit a0ac8f0
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 12 deletions.
10 changes: 9 additions & 1 deletion bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

func WithDontHaveTimeoutConfig(cfg *bsmq.DontHaveTimeoutConfig) Option {
return func(bs *Client) {
bs.dontHaveTimeoutConfig = cfg
}

Check warning on line 75 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L72-L75

Added lines #L72 - L75 were not covered by tests
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -173,8 +179,9 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig))
}
bs.dontHaveTimeoutConfig = nil

sim := bssim.New()
bpm := bsbpm.New()
Expand Down Expand Up @@ -284,6 +291,7 @@ type Client struct {

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
dontHaveTimeoutConfig *bsmq.DontHaveTimeoutConfig

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool
Expand Down
47 changes: 36 additions & 11 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ type MessageQueue struct {
clock clock.Clock

// Used to track things that happen asynchronously -- used only in test
events chan messageEvent
events chan<- messageEvent
}

// recallWantlist keeps a list of pending wants and a list of sent wants
Expand Down Expand Up @@ -230,15 +230,37 @@ type DontHaveTimeoutManager interface {
UpdateMessageLatency(time.Duration)
}

type optsConfig struct {
dhtConfig *DontHaveTimeoutConfig
}

type option func(*optsConfig)

func WithDontHaveTimeoutConfig(dhtConfig *DontHaveTimeoutConfig) option {
return func(cfg *optsConfig) {
cfg.dhtConfig = dhtConfig
}
}

// New creates a new MessageQueue.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout) *MessageQueue {
onTimeout := func(ks []cid.Cid, timeout time.Duration) {
log.Infow("Bitswap: timeout waiting for blocks", "timeout", timeout.String(), "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
//
// If onDontHaveTimeout is nil, then the dontHaveTimeoutMrg is disabled.
func New(ctx context.Context, p peer.ID, network MessageNetwork, onDontHaveTimeout OnDontHaveTimeout, options ...option) *MessageQueue {
var opts optsConfig
for _, o := range options {
o(&opts)
}

var onTimeout func([]cid.Cid, time.Duration)
var dhTimeoutMgr DontHaveTimeoutManager
if onDontHaveTimeout != nil {
onTimeout = func(ks []cid.Cid, timeout time.Duration) {
log.Infow("Bitswap: timeout waiting for blocks", "timeout", timeout.String(), "cids", ks, "peer", p)
onDontHaveTimeout(p, ks)
}
dhTimeoutMgr = newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, opts.dhtConfig)
}
dhTimeoutMgr := newDontHaveTimeoutMgr(newPeerConnection(p, network), onTimeout, nil)
clock := clock.New()
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, clock, nil)
return newMessageQueue(ctx, p, network, maxMessageSize, sendErrorBackoff, maxValidLatency, dhTimeoutMgr, nil, nil)
}

type messageEvent int
Expand All @@ -258,9 +280,12 @@ func newMessageQueue(
sendErrorBackoff time.Duration,
maxValidLatency time.Duration,
dhTimeoutMgr DontHaveTimeoutManager,
clock clock.Clock,
events chan messageEvent,
clk clock.Clock,
events chan<- messageEvent,
) *MessageQueue {
if clk == nil {
clk = clock.New()
}
ctx, cancel := context.WithCancel(ctx)
return &MessageQueue{
ctx: ctx,
Expand All @@ -281,7 +306,7 @@ func newMessageQueue(
// For performance reasons we just clear out the fields of the message
// after using it, instead of creating a new one every time.
msg: bsmsg.New(false),
clock: clock,
clock: clk,
events: events,
}
}
Expand Down

0 comments on commit a0ac8f0

Please sign in to comment.