Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

better configs for retries and log when we have errors on requests #33

Merged
merged 1 commit into from
Jun 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 13 additions & 2 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import (

const (
MaxRetries = 20 // try really hard to send the block
MinWaitDur = 100 * time.Millisecond
MaxWaitDur = 5 * time.Second
)

type BlockchainIngester interface {
Expand Down Expand Up @@ -53,8 +55,18 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
httpClient := retryablehttp.NewClient()
httpClient.RetryMax = MaxRetries
httpClient.Logger = log
httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
checkRetry := func(ctx context.Context, resp *http.Response, err error) (bool, error) {
yes, err2 := retryablehttp.DefaultRetryPolicy(ctx, resp, err)
if yes {
log.Warn("Retrying request", "statusCode", resp.Status, "error", err)
}
return yes, err2
}

httpClient.CheckRetry = checkRetry
httpClient.Backoff = retryablehttp.LinearJitterBackoff
httpClient.RetryWaitMin = MinWaitDur
httpClient.RetryWaitMax = MaxWaitDur
return &client{
log: log.With("module", "duneapi"),
httpClient: httpClient,
Expand Down Expand Up @@ -154,7 +166,6 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
if err != nil {
return err
}

return nil
}

Expand Down
10 changes: 3 additions & 7 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,6 @@ func (i *ingester) ConsumeBlocks(
case blockNumber := <-blockNumbers:
startTime := time.Now()

i.log.Info("Getting block by number", "blockNumber", blockNumber)
block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
if errors.Is(err, context.Canceled) {
Expand Down Expand Up @@ -154,7 +153,6 @@ func (i *ingester) ConsumeBlocks(
i.log.Info("ConsumeBlocks: Channel is closed, not sending block to channel", "blockNumber", block.BlockNumber)
return ctx.Err()
case blocks <- block:
i.log.Info("Sent block")
}
}
}
Expand All @@ -178,11 +176,9 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
}

blockMap[block.BlockNumber] = block
i.log.Info("Received block", "blockNumber", block.BlockNumber)

// Send this block only if we have sent all previous blocks
for block, ok := blockMap[next]; ok; block, ok = blockMap[next] {
i.log.Info("SendBlocks: Sending block to DuneAPI", "blockNumber", block.BlockNumber)
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
Expand All @@ -197,7 +193,6 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}

Expand Down Expand Up @@ -250,7 +245,8 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
fields = append(fields, "fallingBehind", fallingBehind)
}
if newDistance > 1 {
fields = append(fields, "distanceFromLatest", newDistance)
etaHours := time.Duration(float64(newDistance) / blocksPerSec * float64(time.Second)).Hours()
fields = append(fields, "hoursToCatchUp", fmt.Sprintf("%.1f", etaHours))
}
if rpcErrors > 0 {
fields = append(fields, "rpcErrors", rpcErrors)
Expand All @@ -259,7 +255,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
fields = append(fields, "duneErrors", duneErrors)
}

i.log.Info("ProgressReport", fields...)
i.log.Info("PROGRESS REPORT", fields...)
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow
Expand Down
Loading