Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Send a batch of blocks #35

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
86 changes: 64 additions & 22 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"io"
"log/slog"
"net/http"
"strings"
"sync"
"time"

Expand All @@ -26,6 +27,9 @@ type BlockchainIngester interface {
// SendBlock sends a block to DuneAPI
SendBlock(ctx context.Context, payload models.RPCBlock) error

// SendBlocks sends a batch of blocks to DuneAPI
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
SendBlocks(ctx context.Context, payload []models.RPCBlock) error
vegarsti marked this conversation as resolved.
Show resolved Hide resolved

// GetProgressReport gets a progress report from DuneAPI
GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error)

Expand Down Expand Up @@ -85,37 +89,56 @@ func (c *client) SendBlock(ctx context.Context, payload models.RPCBlock) error {
buffer := c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(buffer)

request, err := c.buildRequest(payload, buffer)
request, err := c.buildRequest([]models.RPCBlock{payload}, buffer)
if err != nil {
return err
}
return c.sendRequest(ctx, request)
}

func (c *client) buildRequest(payload models.RPCBlock, buffer *bytes.Buffer) (BlockchainIngestRequest, error) {
var request BlockchainIngestRequest
func (c *client) buildRequest(payload []models.RPCBlock, buffer *bytes.Buffer) (*BlockchainIngestRequest, error) {
request := &BlockchainIngestRequest{}

// not thread safe, multiple calls to the compressor here
if c.cfg.DisableCompression {
request.Payload = payload.Payload
buffer.Reset()
for _, block := range payload {
_, err := buffer.Write(block.Payload)
if err != nil {
return nil, err
}
}
request.Payload = buffer.Bytes()
} else {
// not thread safe, multiple calls to the compressor here
buffer.Reset()
c.compressor.Reset(buffer)
_, err := c.compressor.Write(payload.Payload)
for _, block := range payload {
_, err := c.compressor.Write(block.Payload)
if err != nil {
return nil, err
}
}
err := c.compressor.Close()
if err != nil {
return request, err
return nil, err
}
c.compressor.Close()
request.ContentEncoding = "application/zstd"
request.Payload = buffer.Bytes()
}
request.BlockNumber = payload.BlockNumber
request.IdempotencyKey = c.idempotencyKey(payload)

blockNumbers := make([]string, len(payload))
for i, block := range payload {
blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}
request.FirstBlockNumber = payload[0].BlockNumber
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
request.LastBlockNumber = payload[len(payload)-1].BlockNumber
request.BlockNumbers = blockNumbers
request.IdempotencyKey = strings.Join(blockNumbers, ",") // TODO encapsulate in a method
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
request.EVMStack = c.cfg.Stack.String()
return request, nil
}

func (c *client) sendRequest(ctx context.Context, request BlockchainIngestRequest) error {
func (c *client) sendRequest(ctx context.Context, request *BlockchainIngestRequest) error {
// TODO: implement timeouts (context with deadline)
start := time.Now()
var err error
Expand All @@ -124,15 +147,17 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
defer func() {
if err != nil {
c.log.Error("INGEST FAILED",
"blockNumber", request.BlockNumber,
"firstBlockNumber", request.FirstBlockNumber,
"lastBlockNumber", request.LastBlockNumber,
"error", err,
"statusCode", responseStatus,
"payloadSize", len(request.Payload),
"duration", time.Since(start),
)
} else {
c.log.Info("BLOCK SENT",
"blockNumber", request.BlockNumber,
c.log.Info("INGEST SUCCESS",
"firstBlockNumber", request.FirstBlockNumber,
"lastBlockNumber", request.LastBlockNumber,
"response", response.String(),
"payloadSize", len(request.Payload),
"duration", time.Since(start),
Expand All @@ -159,19 +184,20 @@ func (c *client) sendRequest(ctx context.Context, request BlockchainIngestReques
return err
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status)
}
responseStatus = resp.Status
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i thought you need a header to say:
req.Header.Set(x-dune-batch-size", len(blocks)) or something, to simplify the server side, otherwise the server side needs to...

humm.. you're right, we don't need it, because "opstack" means 3 messages per block, so the server can derive how many blocks it has received.

when we use non-opstack, we we might need to pass additional information in the headers.

Copy link
Member Author

@vegarsti vegarsti Jun 24, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm, doesn't hurt to declare this in a header, that way we know what to expect!


err = json.NewDecoder(resp.Body).Decode(&response)
if err != nil {
return err
}
return nil
}

func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string {
// for idempotency we use the block number (should we use also the date?, or a startup timestamp?)
return fmt.Sprintf("%v", rpcBlock.BlockNumber)
if resp.StatusCode != http.StatusOK {
// We mutate global err here because we have deferred a log message where we check for non-nil err
err = fmt.Errorf("unexpected status code: %v, %v", resp.StatusCode, resp.Status)
return err
}

return nil
}

func (c *client) Close() error {
Expand Down Expand Up @@ -314,3 +340,19 @@ func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndex
}
return progress, nil
}

// SendBlock sends a block to DuneAPI
func (c *client) SendBlocks(ctx context.Context, payload []models.RPCBlock) error {
if len(payload) == 0 {
return nil
}

buffer := c.bufPool.Get().(*bytes.Buffer)
defer c.bufPool.Put(buffer)

request, err := c.buildRequest(payload[:1], buffer) // TODO
if err != nil {
return err
}
return c.sendRequest(ctx, request)
}
15 changes: 9 additions & 6 deletions client/duneapi/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,8 @@ type Config struct {
}

type BlockchainIngestResponse struct {
Tables []IngestedTableInfo `json:"tables"`
Error string `json:"error,omitempty"`
Tables []IngestedTableInfo `json:"tables,omitempty"`
}

type IngestedTableInfo struct {
Expand All @@ -49,11 +50,13 @@ func (b *BlockchainIngestResponse) String() string {
}

type BlockchainIngestRequest struct {
BlockNumber int64
ContentEncoding string
EVMStack string
IdempotencyKey string
Payload []byte
FirstBlockNumber int64
LastBlockNumber int64
BlockNumbers []string
vegarsti marked this conversation as resolved.
Show resolved Hide resolved
ContentEncoding string
EVMStack string
IdempotencyKey string
Payload []byte
}

type BlockchainProgress struct {
Expand Down
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func main() {
PollInterval: cfg.PollInterval,
Stack: cfg.RPCStack,
BlockchainName: cfg.BlockchainName,
BatchRequestInterval: cfg.BatchRequestInterval,
},
)

Expand Down
5 changes: 3 additions & 2 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,9 @@ type Config struct {
PollInterval time.Duration `long:"rpc-poll-interval" env:"RPC_POLL_INTERVAL" description:"Interval to poll the blockchain node" default:"300ms"` // nolint:lll
ReportProgressInterval time.Duration `long:"report-progress-interval" env:"REPORT_PROGRESS_INTERVAL" description:"Interval to report progress" default:"30s"` // nolint:lll
RPCNode RPCClient
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
RPCStack models.EVMStack `long:"rpc-stack" env:"RPC_STACK" description:"Stack for the RPC client" default:"opstack"` // nolint:lll
Concurrency int `long:"concurrency" env:"CONCURRENCY" description:"Number of concurrent workers" default:"5"` // nolint:lll
BatchRequestInterval time.Duration `long:"batch-request-interval" env:"BATCH_REQUEST_INTERVAL" description:"Interval at which to send batch requests to Dune" default:"1s"` // nolint:lll
}

func (c Config) HasError() error {
Expand Down
2 changes: 2 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ const (
defaultMaxBatchSize = 5
defaultPollInterval = 1 * time.Second
defaultReportProgressInterval = 30 * time.Second
defaultBatchRequestInterval = 1 * time.Second
)

type Config struct {
Expand All @@ -49,6 +50,7 @@ type Config struct {
ReportProgressInterval time.Duration
Stack models.EVMStack
BlockchainName string
BatchRequestInterval time.Duration
}

type Info struct {
Expand Down
84 changes: 61 additions & 23 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,19 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
i.log.Info("SendBlocks: Starting to receive blocks")
blocks := make(map[int64]models.RPCBlock) // Buffer for temporarily storing blocks that have arrived out of order
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BatchRequestInterval)
defer batchTimer.Stop()
for {
// Check if we can send any blocks before checking if we should shut down
select {
case <-batchTimer.C:
nextNumberToSend = i.trySendCompletedBlocks(blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
default:
}

// Receive a block or exit if the context is done or the channel is closed
// To avoid blocking on the select, we also select on the batchTimer tick
select {
case <-ctx.Done():
i.log.Info("SendBlocks: Context canceled, stopping")
Expand All @@ -176,46 +188,72 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
i.log.Info("SendBlocks: Channel is closed, returning")
return nil
}

blocks[block.BlockNumber] = block
i.log.Info("SendBlocks: Received block", "blockNumber", block.BlockNumber, "bufferSize", len(blocks))

nextNumberToSend = i.trySendCompletedBlocks(ctx, blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent any completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
case <-batchTimer.C:
nextNumberToSend = i.trySendCompletedBlocks(blocks, nextNumberToSend)
i.log.Info("SendBlocks: Sent completed blocks to DuneAPI", "nextNumberToSend", nextNumberToSend)
}
}
}

// trySendCompletedBlocks sends all blocks that can be sent in order from the blockMap.
// Once we have sent all blocks, if any, we return with the nextNumberToSend.
// We return the next numberToSend such that the caller can continue from there.
// We use a new context so we're able to flush the buffer even if the main context is canceled.
func (i *ingester) trySendCompletedBlocks(
ctx context.Context,
blocks map[int64]models.RPCBlock,
nextNumberToSend int64,
) int64 {
// Send this block only if we have sent all previous blocks
// Collect a batch of blocks to send, only send those which are in order
batch := make([]models.RPCBlock, 0, len(blocks))
for block, ok := blocks[nextNumberToSend]; ok; block, ok = blocks[nextNumberToSend] {
if err := i.dune.SendBlock(ctx, block); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error("SendBlocks: Failed, continuing", "blockNumber", block.BlockNumber, "error", err)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: block.BlockNumber,
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", block.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, block.BlockNumber)
}
// We've sent block N, so increment the pointer
batch = append(batch, block)
delete(blocks, nextNumberToSend)
nextNumberToSend++
}

if len(batch) == 0 {
return nextNumberToSend
}

blockNumbers := make([]string, len(batch))
for i, block := range batch {
blockNumbers[i] = fmt.Sprintf("%d", block.BlockNumber)
}

i.log.Info(
"SendBlocks: Sending batch",
"blockNumberFirst", batch[0].BlockNumber,
"blockNumberLast", batch[len(batch)-1].BlockNumber,
"batchSize", len(batch),
)

// Send the batch, with a new context so we're able to flush the buffer even if the context is canceled
ctx, cancel := context.WithTimeout(context.Background(), time.Second) // Some timeout
defer cancel()
if err := i.dune.SendBlocks(ctx, batch); err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("SendBlocks: Context canceled, stopping")
return nextNumberToSend
}
// this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap
i.log.Error(
"SendBlocks: Failed to send batch, continuing",
"blockNumberFirst", batch[0].BlockNumber,
"blockNumberLast", batch[len(batch)-1].BlockNumber,
"error", err,
)
i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: batch[0].BlockNumber, // TODO?
Error: err,
})
} else {
i.log.Info("Updating latest ingested block number", "blockNumber", batch[len(batch)-1].BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, batch[len(batch)-1].BlockNumber)
}

return nextNumberToSend
}

Expand Down
Loading
Loading