Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Commit

Permalink
fix: surface caboose pool methods for testing
Browse files Browse the repository at this point in the history
  • Loading branch information
AmeanAsad committed Sep 1, 2023
2 parents 17bb795 + 9989d73 commit 00a4344
Show file tree
Hide file tree
Showing 5 changed files with 20 additions and 13 deletions.
21 changes: 11 additions & 10 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ const defaultNodeCoolOff = 5 * time.Minute

type Caboose struct {
config *Config
Pool *pool
pool *pool
logger *logger
}

Expand Down Expand Up @@ -135,7 +135,7 @@ func NewCaboose(config *Config) (*Caboose, error) {
logger := newLogger(config)
c := Caboose{
config: config,
Pool: newPool(config, logger),
pool: newPool(config, logger),
logger: logger,
}

Expand All @@ -162,12 +162,13 @@ func NewCaboose(config *Config) (*Caboose, error) {

// Set during testing to leak internal state to the harness.
if c.config.Harness != nil {
c.config.Harness.ActiveNodes = c.Pool.ActiveNodes
c.config.Harness.AllNodes = c.Pool.AllNodes
c.config.Harness.ActiveNodes = c.pool.ActiveNodes
c.config.Harness.AllNodes = c.pool.AllNodes
c.config.Harness.PoolController = c.pool
}

// start the pool
c.Pool.Start()
c.pool.Start()

return &c, nil
}
Expand All @@ -176,7 +177,7 @@ func NewCaboose(config *Config) (*Caboose, error) {
var _ ipfsblockstore.Blockstore = (*Caboose)(nil)

func (c *Caboose) Close() {
c.Pool.Close()
c.pool.Close()
if c.logger != nil {
c.logger.Close()
}
Expand All @@ -187,14 +188,14 @@ func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error
ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.Pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.Pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return false, err
}
Expand All @@ -205,7 +206,7 @@ func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.Pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -217,7 +218,7 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.Pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
if err != nil {
return 0, err
}
Expand Down
5 changes: 5 additions & 0 deletions internal/state/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,9 @@ package state
type State struct {
ActiveNodes any
AllNodes any
PoolController
}

type PoolController interface {
DoRefresh()
}
2 changes: 2 additions & 0 deletions internal/util/harness.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt
ch.Caboose = bs
ch.CabooseActiveNodes = conf.Harness.ActiveNodes.(*caboose.NodeRing)
ch.CabooseAllNodes = conf.Harness.AllNodes.(*caboose.NodeHeap)
ch.CaboosePool = conf.Harness.PoolController
return ch
}

Expand All @@ -87,6 +88,7 @@ type CabooseHarness struct {

CabooseActiveNodes *caboose.NodeRing
CabooseAllNodes *caboose.NodeHeap
CaboosePool state.PoolController

gol sync.Mutex
goodOrch bool
Expand Down
2 changes: 1 addition & 1 deletion pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func (p *pool) DoRefresh() {
}
}

// refreshPool is a background thread triggering `doRefresh` every `config.PoolRefresh` interval.
// refreshPool is a background thread triggering `DoRefresh` every `config.PoolRefresh` interval.
func (p *pool) refreshPool() {
t := time.NewTimer(0)
started := sync.Once{}
Expand Down
3 changes: 1 addition & 2 deletions pool_dynamics_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,7 @@ func TestPoolDynamics(t *testing.T) {
}


ch.Caboose.Pool.DoRefresh()

ch.CaboosePool.DoRefresh()
fmt.Println("Pool", ch.CabooseActiveNodes.Nodes)

for _,n := range(ch.CabooseAllNodes.Nodes) {
Expand Down

0 comments on commit 00a4344

Please sign in to comment.