Skip to content

Commit

Permalink
Use blockchain progress in main
Browse files Browse the repository at this point in the history
  • Loading branch information
vegarsti committed Jun 12, 2024
1 parent c48f8c2 commit d16a777
Show file tree
Hide file tree
Showing 5 changed files with 120 additions and 44 deletions.
9 changes: 7 additions & 2 deletions client/duneapi/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,17 +184,22 @@ func (c *client) PostProgressReport(ctx context.Context, progress models.Blockch
} else {
c.log.Info("Sent progress report",
"lastIngestedBlockNumer", request.LastIngestedBlockNumber,
"latestBlockNumber", request.LatestBlockNumber,
"duration", time.Since(start),
)
}
}()

request = BlockchainProgress{
LastIngestedBlockNumber: progress.LastIngestedBlockNumber,
LatestBlockNumber: progress.LatestBlockNumber,
}
url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName)
c.log.Debug("Sending request", "url", url)
payload, err := json.Marshal(progress)
payload, err := json.Marshal(request)
if err != nil {
return err
}
c.log.Info("Sending request", "url", url, "payload", string(payload))
req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload))
if err != nil {
return err
Expand Down
12 changes: 11 additions & 1 deletion cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,10 +75,20 @@ func main() {

ctx, cancelFn := context.WithCancel(context.Background())

// Get stored progress
progress, err := duneClient.GetProgressReport(ctx)
if err != nil {
stdlog.Fatal(err)
}

// Start ingesting from where we left off
startBlockNumber := progress.LastIngestedBlockNumber + 1
maxCount := int64(0) // 0 means ingest until cancelled

wg.Add(1)
go func() {
defer wg.Done()
err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */)
err := ingester.Run(ctx, startBlockNumber, maxCount)
logger.Info("Ingester finished", "err", err)
}()

Expand Down
4 changes: 2 additions & 2 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,12 @@ import (

type Ingester interface {
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber, maxCount int64) error
Run(ctx context.Context, startBlockNumber int64, maxCount int64) error

// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
// it will run continuously until the context is cancelled
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error
ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error

// SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop
// it will block until:
Expand Down
65 changes: 47 additions & 18 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,28 +11,23 @@ import (
"golang.org/x/sync/errgroup"
)

func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error {
func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error {
inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize)
defer close(inFlightChan)

var err error

if startBlockNumber < 0 {
startBlockNumber, err = i.node.LatestBlockNumber()
if err != nil {
return errors.Errorf("failed to get latest block number: %w", err)
}
}
// Ingest until endBlockNumber, inclusive. If maxCount is 0, we ingest forever
endBlockNumber := startBlockNumber + maxCount - 1

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockNumber", startBlockNumber,
"endBlockNumber", endBlockNumber,
"maxCount", maxCount,
)

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount)
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber)
})
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
Expand All @@ -44,7 +39,8 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er
if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}
return nil

return i.Close()
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
Expand All @@ -53,10 +49,9 @@ var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 {
waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
select {
case <-ctx.Done():
Expand All @@ -72,6 +67,9 @@ func (i *ingester) ConsumeBlocks(
return latestBlockNumber
}

// Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0
dontStop := endBlockNumber < startBlockNumber

for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {
latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber)
startTime := time.Now()
Expand All @@ -80,7 +78,7 @@ func (i *ingester) ConsumeBlocks(
if err != nil {
if errors.Is(err, context.Canceled) {
i.log.Info("Context canceled, stopping..")
return err
return ctx.Err()
}

i.log.Error("Failed to get block by number, continuing..",
Expand Down Expand Up @@ -119,18 +117,20 @@ func (i *ingester) ConsumeBlocks(
)
}
}
return ErrFinishedConsumeBlocks // FIXME: this is wrong
// Done consuming blocks, either because we reached the endBlockNumber or the context was canceled
i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber)
return ErrFinishedConsumeBlocks
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
for {
select {
case <-ctx.Done():
return nil // context canceled
return ctx.Err()
case payload, ok := <-blocksCh:
// TODO: we should batch RCP blocks here before sending to Dune.
if !ok {
return nil // channel closed
return ctx.Err() // channel closed
}
if err := i.dune.SendBlock(ctx, payload); err != nil {
// TODO: implement DeadLetterQueue
Expand Down Expand Up @@ -169,7 +169,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return nil
return ctx.Err()
case tNow := <-timer.C:
latest := atomic.LoadInt64(&i.info.LatestBlockNumber)
lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)
Expand Down Expand Up @@ -202,10 +202,39 @@ func (i *ingester) ReportProgress(ctx context.Context) error {
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow

// TODO: include errors in the report, reset the error list
err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: lastIngested,
LatestBlockNumber: latest,
})
if err != nil {
i.log.Error("Failed to post progress report", "error", err)
}
}
}
}

