Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Get and Post for ProgressReport #16

Merged
merged 1 commit into from
Jun 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 140 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"

Expand All @@ -22,6 +24,12 @@ type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(ctx context.Context, payload models.RPCBlock) error

// GetProgressReport gets a progress report from DuneAPI
GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error)

// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -155,3 +163,135 @@ func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
func (c *client) Close() error {
return c.compressor.Close()
}

func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error {
var request BlockchainProgress
var err error
var responseStatus string
var responseBody string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Sending progress report failed",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
"responseBody", responseBody,
)
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
payload, err := json.Marshal(progress)
if err != nil {
return err
}
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
responseStatus = resp.Status

if resp.StatusCode != http.StatusOK {
bs, err := io.ReadAll(resp.Body)
responseBody = string(bs)
if err != nil {
return err
}
err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody)
return err
}

return nil
}

func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) {
var response BlockchainProgress
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting progress report failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got progress report",
"progress", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
var errorResp errorResponse
err = json.Unmarshal(responseBody, &errorResp)
if err != nil {
return nil, err
}
err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error)
// No progress yet
if resp.StatusCode == http.StatusNotFound {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice 👍

return &models.BlockchainIndexProgress{
BlockchainName: c.cfg.BlockchainName,
EVMStack: c.cfg.Stack.String(),
LastIngestedBlockNumber: 0,
LatestBlockNumber: 0,
}, nil
}
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

progress := &models.BlockchainIndexProgress{
BlockchainName: c.cfg.BlockchainName,
EVMStack: c.cfg.Stack.String(),
LastIngestedBlockNumber: response.LastIngestedBlockNumber,
LatestBlockNumber: response.LatestBlockNumber,
}
return progress, nil
}
13 changes: 13 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,16 @@ type BlockchainIngestRequest struct {
IdempotencyKey string
Payload []byte
}

type BlockchainProgress struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number"`
LatestBlockNumber int64 `json:"latest_block_number"`
}
helanto marked this conversation as resolved.
Show resolved Hide resolved

func (p *BlockchainProgress) String() string {
return fmt.Sprintf("%+v", *p)
}

type errorResponse struct {
Error string `json:"error"`
}
2 changes: 2 additions & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ func main() {
MaxBatchSize: 1,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
},
)

Expand Down
2 changes: 2 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ type Config struct {
MaxBatchSize int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
}

type Info struct {
Expand Down
96 changes: 95 additions & 1 deletion mocks/duneapi/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package models

type BlockchainIndexProgress struct {
BlockchainName string
EVMStack string
LastIngestedBlockNumber int64
LatestBlockNumber int64
}
Loading