Skip to content

Commit

Permalink
sync: improve peer selection (#5208)
Browse files Browse the repository at this point in the history
previous version had several downsides:
- new peers weren't actively tried, unless they sent unique data. this version set new peer latency to 0.8 of average global latency, so that we can try it unless we are connected with a significantly faster peer.
- if fast peer started to fail, node would continue to request from it until success rate shifts significantly. current version will de-prioritize it faster by scaling average latency with increased fail rate.
  • Loading branch information
dshulyak committed Nov 2, 2023
1 parent 71eca88 commit 659193f
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 125 deletions.
6 changes: 1 addition & 5 deletions fetch/fetch.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,11 +240,7 @@ func NewFetch(
for _, opt := range opts {
opt(f)
}
popts := []peers.Opt{}
if f.cfg.PeersRateThreshold != 0 {
popts = append(popts, peers.WithRateThreshold(f.cfg.PeersRateThreshold))
}
f.peers = peers.New(popts...)
f.peers = peers.New()
// NOTE(dshulyak) this is to avoid tests refactoring.
// there is one test that covers this part.
if host != nil {
Expand Down
92 changes: 42 additions & 50 deletions fetch/peers/peers.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,57 +13,45 @@ import (
type data struct {
id peer.ID
success, failures int
rate float64
failRate float64
averageLatency float64
}

func (p *data) successRate() float64 {
return float64(p.success) / float64(p.success+p.failures)
}

func (p *data) cmp(other *data, rateThreshold float64) int {
if p == nil && other != nil {
return -1
func (d *data) latency(global float64) float64 {
if d.success+d.failures == 0 {
return 0.8 * global // to prioritize trying out new peer
}
switch {
case p.rate-other.rate > rateThreshold:
return 1
case other.rate-p.rate > rateThreshold:
return -1
if d.success == 0 {
return global + d.failRate*global
}
switch {
case p.averageLatency < other.averageLatency:
return 1
case p.averageLatency > other.averageLatency:
return -1
}
return strings.Compare(string(p.id), string(other.id))
return d.averageLatency + d.failRate*global
}

type Opt func(*Peers)

func WithRateThreshold(rate float64) Opt {
return func(p *Peers) {
p.rateThreshold = rate
func (p *data) less(other *data, global float64) bool {
peerLatency := p.latency(global)
otherLatency := other.latency(global)
if peerLatency < otherLatency {
return true
} else if peerLatency > otherLatency {
return false
}
return strings.Compare(string(p.id), string(other.id)) == -1
}

func New(opts ...Opt) *Peers {
p := &Peers{
peers: map[peer.ID]*data{},
rateThreshold: 0.1,
func New() *Peers {
return &Peers{
peers: map[peer.ID]*data{},
}
for _, opt := range opts {
opt(p)
}
return p
}

type Peers struct {
mu sync.Mutex
peers map[peer.ID]*data

rateThreshold float64
// globalLatency is the average latency of all successful responses from peers.
// It is used as a reference value for new peers.
// And to adjust average peer latency based on failure rate.
globalLatency float64
}

func (p *Peers) Add(id peer.ID) {
Expand All @@ -73,8 +61,7 @@ func (p *Peers) Add(id peer.ID) {
if exist {
return
}
peer := &data{id: id}
p.peers[id] = peer
p.peers[id] = &data{id: id}
}

func (p *Peers) Delete(id peer.ID) {
Expand All @@ -91,12 +78,10 @@ func (p *Peers) OnFailure(id peer.ID) {
return
}
peer.failures++
peer.rate = peer.successRate()
peer.failRate = float64(peer.failures) / float64(peer.success+peer.failures)
}

// OnLatency records success and latency. Latency is not reported with every success
// as some requests has different amount of work and data, and we want to measure something
// comparable.
// OnLatency updates average peer and global latency.
func (p *Peers) OnLatency(id peer.ID, latency time.Duration) {
p.mu.Lock()
defer p.mu.Unlock()
Expand All @@ -105,12 +90,17 @@ func (p *Peers) OnLatency(id peer.ID, latency time.Duration) {
return
}
peer.success++
peer.rate = peer.successRate()
peer.failRate = float64(peer.failures) / float64(peer.success+peer.failures)
if peer.averageLatency != 0 {
peer.averageLatency = 0.8*peer.averageLatency + 0.2*float64(latency)
} else {
peer.averageLatency = float64(latency)
}
if p.globalLatency != 0 {
p.globalLatency = 0.8*p.globalLatency + 0.2*float64(latency)
} else {
p.globalLatency = float64(latency)
}
}

// SelectBest peer with preferences.
Expand All @@ -123,7 +113,9 @@ func (p *Peers) SelectBestFrom(peers []peer.ID) peer.ID {
if !exist {
continue
}
if best.cmp(pdata, p.rateThreshold) == -1 {
if best == nil {
best = pdata
} else if pdata.less(best, p.globalLatency) {
best = pdata
}
}
Expand All @@ -146,21 +138,21 @@ func (p *Peers) SelectBest(n int) []peer.ID {
if lth == 0 {
return nil
}
cache := make([]*data, 0, lth)
best := make([]*data, 0, lth)
for _, peer := range p.peers {
worst := peer
for i := range cache {
if cache[i].cmp(worst, p.rateThreshold) == -1 {
cache[i], worst = worst, cache[i]
for i := range best {
if worst.less(best[i], p.globalLatency) {
best[i], worst = worst, best[i]
}
}
if len(cache) < cap(cache) {
cache = append(cache, worst)
if len(best) < cap(best) {
best = append(best, worst)
}
}
rst := make([]peer.ID, len(cache))
rst := make([]peer.ID, len(best))
for i := range rst {
rst[i] = cache[i].id
rst[i] = best[i].id
}
return rst
}
Expand Down
117 changes: 47 additions & 70 deletions fetch/peers/peers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
}

func withEvents(events []event) *Peers {
tracker := New(WithRateThreshold(0.1))
tracker := New()
for _, ev := range events {
if ev.delete {
tracker.Delete(ev.id)
Expand All @@ -44,68 +44,31 @@ func TestSelect(t *testing.T) {
n int
expect []peer.ID

from []peer.ID
best peer.ID
selectFrom []peer.ID
best peer.ID
}{
{
desc: "ordered by rate",
desc: "latency adjusted with moving average",
events: []event{
{id: "b", success: 100, failure: 30, add: true},
{id: "c", success: 100, failure: 0, add: true},
{id: "a", success: 80, failure: 80, add: true},
{id: "a", success: 1, latency: 8, add: true},
{id: "b", success: 1, latency: 9, add: true},
{id: "a", success: 1, latency: 14, add: true},
},
n: 5,
expect: []peer.ID{"c", "b", "a"},
from: []peer.ID{"a", "b"},
best: peer.ID("b"),
},
{
desc: "ordered by rate no best",
events: []event{
{id: "b", success: 100, failure: 30, add: true},
{id: "c", success: 100, failure: 0, add: true},
{id: "a", success: 80, failure: 80, add: true},
},
n: 5,
expect: []peer.ID{"c", "b", "a"},
from: []peer.ID{"d", "e"},
best: "",
},
{
desc: "ordered by latency within threshold",
events: []event{
{id: "b", success: 100, latency: 10, add: true},
{id: "c", success: 95, latency: 5, add: true},
{id: "a", success: 90, latency: 4, add: true},
},
n: 5,
expect: []peer.ID{"a", "c", "b"},
from: []peer.ID{"c", "a"},
best: peer.ID("a"),
},
{
desc: "latency computed with moving average",
events: []event{
{id: "a", success: 100, latency: 8, add: true},
{id: "b", success: 100, latency: 9, add: true},
{id: "a", success: 1, latency: 10, add: true},
},
n: 5,
expect: []peer.ID{"a", "b"},
from: []peer.ID{"a", "b"},
best: peer.ID("a"),
n: 5,
expect: []peer.ID{"b", "a"},
selectFrom: []peer.ID{"a", "b"},
best: peer.ID("b"),
},
{
desc: "latency computed with moving average",
events: []event{
{id: "a", success: 100, latency: 8, add: true},
{id: "b", success: 100, latency: 9, add: true},
{id: "a", success: 1, latency: 14},
{id: "a", success: 2, latency: 8, add: true},
{id: "b", success: 2, latency: 9, add: true},
},
n: 5,
expect: []peer.ID{"b", "a"},
from: []peer.ID{"a", "b"},
best: peer.ID("b"),
n: 5,
expect: []peer.ID{"a", "b"},
selectFrom: []peer.ID{"b", "a"},
best: peer.ID("a"),
},
{
desc: "total number is larger then capacity",
Expand Down Expand Up @@ -139,14 +102,15 @@ func TestSelect(t *testing.T) {
{id: "b", delete: true},
{id: "a", delete: true},
},
n: 4,
expect: []peer.ID{"c", "d"},
from: []peer.ID{"a", "b", "c", "d"},
best: peer.ID("c"),
n: 4,
expect: []peer.ID{"c", "d"},
selectFrom: []peer.ID{"a", "b", "c", "d"},
best: peer.ID("c"),
},
{
desc: "empty",
n: 4,
desc: "empty",
n: 4,
selectFrom: []peer.ID{"a", "b", "c", "d"},
},
{
desc: "request empty",
Expand All @@ -156,20 +120,33 @@ func TestSelect(t *testing.T) {
n: 0,
},
{
desc: "no success rate",
desc: "events for nonexisting",
events: []event{
{id: "a", success: 100, failure: 100},
},
n: 2,
},
{
desc: "new peer",
events: []event{
{id: "a", add: true},
{id: "a", success: 1, latency: 10, add: true},
{id: "b", add: true},
},
n: 2,
expect: []peer.ID{"b", "a"},
n: 2,
expect: []peer.ID{"b", "a"},
selectFrom: []peer.ID{"a", "b"},
best: peer.ID("b"),
},
{
desc: "events for nonexisting",
desc: "unresponsive",
events: []event{
{id: "a", success: 100, failure: 100},
{id: "a", success: 1, latency: 10, add: true},
{id: "b", failure: 1, add: true},
},
n: 2,
n: 2,
expect: []peer.ID{"a", "b"},
selectFrom: []peer.ID{"a", "b"},
best: peer.ID("a"),
},
} {
t.Run(tc.desc, func(t *testing.T) {
Expand All @@ -180,14 +157,14 @@ func TestSelect(t *testing.T) {
"select best %d",
tc.n,
)
if tc.from != nil {
if tc.selectFrom != nil {
require.Equal(
t,
tc.best,
withEvents(tc.events).SelectBestFrom(tc.from),
withEvents(tc.events).SelectBestFrom(tc.selectFrom),
"select best (%v) from %v",
tc.best,
tc.from,
tc.selectFrom,
)
}
})
Expand Down

0 comments on commit 659193f

Please sign in to comment.