Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test main loop #6

Merged
merged 5 commits into from
Jun 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,11 @@ TEST_TIMEOUT := 10s

all: lint test build

setup: bin/golangci-lint bin/gofumpt
setup: bin/golangci-lint bin/gofumpt bin/moq
go mod download

bin/moq:
GOBIN=$(PWD)/bin go install github.com/matryer/moq@v0.3.4
bin/golangci-lint:
GOBIN=$(PWD)/bin go install github.com/golangci/golangci-lint/cmd/golangci-lint@v1.59.0
bin/gofumpt: bin
Expand All @@ -30,6 +32,11 @@ test:
go mod tidy
go test -timeout=$(TEST_TIMEOUT) -race -bench=. -benchmem -cover ./...

gen-mocks: bin/moq ./client/jsonrpc/ ./client/duneapi/
./bin/moq -pkg jsonrpc_mock -out ./mocks/jsonrpc/rpcnode.go ./client/jsonrpc BlockchainClient
./bin/moq -pkg duneapi_mock -out ./mocks/duneapi/client.go ./client/duneapi BlockchainIngester


image-build:
@echo "# Building ingester docker image..."
docker build -t $(APPLICATION) -f Dockerfile --build-arg GITHUB_TOKEN=${GITHUB_TOKEN} .
Expand Down
10 changes: 7 additions & 3 deletions cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,13 +64,17 @@ func main() {
rpcClient,
duneClient,
ingester.Config{
PollInterval: cfg.PollInterval,
StartBlockHeight: cfg.BlockHeight,
PollInterval: cfg.PollInterval,
MaxBatchSize: 1,
},
)

wg.Add(1)
ingester.Run(context.Background(), &wg)
go func() {
defer wg.Done()
err := ingester.Run(context.Background(), cfg.BlockHeight, 0 /* maxCount */)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't realize /* */ was valid in Go!

logger.Info("Ingester finished", "err", err)
}()

// TODO: add a metrics exporter or healthcheck http endpoint ?

