diff --git a/caboose.go b/caboose.go index 7abd0aa..5bbfdd8 100644 --- a/caboose.go +++ b/caboose.go @@ -2,11 +2,11 @@ package caboose import ( "context" + "encoding/json" "io" "net/http" "net/url" "os" - "strings" "time" requestcontext "github.com/willscott/go-requestcontext" @@ -34,7 +34,7 @@ type Config struct { // OrchestratorClient is the HTTP client to use when communicating with the orchestrator. OrchestratorClient *http.Client // OrchestratorOverride replaces calls to the orchestrator with a fixed response. - OrchestratorOverride []string + OrchestratorOverride []state.NodeInfo // LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests. LoggingEndpoint url.URL @@ -81,6 +81,9 @@ type Config struct { // Harness is an internal test harness that is set during testing. Harness *state.State + + // ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid + ComplianceCidPeriod int64 } const DefaultLoggingInterval = 5 * time.Second @@ -95,10 +98,12 @@ const defaultMaxRetries = 3 // default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction const defaultMirrorFraction = 0.01 -const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200" +const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200" const DefaultPoolRefreshInterval = 5 * time.Minute const DefaultPoolTargetSize = 30 +const DefaultComplianceCidPeriod = int64(100) + // we cool off sending requests for a cid for a certain duration // if we've seen a certain number of failures for it already in a given duration. // NOTE: before getting creative here, make sure you dont break end user flow @@ -137,7 +142,13 @@ func NewCaboose(config *Config) (*Caboose, error) { config.MirrorFraction = defaultMirrorFraction } if override := os.Getenv(BackendOverrideKey); len(override) > 0 { - config.OrchestratorOverride = strings.Split(override, ",") + var overrideNodes []state.NodeInfo + err := json.Unmarshal([]byte(override), &overrideNodes) + if err != nil { + goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err) + return nil, err + } + config.OrchestratorOverride = overrideNodes } if config.PoolTargetSize == 0 { config.PoolTargetSize = DefaultPoolTargetSize @@ -166,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) { } } + if c.config.ComplianceCidPeriod == 0 { + c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod + } + if c.config.PoolRefresh == 0 { c.config.PoolRefresh = DefaultPoolRefreshInterval } diff --git a/fetcher.go b/fetcher.go index 728fd37..b7b3c80 100644 --- a/fetcher.go +++ b/fetcher.go @@ -55,7 +55,11 @@ func (p *pool) fetchResource(ctx context.Context, from *Node, resource string, m return ce } - ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime))) + p.ActiveNodes.lk.RLock() + isCore := p.ActiveNodes.IsCore(from) + p.ActiveNodes.lk.RUnlock() + + ctx, span := spanTrace(ctx, "Pool.FetchResource", trace.WithAttributes(attribute.String("from", from.URL), attribute.String("of", resource), attribute.String("mime", mime), attribute.Bool("core", isCore))) defer span.End() requestId := uuid.NewString() diff --git a/internal/util/harness.go b/internal/util/harness.go index e89b6d2..16e0745 100644 --- a/internal/util/harness.go +++ b/internal/util/harness.go @@ -33,7 +33,7 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt ch.Endpoints[i].Setup() ip := strings.TrimPrefix(ch.Endpoints[i].Server.URL, "https://") - cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(ip)) + cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(testBlock)) purls[i] = state.NodeInfo{ IP: ip, @@ -77,6 +77,8 @@ func BuildCabooseHarness(t *testing.T, n int, maxRetries int, opts ...HarnessOpt PoolRefresh: time.Second * 50, MaxRetrievalAttempts: maxRetries, Harness: &state.State{}, + + MirrorFraction: 1.0, } for _, opt := range opts { @@ -257,6 +259,18 @@ func WithMaxFailuresBeforeCoolDown(max int) func(config *caboose.Config) { } } +func WithComplianceCidPeriod(n int64) func(config *caboose.Config) { + return func(config *caboose.Config) { + config.ComplianceCidPeriod = n + } +} + +func WithMirrorFraction(n float64) func(config *caboose.Config) { + return func(config *caboose.Config) { + config.MirrorFraction = n + } +} + func WithCidCoolDownDuration(duration time.Duration) func(config *caboose.Config) { return func(config *caboose.Config) { config.FetchKeyCoolDownDuration = duration diff --git a/metrics.go b/metrics.go index 2b82c49..4350a8d 100644 --- a/metrics.go +++ b/metrics.go @@ -133,6 +133,10 @@ var ( mirroredTrafficTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ Name: prometheus.BuildFQName("ipfs", "caboose", "mirrored_traffic_total"), }, []string{"error_status"}) + + complianceCidCallsTotalMetric = prometheus.NewCounterVec(prometheus.CounterOpts{ + Name: prometheus.BuildFQName("ipfs", "caboose", "compliance_cids_total"), + }, []string{"error_status"}) ) var CabooseMetrics = prometheus.NewRegistry() @@ -163,6 +167,7 @@ func init() { CabooseMetrics.MustRegister(saturnCallsTotalMetric) CabooseMetrics.MustRegister(saturnCallsFailureTotalMetric) CabooseMetrics.MustRegister(saturnConnectionFailureTotalMetric) + CabooseMetrics.MustRegister(complianceCidCallsTotalMetric) CabooseMetrics.MustRegister(saturnCallsSuccessTotalMetric) diff --git a/node.go b/node.go index 7424b41..f21f1b2 100644 --- a/node.go +++ b/node.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/filecoin-saturn/caboose/internal/state" "github.com/zyedidia/generic/queue" ) @@ -14,7 +15,9 @@ const ( ) type Node struct { - URL string + URL string + ComplianceCid string + Core bool PredictedLatency float64 PredictedThroughput float64 @@ -25,10 +28,12 @@ type Node struct { lk sync.RWMutex } -func NewNode(url string) *Node { +func NewNode(info state.NodeInfo) *Node { return &Node{ - URL: url, - Samples: queue.New[NodeSample](), + URL: info.IP, + ComplianceCid: info.ComplianceCid, + Core: info.Core, + Samples: queue.New[NodeSample](), } } diff --git a/node_heap.go b/node_heap.go index 1d6092f..c71771f 100644 --- a/node_heap.go +++ b/node_heap.go @@ -2,6 +2,7 @@ package caboose import ( "container/heap" + "math/rand" "sync" ) @@ -45,8 +46,13 @@ func (nh *NodeHeap) Best() *Node { func (nh *NodeHeap) PeekRandom() *Node { nh.lk.RLock() defer nh.lk.RUnlock() - // TODO - return nil + + if len(nh.Nodes) == 0 { + return nil + } + + randIdx := rand.Intn(len(nh.Nodes)) + return nh.Nodes[randIdx] } func (nh *NodeHeap) TopN(n int) []*Node { diff --git a/node_ring.go b/node_ring.go index dfe6b31..eafbbf3 100644 --- a/node_ring.go +++ b/node_ring.go @@ -137,6 +137,17 @@ func (nr *NodeRing) Contains(n *Node) bool { return ok } +func (nr *NodeRing) IsCore(n *Node) bool { + nr.lk.RLock() + defer nr.lk.RUnlock() + + nd, ok := nr.Nodes[n.URL] + if !ok { + return false + } + return nd.Core +} + func (nr *NodeRing) GetNodes(key string, number int) ([]*Node, error) { nr.lk.RLock() defer nr.lk.RUnlock() diff --git a/pool.go b/pool.go index 8ff4aac..64dca91 100644 --- a/pool.go +++ b/pool.go @@ -2,16 +2,19 @@ package caboose import ( "context" + cryptoRand "crypto/rand" "encoding/json" "errors" "fmt" - "github.com/filecoin-saturn/caboose/internal/state" "io" + "math/big" "math/rand" "net/url" "sync" "time" + "github.com/filecoin-saturn/caboose/internal/state" + "github.com/patrickmn/go-cache" "github.com/ipfs/boxo/path" @@ -25,9 +28,11 @@ const ( defaultMirroredConcurrency = 5 ) +var complianceCidReqTemplate = "/ipfs/%s?format=raw" + // loadPool refreshes the set of endpoints in the pool by fetching an updated list of nodes from the // Orchestrator. -func (p *pool) loadPool() ([]string, error) { +func (p *pool) loadPool() ([]state.NodeInfo, error) { if p.config.OrchestratorOverride != nil { return p.config.OrchestratorOverride, nil } @@ -48,13 +53,7 @@ func (p *pool) loadPool() ([]string, error) { goLogger.Infow("got backends from orchestrators", "cnt", len(responses), "endpoint", p.config.OrchestratorEndpoint.String()) - var ips []string - - for _, r := range responses { - ips = append(ips, r.IP) - } - - return ips, nil + return responses, nil } type mirroredPoolRequest struct { @@ -149,6 +148,20 @@ func (p *pool) refreshPool() { } } +func (p *pool) fetchComplianceCid(node *Node) error { + sc := node.ComplianceCid + if len(node.ComplianceCid) == 0 { + goLogger.Warnw("failed to find compliance cid ", "for node", node) + return fmt.Errorf("compliance cid doesn't exist for node: %s ", node) + } + trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) + reqUrl := fmt.Sprintf(complianceCidReqTemplate, sc) + goLogger.Debugw("fetching compliance cid", "cid", reqUrl, "from", node) + err := p.fetchResourceAndUpdate(trialTimeout, node, reqUrl, 0, p.mirrorValidator) + cancel() + return err +} + func (p *pool) checkPool() { sem := make(chan struct{}, defaultMirroredConcurrency) @@ -156,7 +169,6 @@ func (p *pool) checkPool() { select { case msg := <-p.mirrorSamples: sem <- struct{}{} - go func(msg mirroredPoolRequest) { defer func() { <-sem }() @@ -169,11 +181,26 @@ func (p *pool) checkPool() { return } if p.ActiveNodes.Contains(testNode) { + rand := big.NewInt(1) + if p.config.ComplianceCidPeriod > 0 { + rand, _ = cryptoRand.Int(cryptoRand.Reader, big.NewInt(p.config.ComplianceCidPeriod)) + } + + if rand.Cmp(big.NewInt(0)) == 0 { + err := p.fetchComplianceCid(testNode) + if err != nil { + goLogger.Warnw("failed to fetch compliance cid ", "err", err) + complianceCidCallsTotalMetric.WithLabelValues("error").Add(1) + } else { + complianceCidCallsTotalMetric.WithLabelValues("success").Add(1) + } + } return } trialTimeout, cancel := context.WithTimeout(context.Background(), 30*time.Second) err := p.fetchResourceAndUpdate(trialTimeout, testNode, msg.path, 0, p.mirrorValidator) + cancel() if err != nil { mirroredTrafficTotalMetric.WithLabelValues("error").Inc() diff --git a/pool_refresh_test.go b/pool_refresh_test.go index e6318bf..5ae4ad4 100644 --- a/pool_refresh_test.go +++ b/pool_refresh_test.go @@ -1,8 +1,12 @@ package caboose import ( + "math/rand" "testing" + "github.com/filecoin-saturn/caboose/internal/state" + "github.com/ipfs/go-cid" + "github.com/multiformats/go-multicodec" "github.com/stretchr/testify/require" ) @@ -29,8 +33,25 @@ func TestPoolRefresh(t *testing.T) { } func addAndAssertPool(t *testing.T, p *pool, nodes []string, expectedTotal int) { - for _, n := range nodes { + nodeStructs := genNodeStructs(nodes) + for _, n := range nodeStructs { p.AllNodes.AddIfNotPresent(NewNode(n)) } require.Equal(t, expectedTotal, p.AllNodes.Len()) } + +func genNodeStructs(nodes []string) []state.NodeInfo { + var nodeStructs []state.NodeInfo + + for _, node := range nodes { + cid, _ := cid.V1Builder{Codec: uint64(multicodec.Raw), MhType: uint64(multicodec.Sha2_256)}.Sum([]byte(node)) + nodeStructs = append(nodeStructs, state.NodeInfo{ + IP: node, + ID: node, + Weight: rand.Intn(100), + Distance: rand.Float32(), + ComplianceCid: cid.String(), + }) + } + return nodeStructs +} diff --git a/pool_test.go b/pool_test.go index 683dca2..543078b 100644 --- a/pool_test.go +++ b/pool_test.go @@ -3,15 +3,10 @@ package caboose_test import ( "bytes" "context" - "crypto/tls" - "net/http" - "net/url" - "strings" "testing" "time" "unsafe" - "github.com/filecoin-saturn/caboose" "github.com/filecoin-saturn/caboose/internal/util" "github.com/ipfs/go-cid" "github.com/ipld/go-car/v2" @@ -28,15 +23,7 @@ func TestPoolMirroring(t *testing.T) { t.Skip("skipping for 32bit architectures because too slow") } - saturnClient := &http.Client{ - Transport: &http.Transport{ - TLSClientConfig: &tls.Config{ - InsecureSkipVerify: true, - }, - }, - } - - data := []byte("hello world") + data := []byte("hello World") ls := cidlink.DefaultLinkSystem() lsm := memstore.Store{} ls.SetReadStorage(&lsm) @@ -50,32 +37,8 @@ func TestPoolMirroring(t *testing.T) { t.Fatal(err) } - e := util.Endpoint{} - e.Setup() - e.SetResp(carBytes.Bytes(), false) - eURL := strings.TrimPrefix(e.Server.URL, "https://") - - e2 := util.Endpoint{} - e2.Setup() - e2.SetResp(carBytes.Bytes(), false) - e2URL := strings.TrimPrefix(e2.Server.URL, "https://") - - conf := caboose.Config{ - OrchestratorEndpoint: &url.URL{}, - OrchestratorClient: http.DefaultClient, - OrchestratorOverride: []string{eURL, e2URL}, - LoggingEndpoint: url.URL{}, - LoggingClient: http.DefaultClient, - LoggingInterval: time.Hour, - - Client: saturnClient, - DoValidation: false, - PoolRefresh: time.Minute, - MaxRetrievalAttempts: 1, - MirrorFraction: 1.0, - } + ch := util.BuildCabooseHarness(t, 2, 3) - c, err := caboose.NewCaboose(&conf) if err != nil { t.Fatal(err) } @@ -84,7 +47,7 @@ func TestPoolMirroring(t *testing.T) { // Make 10 requests, and expect some fraction trigger a mirror. for i := 0; i < 10; i++ { - _, err = c.Get(context.Background(), finalC) + _, err = ch.Caboose.Get(context.Background(), finalC) if err != nil { t.Fatal(err) } @@ -92,11 +55,43 @@ func TestPoolMirroring(t *testing.T) { } time.Sleep(100 * time.Millisecond) - c.Close() + ch.Caboose.Close() + + ec := ch.Endpoints[0].Count() - ec := e.Count() - e2c := e2.Count() + e2c := ch.Endpoints[1].Count() if ec+e2c < 10 { t.Fatalf("expected at least 10 fetches, got %d", ec+e2c) } } + +func TestFetchComplianceCid(t *testing.T) { + if unsafe.Sizeof(unsafe.Pointer(nil)) <= 4 { + t.Skip("skipping for 32bit architectures because too slow") + } + + ch := util.BuildCabooseHarness(t, 1, 1, util.WithComplianceCidPeriod(1), util.WithMirrorFraction(1.0)) + + ch.CaboosePool.DoRefresh() + + ls := cidlink.DefaultLinkSystem() + lsm := memstore.Store{} + ls.SetReadStorage(&lsm) + ls.SetWriteStorage(&lsm) + finalCL := ls.MustStore(ipld.LinkContext{}, cidlink.LinkPrototype{Prefix: cid.NewPrefixV1(uint64(multicodec.Raw), uint64(multicodec.Sha2_256))}, basicnode.NewBytes(testBlock)) + finalC := finalCL.(cidlink.Link).Cid + + _, err := ch.Caboose.Get(context.Background(), finalC) + if err != nil { + t.Fatal(err) + } + + time.Sleep(100 * time.Millisecond) + ch.Caboose.Close() + + e := ch.Endpoints[0] + + if e.Count() != 2 { + t.Fatalf("expected 2 primary fetch, got %d", e.Count()) + } +}