Skip to content

Commit

Permalink
Merge branch 'main' into fixes-to-duneapi-zstd
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti authored Jun 13, 2024
2 parents 4a41474 + 918a5c3 commit a75fae9
Show file tree
Hide file tree
Showing 11 changed files with 454 additions and 101 deletions.
18 changes: 9 additions & 9 deletions .github/workflows/build.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,20 @@ jobs:
steps:
- uses: actions/checkout@v3

# - name: Configure AWS Credentials
# uses: aws-actions/configure-aws-credentials@v1.6.1
# with:
# role-to-assume: arn:aws:iam::118330671040:role/duneapi-ci
# aws-region: ${{ env.AWS_REGION }}
- name: Configure AWS Credentials
uses: aws-actions/configure-aws-credentials@v1.6.1
with:
role-to-assume: arn:aws:iam::118330671040:role/node-indexer
aws-region: ${{ env.AWS_REGION }}

# - name: Login to Amazon ECR
# id: login-ecr
# uses: aws-actions/amazon-ecr-login@v1
- name: Login to Amazon ECR
id: login-ecr
uses: aws-actions/amazon-ecr-login@v1

- env:
GITHUB_TOKEN: ${{ secrets.DUNE_ENG_ACCESS_TOKEN }}
ECR_REGISTRY: public.ecr.aws/duneanalytics
ECR_REPOSITORY: blockchain-ingester
ECR_REPOSITORY: node-indexer
run: |
make image-build
make image-push
5 changes: 1 addition & 4 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
@@ -1,10 +1,7 @@
---
name: "Pull Request"

on:
pull_request:
branches:
- main
on: pull_request

jobs:
test:
Expand Down
8 changes: 4 additions & 4 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
.PHONY: all setup lint build test image-build image-push

APPLICATION := ingester
APPLICATION := indexer
GITHUB_SHA ?= HEAD
REF_TAG := $(shell echo ${GITHUB_REF_NAME} | tr -cd '[:alnum:]')
IMAGE_TAG := ${ECR_REGISTRY}/${ECR_REPOSITORY}:${REF_TAG}-$(shell git rev-parse --short "${GITHUB_SHA}")-${GITHUB_RUN_NUMBER}
Expand All @@ -19,7 +19,7 @@ bin/gofumpt: bin
GOBIN=$(PWD)/bin go install mvdan.cc/gofumpt@v0.6.0

build: lint cmd/main.go
go build -o ingester cmd/main.go
go build -o indexer cmd/main.go

lint: bin/golangci-lint bin/gofumpt
go fmt ./...
Expand All @@ -38,11 +38,11 @@ gen-mocks: bin/moq ./client/jsonrpc/ ./client/duneapi/


image-build:
@echo "# Building ingester docker image..."
@echo "# Building indexer docker image..."
docker build -t $(APPLICATION) -f Dockerfile --build-arg GITHUB_TOKEN=${GITHUB_TOKEN} .

image-push: image-build
@echo "# Pushing ingester docker image..."
@echo "# Pushing indexer docker image..."
docker tag $(APPLICATION) ${IMAGE_TAG}
# docker push ${IMAGE_TAG}
docker rmi ${IMAGE_TAG}
144 changes: 144 additions & 0 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"context"
"encoding/json"
"fmt"
"io"
"log/slog"
"net/http"
"sync"
Expand All @@ -23,6 +24,12 @@ type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(ctx context.Context, payload models.RPCBlock) error

// GetProgressReport gets a progress report from DuneAPI
GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error)

// PostProgressReport sends a progress report to DuneAPI
PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error

// - API to discover the latest block number ingested
// this can also provide "next block ranges" to push to DuneAPI
// - log/metrics on catching up/falling behind, distance from tip of chain
Expand Down Expand Up @@ -159,3 +166,140 @@ func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
func (c *client) Close() error {
return c.compressor.Close()
}

func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error {
var request BlockchainProgress
var err error
var responseStatus string
var responseBody string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Sending progress report failed",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
"responseBody", responseBody,
)
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"duration", time.Since(start),
)
}
}()

