Skip to content

Commit

Permalink
bitswap/client/messagequeue: expose dontHaveTimeoutMgr configuration (#…
Browse files Browse the repository at this point in the history
…750)

* expose dontHaveTimeoutMgr configuration

- Create an exported DontHaveTimeoutConfig to hold configuration values for dontHaveTimeoutMrg
- Provide function DefaultDontHaveTimeoutConfig to return a DontHaveTimeoutConfig populated with default valued.
- Bitswap client New has new option for caller to supply a DontHaveTimeoutConfig
- If the required onHaveTimeout function is nil, then disable the dontaHaveTimeoutMgr
  • Loading branch information
gammazero authored Dec 13, 2024
1 parent 058422d commit 2a26bf9
Show file tree
Hide file tree
Showing 5 changed files with 230 additions and 137 deletions.
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap/client`: Improved timeout configuration for block requests
- Exposed `DontHaveTimeoutConfig` to hold configuration values for `dontHaveTimeoutMgr` which controls how long to wait for requested block before emitting a synthetic DontHave response
- Added `DefaultDontHaveTimeoutConfig()` to return a `DontHaveTimeoutConfig` populated with default values
- Added optional `WithDontHaveTimeoutConfig` to allow passing a custom `DontHaveTimeoutConfig`
- Setting `SetSendDontHaves(false)` works the same as before. Behind the scenes, it will disable `dontHaveTimeoutMgr` by passing a `nil` `onDontHaveTimeout` to `newDontHaveTimeoutMgr`.


### Changed

* 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`.

### Removed



### Fixed

### Security
Expand Down Expand Up @@ -67,7 +76,7 @@ The following emojis are used to highlight certain changes:
provider := provider.New(datastore, provider.Online(contentProvider)
// A wrapped providing exchange using the previous exchange and the provider.
exch := providing.New(bswap, provider)

// Finally the blockservice
bserv := blockservice.New(blockstore, exch)
...
Expand Down
20 changes: 17 additions & 3 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@ func SetSimulateDontHavesOnTimeout(send bool) Option {
}
}