func (i *ingester) Close() error {
// Send a final progress report to flush progress
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
i.log.Info("Sending final progress report")
err := i.dune.PostProgressReport(
ctx,
models.BlockchainIndexProgress{
BlockchainName: i.cfg.BlockchainName,
EVMStack: i.cfg.Stack.String(),
LastIngestedBlockNumber: i.info.IngestedBlockNumber,
LatestBlockNumber: i.info.LatestBlockNumber,
})
i.log.Info("Closing node")
if err != nil {
_ = i.node.Close()
return err
}

return i.node.Close()
}
74 changes: 53 additions & 21 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,16 +68,21 @@ func TestBlockConsumptionLoopErrors(t *testing.T) {
Payload: []byte(`block`),
}, nil
},
CloseFunc: func() error {
return nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, nil, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

outCh := make(chan models.RPCBlock, maxBlockNumber+1)
defer close(outCh)
err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber)
require.Error(t, err) // this is expected
require.ErrorIs(t, err, ingester.ErrFinishedConsumeBlocks)
if tc.BlockByNumberIsBroken {
require.Equal(t, producedBlockNumber, int64(0))
}
Expand All @@ -100,23 +105,21 @@ func TestBlockSendingLoop(t *testing.T) {

func TestRunLoopBaseCase(t *testing.T) {
testCases := []struct {
name string
i int64
name string
maxCount int64
lastIngested int64
}{
{name: "1 block", i: 1},
{name: "100 blocks", i: 100},
{name: "1 block", maxCount: 1, lastIngested: 0},
{name: "2 blocks", maxCount: 2, lastIngested: 0},
{name: "100 blocks", maxCount: 100, lastIngested: 0},
{name: "100 blocks, starting from 50", maxCount: 100, lastIngested: 50},
}
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
return nil
},
}
latestBlockNumber := int64(1000)
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return 1000, nil
return latestBlockNumber, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
atomic.StoreInt64(&producedBlockNumber, blockNumber)
Expand All @@ -125,25 +128,44 @@ func TestRunLoopBaseCase(t *testing.T) {
Payload: []byte(`block`),
}, nil
},
CloseFunc: func() error {
return nil
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(_ context.Context, block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
},
}

// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

err := ing.Run(context.Background(), 0, tc.i)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

err := ing.Run(ctx, tc.lastIngested, tc.maxCount)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, tc.i)
require.Equal(t, sentBlockNumber, tc.i)
require.Equal(t, tc.lastIngested+tc.maxCount, producedBlockNumber)
require.Equal(t, tc.lastIngested+tc.maxCount, sentBlockNumber)
})
}
}

func TestRunLoopUntilCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
maxBlockNumber := int64(10)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
Expand All @@ -155,6 +177,9 @@ func TestRunLoopUntilCancel(t *testing.T) {
}
return nil
},
PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error {
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
Expand All @@ -167,14 +192,21 @@ func TestRunLoopUntilCancel(t *testing.T) {
Payload: []byte(`block`),
}, nil
},
CloseFunc: func() error {
return nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
// Swap these to see logs
// logOutput := os.Stderr
logOutput := io.Discard
ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

err := ing.Run(ctx, 0, maxBlockNumber)
require.NoError(t, err)
require.Error(t, err) // this is expected
require.ErrorIs(t, context.Canceled, err)
require.Equal(t, producedBlockNumber, maxBlockNumber)
require.Equal(t, sentBlockNumber, maxBlockNumber)
}

0 comments on commit d16a777

Please sign in to comment.