Skip to content

Commit

Permalink
autorelay: Add a context.Context to WithPeerSource callback (#1736)
Browse files Browse the repository at this point in the history
  • Loading branch information
Jorropo authored Sep 16, 2022
1 parent 5ab6d3f commit 70c1f81
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 15 deletions.
19 changes: 10 additions & 9 deletions p2p/host/autorelay/autorelay_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autorelay_test

import (
"context"
"strings"
"sync/atomic"
"testing"
Expand Down Expand Up @@ -114,7 +115,7 @@ func newRelayV1(t *testing.T) host.Host {
func TestSingleCandidate(t *testing.T) {
var counter int
h := newPrivateNode(t,
autorelay.WithPeerSource(func(num int) <-chan peer.AddrInfo {
autorelay.WithPeerSource(func(_ context.Context, num int) <-chan peer.AddrInfo {
counter++
require.Equal(t, 1, num)
peerChan := make(chan peer.AddrInfo, num)
Expand Down Expand Up @@ -148,7 +149,7 @@ func TestSingleRelay(t *testing.T) {
close(peerChan)

h := newPrivateNode(t,
autorelay.WithPeerSource(func(num int) <-chan peer.AddrInfo {
autorelay.WithPeerSource(func(_ context.Context, num int) <-chan peer.AddrInfo {
require.False(t, called, "expected the peer source callback to only have been called once")
called = true
require.Equal(t, numCandidates, num)
Expand All @@ -175,7 +176,7 @@ func TestPreferRelayV2(t *testing.T) {
})

h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo {
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo {
peerChan := make(chan peer.AddrInfo, 1)
defer close(peerChan)
peerChan <- peer.AddrInfo{ID: r.ID(), Addrs: r.Addrs()}
Expand All @@ -193,7 +194,7 @@ func TestPreferRelayV2(t *testing.T) {
func TestWaitForCandidates(t *testing.T) {
peerChan := make(chan peer.AddrInfo)
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithMinCandidates(2),
autorelay.WithNumRelays(1),
autorelay.WithBootDelay(time.Hour),
Expand Down Expand Up @@ -241,7 +242,7 @@ func TestBackoff(t *testing.T) {

var counter int32 // to be used atomically
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo {
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo {
// always return the same node, and make sure we don't try to connect to it too frequently
atomic.AddInt32(&counter, 1)
peerChan := make(chan peer.AddrInfo, 1)
Expand Down Expand Up @@ -295,7 +296,7 @@ func TestRelayV1(t *testing.T) {
close(peerChan)

h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithBootDelay(0),
)
defer h.Close()
Expand All @@ -311,7 +312,7 @@ func TestRelayV1(t *testing.T) {
close(peerChan)

h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithBootDelay(0),
autorelay.WithCircuitV1Support(),
)
Expand All @@ -332,7 +333,7 @@ func TestConnectOnDisconnect(t *testing.T) {
relays = append(relays, r)
}
h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo { return peerChan }, time.Hour),
autorelay.WithMinCandidates(1),
autorelay.WithMaxCandidates(num),
autorelay.WithNumRelays(1),
Expand Down Expand Up @@ -381,7 +382,7 @@ func TestMaxAge(t *testing.T) {
close(peerChans)

h := newPrivateNode(t,
autorelay.WithPeerSource(func(int) <-chan peer.AddrInfo {
autorelay.WithPeerSource(func(context.Context, int) <-chan peer.AddrInfo {
c, ok := <-peerChans
if !ok {
t.Fatal("unexpected call to PeerSource")
Expand Down
7 changes: 5 additions & 2 deletions p2p/host/autorelay/options.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package autorelay

import (
"context"
"errors"
"fmt"
"time"
Expand All @@ -12,7 +13,7 @@ import (

type config struct {
clock clock.Clock
peerSource func(num int) <-chan peer.AddrInfo
peerSource func(ctx context.Context, num int) <-chan peer.AddrInfo
// minimum interval used to call the peerSource callback
minInterval time.Duration
staticRelays []peer.AddrInfo
Expand Down Expand Up @@ -102,7 +103,9 @@ func WithDefaultStaticRelays() Option {
// Implementations should send new peers, but may send peers they sent before. AutoRelay implements
// a per-peer backoff (see WithBackoff).
// minInterval is the minimum interval this callback is called with, even if AutoRelay needs new candidates.
func WithPeerSource(f func(numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option {
// The context.Context passed MAY be canceled when AutoRelay feels satisfied, it will be canceled when the node is shutting down.
// If the channel is canceled you MUST close the output channel at some point.
func WithPeerSource(f func(ctx context.Context, numPeers int) <-chan peer.AddrInfo, minInterval time.Duration) Option {
return func(c *config) error {
if len(c.staticRelays) > 0 {
return errStaticRelaysPeerSource
Expand Down
8 changes: 4 additions & 4 deletions p2p/host/autorelay/relay_finder.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ type relayFinder struct {
ctxCancel context.CancelFunc
ctxCancelMx sync.Mutex

peerSource func(int) <-chan peer.AddrInfo
peerSource func(context.Context, int) <-chan peer.AddrInfo

candidateFound chan struct{} // receives every time we find a new relay candidate
candidateMx sync.Mutex
Expand All @@ -82,7 +82,7 @@ type relayFinder struct {
cachedAddrsExpiry time.Time
}

func newRelayFinder(host *basic.BasicHost, peerSource func(int) <-chan peer.AddrInfo, conf *config) *relayFinder {
func newRelayFinder(host *basic.BasicHost, peerSource func(context.Context, int) <-chan peer.AddrInfo, conf *config) *relayFinder {
return &relayFinder{
bootTime: conf.clock.Now(),
host: host,
Expand Down Expand Up @@ -206,7 +206,7 @@ func (rf *relayFinder) background(ctx context.Context) {
// It garbage collects old entries, so that nodes doesn't overflow.
// This makes sure that as soon as we need to find relay candidates, we have them available.
func (rf *relayFinder) findNodes(ctx context.Context) {
peerChan := rf.peerSource(rf.conf.maxCandidates)
peerChan := rf.peerSource(ctx, rf.conf.maxCandidates)
var wg sync.WaitGroup
lastCallToPeerSource := rf.conf.clock.Now()

Expand Down Expand Up @@ -235,7 +235,7 @@ func (rf *relayFinder) findNodes(ctx context.Context) {
continue
}
lastCallToPeerSource = now
peerChan = rf.peerSource(rf.conf.maxCandidates)
peerChan = rf.peerSource(ctx, rf.conf.maxCandidates)
case pi, ok := <-peerChan:
if !ok {
wg.Wait()
Expand Down

0 comments on commit 70c1f81

Please sign in to comment.