Skip to content
This repository has been archived by the owner on Apr 18, 2024. It is now read-only.

Adaptive Pool #116

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
200 changes: 89 additions & 111 deletions caboose.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,65 +2,70 @@ package caboose

import (
"context"
"errors"
"fmt"
"encoding/json"
"io"
"net/http"
"net/url"
"os"
"strings"
"time"

"github.com/filecoin-saturn/caboose/tieredhashing"
requestcontext "github.com/willscott/go-requestcontext"

ipfsblockstore "github.com/ipfs/boxo/blockstore"
ipath "github.com/ipfs/boxo/coreiface/path"
gateway "github.com/ipfs/boxo/gateway"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-cid"

"go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

"github.com/filecoin-saturn/caboose/internal/state"
)

const (
SaturnEnvKey = "STRN_ENV_TAG"
BackendOverrideKey = "CABOOSE_BACKEND_OVERRIDE"
)

type Config struct {
// OrchestratorEndpoint is the URL of the Saturn orchestrator.
// OrchestratorEndpoint is the URL for fetching upstream nodes.
OrchestratorEndpoint *url.URL
// OrchestratorClient is the HTTP client to use when communicating with the Saturn orchestrator.
// OrchestratorClient is the HTTP client to use when communicating with the orchestrator.
OrchestratorClient *http.Client
// OrchestratorOverride replaces calls to the orchestrator with a fixed response.
OrchestratorOverride []string
OrchestratorOverride []state.NodeInfo

// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to our Saturn retrieval requests.
// LoggingEndpoint is the URL of the logging endpoint where we submit logs pertaining to retrieval requests.
LoggingEndpoint url.URL
// LoggingClient is the HTTP client to use when communicating with the logging endpoint.
LoggingClient *http.Client
// LoggingInterval is the interval at which we submit logs to the logging endpoint.
LoggingInterval time.Duration

// SaturnClient is the HTTP client to use when retrieving content from the Saturn network.
SaturnClient *http.Client
// Client is the HTTP client to use when retrieving content from upstream nodes.
Client *http.Client
ExtraHeaders *http.Header

// DoValidation is used to determine if we should validate the blocks recieved from the Saturn network.
// DoValidation is used to determine if we should validate the blocks recieved from the upstream.
DoValidation bool

// If set, AffinityKey is used instead of the block CID as the key on the
// Saturn node pool to determine which Saturn node to retrieve the block from.
// pool to determine which upstream to retrieve the request from.
// NOTE: If gateway.ContentPathKey is present in request context,
// it will be used as AffinityKey automatically.
AffinityKey string

// PoolRefresh is the interval at which we refresh the pool of Saturn nodes.
// PoolRefresh is the interval at which we refresh the pool of upstreams from the orchestrator.
PoolRefresh time.Duration

// PoolTargetSize is a baseline size for the pool - the pool will accept decrements in performance to reach maintain at least this size.
PoolTargetSize int

// MirrorFraction is what fraction of requests will be mirrored to another random node in order to track metrics / determine the current best nodes.
MirrorFraction float64

// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from the Saturn network before failing.
// MaxRetrievalAttempts determines the number of times we will attempt to retrieve a block from upstreams before failing.
MaxRetrievalAttempts int

// MaxFetchFailuresBeforeCoolDown is the maximum number of retrieval failures across the pool for a url before we auto-reject subsequent
Expand All @@ -71,103 +76,44 @@ type Config struct {
// before we start making retrieval attempts for it.
FetchKeyCoolDownDuration time.Duration

// SaturnNodeCoolOff is the cool off duration for a saturn node once we determine that we shouldn't be sending requests to it for a while.
SaturnNodeCoolOff time.Duration
// CoolOff is the cool off duration for a node once we determine that we shouldn't be sending requests to it for a while.
CoolOff time.Duration

TieredHashingOpts []tieredhashing.Option
// Harness is an internal test harness that is set during testing.
Harness *state.State

// ComplianceCidPeriod controls how many requests caboose makes on average before requesting a compliance cid
ComplianceCidPeriod int64
}

const DefaultLoggingInterval = 5 * time.Second
const DefaultSaturnOrchestratorRequestTimeout = 30 * time.Second
const DefaultOrchestratorRequestTimeout = 30 * time.Second

const DefaultSaturnBlockRequestTimeout = 19 * time.Second
const DefaultSaturnCarRequestTimeout = 30 * time.Minute
const DefaultBlockRequestTimeout = 19 * time.Second
const DefaultCarRequestTimeout = 30 * time.Minute

// default retries before failure unless overridden by MaxRetrievalAttempts
const defaultMaxRetries = 3

// default percentage of requests to mirror for tracking how nodes perform unless overridden by MirrorFraction
const defaultMirrorFraction = 0.01

const maxBlockSize = 4194305 // 4 Mib + 1 byte
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes/nearby?count=200"
const DefaultOrchestratorEndpoint = "https://orchestrator.strn.pl/nodes?maxNodes=200"
const DefaultPoolRefreshInterval = 5 * time.Minute
const DefaultPoolTargetSize = 30

const DefaultComplianceCidPeriod = int64(100)

// we cool off sending requests to Saturn for a cid for a certain duration
// we cool off sending requests for a cid for a certain duration
// if we've seen a certain number of failures for it already in a given duration.
// NOTE: before getting creative here, make sure you dont break end user flow
// described in https://github.com/ipni/storetheindex/pull/1344
const defaultMaxFetchFailures = 3 * defaultMaxRetries // this has to fail more than DefaultMaxRetries done for a single gateway request
const defaultFetchKeyCoolDownDuration = 1 * time.Minute // how long will a sane person wait and stare at blank screen with "retry later" error before hitting F5?

// we cool off sending requests to a Saturn node if it returns transient errors rather than immediately downvoting it;
// we cool off sending requests to a node if it returns transient errors rather than immediately downvoting it;
// however, only upto a certain max number of cool-offs.
const defaultSaturnNodeCoolOff = 5 * time.Minute

var ErrNotImplemented error = errors.New("not implemented")
var ErrNoBackend error = errors.New("no available saturn backend")
var ErrContentProviderNotFound error = errors.New("saturn failed to find content providers")
var ErrSaturnTimeout error = errors.New("saturn backend timed out")

type ErrSaturnTooManyRequests struct {
Node string
retryAfter time.Duration
}

func (e *ErrSaturnTooManyRequests) Error() string {
return fmt.Sprintf("saturn node %s returned Too Many Requests error, please retry after %s", e.Node, humanRetry(e.retryAfter))
}

func (e *ErrSaturnTooManyRequests) RetryAfter() time.Duration {
return e.retryAfter
}

type ErrCoolDown struct {
Cid cid.Cid
Path string
retryAfter time.Duration
}

func (e *ErrCoolDown) Error() string {
switch true {
case e.Cid != cid.Undef && e.Path != "":
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q and Path %q, please retry after %s", e.Cid, e.Path, humanRetry(e.retryAfter))
case e.Path != "":
return fmt.Sprintf("multiple saturn retrieval failures seen for Path %q, please retry after %s", e.Path, humanRetry(e.retryAfter))
case e.Cid != cid.Undef:
return fmt.Sprintf("multiple saturn retrieval failures seen for CID %q, please retry after %s", e.Cid, humanRetry(e.retryAfter))
default:
return fmt.Sprintf("multiple saturn retrieval failures for unknown CID/Path (BUG), please retry after %s", humanRetry(e.retryAfter))
}
}

func (e *ErrCoolDown) RetryAfter() time.Duration {
return e.retryAfter
}

func humanRetry(d time.Duration) string {
return d.Truncate(time.Second).String()
}

// ErrPartialResponse can be returned from a DataCallback to indicate that some of the requested resource
// was successfully fetched, and that instead of retrying the full resource, that there are
// one or more more specific resources that should be fetched (via StillNeed) to complete the request.
type ErrPartialResponse struct {
error
StillNeed []string
}

func (epr ErrPartialResponse) Error() string {
if epr.error != nil {
return fmt.Sprintf("partial response: %s", epr.error.Error())
}
return "caboose received a partial response"
}

// ErrInvalidResponse can be returned from a DataCallback to indicate that the data provided for the
// requested resource was explicitly 'incorrect' - that blocks not in the requested dag, or non-car-conforming
// data was returned.
type ErrInvalidResponse error
const defaultNodeCoolOff = 5 * time.Minute

type Caboose struct {
config *Config
Expand All @@ -189,28 +135,40 @@ func NewCaboose(config *Config) (*Caboose, error) {
config.MaxFetchFailuresBeforeCoolDown = defaultMaxFetchFailures
}

if config.SaturnNodeCoolOff == 0 {
config.SaturnNodeCoolOff = defaultSaturnNodeCoolOff
if config.CoolOff == 0 {
config.CoolOff = defaultNodeCoolOff
}
if config.MirrorFraction == 0 {
config.MirrorFraction = defaultMirrorFraction
}
if override := os.Getenv(BackendOverrideKey); len(override) > 0 {
config.OrchestratorOverride = strings.Split(override, ",")
var overrideNodes []state.NodeInfo
err := json.Unmarshal([]byte(override), &overrideNodes)
if err != nil {
goLogger.Warnf("Error parsing BackendOverrideKey:", "err", err)
return nil, err
}
config.OrchestratorOverride = overrideNodes
}
if config.PoolTargetSize == 0 {
config.PoolTargetSize = DefaultPoolTargetSize
}

logger := newLogger(config)
c := Caboose{
config: config,
pool: newPool(config),
logger: newLogger(config),
pool: newPool(config, logger),
logger: logger,
}
c.pool.logger = c.logger

if c.config.SaturnClient == nil {
c.config.SaturnClient = &http.Client{
Timeout: DefaultSaturnCarRequestTimeout,
if c.config.Client == nil {
c.config.Client = &http.Client{
Timeout: DefaultCarRequestTimeout,
}
}

c.config.Client.Transport = otelhttp.NewTransport(c.config.Client.Transport)

if c.config.OrchestratorEndpoint == nil {
var err error
c.config.OrchestratorEndpoint, err = url.Parse(DefaultOrchestratorEndpoint)
Expand All @@ -219,6 +177,10 @@ func NewCaboose(config *Config) (*Caboose, error) {
}
}

if c.config.ComplianceCidPeriod == 0 {
c.config.ComplianceCidPeriod = DefaultComplianceCidPeriod
}

if c.config.PoolRefresh == 0 {
c.config.PoolRefresh = DefaultPoolRefreshInterval
}
Expand All @@ -227,6 +189,13 @@ func NewCaboose(config *Config) (*Caboose, error) {
c.config.MaxRetrievalAttempts = defaultMaxRetries
}

// Set during testing to leak internal state to the harness.
if c.config.Harness != nil {
c.config.Harness.ActiveNodes = c.pool.ActiveNodes
c.config.Harness.AllNodes = c.pool.AllNodes
c.config.Harness.PoolController = c.pool
}

// start the pool
c.pool.Start()

Expand All @@ -238,38 +207,47 @@ var _ ipfsblockstore.Blockstore = (*Caboose)(nil)

func (c *Caboose) Close() {
c.pool.Close()
c.logger.Close()
if c.logger != nil {
c.logger.Close()
}
}

// Fetch allows fetching car archives by a path of the form `/ipfs/<cid>[/path/to/file]`
func (c *Caboose) Fetch(ctx context.Context, path string, cb DataCallback) error {
traceID := requestcontext.IDFromContext(ctx)
tid, err := trace.TraceIDFromHex(traceID)

ctx, span := spanTrace(ctx, "Fetch", trace.WithAttributes(attribute.String("path", path)))
defer span.End()

return c.pool.fetchResourceWith(ctx, path, cb, c.getAffinity(ctx))
if err == nil {
sc := trace.NewSpanContext(trace.SpanContextConfig{
TraceID: tid,
SpanID: span.SpanContext().SpanID(),
Remote: true,
})
ctx = trace.ContextWithRemoteSpanContext(ctx, sc)
}

return c.pool.fetchResourceWith(ctx, path, cb, c.GetAffinity(ctx))
}

func (c *Caboose) Has(ctx context.Context, it cid.Cid) (bool, error) {
ctx, span := spanTrace(ctx, "Has", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return false, err
}
return blk != nil, nil
}

// for testing only
func (c *Caboose) GetPoolPerf() map[string]*tieredhashing.NodePerf {
return c.pool.th.GetPerf()
}

func (c *Caboose) Get(ctx context.Context, it cid.Cid) (blocks.Block, error) {
ctx, span := spanTrace(ctx, "Get", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return nil, err
}
Expand All @@ -281,14 +259,14 @@ func (c *Caboose) GetSize(ctx context.Context, it cid.Cid) (int, error) {
ctx, span := spanTrace(ctx, "GetSize", trace.WithAttributes(attribute.Stringer("cid", it)))
defer span.End()

blk, err := c.pool.fetchBlockWith(ctx, it, c.getAffinity(ctx))
blk, err := c.pool.fetchBlockWith(ctx, it, c.GetAffinity(ctx))
if err != nil {
return 0, err
}
return len(blk.RawData()), nil
}

func (c *Caboose) getAffinity(ctx context.Context) string {
func (c *Caboose) GetAffinity(ctx context.Context) string {
// https://github.com/ipfs/bifrost-gateway/issues/53#issuecomment-1442732865
if affG := ctx.Value(gateway.ContentPathKey); affG != nil {
contentPath := affG.(ipath.Path).String()
Expand Down
Loading