Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add config option: max-batch-size #68

Merged
merged 1 commit into from
Jul 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ func main() {
ingester.Config{
MaxConcurrentRequests: cfg.BlockConcurrency,
MaxConcurrentRequestsDLQ: cfg.DLQBlockConcurrency,
MaxBatchSize: cfg.MaxBatchSize,
ReportProgressInterval: cfg.ReportProgressInterval,
PollInterval: cfg.PollInterval,
PollDLQInterval: cfg.PollDLQInterval,
Expand Down
1 change: 1 addition & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ type Config struct {
DLQBlockConcurrency int `long:"dlq-concurrency" env:"DLQ_CONCURRENCY" description:"Number of concurrent block requests to the RPC node for DLQ processing" default:"2"` // nolint:lll
BlockSubmitInterval time.Duration `long:"block-submit-interval" env:"BLOCK_SUBMIT_INTERVAL" description:"Interval at which to submit batched blocks to Dune" default:"500ms"` // nolint:lll
LogLevel string `long:"log" env:"LOG" description:"Log level" choice:"info" choice:"debug" choice:"warn" choice:"error" default:"info"` // nolint:lll
MaxBatchSize int `long:"max-batch-size" env:"MAX_BATCH_SIZE" description:"Max number of blocks to send in a single batch" default:"128"` // nolint:lll
msf marked this conversation as resolved.
Show resolved Hide resolved
}

func (c Config) HasError() error {
Expand Down
6 changes: 6 additions & 0 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ type Config struct {
BlockSubmitInterval time.Duration
SkipFailedBlocks bool
DLQOnly bool
MaxBatchSize int
}

type ingester struct {
Expand Down Expand Up @@ -105,5 +106,10 @@ func New(
if ing.cfg.ReportProgressInterval == 0 {
ing.cfg.ReportProgressInterval = defaultReportProgressInterval
}
if ing.cfg.MaxBatchSize == 0 {
msf marked this conversation as resolved.
Show resolved Hide resolved
ing.cfg.MaxBatchSize = maxBatchSize
} else if ing.cfg.MaxBatchSize > maxBatchSize {
ing.cfg.MaxBatchSize = maxBatchSize
}
return ing
}
4 changes: 2 additions & 2 deletions ingester/send.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ const maxBatchSize = 256
// We buffer the blocks in a map until we have no gaps, so that we can send them in order to Dune.
func (i *ingester) SendBlocks(ctx context.Context, blocks <-chan models.RPCBlock, startBlockNumber int64) error {
// Buffer for temporarily storing blocks that have arrived out of order
collectedBlocks := make(map[int64]models.RPCBlock, maxBatchSize)
collectedBlocks := make(map[int64]models.RPCBlock, i.cfg.MaxBatchSize)
nextNumberToSend := startBlockNumber
batchTimer := time.NewTicker(i.cfg.BlockSubmitInterval)
defer batchTimer.Stop()
Expand Down Expand Up @@ -69,7 +69,7 @@ func (i *ingester) trySendCompletedBlocks(
nextBlockToSend int64,
) (int64, error) {
for {
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, maxBatchSize)
nextBlock, err := i.trySendBlockBatch(ctx, collectedBlocks, nextBlockToSend, i.cfg.MaxBatchSize)
if err != nil || nextBlock == nextBlockToSend {
return nextBlock, err
}
Expand Down
Loading