request = BlockchainProgress{
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
payload, err := json.Marshal(request)
if err != nil {
return err
}
c.log.Info("Sending request", "url", url, "payload", string(payload))
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer resp.Body.Close()
responseStatus = resp.Status

if resp.StatusCode != http.StatusOK {
bs, err := io.ReadAll(resp.Body)
responseBody = string(bs)
if err != nil {
return err
}
err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody)
return err
}

return nil
}

func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) {
var response BlockchainProgress
var err error
var responseStatus string
start := time.Now()

// Log response
defer func() {
if err != nil {
c.log.Error("Getting progress report failed",
"error", err,
"statusCode", responseStatus,
"duration", time.Since(start),
)
} else {
c.log.Info("Got progress report",
"progress", response.String(),
"duration", time.Since(start),
)
}
}()

url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body
if err != nil {
return nil, err
}
req.Header.Set("x-dune-api-key", c.cfg.APIKey)
req = req.WithContext(ctx)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

if resp.StatusCode != http.StatusOK {
var errorResp errorResponse
err = json.Unmarshal(responseBody, &errorResp)
if err != nil {
return nil, err
}
err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error)
// No progress yet
if resp.StatusCode == http.StatusNotFound {
return &models.BlockchainIndexProgress{
BlockchainName: c.cfg.BlockchainName,
EVMStack: c.cfg.Stack.String(),
LastIngestedBlockNumber: 0,
LatestBlockNumber: 0,
}, nil
}
return nil, err
}

err = json.Unmarshal(responseBody, &response)
if err != nil {
return nil, err
}

progress := &models.BlockchainIndexProgress{
BlockchainName: c.cfg.BlockchainName,
EVMStack: c.cfg.Stack.String(),
LastIngestedBlockNumber: response.LastIngestedBlockNumber,
LatestBlockNumber: response.LatestBlockNumber,
}
return progress, nil
}
13 changes: 13 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,3 +46,16 @@ type BlockchainIngestRequest struct {
IdempotencyKey string
Payload []byte
}

type BlockchainProgress struct {
LastIngestedBlockNumber int64 `json:"last_ingested_block_number"`
LatestBlockNumber int64 `json:"latest_block_number"`
}

func (p *BlockchainProgress) String() string {
return fmt.Sprintf("%+v", *p)
}

type errorResponse struct {
Error string `json:"error"`
}
24 changes: 21 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,22 @@ func main() {
stdlog.Fatal(err)
}

ctx, cancelFn := context.WithCancel(context.Background())

// Get stored progress unless config indicates we should start from 0
var startBlockNumber int64
// Default to -1 to start where the ingester left off
if cfg.BlockHeight == -1 {
progress, err := duneClient.GetProgressReport(ctx)
if err != nil {
stdlog.Fatal(err)
}
startBlockNumber = progress.LastIngestedBlockNumber + 1
} else {
startBlockNumber = cfg.BlockHeight
}

maxCount := int64(0) // 0 means ingest until cancelled
ingester := ingester.New(
logger,
rpcClient,
Expand All @@ -68,18 +84,20 @@ func main() {
MaxBatchSize: 1,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
},
)

ctx, cancelFn := context.WithCancel(context.Background())

wg.Add(1)
go func() {
defer wg.Done()
err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */)
err := ingester.Run(ctx, startBlockNumber, maxCount)
logger.Info("Ingester finished", "err", err)
}()

defer ingester.Close()

// TODO: add a metrics exporter or healthcheck http endpoint ?

quit := make(chan os.Signal, 1)
Expand Down
8 changes: 6 additions & 2 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber, maxCount int64) error
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
Expand All @@ -28,6 +28,8 @@ type Ingester interface {

// This is just a placeholder for now
Info() Info

Close() error
}

const (
Expand All @@ -40,6 +42,8 @@ type Config struct {
MaxBatchSize int
PollInterval time.Duration
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
}

type Info struct {
Expand Down
Loading

0 comments on commit a75fae9

Please sign in to comment.