Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bitswap/client/messagequeue: expose dontHaveTimeoutMgr configuration #750

Merged
merged 5 commits into from
Dec 13, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 10 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,21 @@ The following emojis are used to highlight certain changes:

### Added

- `bitswap/client`: Improved timeout configuration for block requests
- Exposed `DontHaveTimeoutConfig` to hold configuration values for `dontHaveTimeoutMgr` which controls how long to wait for requested block before emitting a synthetic DontHave response
- Added `DefaultDontHaveTimeoutConfig()` to return a `DontHaveTimeoutConfig` populated with default values
- Added optional `WithDontHaveTimeoutConfig` to allow passing a custom `DontHaveTimeoutConfig`
- Setting `SetSendDontHaves(false)` works the same as before. Behind the scenes, it will disable `dontHaveTimeoutMgr` by passing a `nil` `onDontHaveTimeout` to `newDontHaveTimeoutMgr`.


### Changed

* 🛠 `blockstore` and `blockservice`'s `WriteThrough()` option now takes an "enabled" parameter: `WriteThrough(enabled bool)`.

### Removed



### Fixed

### Security
Expand Down Expand Up @@ -67,7 +76,7 @@ The following emojis are used to highlight certain changes:
provider := provider.New(datastore, provider.Online(contentProvider)
// A wrapped providing exchange using the previous exchange and the provider.
exch := providing.New(bswap, provider)

// Finally the blockservice
bserv := blockservice.New(blockstore, exch)
...
Expand Down
20 changes: 17 additions & 3 deletions bitswap/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,12 @@
}
}

func WithDontHaveTimeoutConfig(cfg *bsmq.DontHaveTimeoutConfig) Option {
return func(bs *Client) {
bs.dontHaveTimeoutConfig = cfg
}

Check warning on line 75 in bitswap/client/client.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/client.go#L72-L75

Added lines #L72 - L75 were not covered by tests
}

// Configures the Client to use given tracer.
// This provides methods to access all messages sent and received by the Client.
// This interface can be used to implement various statistics (this is original intent).
Expand Down Expand Up @@ -165,16 +171,23 @@
// onDontHaveTimeout is called when a want-block is sent to a peer that
// has an old version of Bitswap that doesn't support DONT_HAVE messages,
// or when no response is received within a timeout.
//
// When set to nil (when bs.simulateDontHavesOnTimeout is false), then
// disable the dontHaveTimoutMgr and do not simulate DONT_HAVE messages on
// timeout.
var onDontHaveTimeout func(peer.ID, []cid.Cid)

