Skip to content

Commit

Permalink
simplify logic
Browse files Browse the repository at this point in the history
  • Loading branch information
gammazero committed Nov 26, 2024
1 parent 66013d7 commit baf4e71
Showing 1 changed file with 10 additions and 26 deletions.
36 changes: 10 additions & 26 deletions bitswap/client/internal/messagequeue/messagequeue.go
Original file line number Diff line number Diff line change
Expand Up @@ -440,12 +440,12 @@ func (mq *MessageQueue) runQueue() {
var workScheduled time.Time
for {
select {
case <-rebroadcastTimer.C:
mq.rebroadcastWantlist(false)
case now := <-rebroadcastTimer.C:
mq.rebroadcastWantlist(now, rebroadcastInterval)
rebroadcastTimer.Reset(runRebroadcastsInterval)

Check warning on line 445 in bitswap/client/internal/messagequeue/messagequeue.go

View check run for this annotation

Codecov / codecov/patch

bitswap/client/internal/messagequeue/messagequeue.go#L443-L445

Added lines #L443 - L445 were not covered by tests

case <-mq.rebroadcastNow:
mq.rebroadcastWantlist(true)
mq.rebroadcastWantlist(mq.clock.Now(), 0)

case when := <-mq.outgoingWork:
// If we have work scheduled, cancel the timer. If we
Expand Down Expand Up @@ -490,36 +490,20 @@ func (mq *MessageQueue) runQueue() {
}

// Periodically resend the list of wants to the peer
func (mq *MessageQueue) rebroadcastWantlist(immediate bool) {
func (mq *MessageQueue) rebroadcastWantlist(now time.Time, interval time.Duration) {
mq.wllock.Lock()
// Transfer wants from the rebroadcast lists into the pending lists.
toRebroadcast := mq.bcstWants.Refresh(now, interval) + mq.peerWants.Refresh(now, interval)
mq.wllock.Unlock()

// If some wants were transferred from the rebroadcast list
if toRebroadcast := mq.transferRebroadcastWants(immediate); toRebroadcast > 0 {
if toRebroadcast > 0 {
// Send them out
mq.sendMessage()
log.Infow("Rebroadcasting wants", "amount", toRebroadcast, "peer", mq.p)
}
}

// Transfer wants from the rebroadcast lists into the pending lists.
func (mq *MessageQueue) transferRebroadcastWants(immediate bool) int {
mq.wllock.Lock()
defer mq.wllock.Unlock()

if mq.bcstWants.sent.Len() == 0 && mq.peerWants.sent.Len() == 0 {
return 0
}

interval := rebroadcastInterval
if immediate {
interval = 0
}
now := mq.clock.Now()

// Transfer sent wants into pending wants lists
transferred := mq.bcstWants.Refresh(now, interval)
transferred += mq.peerWants.Refresh(now, interval)
return transferred
}

func (mq *MessageQueue) signalWorkReady() {
select {
case mq.outgoingWork <- mq.clock.Now():
Expand Down

0 comments on commit baf4e71

Please sign in to comment.