Skip to content

Commit

Permalink
refactor: move the main loops to the ingester, it simplifies the
Browse files Browse the repository at this point in the history
clients.

TODO: TESTS, this is logic that is nuanced
  • Loading branch information
msf committed Jun 5, 2024
1 parent 07ea7cc commit c14985e
Show file tree
Hide file tree
Showing 2 changed files with 100 additions and 44 deletions.
129 changes: 85 additions & 44 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package ingester

import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

"github.com/duneanalytics/blockchain-ingester/models"
Expand Down Expand Up @@ -39,42 +41,7 @@ func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error {
return i.SendBlocks(ctx, inFlightChan)
})
errGroup.Go(func() error {
timer := time.NewTicker(20 * time.Second)
defer timer.Stop()

previousTime := time.Now()
previousDistance := int64(0)
previoussIngested := sync.LoadInt64(&i.info.IngestedBlockNumber)

for {
select {
case tNow := <-timer.C:
latest, err := i.node.LatestBlockNumber()
if err != nil {
i.log.Error("Failed to get latest block number, continuing..", "error", err)
continue
}
sync.StoreInt64(&i.info.LatestBlockNumber, latest)
lastIngested := sync.LoadInt64(&i.info.IngestedBlockNumber)
lastConsumed := sync.LoadInt64(&i.info.ConsumedBlockNumber)

blocksPerSec := float64(lastIngested-previoussIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable

i.log.Info("Info",
"latestBlockNumber", latest,
"ingestedBlockNumber", lastIngested,
"consumedBlockNumber", lastConsumed,
"distanceFromLatest", latest-lastIngested,
"FallingBehind", falingBehind,
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
)
previoussIngested = lastIngested
previousDistance = newDistance
previousTime = tNow
}
}
return i.ReportProgress(ctx)
})

return errGroup.Wait()
Expand All @@ -85,33 +52,61 @@ func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
) error {
dontStop := endBlockNumber <= startBlockNumber
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(blockNumber, latestBlockNumber int64) int64 {
for blockNumber > latestBlockNumber {
i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
)
time.Sleep(i.cfg.PollInterval)
latestBlockNumber = i.tryUpdateLatestBlockNumber()
}
return latestBlockNumber
}

for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {

latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber)
startTime := time.Now()

block, err := i.node.BlockByNumber(ctx, blockNumber)
if err != nil {
i.log.Error("Failed to get block by number, continuing..",
"blockNumber", blockNumber,
"latestBlockNumber", latestBlockNumber,
"error", err,
)
i.info.RPCErrors = append(i.info.RPCErrors, ErrorInfo{
Timestamp: time.Now(),
BlockNumber: blockNumber,
Error: err,
})
// TODO: should I sleep (backoff) here?
continue
} else {
sync.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
}
// TODO:
// - track if we're getting blocked on sending to outChan
// - track blocks per second and our distance from LatestBlockNumber

atomic.StoreInt64(&i.info.ConsumedBlockNumber, block.BlockNumber)
getBlockElapsed := time.Since(startTime)

select {
case <-ctx.Done():
return nil
case outChan <- block:
}
sleepTime := time.Until(startTime.Add(i.cfg.PollInterval))
time.Sleep(sleepTime)

distanceFromLatest := latestBlockNumber - block.BlockNumber
if distanceFromLatest > 0 {
// TODO: improve logs of processing speed and catchup estimated ETA
i.log.Info("We're behind, trying to catch up..",
"blockNumber", block.BlockNumber,
"latestBlockNumber", latestBlockNumber,
"distanceFromLatest", distanceFromLatest,
"getBlockElapsedMillis", getBlockElapsed.Milliseconds(),
"elapsedMillis", time.Since(startTime).Milliseconds(),
)
}
}
return nil
}
Expand All @@ -136,12 +131,58 @@ func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlo
Error: err,
})
} else {
sync.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber)
}
}
}
}

func (i *ingester) tryUpdateLatestBlockNumber() int64 {
latest, err := i.node.LatestBlockNumber()
if err != nil {
i.log.Error("Failed to get latest block number, continuing..", "error", err)
return atomic.LoadInt64(&i.info.LatestBlockNumber)
}
atomic.StoreInt64(&i.info.LatestBlockNumber, latest)
return latest
}

func (i *ingester) ReportProgress(ctx context.Context) error {
timer := time.NewTicker(20 * time.Second)
defer timer.Stop()

previousTime := time.Now()
previousDistance := int64(0)
previousIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)

for {
select {
case <-ctx.Done():
return nil
case tNow := <-timer.C:
latest := atomic.LoadInt64(&i.info.LatestBlockNumber)
lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber)
lastConsumed := atomic.LoadInt64(&i.info.ConsumedBlockNumber)

blocksPerSec := float64(lastIngested-previousIngested) / tNow.Sub(previousTime).Seconds()
newDistance := latest - lastIngested
fallingBehind := newDistance > (previousDistance + 1) // TODO: make is more stable

i.log.Info("Info",
"latestBlockNumber", latest,
"ingestedBlockNumber", lastIngested,
"consumedBlockNumber", lastConsumed,
"distanceFromLatest", latest-lastIngested,
"FallingBehind", fallingBehind,
"blocksPerSec", fmt.Sprintf("%.2f", blocksPerSec),
)
previousIngested = lastIngested
previousDistance = newDistance
previousTime = tNow
}
}
}

func (i *ingester) Close() error {
return i.node.Close()
}
15 changes: 15 additions & 0 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package ingester_test

import "testing"

func TestBlockConsumptionLoop(t *testing.T) {
t.Skip("not implemented")
}

func TestBlockSendingLoop(t *testing.T) {
t.Skip("not implemented")
}

func TestRunLoop(t *testing.T) {
t.Skip("not implemented")
}

0 comments on commit c14985e

Please sign in to comment.