var sm *bssm.SessionManager
onDontHaveTimeout := func(p peer.ID, dontHaves []cid.Cid) {
if bs.simulateDontHavesOnTimeout {
// Simulate a message arriving with DONT_HAVEs
if bs.simulateDontHavesOnTimeout {
onDontHaveTimeout = func(p peer.ID, dontHaves []cid.Cid) {
sm.ReceiveFrom(ctx, p, nil, nil, dontHaves)
}
}
peerQueueFactory := func(ctx context.Context, p peer.ID) bspm.PeerQueue {
return bsmq.New(ctx, p, network, onDontHaveTimeout)
return bsmq.New(ctx, p, network, onDontHaveTimeout, bsmq.WithDontHaveTimeoutConfig(bs.dontHaveTimeoutConfig))
}
bs.dontHaveTimeoutConfig = nil

sim := bssim.New()
bpm := bsbpm.New()
Expand Down Expand Up @@ -284,6 +297,7 @@

// whether we should actually simulate dont haves on request timeout
simulateDontHavesOnTimeout bool
dontHaveTimeoutConfig *bsmq.DontHaveTimeoutConfig

// dupMetric will stay at 0
skipDuplicatedBlocksStats bool
Expand Down
183 changes: 91 additions & 92 deletions bitswap/client/internal/messagequeue/donthavetimeoutmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,33 +11,52 @@
"github.com/libp2p/go-libp2p/p2p/protocol/ping"
)

const (
// dontHaveTimeout is used to simulate a DONT_HAVE when communicating with
type DontHaveTimeoutConfig struct {
// DontHaveTimeout is used to simulate a DONT_HAVE when communicating with
// a peer whose Bitswap client doesn't support the DONT_HAVE response,
// or when the peer takes too long to respond.
// If the peer doesn't respond to a want-block within the timeout, the
// local node assumes that the peer doesn't have the block.
dontHaveTimeout = 5 * time.Second
DontHaveTimeout time.Duration

// maxExpectedWantProcessTime is the maximum amount of time we expect a
// MaxExpectedWantProcessTime is the maximum amount of time we expect a
// peer takes to process a want and initiate sending a response to us
maxExpectedWantProcessTime = 2 * time.Second
MaxExpectedWantProcessTime time.Duration

// maxTimeout is the maximum allowed timeout, regardless of latency
maxTimeout = dontHaveTimeout + maxExpectedWantProcessTime
// MaxTimeout is the maximum allowed timeout, regardless of latency
MaxTimeout time.Duration

// pingLatencyMultiplier is multiplied by the average ping time to
// PingLatencyMultiplier is multiplied by the average ping time to
// get an upper bound on how long we expect to wait for a peer's response
// to arrive
pingLatencyMultiplier = 3
PingLatencyMultiplier int

// messageLatencyAlpha is the alpha supplied to the message latency EWMA
messageLatencyAlpha = 0.5
// MessageLatencyAlpha is the alpha supplied to the message latency EWMA
MessageLatencyAlpha float64

// To give a margin for error, the timeout is calculated as
// messageLatencyMultiplier * message latency
messageLatencyMultiplier = 2
)
// MessageLatencyMultiplier gives a margin for error. The timeout is calculated as
// MessageLatencyMultiplier * message latency
MessageLatencyMultiplier int

// timeoutsSignal used for testing -- caller-provided channel to signals
// when a dont have timeout was triggered.
timeoutsSignal chan<- struct{}

// clock is a mockable time api used for testing.
clock clock.Clock
}

func DefaultDontHaveTimeoutConfig() *DontHaveTimeoutConfig {
cfg := DontHaveTimeoutConfig{
DontHaveTimeout: 5 * time.Second,
MaxExpectedWantProcessTime: 2 * time.Second,
PingLatencyMultiplier: 3,
MessageLatencyAlpha: 0.5,
MessageLatencyMultiplier: 2,
}
cfg.MaxTimeout = cfg.DontHaveTimeout + cfg.MaxExpectedWantProcessTime
return &cfg
}

// PeerConnection is a connection to a peer that can be pinged, and the
// average latency measured
Expand All @@ -62,16 +81,11 @@
// we ping the peer to estimate latency. If we receive a response from the
// peer we use the response latency.
type dontHaveTimeoutMgr struct {
clock clock.Clock
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid)
defaultTimeout time.Duration
maxTimeout time.Duration
pingLatencyMultiplier int
messageLatencyMultiplier int
maxExpectedWantProcessTime time.Duration
ctx context.Context
shutdown func()
peerConn PeerConnection
onDontHaveTimeout func([]cid.Cid, time.Duration)
config DontHaveTimeoutConfig

// All variables below here must be protected by the lock
lk sync.RWMutex
Expand All @@ -87,48 +101,34 @@
messageLatency *latencyEwma
// timer used to wait until want at front of queue expires
checkForTimeoutsTimer *clock.Timer
// used for testing -- timeoutsTriggered when a scheduled dont have timeouts were triggered
timeoutsTriggered chan struct{}
}

