From c8be27d4a5030711a516877039f8dd7e344b08a6 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Tue, 5 Sep 2023 16:32:46 +0200 Subject: [PATCH 1/3] pool-target-size through config to better test dynamics --- caboose.go | 7 +++++++ node_heap.go | 5 +++-- node_ring.go | 12 +++++++----- node_ring_test.go | 2 +- pool.go | 2 +- pool_dynamics_test.go | 5 +++-- pool_tier_promotion.go | 9 ++++----- 7 files changed, 26 insertions(+), 16 deletions(-) diff --git a/caboose.go b/caboose.go index e14fe94..302ada7 100644 --- a/caboose.go +++ b/caboose.go @@ -55,6 +55,9 @@ type Config struct { // PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator. PoolRefresh time.Duration + // PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size. + PoolTargetSize int + // MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes. MirrorFraction float64 @@ -90,6 +93,7 @@ const defaultMirrorFraction = 0.01 const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200" const DefaultPoolRefreshInterval = 5 * time.Minute +const DefaultPoolTargetSize = 30 // we cool off sending requests for a cid for a certain duration // if we've seen a certain number of failures for it already in a given duration. @@ -155,6 +159,9 @@ func NewCaboose(config *Config) (*Caboose, error) { if c.config.PoolRefresh == 0 { c.config.PoolRefresh = DefaultPoolRefreshInterval } + if c.config.PoolTargetSize == 0 { + c.config.PoolTargetSize = DefaultPoolTargetSize + } if c.config.MaxRetrievalAttempts == 0 { c.config.MaxRetrievalAttempts = defaultMaxRetries diff --git a/node_heap.go b/node_heap.go index 0cce5f6..6067abf 100644 --- a/node_heap.go +++ b/node_heap.go @@ -51,8 +51,9 @@ func (nh *NodeHeap) PeekRandom() *Node { func (nh *NodeHeap) TopN(n int) []*Node { m := make([]*Node, 0, n) - nh.lk.RLock() - defer nh.lk.RUnlock() + nh.lk.Lock() + defer nh.lk.Unlock() + heap.Init(nh) for i := 0; i < n && i < len(nh.Nodes); i++ { node := nh.Nodes[i] m = append(m, node) diff --git a/node_ring.go b/node_ring.go index a02a1de..30cf4e1 100644 --- a/node_ring.go +++ b/node_ring.go @@ -8,16 +8,18 @@ import ( // NodeRing represents a set of nodes organized for stable hashing. type NodeRing struct { - Nodes map[string]*Node - ring hashring.HashRing + Nodes map[string]*Node + ring hashring.HashRing + targetSize int lk sync.RWMutex } -func NewNodeRing() *NodeRing { +func NewNodeRing(targetSize int) *NodeRing { return &NodeRing{ - Nodes: map[string]*Node{}, - ring: *hashring.New([]string{}), + Nodes: map[string]*Node{}, + ring: *hashring.New([]string{}), + targetSize: targetSize, } } diff --git a/node_ring_test.go b/node_ring_test.go index a5ff11a..c24e14d 100644 --- a/node_ring_test.go +++ b/node_ring_test.go @@ -8,7 +8,7 @@ import ( ) func TestNodeRing(t *testing.T) { - nr := caboose.NewNodeRing() + nr := caboose.NewNodeRing(30) nodes := make([]*caboose.Node, 0) for i := 0; i < 100; i++ { nodes = append(nodes, &caboose.Node{URL: fmt.Sprintf("node%d", i)}) diff --git a/pool.go b/pool.go index 940f15e..294c187 100644 --- a/pool.go +++ b/pool.go @@ -80,7 +80,7 @@ func newPool(c *Config, logger *logger) *pool { fetchKeyCoolDownCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute), fetchKeyFailureCache: cache.New(c.FetchKeyCoolDownDuration, 1*time.Minute), - ActiveNodes: NewNodeRing(), + ActiveNodes: NewNodeRing(c.PoolTargetSize), AllNodes: NewNodeHeap(), } diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index f0ac675..fcc2deb 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -30,7 +30,6 @@ are picked randomly in the beginning of each test. At the end of each test, the always be converging to the "good" nodes. */ func TestPoolDynamics(t *testing.T) { - baseStatSize := 100000 baseStatLatency := 100 poolRefreshNo := 10 @@ -241,7 +240,9 @@ func TestPoolDynamics(t *testing.T) { } func getHarnessAndControlGroup(t *testing.T, nodesSize int, poolSize int) (*util.CabooseHarness, map[string]string) { - ch := util.BuildCabooseHarness(t, nodesSize, 3) + ch := util.BuildCabooseHarness(t, nodesSize, 3, func(config *caboose.Config) { + config.PoolTargetSize = 3 + }) ch.StartOrchestrator() diff --git a/pool_tier_promotion.go b/pool_tier_promotion.go index e0be6f6..36cf98a 100644 --- a/pool_tier_promotion.go +++ b/pool_tier_promotion.go @@ -1,19 +1,18 @@ package caboose -const ( - PoolConsiderationCount = 30 - activationThreshold = 0 +var ( + activationThreshold = 0 ) func updateActiveNodes(active *NodeRing, all *NodeHeap) error { - candidates := all.TopN(PoolConsiderationCount) + candidates := all.TopN(active.targetSize) added := 0 for _, c := range candidates { if active.Contains(c) { continue } activeSize := active.Len() - discount := PoolConsiderationCount - activeSize + discount := active.targetSize - activeSize if discount < 0 { discount = 0 } From d52ef6ed0e85e5f35807386dac2b675656ceae38 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Wed, 6 Sep 2023 23:23:27 +0200 Subject: [PATCH 2/3] down to flakiness --- caboose.go | 6 +++--- node_ring.go | 5 ++++- pool_dynamics_test.go | 40 +++++----------------------------------- 3 files changed, 12 insertions(+), 39 deletions(-) diff --git a/caboose.go b/caboose.go index 302ada7..de549b0 100644 --- a/caboose.go +++ b/caboose.go @@ -135,6 +135,9 @@ func NewCaboose(config *Config) (*Caboose, error) { if override := os.Getenv(BackendOverrideKey); len(override) > 0 { config.OrchestratorOverride = strings.Split(override, ",") } + if config.PoolTargetSize == 0 { + config.PoolTargetSize = DefaultPoolTargetSize + } logger := newLogger(config) c := Caboose{ @@ -159,9 +162,6 @@ func NewCaboose(config *Config) (*Caboose, error) { if c.config.PoolRefresh == 0 { c.config.PoolRefresh = DefaultPoolRefreshInterval } - if c.config.PoolTargetSize == 0 { - c.config.PoolTargetSize = DefaultPoolTargetSize - } if c.config.MaxRetrievalAttempts == 0 { c.config.MaxRetrievalAttempts = defaultMaxRetries diff --git a/node_ring.go b/node_ring.go index 30cf4e1..0611687 100644 --- a/node_ring.go +++ b/node_ring.go @@ -54,9 +54,12 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in for n, v := range overlapEstimate { neighbor = nr.Nodes[n] neighborVolume := neighbor.Rate() + if neighborVolume < 1 { + neighborVolume = 1 + } // how much worse is candidate? - diff := candidate.Priority() - neighbor.Priority() + diff := neighbor.Priority() - candidate.Priority() delta += diff * neighborVolume * float64(v) } diff --git a/pool_dynamics_test.go b/pool_dynamics_test.go index fcc2deb..eca678e 100644 --- a/pool_dynamics_test.go +++ b/pool_dynamics_test.go @@ -2,7 +2,6 @@ package caboose_test import ( "context" - "fmt" "math/rand" "net/url" "testing" @@ -43,41 +42,20 @@ func TestPoolDynamics(t *testing.T) { ch.FetchAndAssertSuccess(t, ctx, testCid) goodNodes := make([]*caboose.Node, 0) - badNodes := make([]*caboose.Node, 0) for _, n := range ch.CabooseAllNodes.Nodes { _, ok := controlGroup[n.URL] if ok { goodNodes = append(goodNodes, n) - } else { - badNodes = append(badNodes, n) } } for i := 0; i < 1; i++ { - nodes := make([]string, 0) - for _, n := range ch.CabooseAllNodes.Nodes { - nodes = append(nodes, n.URL) - } - fmt.Println("All nodes", nodes) - goodStats := util.NodeStats{ Start: time.Now().Add(-time.Second * 2), Latency: float64(baseStatLatency) / float64(10), Size: float64(baseStatSize) * float64(10), } - bn := make([]string, 0) - gn := make([]string, 0) - for _, n := range goodNodes { - gn = append(gn, n.URL) - } - - for _, n := range badNodes { - bn = append(bn, n.URL) - } - fmt.Println("Good Nodes", gn) - fmt.Println("Bad nodes", bn) - ch.RecordSuccesses(t, goodNodes, goodStats, 1000) ch.CaboosePool.DoRefresh() } @@ -85,22 +63,11 @@ func TestPoolDynamics(t *testing.T) { for n := range controlGroup { assert.Contains(t, ch.CabooseActiveNodes.Nodes, n) } - - np := make([]string, 0) - for _, n := range ch.CabooseActiveNodes.Nodes { - np = append(np, n.URL) - } - - fmt.Println("Final Node Pool", np) - - for _, n := range ch.CabooseAllNodes.Nodes { - fmt.Println("Node", n.URL, "Priority", n.Priority(), "Rate", n.Rate(), "samples ", len(n.Samples.PeekAll())) - } - }) t.Run("pool converges to good nodes vs nodes with worse stats", func(t *testing.T) { ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) goodNodes := make([]*caboose.Node, 0) badNodes := make([]*caboose.Node, 0) @@ -134,13 +101,14 @@ func TestPoolDynamics(t *testing.T) { for n := range controlGroup { assert.Contains(t, ch.CabooseActiveNodes.Nodes, n) } - }) // When new nodes join, if they start consistently performing better than the nodes in the current pool, // then those nodes should replace the nodes in the current pool. t.Run("pool converges to new nodes that are better than the current pool", func(t *testing.T) { ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) + goodNodes := make([]*caboose.Node, 0) badNodes := make([]*caboose.Node, 0) @@ -187,6 +155,8 @@ func TestPoolDynamics(t *testing.T) { // to nodes that are not failing. t.Run("pool converges to other nodes if the current ones start failing", func(t *testing.T) { ch, controlGroup := getHarnessAndControlGroup(t, nodesSize, nodesSize/2) + ch.FetchAndAssertSuccess(t, ctx, testCid) + goodNodes := make([]*caboose.Node, 0) badNodes := make([]*caboose.Node, 0) From 61c82da432e09ca54e18752c1c9eebd4ceb66c14 Mon Sep 17 00:00:00 2001 From: Will Scott Date: Sun, 10 Sep 2023 15:24:42 +0200 Subject: [PATCH 3/3] add substitution (rough) --- node.go | 4 +++ node_ring.go | 79 +++++++++++++++++++++++++++++++++++++++++++--------- 2 files changed, 70 insertions(+), 13 deletions(-) diff --git a/node.go b/node.go index 4fc27ce..7424b41 100644 --- a/node.go +++ b/node.go @@ -131,3 +131,7 @@ func (n *Node) Rate() float64 { last := n.Samples.Peek() return float64(len) / float64(time.Since(last.Start)) } + +func (n *Node) String() string { + return n.URL +} diff --git a/node_ring.go b/node_ring.go index 0611687..dfe6b31 100644 --- a/node_ring.go +++ b/node_ring.go @@ -1,6 +1,8 @@ package caboose import ( + "fmt" + "strings" "sync" "github.com/willscott/hashring" @@ -34,6 +36,40 @@ func (nr *NodeRing) updateRing() error { return nil } +// A score of '0' ==> overall experience is the same as the current state +// A positive score ==> overall experience is better than the current state +// A negative score ==> overall experience is worse than the current state +func (nr *NodeRing) getScoreForUpdate(candidate string, priority float64, weight int) float64 { + changes := nr.ring.ConsiderUpdateWeightedNode(candidate, weight) + delta := float64(0) + var neighbor *Node + + for n, v := range changes { + neighbor = nr.Nodes[n] + neighborVolume := neighbor.Rate() + if neighborVolume < 1 { + neighborVolume = 1 + } + + amntChanged := v + // for now, add some bounds + if amntChanged < -1 { + amntChanged = -1 + } else if amntChanged > 1 { + amntChanged = 1 + } + // a negative amntChanged means that we're replacing the neighbor with the candidate. + amntChanged *= -1 + + // how much worse is candidate? + diff := priority - neighbor.Priority() + cs := diff * neighborVolume * float64(amntChanged) + delta += cs + // fmt.Printf("+%f (n %s: diff %f=(n %f - candidate %f) * volume %f * v = %f)", cs, neighbor.URL, diff, neighbor.Priority(), priority, neighborVolume, amntChanged) + } + return delta +} + func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold int64) (bool, error) { nr.lk.Lock() defer nr.lk.Unlock() @@ -46,27 +82,32 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in } // how much space is being claimed? - overlapEstimate := nr.ring.ConsiderUpdateWeightedNode(candidate.URL, 1) + delta := nr.getScoreForUpdate(candidate.URL, candidate.Priority(), 1) - var neighbor *Node - delta := float64(0) + if delta >= float64(activationThreshold) { + nr.Nodes[candidate.URL] = candidate + return true, nr.updateRing() + } - for n, v := range overlapEstimate { - neighbor = nr.Nodes[n] - neighborVolume := neighbor.Rate() - if neighborVolume < 1 { - neighborVolume = 1 + // not a clear benefit to add, but maybe acceptable for substitution: + worst := candidate.Priority() + worstN := "" + for _, n := range nr.Nodes { + if n.Priority() < worst { + worst = n.Priority() + worstN = n.URL } - - // how much worse is candidate? - diff := neighbor.Priority() - candidate.Priority() - delta += diff * neighborVolume * float64(v) } - if delta > float64(activationThreshold) { + // todo: the '+1' is an arbitrary threshold to prevent thrashing. it should be configurable. + if worstN != "" && candidate.Priority()-worst > float64(activationThreshold)+1 { nr.Nodes[candidate.URL] = candidate + delete(nr.Nodes, worstN) return true, nr.updateRing() + } + + // fmt.Printf("did not add - delta %f activation %d, node priority %f\n", delta, activationThreshold, candidate.Priority()) return false, nil } @@ -121,3 +162,15 @@ func (nr *NodeRing) Len() int { defer nr.lk.RUnlock() return nr.ring.Size() } + +func (nr *NodeRing) String() string { + nr.lk.RLock() + defer nr.lk.RUnlock() + + ns := make([]string, 0, len(nr.Nodes)) + for _, n := range nr.Nodes { + ns = append(ns, n.String()) + } + + return fmt.Sprintf("NodeRing[len %d]{%s}", nr.ring.Size(), strings.Join(ns, ",")) +}