Skip to content

Commit

Permalink
Add Get and Post for ProgressReport
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 11, 2024
1 parent 3ff81f1 commit e3cd9d3
Show file tree
Hide file tree
Showing 5 changed files with 258 additions and 1 deletion.
136 changes: 136 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
"time"

Expand All @@ -22,6 +24,12 @@ type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(ctx context.Context, payload models.RPCBlock) error

// GetProgressReport gets a progress report from DuneAPI
GetProgressReport(ctx context.Context) (*models.Progress, error)

// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.Progress) error

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -155,3 +163,131 @@ func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
func (c *client) Close() error {
return c.compressor.Close()
}

func (c *client) PostProgressReport(ctx context.Context, progress models.Progress) error {
var request BlockchainProgress
var err error
var responseStatus string
var responseBody string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Sending progress report failed",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
"responseBody", responseBody,
)
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
payload, err := json.Marshal(progress)
if err != nil {
return err
}
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
responseStatus = resp.Status

if resp.StatusCode != http.StatusOK {
bs, err := io.ReadAll(resp.Body)
responseBody = string(bs)
if err != nil {
return err
}
err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody)
return err
}

return nil
}

func (c *client) GetProgressReport(ctx context.Context) (*models.Progress, error) {
var response BlockchainProgress
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting progress report failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got progress report",
"progress", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
var errorResp errorResponse
err = json.Unmarshal(responseBody, &errorResp)
if err != nil {
return nil, err
}
err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error)
// No progress yet
if resp.StatusCode == http.StatusNotFound {
return &models.Progress{
LastIngestedBlockNumber: 0,
LatestBlockNumber: 0,
}, nil
}
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

progress := &models.Progress{
LastIngestedBlockNumber: response.LastIngestedBlockNumber,
LatestBlockNumber: response.LatestBlockNumber,
}
return progress, nil
}
13 changes: 13 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,16 @@ type BlockchainIngestRequest struct {
IdempotencyKey string
Payload []byte
}

type BlockchainProgress struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number"`
LatestBlockNumber int64 `json:"latest_block_number"`
}

func (p *BlockchainProgress) String() string {
return fmt.Sprintf("%+v", *p)
}

type errorResponse struct {
Error string `json:"error"`
}
8 changes: 8 additions & 0 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,14 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow

err := i.dune.PostProgressReport(ctx, models.Progress{
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
})
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
}
}
}
}
Expand Down
96 changes: 95 additions & 1 deletion mocks/duneapi/client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions models/progress.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
package models

type Progress struct {
LastIngestedBlockNumber int64
LatestBlockNumber int64
}

0 comments on commit e3cd9d3

Please sign in to comment.