Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
feat: modify pool tests
Browse files Browse the repository at this point in the history
  • Loading branch information
AmeanAsad committed Aug 31, 2023
1 parent 5f2ab23 commit 17bb795
Show file tree
Hide file tree
Showing 6 changed files with 135 additions and 28 deletions.
20 changes: 10 additions & 10 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const defaultNodeCoolOff = 5 * time.Minute

type Caboose struct {
config *Config
pool *pool
Pool *pool
logger *logger
}

Expand Down Expand Up @@ -135,7 +135,7 @@ func NewCaboose(config *Config) (*Caboose, error) {
logger := newLogger(config)
c := Caboose{
config: config,
pool: newPool(config, logger),
Pool: newPool(config, logger),
logger: logger,
}

Expand All @@ -162,12 +162,12 @@ func NewCaboose(config *Config) (*Caboose, error) {

// Set during testing to leak internal state to the harness.
if c.config.Harness != nil {
c.config.Harness.ActiveNodes = c.pool.ActiveNodes
c.config.Harness.AllNodes = c.pool.AllNodes
c.config.Harness.ActiveNodes = c.Pool.ActiveNodes
c.config.Harness.AllNodes = c.Pool.AllNodes
}

// start the pool
c.pool.Start()
c.Pool.Start()

return &c, nil
}
Expand All @@ -176,7 +176,7 @@ func NewCaboose(config *Config) (*Caboose, error) {
var _ ipfsblockstore.Blockstore = (*Caboose)(nil)

func (c *Caboose) Close() {
c.pool.Close()
c.Pool.Close()
if c.logger != nil {
c.logger.Close()
}
Expand All @@ -187,14 +187,14 @@ func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error
ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
return c.Pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.Pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -205,7 +205,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.Pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -217,7 +217,7 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.Pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return 0, err
}
Expand Down
24 changes: 24 additions & 0 deletions internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,12 @@ type CabooseHarness struct {
goodOrch bool
}

type NodeStats struct {
Start time.Time
Latency float64
Size float64
}

func (ch *CabooseHarness) RunFetchesForRandCids(n int) {
for i := 0; i < n; i++ {
randCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte{uint8(i)})
Expand Down Expand Up @@ -122,6 +128,24 @@ func (ch *CabooseHarness) FetchAndAssertSuccess(t *testing.T, ctx context.Contex
require.NotEmpty(t, blk)
}

func (ch *CabooseHarness) RecordSuccesses(t *testing.T, nodes []*caboose.Node, s NodeStats, n int) {
for _, node := range(nodes) {
s.Start = time.Now().Add(-time.Second*5)
for i := 0; i < n; i++ {
node.RecordSuccess(s.Start, s.Latency, s.Size)
}
}
}

func (ch *CabooseHarness) RecordFailures(t *testing.T, nodes []*caboose.Node, n int) {
for _, node := range(nodes) {
for i := 0; i < n; i++ {
node.RecordFailure()
}
}
}


func (ch *CabooseHarness) FailNodesWithCode(t *testing.T, selectorF func(ep *Endpoint) bool, code int) {
for _, n := range ch.Endpoints {
if selectorF(n) {
Expand Down
22 changes: 11 additions & 11 deletions node_ring.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ import (

// NodeRing represents a set of nodes organized for stable hashing.
type NodeRing struct {
nodes map[string]*Node
Nodes map[string]*Node
ring hashring.HashRing

lk sync.RWMutex
}

func NewNodeRing() *NodeRing {
return &NodeRing{
nodes: map[string]*Node{},
Nodes: map[string]*Node{},
ring: *hashring.New([]string{}),
}
}

func (nr *NodeRing) updateRing() error {
// this method expects that the lk is held when called.
rs := make(map[string]int)
for _, n := range nr.nodes {
for _, n := range nr.Nodes {
// TODO: weight multiples
rs[n.URL] = 1
}
Expand All @@ -39,7 +39,7 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
_, ok := nr.ring.GetNode(candidate.URL)
if !ok {
// ring is empty. in this case we always want to add.
nr.nodes[candidate.URL] = candidate
nr.Nodes[candidate.URL] = candidate
return true, nr.updateRing()
}

Expand All @@ -50,7 +50,7 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
delta := float64(0)

for n, v := range overlapEstimate {
neighbor = nr.nodes[n]
neighbor = nr.Nodes[n]
neighborVolume := neighbor.Rate()

// how much worse is candidate?
Expand All @@ -59,7 +59,7 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
}

if delta > float64(activationThreshold) {
nr.nodes[candidate.URL] = candidate
nr.Nodes[candidate.URL] = candidate
return true, nr.updateRing()
}
return false, nil
Expand All @@ -68,16 +68,16 @@ func (nr *NodeRing) MaybeSubstituteOrAdd(candidate *Node, activationThreshold in
func (nr *NodeRing) Add(n *Node) error {
nr.lk.Lock()
defer nr.lk.Unlock()
nr.nodes[n.URL] = n
nr.Nodes[n.URL] = n
return nr.updateRing()
}

func (nr *NodeRing) Remove(n *Node) error {
nr.lk.Lock()
defer nr.lk.Unlock()

if _, ok := nr.nodes[n.URL]; ok {
delete(nr.nodes, n.URL)
if _, ok := nr.Nodes[n.URL]; ok {
delete(nr.Nodes, n.URL)
return nr.updateRing()
}
return ErrNoBackend
Expand All @@ -87,7 +87,7 @@ func (nr *NodeRing) Contains(n *Node) bool {
nr.lk.RLock()
defer nr.lk.RUnlock()

_, ok := nr.nodes[n.URL]
_, ok := nr.Nodes[n.URL]
return ok
}

Expand All @@ -104,7 +104,7 @@ func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) {
}
nodes := make([]*Node, 0, len(keys))
for _, k := range keys {
if n, ok := nr.nodes[k]; ok {
if n, ok := nr.Nodes[k]; ok {
nodes = append(nodes, n)
}
}
Expand Down
6 changes: 3 additions & 3 deletions pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (p *pool) Start() {
go p.checkPool()
}

func (p *pool) doRefresh() {
func (p *pool) DoRefresh() {
newEP, err := p.loadPool()
if err == nil {
for _, n := range newEP {
Expand All @@ -114,14 +114,14 @@ func (p *pool) refreshPool() {
for {
select {
case <-t.C:
p.doRefresh()
p.DoRefresh()
started.Do(func() {
close(p.started)
})

t.Reset(p.config.PoolRefresh)
case <-p.refresh:
p.doRefresh()
p.DoRefresh()
started.Do(func() {
close(p.started)
})
Expand Down
80 changes: 79 additions & 1 deletion pool_dynamics_test.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,91 @@
package caboose_test

import (
"context"
"fmt"
"math/rand"
"net/url"
"testing"
"time"

"github.com/filecoin-saturn/caboose"
"github.com/filecoin-saturn/caboose/internal/util"
"github.com/ipfs/go-cid"
"github.com/multiformats/go-multicodec"
)


const (
nodesSize = 10
nodesPoolSize = caboose.PoolConsiderationCount
)


func TestPoolDynamics(t *testing.T) {
ch := util.BuildCabooseHarness(t, 3, 3)

ch := util.BuildCabooseHarness(t, nodesSize , 3)
ch.StartOrchestrator()
baseStatSize := 100
baseStatLatency := 100
ctx := context.Background()

testCid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum(testBlock)

ch.FetchAndAssertSuccess(t, ctx, testCid)

rand.New(rand.NewSource(0))
eps := ch.Endpoints
controlGroup := make(map[string]string)

rand.Shuffle(len(eps), func(i, j int) {
eps[i], eps[j] = eps[j], eps[i]
})

for _,ep := range(eps[:nodesPoolSize]) {
url, _ := url.Parse(ep.Server.URL)
controlGroup[url.Host] = ep.Server.URL

}

for i := 0; i < 1; i++ {
nodes := ch.CabooseAllNodes
goodNodes := make([]*caboose.Node, 0)
badNodes := make([]*caboose.Node, 0)
goodStats := util.NodeStats{
Start: time.Now().Add(-time.Second*6),
Latency: float64(baseStatLatency) / float64(10),
Size: float64(baseStatSize) * float64(10),
}
badStats := util.NodeStats{
Start: time.Now().Add(-time.Second*6),
Latency: float64(baseStatLatency) * float64(10),
Size: float64(baseStatSize) / float64(10),
}
for _,n := range(nodes.Nodes) {
_, ok := controlGroup[n.URL]
if ok {
fmt.Println("Good", n.URL)

goodNodes = append(goodNodes, n)
} else {
fmt.Println("Bad", n.URL)

badNodes = append(badNodes, n)
}
}

ch.RecordSuccesses(t, goodNodes, goodStats, 10)
ch.RecordSuccesses(t, badNodes, badStats, 10)

}


ch.Caboose.Pool.DoRefresh()

fmt.Println("Pool", ch.CabooseActiveNodes.Nodes)

for _,n := range(ch.CabooseAllNodes.Nodes) {
fmt.Println("Node", n.URL, "size", n.PredictedThroughput)
}

}
11 changes: 8 additions & 3 deletions pool_tier_promotion.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,24 @@
package caboose

import "fmt"

const (
poolConsiderationCount = 30
PoolConsiderationCount = 3
activationThreshold = 0
)

func updateActiveNodes(active *NodeRing, all *NodeHeap) error {
candidates := all.TopN(poolConsiderationCount)
candidates := all.TopN(PoolConsiderationCount)
for _, c := range(candidates) {
fmt.Println("Candidates", c.URL, c.PredictedThroughput)
}
added := 0
for _, c := range candidates {
if active.Contains(c) {
continue
}
activeSize := active.Len()
discount := poolConsiderationCount - activeSize
discount := PoolConsiderationCount - activeSize
if discount < 0 {
discount = 0
}
Expand Down

0 comments on commit 17bb795

Please sign in to comment.