Skip to content

Commit

Permalink
Add error reporting to progress report API call (#44)
Browse files Browse the repository at this point in the history
This PR makes it so we collect errors and send them to Dune as part of
the progress report request.

I added two models in this PR, one in the `models` package and one in
the DuneAPI package. This means we have three structs for representing
the same kind of error (there's also one in `ingester). Is that too
much? 😅
  • Loading branch information
vegarsti authored Jun 28, 2024
1 parent 68b10f4 commit 85403d1
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 11 deletions.
21 changes: 16 additions & 5 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
if resp == nil {
log.Warn("Retrying request", "error", err)
log.Warn("Retrying request to Dune API", "error", err)
} else {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
log.Warn("Retrying request to Dune API", "statusCode", resp.Status, "error", err)
}
}
return yes, err2
Expand Down Expand Up @@ -214,7 +214,7 @@ func (c *client) Close() error {
}

func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error {
var request BlockchainProgress
var request PostBlockchainProgressRequest
var err error
var responseStatus string
var responseBody string
Expand All @@ -234,14 +234,25 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
c.log.Info("Sent progress report",
"lastIngestedBlockNumber", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"errors", len(request.Errors),
"duration", time.Since(start),
)
}
}()

request = BlockchainProgress{
errors := make([]BlockchainError, len(progress.Errors))
for i, e := range progress.Errors {
errors[i] = BlockchainError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error,
Source: e.Source,
}
}
request = PostBlockchainProgressRequest{
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
Errors: errors,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
payload, err := json.Marshal(request)
Expand Down Expand Up @@ -275,7 +286,7 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
}

func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) {
var response BlockchainProgress
var response GetBlockchainProgressResponse
var err error
var responseStatus string
start := time.Now()
Expand Down
18 changes: 16 additions & 2 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"sort"
"strings"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
)
Expand Down Expand Up @@ -58,11 +59,24 @@ type BlockchainIngestRequest struct {
Payload []byte
}

type BlockchainProgress struct {
type GetBlockchainProgressResponse struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
}

func (p *BlockchainProgress) String() string {
func (p *GetBlockchainProgressResponse) String() string {
return fmt.Sprintf("%+v", *p)
}

type PostBlockchainProgressRequest struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number,omitempty"`
LatestBlockNumber int64 `json:"latest_block_number,omitempty"`
Errors []BlockchainError `json:"errors,omitempty"`
}

type BlockchainError struct {
Timestamp time.Time `json:"timestamp"`
BlockNumbers string `json:"block_numbers"`
Error string `json:"error"`
Source string `json:"source"`
}
4 changes: 2 additions & 2 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func NewClient(log *slog.Logger, cfg Config) (*rpcClient, error) { // revive:dis
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
if resp == nil {
log.Warn("Retrying request", "error", err)
log.Warn("Retrying request to RPC client", "error", err)
} else {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
log.Warn("Retrying request to RPC client", "statusCode", resp.Status, "error", err)
}
}
return yes, err2
Expand Down
22 changes: 22 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,28 @@ type Info struct {
DuneErrors []ErrorInfo
}

// Errors returns a combined list of errors from RPC requests and Dune requests, for use in progress reporting
func (info Info) Errors() []models.BlockchainIndexError {
errors := make([]models.BlockchainIndexError, 0, len(info.RPCErrors)+len(info.DuneErrors))
for _, e := range info.RPCErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "rpc",
})
}
for _, e := range info.DuneErrors {
errors = append(errors, models.BlockchainIndexError{
Timestamp: e.Timestamp,
BlockNumbers: e.BlockNumbers,
Error: e.Error.Error(),
Source: "dune",
})
}
return errors
}

type ErrorInfo struct {
Timestamp time.Time
BlockNumbers string
Expand Down
7 changes: 6 additions & 1 deletion ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -212,15 +212,20 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
previousIngested = lastIngested
previousTime = tNow

// TODO: include errors in the report, reset the error list
err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
Errors: i.info.Errors(),
})
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
} else {
i.log.Debug("Posted progress report")
// Reset errors after reporting
i.info.RPCErrors = []ErrorInfo{}
i.info.DuneErrors = []ErrorInfo{}
}
}
}
Expand Down
14 changes: 13 additions & 1 deletion ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ingester
import (
"context"
"fmt"
"strings"
"sync/atomic"
"time"

Expand Down Expand Up @@ -122,7 +123,18 @@ func (i *ingester) trySendBlockBatch(
i.log.Info("SendBlocks: Context canceled, stopping")
return nextBlockToSend, nil
}
// TODO: handle errors of duneAPI, save the blockRange impacted and report this back for later retries

// Store error for reporting
blocknumbers := make([]string, len(blockBatch))
for i, block := range blockBatch {
blocknumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
Error: err,
BlockNumbers: strings.Join(blocknumbers, ","),
})

err := errors.Errorf("failed to send batch: %w", err)
i.log.Error("SendBlocks: Failed to send batch, exiting", "error", err)
return nextBlockToSend, err
Expand Down
12 changes: 12 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,20 @@
package models

import (
"time"
)

type BlockchainIndexProgress struct {
BlockchainName string
EVMStack string
LastIngestedBlockNumber int64
LatestBlockNumber int64
Errors []BlockchainIndexError
}

type BlockchainIndexError struct {
Timestamp time.Time
BlockNumbers string
Error string
Source string
}

0 comments on commit 85403d1

Please sign in to comment.