func WithDontHaveTimeoutConfig(cfg *bsmq.DontHaveTimeoutConfig) Option {
return func(bs *Client) {
bs.dontHaveTimeoutConfig = cfg
}
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -165,16 +171,23 @@ func New(parent context.Context, network bsnet.BitSwapNetwork, providerFinder Pr
// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
//
// When set to nil (when bs.simulateDontHavesOnTimeout is false), then
// disable the dontHaveTimoutMgr and do not simulate DONT_HAVE messages on
// timeout.
var onDontHaveTimeout func(peer.ID, []cid.Cid)

var sm *bssm.SessionManager
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
if bs.simulateDontHavesOnTimeout {
// Simulate a message arriving with DONT_HAVEs
if bs.simulateDontHavesOnTimeout {
onDontHaveTimeout = func(p peer.ID, dontHaves []cid.Cid) {
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig))
}
bs.dontHaveTimeoutConfig = nil

sim := bssim.New()
bpm := bsbpm.New()
Expand Down Expand Up @@ -284,6 +297,7 @@ type Client struct {

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
dontHaveTimeoutConfig *bsmq.DontHaveTimeoutConfig

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool
Expand Down
183 changes: 91 additions & 92 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,52 @@ import (
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const (
// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
type DontHaveTimeoutConfig struct {
// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
dontHaveTimeout = 5 * time.Second
DontHaveTimeout time.Duration

// maxExpectedWantProcessTime is the maximum amount of time we expect a
// MaxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second
MaxExpectedWantProcessTime time.Duration

// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime
// MaxTimeout is the maximum allowed timeout, regardless of latency
MaxTimeout time.Duration

// pingLatencyMultiplier is multiplied by the average ping time to
// PingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
pingLatencyMultiplier = 3
PingLatencyMultiplier int

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5
// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
MessageLatencyAlpha float64

// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// MessageLatencyMultiplier gives a margin for error. The timeout is calculated as
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int

// timeoutsSignal used for testing -- caller-provided channel to signals
// when a dont have timeout was triggered.
timeoutsSignal chan<- struct{}

// clock is a mockable time api used for testing.
clock clock.Clock
}

func DefaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig {
cfg := DontHaveTimeoutConfig{
DontHaveTimeout: 5 * time.Second,
MaxExpectedWantProcessTime: 2 * time.Second,
PingLatencyMultiplier: 3,
MessageLatencyAlpha: 0.5,
MessageLatencyMultiplier: 2,
}
cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime
return &cfg
}

// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
Expand All @@ -62,16 +81,11 @@ type pendingWant struct {
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid, time.Duration)
config DontHaveTimeoutConfig

// All variables below here must be protected by the lock
lk sync.RWMutex
Expand All @@ -87,48 +101,34 @@ type dontHaveTimeoutMgr struct {
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *clock.Timer
// used for testing -- timeoutsTriggered when a scheduled dont have timeouts were triggered
timeoutsTriggered chan struct{}
}

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
clock clock.Clock,
timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
//
// onDontHaveTimeout is the function called when pending keys expire (not
// cancelled before timeout). If this is nil, then DontHaveTimeoutMgm is
// disabled.
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig) *dontHaveTimeoutMgr {
if onDontHaveTimeout == nil {
return nil
}
if cfg == nil {
cfg = DefaultDontHaveTimeoutConfig()
}
if cfg.clock == nil {
cfg.clock = clock.New()
}
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
timeoutsTriggered: timeoutsTriggered,
return &dontHaveTimeoutMgr{
ctx: ctx,
config: *cfg,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: cfg.DontHaveTimeout,
messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha},
onDontHaveTimeout: onDontHaveTimeout,
}

return mqp
}

// Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored
Expand Down Expand Up @@ -190,7 +190,7 @@ func (dhtm *dontHaveTimeoutMgr) UpdateMessageLatency(elapsed time.Duration) {
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout)
defer cancel()

// Ping the peer
Expand Down Expand Up @@ -229,6 +229,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {

// Figure out which of the blocks that were wanted were not received
// within the timeout
now := dhtm.config.clock.Now()
expired := make([]cid.Cid, 0, len(dhtm.activeWants))
for dhtm.wantQueue.Len() > 0 {
pw := dhtm.wantQueue.Front()
Expand All @@ -237,7 +238,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
if pw.active {
// The queue is in order from earliest to latest, so if we
// didn't find an expired entry we can stop iterating
if dhtm.clock.Since(pw.sent) < dhtm.timeout {
if now.Sub(pw.sent) < dhtm.timeout {
break
}

Expand All @@ -253,7 +254,7 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {

// Fire the timeout event for the expired wants
if len(expired) > 0 {
go dhtm.fireTimeout(expired)
go dhtm.fireTimeout(expired, dhtm.timeout)
}

if dhtm.wantQueue.Len() == 0 {
Expand All @@ -268,37 +269,35 @@ func (dhtm *dontHaveTimeoutMgr) checkForTimeouts() {
// Schedule the next check for the moment when the oldest pending want will
// timeout
oldestStart := dhtm.wantQueue.Front().sent
until := oldestStart.Add(dhtm.timeout).Sub(dhtm.clock.Now())
until := oldestStart.Add(dhtm.timeout).Sub(now)
if dhtm.checkForTimeoutsTimer == nil {
dhtm.checkForTimeoutsTimer = dhtm.clock.Timer(until)
go dhtm.consumeTimeouts()
dhtm.checkForTimeoutsTimer = dhtm.config.clock.Timer(until)
go func() {
for {
select {
case <-dhtm.ctx.Done():
return
case <-dhtm.checkForTimeoutsTimer.C:
dhtm.lk.Lock()
dhtm.checkForTimeouts()
dhtm.lk.Unlock()
}
}
}()
} else {
dhtm.checkForTimeoutsTimer.Stop()
dhtm.checkForTimeoutsTimer.Reset(until)
}
}

func (dhtm *dontHaveTimeoutMgr) consumeTimeouts() {
for {
select {
case <-dhtm.ctx.Done():
return
case <-dhtm.checkForTimeoutsTimer.C:
dhtm.lk.Lock()
dhtm.checkForTimeouts()
dhtm.lk.Unlock()
}
}
}

// AddPending adds the given keys that will expire if not cancelled before
// the timeout
func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
if len(ks) == 0 {
return
}

start := dhtm.clock.Now()
start := dhtm.config.clock.Now()

dhtm.lk.Lock()
defer dhtm.lk.Unlock()
Expand Down Expand Up @@ -341,18 +340,18 @@ func (dhtm *dontHaveTimeoutMgr) CancelPending(ks []cid.Cid) {
}

// fireTimeout fires the onDontHaveTimeout method with the timed out keys
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid, timeout time.Duration) {
// Make sure the timeout manager has not been shut down
if dhtm.ctx.Err() != nil {
return
}

// Fire the timeout
dhtm.onDontHaveTimeout(pending)
dhtm.onDontHaveTimeout(pending, timeout)

// signal a timeout fired
if dhtm.timeoutsTriggered != nil {
dhtm.timeoutsTriggered <- struct{}{}
if dhtm.config.timeoutsSignal != nil {
dhtm.config.timeoutsSignal <- struct{}{}
}
}

Expand All @@ -361,18 +360,18 @@ func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromPingLatency(latency time.Dur
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier)
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout
}
return timeout
}
Expand Down
Loading

0 comments on commit 2a26bf9

Please sign in to comment.