// newDontHaveTimeoutMgr creates a new dontHaveTimeoutMgr
// onDontHaveTimeout is called when pending keys expire (not cancelled before timeout)
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid), clock clock.Clock) *dontHaveTimeoutMgr {
return newDontHaveTimeoutMgrWithParams(pc, onDontHaveTimeout, dontHaveTimeout, maxTimeout,
pingLatencyMultiplier, messageLatencyMultiplier, maxExpectedWantProcessTime, clock, nil)
}

// newDontHaveTimeoutMgrWithParams is used by the tests
func newDontHaveTimeoutMgrWithParams(
pc PeerConnection,
onDontHaveTimeout func([]cid.Cid),
defaultTimeout time.Duration,
maxTimeout time.Duration,
pingLatencyMultiplier int,
messageLatencyMultiplier int,
maxExpectedWantProcessTime time.Duration,
clock clock.Clock,
timeoutsTriggered chan struct{},
) *dontHaveTimeoutMgr {
//
// onDontHaveTimeout is the function called when pending keys expire (not
// cancelled before timeout). If this is nil, then DontHaveTimeoutMgm is
// disabled.
func newDontHaveTimeoutMgr(pc PeerConnection, onDontHaveTimeout func([]cid.Cid, time.Duration), cfg *DontHaveTimeoutConfig) *dontHaveTimeoutMgr {
if onDontHaveTimeout == nil {
return nil
}

Check warning on line 114 in bitswap/client/internal/messagequeue/donthavetimeoutmgr.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/donthavetimeoutmgr.go#L113-L114

Added lines #L113 - L114 were not covered by tests
if cfg == nil {
cfg = DefaultDontHaveTimeoutConfig()
}
if cfg.clock == nil {
cfg.clock = clock.New()
}
ctx, shutdown := context.WithCancel(context.Background())
mqp := &dontHaveTimeoutMgr{
clock: clock,
ctx: ctx,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: defaultTimeout,
messageLatency: &latencyEwma{alpha: messageLatencyAlpha},
defaultTimeout: defaultTimeout,
maxTimeout: maxTimeout,
pingLatencyMultiplier: pingLatencyMultiplier,
messageLatencyMultiplier: messageLatencyMultiplier,
maxExpectedWantProcessTime: maxExpectedWantProcessTime,
onDontHaveTimeout: onDontHaveTimeout,
timeoutsTriggered: timeoutsTriggered,
return &dontHaveTimeoutMgr{
ctx: ctx,
config: *cfg,
shutdown: shutdown,
peerConn: pc,
activeWants: make(map[cid.Cid]*pendingWant),
timeout: cfg.DontHaveTimeout,
messageLatency: &latencyEwma{alpha: cfg.MessageLatencyAlpha},
onDontHaveTimeout: onDontHaveTimeout,
}

return mqp
}

// Shutdown the dontHaveTimeoutMgr. Any subsequent call to Start() will be ignored
Expand Down Expand Up @@ -190,7 +190,7 @@
// measurePingLatency measures the latency to the peer by pinging it
func (dhtm *dontHaveTimeoutMgr) measurePingLatency() {
// Wait up to defaultTimeout for a response to the ping
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.defaultTimeout)
ctx, cancel := context.WithTimeout(dhtm.ctx, dhtm.config.DontHaveTimeout)
defer cancel()

// Ping the peer
Expand Down Expand Up @@ -229,6 +229,7 @@

// Figure out which of the blocks that were wanted were not received
// within the timeout
now := dhtm.config.clock.Now()
expired := make([]cid.Cid, 0, len(dhtm.activeWants))
for dhtm.wantQueue.Len() > 0 {
pw := dhtm.wantQueue.Front()
Expand All @@ -237,7 +238,7 @@
if pw.active {
// The queue is in order from earliest to latest, so if we
// didn't find an expired entry we can stop iterating
if dhtm.clock.Since(pw.sent) < dhtm.timeout {
if now.Sub(pw.sent) < dhtm.timeout {
break
}

Expand All @@ -253,7 +254,7 @@

// Fire the timeout event for the expired wants
if len(expired) > 0 {
go dhtm.fireTimeout(expired)
go dhtm.fireTimeout(expired, dhtm.timeout)
}

if dhtm.wantQueue.Len() == 0 {
Expand All @@ -268,37 +269,35 @@
// Schedule the next check for the moment when the oldest pending want will
// timeout
oldestStart := dhtm.wantQueue.Front().sent
until := oldestStart.Add(dhtm.timeout).Sub(dhtm.clock.Now())
until := oldestStart.Add(dhtm.timeout).Sub(now)
if dhtm.checkForTimeoutsTimer == nil {
dhtm.checkForTimeoutsTimer = dhtm.clock.Timer(until)
go dhtm.consumeTimeouts()
dhtm.checkForTimeoutsTimer = dhtm.config.clock.Timer(until)
go func() {
for {
select {
case <-dhtm.ctx.Done():
return
case <-dhtm.checkForTimeoutsTimer.C:
dhtm.lk.Lock()
dhtm.checkForTimeouts()
dhtm.lk.Unlock()
}
}
}()
} else {
dhtm.checkForTimeoutsTimer.Stop()
dhtm.checkForTimeoutsTimer.Reset(until)
}
}

func (dhtm *dontHaveTimeoutMgr) consumeTimeouts() {
for {
select {
case <-dhtm.ctx.Done():
return
case <-dhtm.checkForTimeoutsTimer.C:
dhtm.lk.Lock()
dhtm.checkForTimeouts()
dhtm.lk.Unlock()
}
}
}

// AddPending adds the given keys that will expire if not cancelled before
// the timeout
func (dhtm *dontHaveTimeoutMgr) AddPending(ks []cid.Cid) {
if len(ks) == 0 {
return
}

start := dhtm.clock.Now()
start := dhtm.config.clock.Now()

dhtm.lk.Lock()
defer dhtm.lk.Unlock()
Expand Down Expand Up @@ -341,18 +340,18 @@
}

// fireTimeout fires the onDontHaveTimeout method with the timed out keys
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid) {
func (dhtm *dontHaveTimeoutMgr) fireTimeout(pending []cid.Cid, timeout time.Duration) {
// Make sure the timeout manager has not been shut down
if dhtm.ctx.Err() != nil {
return
}

// Fire the timeout
dhtm.onDontHaveTimeout(pending)
dhtm.onDontHaveTimeout(pending, timeout)

// signal a timeout fired
if dhtm.timeoutsTriggered != nil {
dhtm.timeoutsTriggered <- struct{}{}
if dhtm.config.timeoutsSignal != nil {
dhtm.config.timeoutsSignal <- struct{}{}
}
}

Expand All @@ -361,18 +360,18 @@
// The maximum expected time for a response is
// the expected time to process the want + (latency * multiplier)
// The multiplier is to provide some padding for variable latency.
timeout := dhtm.maxExpectedWantProcessTime + time.Duration(dhtm.pingLatencyMultiplier)*latency
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.config.MaxExpectedWantProcessTime + time.Duration(dhtm.config.PingLatencyMultiplier)*latency
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout

Check warning on line 365 in bitswap/client/internal/messagequeue/donthavetimeoutmgr.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/donthavetimeoutmgr.go#L365

Added line #L365 was not covered by tests
}
return timeout
}

// calculateTimeoutFromMessageLatency calculates a timeout derived from message latency
func (dhtm *dontHaveTimeoutMgr) calculateTimeoutFromMessageLatency() time.Duration {
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.messageLatencyMultiplier)
if timeout > dhtm.maxTimeout {
timeout = dhtm.maxTimeout
timeout := dhtm.messageLatency.latency * time.Duration(dhtm.config.MessageLatencyMultiplier)
if timeout > dhtm.config.MaxTimeout {
timeout = dhtm.config.MaxTimeout
}
return timeout
}
Expand Down
Loading
Loading