Skip to content

Commit

Permalink
jsonrpc: start to implement the jsonrpc client side
Browse files Browse the repository at this point in the history
  • Loading branch information
msf committed Jun 4, 2024
1 parent c653d14 commit 80b45e2
Show file tree
Hide file tree
Showing 10 changed files with 317 additions and 31 deletions.
47 changes: 18 additions & 29 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,36 +14,20 @@ import (
"github.com/klauspost/compress/zstd"
)

type Config struct {
APIKey string
URL string

// this is used by DuneAPI to determine the logic used to decode the EVM transactions
Stack models.EVMStack
// the name of this blockchain as it will be stored in DuneAPI
BlockchainName string

// RPC json payloads can be very large, we default to compressing for better throughput
// - lowers latency
// - reduces bandwidth
DisableCompression bool
}

type RPCBlock struct {
BlockNumber string
Payload []byte
}
const (
MaxRetries = 20 // try really hard to send the block
)

type BlockchainIngester interface {
// Sync pushes to DuneAPI the RPCBlockPayloads as they are received in an endless loop
// Sync pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
// - the context is cancelled
// - channel is closed
// - a fatal error occurs
Sync(ctx context.Context, blocksCh <-chan RPCBlock) error
Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error

// SendBlock sends a block to DuneAPI
SendBlock(payload RPCBlock) error
SendBlock(payload models.RPCBlock) error

// TODO:
// - Batching multiple blocks in a single request
Expand All @@ -67,9 +51,14 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
if err != nil {
return nil, err
}
httpClient := retryablehttp.NewClient()
httpClient.RetryMax = MaxRetries
httpClient.Logger = log
httpClient.CheckRetry = retryablehttp.DefaultRetryPolicy
httpClient.Backoff = retryablehttp.LinearJitterBackoff
return &client{
log: log,
httpClient: retryablehttp.NewClient(),
httpClient: httpClient,
cfg: cfg,
compressor: comp,
bufPool: &sync.Pool{
Expand All @@ -80,7 +69,7 @@ func New(log *slog.Logger, cfg Config) (*client, error) { // revive:disable-line
}, nil
}

func (c *client) Sync(ctx context.Context, blocksCh <-chan RPCBlock) error {
func (c *client) Sync(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for {
select {
case <-ctx.Done():
Expand All @@ -99,7 +88,7 @@ func (c *client) Sync(ctx context.Context, blocksCh <-chan RPCBlock) error {
}

// SendBlock sends a block to DuneAPI
func (c *client) SendBlock(payload RPCBlock) error {
func (c *client) SendBlock(payload models.RPCBlock) error {
start := time.Now()
buffer := c.bufPool.Get().(*bytes.Buffer)
defer func() {
Expand All @@ -114,7 +103,7 @@ func (c *client) SendBlock(payload RPCBlock) error {
return c.sendRequest(request)
}

func (c *client) buildRequest(payload RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) {
func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) {
var request BlockchainIngestRequest

if c.cfg.DisableCompression {
Expand Down Expand Up @@ -182,9 +171,9 @@ func (c *client) sendRequest(request BlockchainIngestRequest) error {
return nil
}

func (c *client) idempotencyKey(rpcBlock RPCBlock) string {
// for idempotency we use the chain and block number
return fmt.Sprintf("%s-%s", c.cfg.BlockchainName, rpcBlock.BlockNumber)
func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
// for idempotency we use the block number (should we use also the date?, or a startup timestamp?)
return fmt.Sprintf("%v", rpcBlock.BlockNumber)
}

func (c *client) Close() error {
Expand Down
17 changes: 17 additions & 0 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,25 @@ package duneapi

import (
"fmt"

"github.com/duneanalytics/blockchain-ingester/models"
)

type Config struct {
APIKey string
URL string

// this is used by DuneAPI to determine the logic used to decode the EVM transactions
Stack models.EVMStack
// the name of this blockchain as it will be stored in DuneAPI
BlockchainName string

// RPCBlock json payloads can be very large, we default to compressing for better throughput
// - lowers latency
// - reduces bandwidth
DisableCompression bool
}

type BlockchainIngestResponse struct {
Tables []IngestedTableInfo `json:"tables"`
}
Expand Down
117 changes: 117 additions & 0 deletions client/jsonrpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
package jsonrpc

import (
"bytes"
"context"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"sync"

"github.com/duneanalytics/blockchain-ingester/lib/hexutils"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/hashicorp/go-retryablehttp"
)

type BlockchainClient interface {
LatestBlockNumber() (int64, error)
BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error)

// SendBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continueously until the context is cancelled
SendBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error

Close() error
}

const (
MaxRetries = 10
)

type rpcClient struct {
client *retryablehttp.Client
cfg Config
log *slog.Logger
bufPool *sync.Pool
}

func NewRPCClient(cfg Config, log *slog.Logger) *rpcClient { // revive:disable-line:unexported-return
client := retryablehttp.NewClient()
client.RetryMax = MaxRetries
client.Logger = log
client.CheckRetry = retryablehttp.DefaultRetryPolicy
client.Backoff = retryablehttp.LinearJitterBackoff
return &rpcClient{
client: client,
cfg: cfg,
log: log,
bufPool: &sync.Pool{
New: func() interface{} {
return new(bytes.Buffer)
},
},
}
}

func (c *rpcClient) LatestBlockNumber() (int64, error) {
buf := c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(buf)
buf.Reset()

err := c.getResponseBody(context.Background(), "eth_blockNumber", []any{}, buf)
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"method", "eth_blockNumber",
"error", err,
)
return 0, err
}
resp := struct {
Result string `json:"result"`
}{}
if err := json.NewDecoder(buf).Decode(&resp); err != nil {
c.log.Error("Failed to decode response for jsonRPC", "error", err)
return 0, err
}
return hexutils.IntFromHex(resp.Result)
}

// getResponseBody sends a request to the OpStack server and returns the response body
func (c *rpcClient) getResponseBody(
ctx context.Context, method string, params interface{}, output *bytes.Buffer,
) error {
reqData := map[string]interface{}{
"jsonrpc": "2.0",
"id": 1,
"method": method,
"params": params,
}
encoder := json.NewEncoder(output)
if err := encoder.Encode(reqData); err != nil {
return err
}
req, err := retryablehttp.NewRequestWithContext(ctx, http.MethodPost, c.cfg.URL, output)
if err != nil {
return err
}
resp, err := c.client.Do(req)
if err != nil {
return fmt.Errorf("failed to send request for method %s: %w", method, err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("response for method %s has status code %d", method, resp.StatusCode)
}

output.Reset()
if _, err := output.ReadFrom(resp.Body); err != nil {
return fmt.Errorf("failed to read response body for method %s: %w", method, err)
}
return nil
}

func (c *rpcClient) Close() error {
return nil
}
8 changes: 8 additions & 0 deletions client/jsonrpc/models.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package jsonrpc

import "time"

type Config struct {
URL string
PollInterval time.Duration
}
104 changes: 104 additions & 0 deletions client/jsonrpc/opstack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package jsonrpc

import (
"bytes"
"context"
"fmt"
"log/slog"

"github.com/duneanalytics/blockchain-ingester/models"
"golang.org/x/sync/errgroup"
)

type OpStackClient struct {
rpcClient
}

var _ BlockchainClient = &OpStackClient{}

func NewOpStackClient(cfg Config, log *slog.Logger) *OpStackClient {
rpcClient := NewRPCClient(cfg, log)
return &OpStackClient{*rpcClient}
}

// BlockByNumber returns the block with the given blockNumber.
// it uses 3 different methods to get the block:
// 1. eth_getBlockByNumber
// 2. eth_getBlockReceipts
// 3. debug_traceBlockByNumber with tracer "callTracer"
// We encode the payload in NDJSON, in this order.
// TODO: debug_traceBlockByNumber should be optional
//
// we should handle the case where it is not available
func (c *OpStackClient) BlockByNumber(ctx context.Context, blockNumber int64) (models.RPCBlock, error) {
blockNumberHex := fmt.Sprintf("0x%x", blockNumber)

// TODO: split this into mandatory and optional methods
methods := []string{
"eth_getBlockByNumber",
"eth_getBlockReceipts",
"debug_traceBlockByNumber",
}
methodArgs := map[string][]any{
"eth_getBlockByNumber": {blockNumberHex, true},
"eth_getBlockReceipts": {blockNumberHex},
"debug_traceBlockByNumber": {blockNumberHex, `{"tracer":"callTracer"}`},
}
group, ctx := errgroup.WithContext(ctx)
results := make([]*bytes.Buffer, len(methods))
for i, method := range methods {

results[i] = c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(results[i])

group.Go(func() error {
results[i].Reset()
err := c.getResponseBody(ctx, method, methodArgs[method], results[i])
if err != nil {
c.log.Error("Failed to get response for jsonRPC",
"method", method,
"error", err,
)
}
return err
})
}

if err := group.Wait(); err != nil {
return models.RPCBlock{}, err
}

// copy the responses in order
var buffer bytes.Buffer
for _, res := range results {
buffer.Grow(res.Len() + 1)
buffer.ReadFrom(res)
buffer.WriteString("\n")
}
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: buffer.Bytes(),
}, nil
}

func (c *OpStackClient) SendBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {
block, err := c.BlockByNumber(ctx, blockNumber)
if err != nil {
c.log.Error("Failed to get block by number",
"blockNumber", blockNumber,
"error", err,
)
return err
}
select {
case <-ctx.Done():
return nil
case outChan <- block:
}
}
return nil
}
4 changes: 2 additions & 2 deletions cmd/main.go
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
package main

// topsql is a "synchronized" that inserts into a MeteringDB all the records present on S3.
// it has the ability to resume and continue from the last most recent record present in the DB.
// ingester is a "synchronizer" that ingests into DuneAPI the blocks from the blockchain.
// it has the ability to resume and catch up with the the head of the blockchain.

import (
"context"
Expand Down
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,11 @@ module github.com/duneanalytics/blockchain-ingester
go 1.22.2

require (
github.com/go-errors/errors v1.5.1
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
github.com/klauspost/compress v1.17.8
golang.org/x/sync v0.7.0
)

require (
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
github.com/go-errors/errors v1.5.1/go.mod h1:sIVyrIiJhuEF+Pj9Ebtd6P/rEYROXFi3BopGUQ5a5Og=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
github.com/hashicorp/go-cleanhttp v0.5.2/go.mod h1:kO/YDlP8L1346E6Sodw+PrpBSV4/SoxCXGY6BqNFT48=
github.com/hashicorp/go-hclog v1.6.3 h1:Qr2kF+eVWjTiYmU7Y31tYlP1h0q/X3Nl3tPGdaB11/k=
Expand All @@ -14,6 +16,8 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
Loading

0 comments on commit 80b45e2

Please sign in to comment.