Expand Down
4 changes: 4 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ require (
github.com/hashicorp/go-retryablehttp v0.7.7
github.com/jessevdk/go-flags v1.5.0
github.com/klauspost/compress v1.17.8
github.com/stretchr/testify v1.9.0
golang.org/x/sync v0.7.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
golang.org/x/sys v0.20.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/fatih/color v1.16.0 h1:zmkK9Ngbjj+K0yRhTVONQh1p/HknKYSlNT+vZCzyokM=
github.com/fatih/color v1.16.0/go.mod h1:fL2Sau1YI5c0pdGEVCbKQbLXB6edEj1ZgiY4NijnWvE=
github.com/go-errors/errors v1.5.1 h1:ZwEMSLRCapFLflTpT7NKaAc7ukJ8ZPEjzlxt8rPN8bk=
Expand All @@ -16,8 +18,16 @@ github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxec
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
golang.org/x/sync v0.7.0 h1:YsImfSBoP9QPYL0xyKJPq0gcaJdG3rInoqxTWbfQu9M=
golang.org/x/sync v0.7.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20210320140829-1e4c9ba3b0c4/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.20.0 h1:Od9JTbYCk261bKm4M/mw7AklTlFYIa0bIp9BgSm1S8Y=
golang.org/x/sys v0.20.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
9 changes: 4 additions & 5 deletions ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"context"
"log/slog"
"sync"
"time"

"github.com/duneanalytics/blockchain-ingester/client/duneapi"
Expand All @@ -12,7 +11,8 @@ import (
)

type Ingester interface {
Run(ctx context.Context, wg *sync.WaitGroup) error
// Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested
Run(ctx context.Context, startBlockNumber, maxCount int64) error
Copy link
Member

@vegarsti vegarsti Jun 5, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maxCount 🤔 EDIT: Nevermind, tests made it clear why we want this


// ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive.
// If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain
Expand All @@ -33,9 +33,8 @@ type Ingester interface {
const defaultMaxBatchSize = 1

type Config struct {
MaxBatchSize int
StartBlockHeight int64
PollInterval time.Duration
MaxBatchSize int
PollInterval time.Duration
}

type Info struct {
Expand Down
27 changes: 15 additions & 12 deletions ingester/mainloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package ingester
import (
"context"
"fmt"
"sync"
"sync/atomic"
"time"

Expand All @@ -12,30 +11,28 @@ import (
"golang.org/x/sync/errgroup"
)

func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error {
defer wg.Done()

func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error {
inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize)
defer close(inFlightChan)

var err error

startBlockNumber := i.cfg.StartBlockHeight
if startBlockNumber <= 0 {
if startBlockNumber < 0 {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good! We do indeed want block 0

startBlockNumber, err = i.node.LatestBlockNumber()
if err != nil {
return errors.Errorf("failed to get latest block number: %w", err)
}
}

i.log.Info("Starting ingester",
"maxBatchSize", i.cfg.MaxBatchSize,
"startBlockHeight", i.cfg.StartBlockHeight,
"startBlockNumber", startBlockNumber,
"maxCount", maxCount,
)

errGroup, ctx := errgroup.WithContext(ctx)
errGroup.Go(func() error {
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, -1)
return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount)
})
errGroup.Go(func() error {
return i.SendBlocks(ctx, inFlightChan)
Expand All @@ -44,9 +41,14 @@ func (i *ingester) Run(ctx context.Context, wg *sync.WaitGroup) error {
return i.ReportProgress(ctx)
})

return errGroup.Wait()
if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks {
return err
}
return nil
}

var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks")

// ConsumeBlocks from the NPC Node
func (i *ingester) ConsumeBlocks(
ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64,
Expand All @@ -55,6 +57,7 @@ func (i *ingester) ConsumeBlocks(
latestBlockNumber := i.tryUpdateLatestBlockNumber()

waitForBlock := func(blockNumber, latestBlockNumber int64) int64 {
// TODO: handle cancellation here
for blockNumber > latestBlockNumber {
i.log.Info(fmt.Sprintf("Waiting %v for block to be available..", i.cfg.PollInterval),
"blockNumber", blockNumber,
Expand All @@ -66,7 +69,7 @@ func (i *ingester) ConsumeBlocks(
return latestBlockNumber
}

for blockNumber := startBlockNumber; dontStop || startBlockNumber <= endBlockNumber; blockNumber++ {
for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ {

latestBlockNumber = waitForBlock(blockNumber, latestBlockNumber)
startTime := time.Now()
Expand All @@ -92,7 +95,7 @@ func (i *ingester) ConsumeBlocks(

select {
case <-ctx.Done():
return nil
return ctx.Err()
case outChan <- block:
}

Expand All @@ -108,7 +111,7 @@ func (i *ingester) ConsumeBlocks(
)
}
}
return nil
return ErrFinishedConsumeBlocks // FIXME: this is wrong
}

func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error {
Expand Down
177 changes: 171 additions & 6 deletions ingester/mainloop_test.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,180 @@
package ingester_test

import "testing"
import (
"context"
"io"
"log/slog"
"sync/atomic"
"testing"
"time"

func TestBlockConsumptionLoop(t *testing.T) {
t.Skip("not implemented")
"github.com/duneanalytics/blockchain-ingester/ingester"
duneapi_mock "github.com/duneanalytics/blockchain-ingester/mocks/duneapi"
jsonrpc_mock "github.com/duneanalytics/blockchain-ingester/mocks/jsonrpc"
"github.com/duneanalytics/blockchain-ingester/models"
"github.com/go-errors/errors"
"github.com/stretchr/testify/require"
)

func TestBlockConsumptionLoopErrors(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's not immediately clear to me what this test is testing. Is it testing that we exit the function cleanly under these scenarios + what blocks have been ingested? Or am I misunderstanding?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this test is bogus right now, because I realized (while trying to get this test any decent) that the logic I created is just bad...

The problem is:

  • we should be able to cancel execution (using the context) and have a good understanding if the execution was smooth, flaky, or we were under a busted scenario.
  • for example, with everything broken, I think there should be an early return and just abort execution ? or should we just stay alive and keep trying?
  • if execution is flaky, we should still consume blocks, but skip some, right ?

in short, this test just shows we're a bit confused on what to expect from the test.

testcases := []struct {
name string
LatestIsBroken bool
BlockByNumberIsBroken bool
}{
{
name: "we're up to date, following the head",
LatestIsBroken: false,
BlockByNumberIsBroken: false,
},
{
name: "the RPC node is broken, all API calls are failing",
LatestIsBroken: true,
BlockByNumberIsBroken: true,
},
{
name: "BlockByNumber, a specific jsonRPC on the RPC node is broken",
LatestIsBroken: false,
BlockByNumberIsBroken: true,
},
}

for _, tc := range testcases {
t.Run(tc.name, func(t *testing.T) {
if tc.LatestIsBroken {
t.Skip("latest block number is broken, we don't behave correctly yet")
}
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(100)
producedBlockNumber := int64(0)
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
if tc.LatestIsBroken {
return 0, errors.New("latest block number is broken")
}
return maxBlockNumber, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
if tc.BlockByNumberIsBroken {
return models.RPCBlock{}, errors.New("block by number is broken")
}
if blockNumber > maxBlockNumber {
// end tests
cancel()
}
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: []byte(`block`),
}, nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

outCh := make(chan models.RPCBlock, maxBlockNumber+1)
defer close(outCh)
err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber)
require.Error(t, err) // this is expected
if tc.BlockByNumberIsBroken {
require.Equal(t, producedBlockNumber, int64(0))
}
})
}
}

func TestBlockSendingLoop(t *testing.T) {
t.Skip("not implemented")
testcases := []string{
"we're up to date, following the head",
"we're failing intermittently, the Dune API is broken",
"we're erroring systematically, the Dune API is down",
}
for _, testcase := range testcases {
t.Run(testcase, func(t *testing.T) {
t.Skip("not implemented")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a TODO?

})
}
}

func TestRunLoop(t *testing.T) {
t.Skip("not implemented")
func TestRunLoopBaseCase(t *testing.T) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice

testCases := []struct {
name string
i int64
}{
{name: "1 block", i: 1},
{name: "100 blocks", i: 100},
}
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return 1000, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: []byte(`block`),
}, nil
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

err := ing.Run(context.Background(), 0, tc.i)
require.NoError(t, err)
require.Equal(t, producedBlockNumber, tc.i)
require.Equal(t, sentBlockNumber, tc.i)
})
}
}

func TestRunLoopUntilCancel(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
maxBlockNumber := int64(1000)
sentBlockNumber := int64(0)
producedBlockNumber := int64(0)
duneapi := &duneapi_mock.BlockchainIngesterMock{
SendBlockFunc: func(block models.RPCBlock) error {
atomic.StoreInt64(&sentBlockNumber, block.BlockNumber)
if block.BlockNumber == maxBlockNumber {
// cancel execution when we send the last block
cancel()
}
return nil
},
}
rpcClient := &jsonrpc_mock.BlockchainClientMock{
LatestBlockNumberFunc: func() (int64, error) {
return maxBlockNumber + 1, nil
},
BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) {
atomic.StoreInt64(&producedBlockNumber, blockNumber)
return models.RPCBlock{
BlockNumber: blockNumber,
Payload: []byte(`block`),
}, nil
},
}
ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{
MaxBatchSize: 1,
PollInterval: 1000 * time.Millisecond,
})

err := ing.Run(ctx, 0, maxBlockNumber)
require.ErrorIs(t, err, context.Canceled)
require.Equal(t, producedBlockNumber, maxBlockNumber)
require.Equal(t, sentBlockNumber, maxBlockNumber)
}
Loading
Loading