Skip to content

Commit

Permalink
improve 'tools prometheus-explorer'
Browse files Browse the repository at this point in the history
change flags `lookup_interval` and `lookup_timeout` to `--interval` and `--timeout`
now support relative block (default is now: -1) and does not use 'final-blocks-only' flag on request
add `--max-freshness` flag to check for block age (when using relative block)
add `substreams_healthcheck_block_age_ms` prometheus metric
`--block-height` is now a flag instead of a positional argument
improve logging
removed "3 retries" that were built in and causing more confusion
  • Loading branch information
sduchesneau committed Jan 13, 2025
1 parent 36c2750 commit 67cdfa0
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 89 deletions.
2 changes: 1 addition & 1 deletion cmd/substreams/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ var rootCmd = &cobra.Command{
Long: "Any place where <package> is specified, a 'substreams.yaml', a local '.spkg' file or an https://...spkg file can be specified",
SilenceUsage: true,
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
setup(cmd, zapcore.WarnLevel)
Setup(cmd, zapcore.WarnLevel)
},
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/service-serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ var serveCmd = &cobra.Command{
Listens for "deploy" requests, allowing you to test your deployable units to a local docker-based dev environment.
`),
PersistentPreRun: func(cmd *cobra.Command, _ []string) {
setup(cmd, zapcore.InfoLevel)
Setup(cmd, zapcore.InfoLevel)
},
RunE: serveE,
Args: cobra.ExactArgs(0),
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import (
"go.uber.org/zap/zapcore"
)

func setup(cmd *cobra.Command, loglevel zapcore.Level) {
func Setup(cmd *cobra.Command, loglevel zapcore.Level) {
setupProfiler()
manifest.IPFSURL = sflags.MustGetString(cmd, "ipfs-url")
logging.InstantiateLoggers(logging.WithLogLevelSwitcherServerAutoStart(), logging.WithDefaultLevel(loglevel))
Expand Down
9 changes: 8 additions & 1 deletion cmd/substreams/tools.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
package main

import "github.com/streamingfast/substreams/tools"
import (
"github.com/spf13/cobra"
"github.com/streamingfast/substreams/tools"
"go.uber.org/zap/zapcore"
)

func init() {
tools.Cmd.PersistentPreRun = func(cmd *cobra.Command, _ []string) {
Setup(cmd, zapcore.InfoLevel)
}
rootCmd.AddCommand(tools.Cmd)
}
11 changes: 11 additions & 0 deletions docs/release-notes/change-log.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,17 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
* Fix detection of accepted gzip compression when multiple values are sent in the `Grpc-Accept-Encoding` header (ex: Python library)
* Allow setting subscription channel max capacity via `SOURCE_CHAN_SIZE` env var (default: 100)

### Client-side

* improvements to 'tools prometheus-explorer'
- change flags `lookup_interval` and `lookup_timeout` to `--interval` and `--timeout`
- now support relative block (default is now: -1) and does not use 'final-blocks-only' flag on request
- add `--max-freshness` flag to check for block age (when using relative block)
- add `substreams_healthcheck_block_age_ms` prometheus metric
- `--block-height` is now a flag instead of a positional argument
- improve logging
- removed "3 retries" that were built in and causing more confusion

## v1.11.3

### Server-side
Expand Down
165 changes: 80 additions & 85 deletions tools/prometheus-exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package tools
import (
"context"
"fmt"
"io"
"net/http"
"strconv"
"strings"
Expand All @@ -29,18 +28,21 @@ import (

var status = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "substreams_healthcheck_status", Help: "Either 1 for successful subtreams request, or 0 for failure"}, []string{"endpoint"})
var requestDurationMs = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "substreams_healthcheck_duration_ms", Help: "Request full processing time in millisecond"}, []string{"endpoint"})
var blockAgeMs = prometheus.NewGaugeVec(prometheus.GaugeOpts{Name: "substreams_healthcheck_block_age_ms", Help: "Age of returned block"}, []string{"endpoint"})

var lastStatus = map[string]bool{}
var lock = &sync.Mutex{}

var prometheusCmd = &cobra.Command{
Use: "prometheus-exporter <endpoint[,endpoint[,endpoint[@<block_height>]],[,...]]> [<manifest>] <module_name> <block_height>",
Use: "prometheus-exporter <endpoint[,endpoint[,endpoint[@<block_height>]],[,...]]> <manifest> <module_name>",
Short: "run substreams client periodically on a single block, exporting the values in prometheus format",
Long: cli.Dedent(`
Run substreams client periodically on a single block, exporting the values in prometheus format. The manifest is optional as it will try to find a file named
'substreams.yaml' in current working directory if nothing entered. You may enter a directory that contains a 'substreams.yaml'
file in place of '<manifest_file>, or a link to a remote .spkg file, using urls gs://, http(s)://, ipfs://, etc.'.
Run substreams client periodically on a single block, exporting the values in prometheus format.
The manifest can be a local file or a URL to an spkg.
You can specify a start-block on some endpoints by appending '@<block_height>' to the endpoint URL.
`),
RunE: runPrometheus,
Args: cobra.RangeArgs(3, 4),
Args: cobra.ExactArgs(3),
SilenceUsage: true,
}

Expand All @@ -50,27 +52,21 @@ func init() {
prometheusCmd.Flags().String("substreams-api-key-envvar", "SUBSTREAMS_API_KEY", "Name of variable containing Substreams Api Key")
prometheusCmd.Flags().BoolP("insecure", "k", false, "Skip certificate validation on GRPC connection")
prometheusCmd.Flags().BoolP("plaintext", "p", false, "Establish GRPC connection in plaintext")
prometheusCmd.Flags().Duration("lookup_interval", time.Second*20, "endpoints will be polled at this interval")
prometheusCmd.Flags().Duration("lookup_timeout", time.Second*10, "endpoints will be considered 'failing' if they don't complete in that duration")
prometheusCmd.Flags().Int64("block-height", -1, "Block number to request (defaults to -1, which means the HEAD)")
prometheusCmd.Flags().Duration("max-freshness", time.Minute*2, "(only used if block-height is relative, i.e. below 0) check the age of the received blocks, fail an endpoint if it is older than this duration")
prometheusCmd.Flags().Duration("interval", time.Second*20, "endpoints will be polled at this interval")
prometheusCmd.Flags().Duration("timeout", time.Second*10, "endpoints will be considered 'failing' if they don't complete in that duration")

Cmd.AddCommand(prometheusCmd)
}

func runPrometheus(cmd *cobra.Command, args []string) error {

endpoints := strings.Split(args[0], ",")
manifestPath := ""
if len(args) == 4 {
manifestPath = args[1]
args = args[1:]
}
moduleName := args[1]
blockHeight := args[2]
manifestPath := args[1]
moduleName := args[2]

blockNum, err := strconv.ParseInt(blockHeight, 10, 64)
if err != nil {
return err
}
blockNum := sflags.MustGetInt64(cmd, "block-height")
addr := sflags.MustGetString(cmd, "listen-addr")

manifestReader, err := manifest.NewReader(manifestPath)
Expand All @@ -92,20 +88,18 @@ func runPrometheus(cmd *cobra.Command, args []string) error {
authToken, authType := GetAuth(cmd, "substreams-api-key-envvar", "substreams-api-token-envvar")
insecure := sflags.MustGetBool(cmd, "insecure")
plaintext := sflags.MustGetBool(cmd, "plaintext")
interval := sflags.MustGetDuration(cmd, "lookup_interval")
timeout := sflags.MustGetDuration(cmd, "lookup_timeout")
interval := sflags.MustGetDuration(cmd, "interval")
timeout := sflags.MustGetDuration(cmd, "timeout")
maxFreshness := sflags.MustGetDuration(cmd, "max-freshness")
for _, endpoint := range endpoints {

startBlock := blockNum

if parts := strings.Split(endpoint, "@"); len(parts) == 2 {
start, err := strconv.ParseUint(parts[1], 10, 64)
start, err := strconv.ParseInt(parts[1], 10, 64)
if err != nil {
return fmt.Errorf("invalid endpoint @startBlock format for %q: %w", endpoint, err)
}

endpoint = parts[0]
startBlock = int64(start)
startBlock = start
}

substreamsClientConfig := client.NewSubstreamsClientConfig(
Expand All @@ -115,12 +109,18 @@ func runPrometheus(cmd *cobra.Command, args []string) error {
insecure,
plaintext,
)
go launchSubstreamsPoller(endpoint, substreamsClientConfig, pkgBundle.Package.Modules, outputStreamName, startBlock, interval, timeout)
var fresh *time.Duration
if startBlock < 0 {
fresh = &maxFreshness
}

go launchSubstreamsPoller(endpoint, substreamsClientConfig, pkgBundle.Package.Modules, outputStreamName, startBlock, interval, timeout, fresh)
}

promReg := prometheus.NewRegistry()
promReg.MustRegister(status)
promReg.MustRegister(requestDurationMs)
promReg.MustRegister(blockAgeMs)

handler := promhttp.HandlerFor(
promReg,
Expand All @@ -136,27 +136,31 @@ func runPrometheus(cmd *cobra.Command, args []string) error {
return nil
}

func markSuccess(endpoint string, begin time.Time, counter *failCounter) {
counter.Reset()

func markSuccess(endpoint string, begin time.Time) {
lock.Lock()
defer lock.Unlock()
if !lastStatus[endpoint] {
zlog.Info("endpoint now marked as available", zap.String("endpoint", endpoint))
}
lastStatus[endpoint] = true
status.With(prometheus.Labels{"endpoint": endpoint}).Set(1)
requestDurationMs.With(prometheus.Labels{"endpoint": endpoint}).Set(float64(time.Since(begin).Milliseconds()))
}

func maybeMarkFailure(endpoint string, begin time.Time, counter *failCounter) {
counter.Inc()
if counter.Get() < 3 {
return
func markFailure(endpoint string, begin time.Time, err error) {
lock.Lock()
defer lock.Unlock()
if val, ok := lastStatus[endpoint]; !ok || val {
zlog.Info("endpoint now marked as unavailable", zap.String("endpoint", endpoint), zap.Error(err))
lastStatus[endpoint] = false
}

status.With(prometheus.Labels{"endpoint": endpoint}).Set(0)
requestDurationMs.With(prometheus.Labels{"endpoint": endpoint}).Set(float64(time.Since(begin).Milliseconds()))
}

func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.SubstreamsClientConfig, modules *pbsubstreams.Modules, outputStreamName string, blockNum int64, pollingInterval, pollingTimeout time.Duration) {
func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.SubstreamsClientConfig, modules *pbsubstreams.Modules, outputStreamName string, blockNum int64, pollingInterval, pollingTimeout time.Duration, maxFreshness *time.Duration) {

sleep := time.Duration(0)
counter := newFailCounter()
for {
time.Sleep(sleep)
sleep = pollingInterval
Expand All @@ -166,7 +170,7 @@ func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.Subs
ssClient, connClose, callOpts, headers, err := client.NewSubstreamsClient(substreamsClientConfig)
if err != nil {
zlog.Error("substreams client setup", zap.Error(err))
maybeMarkFailure(endpoint, begin, counter)
markFailure(endpoint, begin, err)
cancel()
continue
}
Expand All @@ -175,48 +179,69 @@ func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.Subs
ctx = metadata.AppendToOutgoingContext(ctx, headers.ToArray()...)
}

var stopBlockNum uint64
if blockNum > 0 {
stopBlockNum = uint64(blockNum + 1)
}
subReq := &pbsubstreamsrpc.Request{
StartBlockNum: blockNum,
StopBlockNum: uint64(blockNum + 1),
FinalBlocksOnly: true,
Modules: modules,
OutputModule: outputStreamName,
StartBlockNum: blockNum,
StopBlockNum: stopBlockNum,
Modules: modules,
OutputModule: outputStreamName,
}

if err := subReq.Validate(); err != nil {
zlog.Error("validate request", zap.Error(err))
maybeMarkFailure(endpoint, begin, counter)
markFailure(endpoint, begin, err)
connClose()
cancel()
continue
}
callOpts = append(callOpts, grpc.WaitForReady(false))
zlog.Debug("calling sf.substreams.rpc.v2.Stream/Blocks", zap.String("endpoint", endpoint), zap.String("output_module", outputStreamName), zap.Int64("start_block", blockNum), zap.Uint64("stop_block", stopBlockNum))
cli, err := ssClient.Blocks(ctx, subReq, callOpts...)
if err != nil {
zlog.Error("call sf.substreams.rpc.v2.Stream/Blocks", zap.Error(err))
maybeMarkFailure(endpoint, begin, counter)
zlog.Error("call sf.substreams.rpc.v2.Stream/Blocks", zap.String("endpoint", endpoint), zap.Error(err))
markFailure(endpoint, begin, err)
connClose()
cancel()
continue
}

var gotResp bool
forloop:
for {
resp, err := cli.Recv()
if resp != nil {
switch resp.Message.(type) {
case *pbsubstreamsrpc.Response_BlockScopedData:
fmt.Println(resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Output)
gotResp = true
if maxFreshness == nil {
zlog.Debug("marking endpoint with success",
zap.String("endpoint", endpoint),
zap.Duration("duration", time.Since(begin)),
zap.Uint64("block_num", resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Clock.Number),
)
markSuccess(endpoint, begin)
break forloop
}
blockTime := resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Clock.Timestamp.AsTime()
blockAgeMs.With(prometheus.Labels{"endpoint": endpoint}).Set(float64(time.Since(blockTime).Milliseconds()))
if age := time.Since(blockTime); age > *maxFreshness {
markFailure(endpoint, begin, fmt.Errorf("block is too old: %s", age))
zlog.Debug("marking endpoint with failure because of freshness", zap.String("endpoint", endpoint), zap.Duration("duration", time.Since(begin)), zap.Duration("block_age", time.Since(blockTime)))
} else {
markSuccess(endpoint, begin)
zlog.Debug("marking endpoint with success",
zap.String("endpoint", endpoint),
zap.Duration("duration", time.Since(begin)),
zap.Duration("block_age", time.Since(blockTime)),
zap.Uint64("block_num", resp.Message.(*pbsubstreamsrpc.Response_BlockScopedData).BlockScopedData.Clock.Number),
)
}
break forloop
}
}
if err != nil {
if err == io.EOF && gotResp {
markSuccess(endpoint, begin, counter)
} else {
zlog.Error("received error from substreams", zap.Error(err))
maybeMarkFailure(endpoint, begin, counter)
}
markFailure(endpoint, begin, err)
break
}
}
Expand All @@ -225,33 +250,3 @@ func launchSubstreamsPoller(endpoint string, substreamsClientConfig *client.Subs
cancel()
}
}

type failCounter struct {
failCount int
mu sync.Mutex
}

func newFailCounter() *failCounter {
return &failCounter{}
}

func (f *failCounter) Inc() {
f.mu.Lock()
defer f.mu.Unlock()

f.failCount++
}

func (f *failCounter) Get() int {
f.mu.Lock()
defer f.mu.Unlock()

return f.failCount
}

func (f *failCounter) Reset() {
f.mu.Lock()
defer f.mu.Unlock()

f.failCount = 0
}

0 comments on commit 67cdfa0

Please sign in to comment.