diff --git a/cmd/substreams/root.go b/cmd/substreams/root.go index eed811ef..a49a7871 100644 --- a/cmd/substreams/root.go +++ b/cmd/substreams/root.go @@ -14,7 +14,7 @@ var rootCmd = &cobra.Command{ Long: "Any place where is specified, a 'substreams.yaml', a local '.spkg' file or an https://...spkg file can be specified", SilenceUsage: true, PersistentPreRun: func(cmd *cobra.Command, _ []string) { - setup(cmd, zapcore.WarnLevel) + Setup(cmd, zapcore.WarnLevel) }, } diff --git a/cmd/substreams/service-serve.go b/cmd/substreams/service-serve.go index 7c32e610..3eed0717 100644 --- a/cmd/substreams/service-serve.go +++ b/cmd/substreams/service-serve.go @@ -38,7 +38,7 @@ var serveCmd = &cobra.Command{ Listens for "deploy" requests, allowing you to test your deployable units to a local docker-based dev environment. `), PersistentPreRun: func(cmd *cobra.Command, _ []string) { - setup(cmd, zapcore.InfoLevel) + Setup(cmd, zapcore.InfoLevel) }, RunE: serveE, Args: cobra.ExactArgs(0), diff --git a/cmd/substreams/setup.go b/cmd/substreams/setup.go index 69e50c1e..11a0b1f8 100644 --- a/cmd/substreams/setup.go +++ b/cmd/substreams/setup.go @@ -12,7 +12,7 @@ import ( "go.uber.org/zap/zapcore" ) -func setup(cmd *cobra.Command, loglevel zapcore.Level) { +func Setup(cmd *cobra.Command, loglevel zapcore.Level) { setupProfiler() manifest.IPFSURL = sflags.MustGetString(cmd, "ipfs-url") logging.InstantiateLoggers(logging.WithLogLevelSwitcherServerAutoStart(), logging.WithDefaultLevel(loglevel)) diff --git a/cmd/substreams/tools.go b/cmd/substreams/tools.go index a7078354..ca6b5331 100644 --- a/cmd/substreams/tools.go +++ b/cmd/substreams/tools.go @@ -1,7 +1,14 @@ package main -import "github.com/streamingfast/substreams/tools" +import ( + "github.com/spf13/cobra" + "github.com/streamingfast/substreams/tools" + "go.uber.org/zap/zapcore" +) func init() { + tools.Cmd.PersistentPreRun = func(cmd *cobra.Command, _ []string) { + Setup(cmd, zapcore.InfoLevel) + } rootCmd.AddCommand(tools.Cmd) } diff --git a/docs/release-notes/change-log.md b/docs/release-notes/change-log.md index fae2e785..a2622557 100644 --- a/docs/release-notes/change-log.md +++ b/docs/release-notes/change-log.md @@ -17,6 +17,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), * Fix detection of accepted gzip compression when multiple values are sent in the `Grpc-Accept-Encoding` header (ex: Python library) * Allow setting subscription channel max capacity via `SOURCE_CHAN_SIZE` env var (default: 100) +### Client-side + +* improvements to 'tools prometheus-explorer' + - change flags `lookup_interval` and `lookup_timeout` to `--interval` and `--timeout` + - now support relative block (default is now: -1) and does not use 'final-blocks-only' flag on request + - add `--max-freshness` flag to check for block age (when using relative block) + - add `substreams_healthcheck_block_age_ms` prometheus metric + - `--block-height` is now a flag instead of a positional argument + - improve logging + - removed "3 retries" that were built in and causing more confusion + ## v1.11.3 ### Server-side diff --git a/tools/prometheus-exporter.go b/tools/prometheus-exporter.go index 3bd23688..9753d550 100644 --- a/tools/prometheus-exporter.go +++ b/tools/prometheus-exporter.go @@ -3,7 +3,6 @@ package tools import ( "context" "fmt" - "io" "net/http" "strconv" "strings" @@ -29,18 +28,21 @@ import ( var status = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "substreams_healthcheck_status", Help: "Either 1 for successful subtreams request, or 0 for failure"}, []string{"endpoint"}) var requestDurationMs = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "substreams_healthcheck_duration_ms", Help: "Request full processing time in millisecond"}, []string{"endpoint"}) +var blockAgeMs = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "substreams_healthcheck_block_age_ms", Help: "Age of returned block"}, []string{"endpoint"}) + +var lastStatus = map[string]bool{} +var lock = &sync.Mutex{} var prometheusCmd = &cobra.Command{ - Use: "prometheus-exporter ]],[,...]]> [] ", + Use: "prometheus-exporter ]],[,...]]> ", Short: "run substreams client periodically on a single block, exporting the values in prometheus format", Long: cli.Dedent(` - Run substreams client periodically on a single block, exporting the values in prometheus format. The manifest is optional as it will try to find a file named - 'substreams.yaml' in current working directory if nothing entered. You may enter a directory that contains a 'substreams.yaml' - file in place of ', or a link to a remote .spkg file, using urls gs://, http(s)://, ipfs://, etc.'. + Run substreams client periodically on a single block, exporting the values in prometheus format. + The manifest can be a local file or a URL to an spkg. You can specify a start-block on some endpoints by appending '@' to the endpoint URL. `), RunE: runPrometheus, - Args: cobra.RangeArgs(3, 4), + Args: cobra.ExactArgs(3), SilenceUsage: true, } @@ -50,8 +52,10 @@ func init() { prometheusCmd.Flags().String("substreams-api-key-envvar", "SUBSTREAMS_API_KEY", "Name of variable containing Substreams Api Key") prometheusCmd.Flags().BoolP("insecure", "k", false, "Skip certificate validation on GRPC connection") prometheusCmd.Flags().BoolP("plaintext", "p", false, "Establish GRPC connection in plaintext") - prometheusCmd.Flags().Duration("lookup_interval", time.Second*20, "endpoints will be polled at this interval") - prometheusCmd.Flags().Duration("lookup_timeout", time.Second*10, "endpoints will be considered 'failing' if they don't complete in that duration") + prometheusCmd.Flags().Int64("block-height", -1, "Block number to request (defaults to -1, which means the HEAD)") + prometheusCmd.Flags().Duration("max-freshness", time.Minute*2, "(only used if block-height is relative, i.e. below 0) check the age of the received blocks, fail an endpoint if it is older than this duration") + prometheusCmd.Flags().Duration("interval", time.Second*20, "endpoints will be polled at this interval") + prometheusCmd.Flags().Duration("timeout", time.Second*10, "endpoints will be considered 'failing' if they don't complete in that duration") Cmd.AddCommand(prometheusCmd) } @@ -59,18 +63,10 @@ func init() { func runPrometheus(cmd *cobra.Command, args []string) error { endpoints := strings.Split(args[0], ",") - manifestPath := "" - if len(args) == 4 { - manifestPath = args[1] - args = args[1:] - } - moduleName := args[1] - blockHeight := args[2] + manifestPath := args[1] + moduleName := args[2] - blockNum, err := strconv.ParseInt(blockHeight, 10, 64) - if err != nil { - return err - } + blockNum := sflags.MustGetInt64(cmd, "block-height") addr := sflags.MustGetString(cmd, "listen-addr") manifestReader, err := manifest.NewReader(manifestPath) @@ -92,20 +88,18 @@ func runPrometheus(cmd *cobra.Command, args []string) error { authToken, authType := GetAuth(cmd, "substreams-api-key-envvar", "substreams-api-token-envvar") insecure := sflags.MustGetBool(cmd, "insecure") plaintext := sflags.MustGetBool(cmd, "plaintext") - interval := sflags.MustGetDuration(cmd, "lookup_interval") - timeout := sflags.MustGetDuration(cmd, "lookup_timeout") + interval := sflags.MustGetDuration(cmd, "interval") + timeout := sflags.MustGetDuration(cmd, "timeout") + maxFreshness := sflags.MustGetDuration(cmd, "max-freshness") for _, endpoint := range endpoints { - startBlock := blockNum - if parts := strings.Split(endpoint, "@"); len(parts) == 2 { - start, err := strconv.ParseUint(parts[1], 10, 64) + start, err := strconv.ParseInt(parts[1], 10, 64) if err != nil { return fmt.Errorf("invalid endpoint @startBlock format for %q: %w", endpoint, err) } - endpoint = parts[0] - startBlock = int64(start) + startBlock = start } substreamsClientConfig := client.NewSubstreamsClientConfig( @@ -115,12 +109,18 @@ func runPrometheus(cmd *cobra.Command, args []string) error { insecure, plaintext, ) - go launchSubstreamsPoller(endpoint, substreamsClientConfig, pkgBundle.Package.Modules, outputStreamName, startBlock, interval, timeout) + var fresh *time.Duration + if startBlock < 0 { + fresh = &maxFreshness + } + + go launchSubstreamsPoller(endpoint, substreamsClientConfig, pkgBundle.Package.Modules, outputStreamName, startBlock, interval, timeout, fresh) } promReg := prometheus.NewRegistry() promReg.MustRegister(status) promReg.MustRegister(requestDurationMs) + promReg.MustRegister(blockAgeMs) handler := promhttp.HandlerFor( promReg, @@ -136,27 +136,31 @@ func runPrometheus(cmd *cobra.Command, args []string) error { return nil } -func markSuccess(endpoint string, begin time.Time, counter *failCounter) { - counter.Reset() - +func markSuccess(endpoint string, begin time.Time) { + lock.Lock() + defer lock.Unlock() + if !lastStatus[endpoint] { + zlog.Info("endpoint now marked as available", zap.String("endpoint", endpoint)) + } + lastStatus[endpoint] = true status.With(prometheus.Labels{"endpoint": endpoint}).Set(1) requestDurationMs.With(prometheus.Labels{"endpoint": endpoint}).Set(float64(time.Since(begin).Milliseconds())) } -func maybeMarkFailure(endpoint string, begin time.Time, counter *failCounter) { - counter.Inc() - if counter.Get() < 3 { - return +func markFailure(endpoint string, begin time.Time, err error) { + lock.Lock() + defer lock.Unlock() + if val, ok := lastStatus[endpoint]; !ok || val { + zlog.Info("endpoint now marked as unavailable", zap.String("endpoint", endpoint), zap.Error(err)) + lastStatus[endpoint] = false } - status.With(prometheus.Labels{"endpoint": endpoint}).Set(0) requestDurationMs.With(prometheus.Labels{"endpoint": endpoint}).Set(float64(time.Since(begin).Milliseconds())) } -func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.SubstreamsClientConfig, modules *pbsubstreams.Modules, outputStreamName string, blockNum int64, pollingInterval, pollingTimeout time.Duration) { +func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.SubstreamsClientConfig, modules *pbsubstreams.Modules, outputStreamName string, blockNum int64, pollingInterval, pollingTimeout time.Duration, maxFreshness *time.Duration) { sleep := time.Duration(0) - counter := newFailCounter() for { time.Sleep(sleep) sleep = pollingInterval @@ -166,7 +170,7 @@ func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.Subs ssClient, connClose, callOpts, headers, err := client.NewSubstreamsClient(substreamsClientConfig) if err != nil { zlog.Error("substreams client setup", zap.Error(err)) - maybeMarkFailure(endpoint, begin, counter) + markFailure(endpoint, begin, err) cancel() continue } @@ -175,48 +179,69 @@ func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.Subs ctx = metadata.AppendToOutgoingContext(ctx, headers.ToArray()...) } + var stopBlockNum uint64 + if blockNum > 0 { + stopBlockNum = uint64(blockNum + 1) + } subReq := &pbsubstreamsrpc.Request{ - StartBlockNum: blockNum, - StopBlockNum: uint64(blockNum + 1), - FinalBlocksOnly: true, - Modules: modules, - OutputModule: outputStreamName, + StartBlockNum: blockNum, + StopBlockNum: stopBlockNum, + Modules: modules, + OutputModule: outputStreamName, } if err := subReq.Validate(); err != nil { zlog.Error("validate request", zap.Error(err)) - maybeMarkFailure(endpoint, begin, counter) + markFailure(endpoint, begin, err) connClose() cancel() continue } callOpts = append(callOpts, grpc.WaitForReady(false)) + zlog.Debug("calling sf.substreams.rpc.v2.Stream/Blocks", zap.String("endpoint", endpoint), zap.String("output_module", outputStreamName), zap.Int64("start_block", blockNum), zap.Uint64("stop_block", stopBlockNum)) cli, err := ssClient.Blocks(ctx, subReq, callOpts...) if err != nil { - zlog.Error("call sf.substreams.rpc.v2.Stream/Blocks", zap.Error(err)) - maybeMarkFailure(endpoint, begin, counter) + zlog.Error("call sf.substreams.rpc.v2.Stream/Blocks", zap.String("endpoint", endpoint), zap.Error(err)) + markFailure(endpoint, begin, err) connClose() cancel() continue } - var gotResp bool + forloop: for { resp, err := cli.Recv() if resp != nil { switch resp.Message.(type) { case *pbsubstreamsrpc.Response_BlockScopedData: - fmt.Println(resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Output) - gotResp = true + if maxFreshness == nil { + zlog.Debug("marking endpoint with success", + zap.String("endpoint", endpoint), + zap.Duration("duration", time.Since(begin)), + zap.Uint64("block_num", resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Clock.Number), + ) + markSuccess(endpoint, begin) + break forloop + } + blockTime := resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Clock.Timestamp.AsTime() + blockAgeMs.With(prometheus.Labels{"endpoint": endpoint}).Set(float64(time.Since(blockTime).Milliseconds())) + if age := time.Since(blockTime); age > *maxFreshness { + markFailure(endpoint, begin, fmt.Errorf("block is too old: %s", age)) + zlog.Debug("marking endpoint with failure because of freshness", zap.String("endpoint", endpoint), zap.Duration("duration", time.Since(begin)), zap.Duration("block_age", time.Since(blockTime))) + } else { + markSuccess(endpoint, begin) + zlog.Debug("marking endpoint with success", + zap.String("endpoint", endpoint), + zap.Duration("duration", time.Since(begin)), + zap.Duration("block_age", time.Since(blockTime)), + zap.Uint64("block_num", resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Clock.Number), + ) + } + break forloop } } if err != nil { - if err == io.EOF && gotResp { - markSuccess(endpoint, begin, counter) - } else { - zlog.Error("received error from substreams", zap.Error(err)) - maybeMarkFailure(endpoint, begin, counter) - } + markFailure(endpoint, begin, err) break } } @@ -225,33 +250,3 @@ func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.Subs cancel() } } - -type failCounter struct { - failCount int - mu sync.Mutex -} - -func newFailCounter() *failCounter { - return &failCounter{} -} - -func (f *failCounter) Inc() { - f.mu.Lock() - defer f.mu.Unlock() - - f.failCount++ -} - -func (f *failCounter) Get() int { - f.mu.Lock() - defer f.mu.Unlock() - - return f.failCount -} - -func (f *failCounter) Reset() { - f.mu.Lock() - defer f.mu.Unlock() - - f.failCount = 0 -}