diff --git a/.github/workflows/build.yaml b/.github/workflows/build.yaml index 522590b..bb85cfd 100644 --- a/.github/workflows/build.yaml +++ b/.github/workflows/build.yaml @@ -16,20 +16,20 @@ jobs: steps: - uses: actions/checkout@v3 - # - name: Configure AWS Credentials - # uses: aws-actions/configure-aws-credentials@v1.6.1 - # with: - # role-to-assume: arn:aws:iam::118330671040:role/duneapi-ci - # aws-region: ${{ env.AWS_REGION }} + - name: Configure AWS Credentials + uses: aws-actions/configure-aws-credentials@v1.6.1 + with: + role-to-assume: arn:aws:iam::118330671040:role/node-indexer + aws-region: ${{ env.AWS_REGION }} - # - name: Login to Amazon ECR - # id: login-ecr - # uses: aws-actions/amazon-ecr-login@v1 + - name: Login to Amazon ECR + id: login-ecr + uses: aws-actions/amazon-ecr-login@v1 - env: GITHUB_TOKEN: ${{ secrets.DUNE_ENG_ACCESS_TOKEN }} ECR_REGISTRY: public.ecr.aws/duneanalytics - ECR_REPOSITORY: blockchain-ingester + ECR_REPOSITORY: node-indexer run: | make image-build make image-push diff --git a/.github/workflows/pr.yaml b/.github/workflows/pr.yaml index 38fe7ff..5f48d77 100644 --- a/.github/workflows/pr.yaml +++ b/.github/workflows/pr.yaml @@ -1,10 +1,7 @@ --- name: "Pull Request" -on: - pull_request: - branches: - - main +on: pull_request jobs: test: diff --git a/Makefile b/Makefile index aba82aa..213f564 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,6 @@ .PHONY: all setup lint build test image-build image-push -APPLICATION := ingester +APPLICATION := indexer GITHUB_SHA ?= HEAD REF_TAG := $(shell echo ${GITHUB_REF_NAME} | tr -cd '[:alnum:]') IMAGE_TAG := ${ECR_REGISTRY}/${ECR_REPOSITORY}:${REF_TAG}-$(shell git rev-parse --short "${GITHUB_SHA}")-${GITHUB_RUN_NUMBER} @@ -19,7 +19,7 @@ bin/gofumpt: bin GOBIN=$(PWD)/bin go install mvdan.cc/gofumpt@v0.6.0 build: lint cmd/main.go - go build -o ingester cmd/main.go + go build -o indexer cmd/main.go lint: bin/golangci-lint bin/gofumpt go fmt ./... @@ -38,11 +38,11 @@ gen-mocks: bin/moq ./client/jsonrpc/ ./client/duneapi/ image-build: - @echo "# Building ingester docker image..." + @echo "# Building indexer docker image..." docker build -t $(APPLICATION) -f Dockerfile --build-arg GITHUB_TOKEN=${GITHUB_TOKEN} . image-push: image-build - @echo "# Pushing ingester docker image..." + @echo "# Pushing indexer docker image..." docker tag $(APPLICATION) ${IMAGE_TAG} # docker push ${IMAGE_TAG} docker rmi ${IMAGE_TAG} diff --git a/client/duneapi/client.go b/client/duneapi/client.go index 8ac1a15..5d88a89 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -5,6 +5,7 @@ import ( "context" "encoding/json" "fmt" + "io" "log/slog" "net/http" "sync" @@ -23,6 +24,12 @@ type BlockchainIngester interface { // SendBlock sends a block to DuneAPI SendBlock(ctx context.Context, payload models.RPCBlock) error + // GetProgressReport gets a progress report from DuneAPI + GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) + + // PostProgressReport sends a progress report to DuneAPI + PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error + // - API to discover the latest block number ingested // this can also provide "next block ranges" to push to DuneAPI // - log/metrics on catching up/falling behind, distance from tip of chain @@ -159,3 +166,140 @@ func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string { func (c *client) Close() error { return c.compressor.Close() } + +func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error { + var request BlockchainProgress + var err error + var responseStatus string + var responseBody string + start := time.Now() + + // Log response + defer func() { + if err != nil { + c.log.Error("Sending progress report failed", + "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "error", err, + "statusCode", responseStatus, + "duration", time.Since(start), + "responseBody", responseBody, + ) + } else { + c.log.Info("Sent progress report", + "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "latestBlockNumber", request.LatestBlockNumber, + "duration", time.Since(start), + ) + } + }() + + request = BlockchainProgress{ + LastIngestedBlockNumber: progress.LastIngestedBlockNumber, + LatestBlockNumber: progress.LatestBlockNumber, + } + url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) + payload, err := json.Marshal(request) + if err != nil { + return err + } + c.log.Info("Sending request", "url", url, "payload", string(payload)) + req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + responseStatus = resp.Status + + if resp.StatusCode != http.StatusOK { + bs, err := io.ReadAll(resp.Body) + responseBody = string(bs) + if err != nil { + return err + } + err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody) + return err + } + + return nil +} + +func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { + var response BlockchainProgress + var err error + var responseStatus string + start := time.Now() + + // Log response + defer func() { + if err != nil { + c.log.Error("Getting progress report failed", + "error", err, + "statusCode", responseStatus, + "duration", time.Since(start), + ) + } else { + c.log.Info("Got progress report", + "progress", response.String(), + "duration", time.Since(start), + ) + } + }() + + url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) + c.log.Debug("Sending request", "url", url) + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body + if err != nil { + return nil, err + } + req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + var errorResp errorResponse + err = json.Unmarshal(responseBody, &errorResp) + if err != nil { + return nil, err + } + err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error) + // No progress yet + if resp.StatusCode == http.StatusNotFound { + return &models.BlockchainIndexProgress{ + BlockchainName: c.cfg.BlockchainName, + EVMStack: c.cfg.Stack.String(), + LastIngestedBlockNumber: 0, + LatestBlockNumber: 0, + }, nil + } + return nil, err + } + + err = json.Unmarshal(responseBody, &response) + if err != nil { + return nil, err + } + + progress := &models.BlockchainIndexProgress{ + BlockchainName: c.cfg.BlockchainName, + EVMStack: c.cfg.Stack.String(), + LastIngestedBlockNumber: response.LastIngestedBlockNumber, + LatestBlockNumber: response.LatestBlockNumber, + } + return progress, nil +} diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 31cc90a..6ee8229 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -46,3 +46,16 @@ type BlockchainIngestRequest struct { IdempotencyKey string Payload []byte } + +type BlockchainProgress struct { + LastIngestedBlockNumber int64 `json:"last_ingested_block_number"` + LatestBlockNumber int64 `json:"latest_block_number"` +} + +func (p *BlockchainProgress) String() string { + return fmt.Sprintf("%+v", *p) +} + +type errorResponse struct { + Error string `json:"error"` +} diff --git a/cmd/main.go b/cmd/main.go index d5c2e33..dd471d8 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -60,6 +60,22 @@ func main() { stdlog.Fatal(err) } + ctx, cancelFn := context.WithCancel(context.Background()) + + // Get stored progress unless config indicates we should start from 0 + var startBlockNumber int64 + // Default to -1 to start where the ingester left off + if cfg.BlockHeight == -1 { + progress, err := duneClient.GetProgressReport(ctx) + if err != nil { + stdlog.Fatal(err) + } + startBlockNumber = progress.LastIngestedBlockNumber + 1 + } else { + startBlockNumber = cfg.BlockHeight + } + + maxCount := int64(0) // 0 means ingest until cancelled ingester := ingester.New( logger, rpcClient, @@ -68,18 +84,20 @@ func main() { MaxBatchSize: 1, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, + Stack: cfg.RPCStack, + BlockchainName: cfg.BlockchainName, }, ) - ctx, cancelFn := context.WithCancel(context.Background()) - wg.Add(1) go func() { defer wg.Done() - err := ingester.Run(ctx, cfg.BlockHeight, 0 /* maxCount */) + err := ingester.Run(ctx, startBlockNumber, maxCount) logger.Info("Ingester finished", "err", err) }() + defer ingester.Close() + // TODO: add a metrics exporter or healthcheck http endpoint ? quit := make(chan os.Signal, 1) diff --git a/ingester/ingester.go b/ingester/ingester.go index 32de024..a5f2743 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -12,12 +12,12 @@ import ( type Ingester interface { // Run starts the ingester and blocks until the context is cancelled or maxCount blocks are ingested - Run(ctx context.Context, startBlockNumber, maxCount int64) error + Run(ctx context.Context, startBlockNumber int64, maxCount int64) error // ConsumeBlocks sends blocks from startBlockNumber to endBlockNumber to outChan, inclusive. // If endBlockNumber is -1, it sends blocks from startBlockNumber to the tip of the chain // it will run continuously until the context is cancelled - ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64) error + ConsumeBlocks(ctx context.Context, outChan chan models.RPCBlock, startBlockNumber int64, endBlockNumber int64) error // SendBlocks pushes to DuneAPI the RPCBlock Payloads as they are received in an endless loop // it will block until: @@ -28,6 +28,8 @@ type Ingester interface { // This is just a placeholder for now Info() Info + + Close() error } const ( @@ -40,6 +42,8 @@ type Config struct { MaxBatchSize int PollInterval time.Duration ReportProgressInterval time.Duration + Stack models.EVMStack + BlockchainName string } type Info struct { diff --git a/ingester/mainloop.go b/ingester/mainloop.go index 79d0b2d..0db27a7 100644 --- a/ingester/mainloop.go +++ b/ingester/mainloop.go @@ -11,29 +11,22 @@ import ( "golang.org/x/sync/errgroup" ) -func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) error { - inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) - defer close(inFlightChan) +func (i *ingester) Run(ctx context.Context, startBlockNumber int64, maxCount int64) error { + ctx, cancel := context.WithCancel(ctx) - var err error + inFlightChan := make(chan models.RPCBlock, i.cfg.MaxBatchSize) // we close this after ConsumeBlocks has returned - if startBlockNumber < 0 { - startBlockNumber, err = i.node.LatestBlockNumber() - if err != nil { - return errors.Errorf("failed to get latest block number: %w", err) - } - } + // Ingest until endBlockNumber, inclusive. If maxCount is <= 0, we ingest forever + endBlockNumber := startBlockNumber - 1 + maxCount i.log.Info("Starting ingester", "maxBatchSize", i.cfg.MaxBatchSize, "startBlockNumber", startBlockNumber, + "endBlockNumber", endBlockNumber, "maxCount", maxCount, ) errGroup, ctx := errgroup.WithContext(ctx) - errGroup.Go(func() error { - return i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, startBlockNumber+maxCount) - }) errGroup.Go(func() error { return i.SendBlocks(ctx, inFlightChan) }) @@ -41,9 +34,20 @@ func (i *ingester) Run(ctx context.Context, startBlockNumber, maxCount int64) er return i.ReportProgress(ctx) }) + err := i.ConsumeBlocks(ctx, inFlightChan, startBlockNumber, endBlockNumber) + close(inFlightChan) + cancel() + if err != nil { + if err := errGroup.Wait(); err != nil { + i.log.Error("errgroup wait", "error", err) + } + return errors.Errorf("consume blocks: %w", err) + } + if err := errGroup.Wait(); err != nil && err != ErrFinishedConsumeBlocks { return err } + return nil } @@ -53,10 +57,9 @@ var ErrFinishedConsumeBlocks = errors.New("finished ConsumeBlocks") func (i *ingester) ConsumeBlocks( ctx context.Context, outChan chan models.RPCBlock, startBlockNumber, endBlockNumber int64, ) error { - dontStop := endBlockNumber <= startBlockNumber latestBlockNumber := i.tryUpdateLatestBlockNumber() - waitForBlock := func(ctx context.Context, blockNumber, latestBlockNumber int64) int64 { + waitForBlock := func(ctx context.Context, blockNumber int64, latestBlockNumber int64) int64 { for blockNumber > latestBlockNumber { select { case <-ctx.Done(): @@ -72,15 +75,19 @@ func (i *ingester) ConsumeBlocks( return latestBlockNumber } + // Consume blocks forever if end is before start. This happens if Run is called with a maxCount of <= 0 + dontStop := endBlockNumber < startBlockNumber + for blockNumber := startBlockNumber; dontStop || blockNumber <= endBlockNumber; blockNumber++ { latestBlockNumber = waitForBlock(ctx, blockNumber, latestBlockNumber) startTime := time.Now() + i.log.Info("Getting block by number", "blockNumber", blockNumber, "latestBlockNumber", latestBlockNumber) block, err := i.node.BlockByNumber(ctx, blockNumber) if err != nil { if errors.Is(err, context.Canceled) { i.log.Info("Context canceled, stopping..") - return err + return ctx.Err() } i.log.Error("Failed to get block by number, continuing..", @@ -119,33 +126,33 @@ func (i *ingester) ConsumeBlocks( ) } } - return ErrFinishedConsumeBlocks // FIXME: this is wrong + // Done consuming blocks, either because we reached the endBlockNumber or the context was canceled + i.log.Info("Finished consuming blocks", "latestBlockNumber", latestBlockNumber, "endBlockNumber", endBlockNumber) + return ErrFinishedConsumeBlocks } func (i *ingester) SendBlocks(ctx context.Context, blocksCh <-chan models.RPCBlock) error { - for { - select { - case <-ctx.Done(): - return nil // context canceled - case payload, ok := <-blocksCh: - // TODO: we should batch RCP blocks here before sending to Dune. - if !ok { - return nil // channel closed - } - if err := i.dune.SendBlock(ctx, payload); err != nil { - // TODO: implement DeadLetterQueue - // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap - i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) - i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ - Timestamp: time.Now(), - BlockNumber: payload.BlockNumber, - Error: err, - }) - } else { - atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber) + for payload := range blocksCh { + // TODO: we should batch RCP blocks here before sending to Dune. + if err := i.dune.SendBlock(ctx, payload); err != nil { + if errors.Is(err, context.Canceled) { + i.log.Info("Context canceled, stopping..") + return ctx.Err() } + // TODO: implement DeadLetterQueue + // this will leave a "block gap" in DuneAPI, TODO: implement a way to fill this gap + i.log.Error("SendBlock failed, continuing..", "blockNumber", payload.BlockNumber, "error", err) + i.info.DuneErrors = append(i.info.DuneErrors, ErrorInfo{ + Timestamp: time.Now(), + BlockNumber: payload.BlockNumber, + Error: err, + }) + } else { + i.log.Info("Updating latest ingested block number", "blockNumber", payload.BlockNumber) + atomic.StoreInt64(&i.info.IngestedBlockNumber, payload.BlockNumber) } } + return ctx.Err() // channel closed } func (i *ingester) tryUpdateLatestBlockNumber() int64 { @@ -169,7 +176,7 @@ func (i *ingester) ReportProgress(ctx context.Context) error { for { select { case <-ctx.Done(): - return nil + return ctx.Err() case tNow := <-timer.C: latest := atomic.LoadInt64(&i.info.LatestBlockNumber) lastIngested := atomic.LoadInt64(&i.info.IngestedBlockNumber) @@ -202,10 +209,39 @@ func (i *ingester) ReportProgress(ctx context.Context) error { previousIngested = lastIngested previousDistance = newDistance previousTime = tNow + + // TODO: include errors in the report, reset the error list + err := i.dune.PostProgressReport(ctx, models.BlockchainIndexProgress{ + BlockchainName: i.cfg.BlockchainName, + EVMStack: i.cfg.Stack.String(), + LastIngestedBlockNumber: lastIngested, + LatestBlockNumber: latest, + }) + if err != nil { + i.log.Error("Failed to post progress report", "error", err) + } } } } func (i *ingester) Close() error { + // Send a final progress report to flush progress + ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second) + defer cancel() + i.log.Info("Sending final progress report") + err := i.dune.PostProgressReport( + ctx, + models.BlockchainIndexProgress{ + BlockchainName: i.cfg.BlockchainName, + EVMStack: i.cfg.Stack.String(), + LastIngestedBlockNumber: i.info.IngestedBlockNumber, + LatestBlockNumber: i.info.LatestBlockNumber, + }) + i.log.Info("Closing node") + if err != nil { + _ = i.node.Close() + return err + } + return i.node.Close() } diff --git a/ingester/mainloop_test.go b/ingester/mainloop_test.go index 3b03d94..e2a969d 100644 --- a/ingester/mainloop_test.go +++ b/ingester/mainloop_test.go @@ -68,16 +68,21 @@ func TestBlockConsumptionLoopErrors(t *testing.T) { Payload: []byte(`block`), }, nil }, + CloseFunc: func() error { + return nil + }, } - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, nil, ingester.Config{ + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, nil, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) outCh := make(chan models.RPCBlock, maxBlockNumber+1) - defer close(outCh) err := ing.ConsumeBlocks(ctx, outCh, 0, maxBlockNumber) - require.Error(t, err) // this is expected + require.ErrorIs(t, err, ingester.ErrFinishedConsumeBlocks) if tc.BlockByNumberIsBroken { require.Equal(t, producedBlockNumber, int64(0)) } @@ -100,50 +105,75 @@ func TestBlockSendingLoop(t *testing.T) { func TestRunLoopBaseCase(t *testing.T) { testCases := []struct { - name string - i int64 + name string + maxCount int64 + lastIngested int64 + expectedEndBlock int64 }{ - {name: "1 block", i: 1}, - {name: "100 blocks", i: 100}, - } - sentBlockNumber := int64(0) - producedBlockNumber := int64(0) - duneapi := &duneapi_mock.BlockchainIngesterMock{ - SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { - atomic.StoreInt64(&sentBlockNumber, block.BlockNumber) - return nil - }, - } - rpcClient := &jsonrpc_mock.BlockchainClientMock{ - LatestBlockNumberFunc: func() (int64, error) { - return 1000, nil - }, - BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { - atomic.StoreInt64(&producedBlockNumber, blockNumber) - return models.RPCBlock{ - BlockNumber: blockNumber, - Payload: []byte(`block`), - }, nil - }, + {name: "1 block", maxCount: 1, lastIngested: 0, expectedEndBlock: 1}, + {name: "2 blocks", maxCount: 2, lastIngested: 0, expectedEndBlock: 2}, + {name: "100 blocks", maxCount: 100, lastIngested: 0, expectedEndBlock: 100}, + {name: "100 blocks, starting from 50", maxCount: 100, lastIngested: 50, expectedEndBlock: 150}, } for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ - MaxBatchSize: 1, - PollInterval: 1000 * time.Millisecond, - }) + latestBlockNumber := int64(1000) + ingestedBlockNumber := int64(0) - err := ing.Run(context.Background(), 0, tc.i) - require.NoError(t, err) - require.Equal(t, producedBlockNumber, tc.i) - require.Equal(t, sentBlockNumber, tc.i) + rpcClient := &jsonrpc_mock.BlockchainClientMock{ + LatestBlockNumberFunc: func() (int64, error) { + return latestBlockNumber, nil + }, + BlockByNumberFunc: func(_ context.Context, blockNumber int64) (models.RPCBlock, error) { + atomic.StoreInt64(&latestBlockNumber, blockNumber) + return models.RPCBlock{ + BlockNumber: blockNumber, + Payload: []byte(`block`), + }, nil + }, + CloseFunc: func() error { + return nil + }, + } + + duneapi := &duneapi_mock.BlockchainIngesterMock{ + SendBlockFunc: func(_ context.Context, block models.RPCBlock) error { + atomic.StoreInt64(&ingestedBlockNumber, block.BlockNumber) + return nil + }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, + } + + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New( + slog.New(slog.NewTextHandler(logOutput, nil)), + rpcClient, + duneapi, + ingester.Config{ + MaxBatchSize: 1, + PollInterval: 1000 * time.Millisecond, + }, + ) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + startBlockNumber := tc.lastIngested + 1 + err := ing.Run(ctx, startBlockNumber, tc.maxCount) + require.ErrorIs(t, err, ingester.ErrFinishedConsumeBlocks) + // require.Equal(t, tc.lastIngested+tc.maxCount, producedBlockNumber) + require.Equal(t, tc.expectedEndBlock, atomic.LoadInt64(&ingestedBlockNumber)) }) } } func TestRunLoopUntilCancel(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) - maxBlockNumber := int64(1000) + maxBlockNumber := int64(10) sentBlockNumber := int64(0) producedBlockNumber := int64(0) duneapi := &duneapi_mock.BlockchainIngesterMock{ @@ -152,9 +182,13 @@ func TestRunLoopUntilCancel(t *testing.T) { if block.BlockNumber == maxBlockNumber { // cancel execution when we send the last block cancel() + return context.Canceled } return nil }, + PostProgressReportFunc: func(_ context.Context, _ models.BlockchainIndexProgress) error { + return nil + }, } rpcClient := &jsonrpc_mock.BlockchainClientMock{ LatestBlockNumberFunc: func() (int64, error) { @@ -167,14 +201,19 @@ func TestRunLoopUntilCancel(t *testing.T) { Payload: []byte(`block`), }, nil }, + CloseFunc: func() error { + return nil + }, } - ing := ingester.New(slog.New(slog.NewTextHandler(io.Discard, nil)), rpcClient, duneapi, ingester.Config{ + // Swap these to see logs + // logOutput := os.Stderr + logOutput := io.Discard + ing := ingester.New(slog.New(slog.NewTextHandler(logOutput, nil)), rpcClient, duneapi, ingester.Config{ MaxBatchSize: 1, PollInterval: 1000 * time.Millisecond, }) - err := ing.Run(ctx, 0, maxBlockNumber) - require.NoError(t, err) - require.Equal(t, producedBlockNumber, maxBlockNumber) + err := ing.Run(ctx, 1, -1) // run until canceled + require.ErrorIs(t, err, context.Canceled) // this is expected require.Equal(t, sentBlockNumber, maxBlockNumber) } diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index ccea12d..f918ed3 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -20,6 +20,12 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // // make and configure a mocked duneapi.BlockchainIngester // mockedBlockchainIngester := &BlockchainIngesterMock{ +// GetProgressReportFunc: func(ctx context.Context) (*models.BlockchainIndexProgress, error) { +// panic("mock out the GetProgressReport method") +// }, +// PostProgressReportFunc: func(ctx context.Context, progress models.BlockchainIndexProgress) error { +// panic("mock out the PostProgressReport method") +// }, // SendBlockFunc: func(ctx context.Context, payload models.RPCBlock) error { // panic("mock out the SendBlock method") // }, @@ -30,11 +36,29 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // } type BlockchainIngesterMock struct { + // GetProgressReportFunc mocks the GetProgressReport method. + GetProgressReportFunc func(ctx context.Context) (*models.BlockchainIndexProgress, error) + + // PostProgressReportFunc mocks the PostProgressReport method. + PostProgressReportFunc func(ctx context.Context, progress models.BlockchainIndexProgress) error + // SendBlockFunc mocks the SendBlock method. SendBlockFunc func(ctx context.Context, payload models.RPCBlock) error // calls tracks calls to the methods. calls struct { + // GetProgressReport holds details about calls to the GetProgressReport method. + GetProgressReport []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } + // PostProgressReport holds details about calls to the PostProgressReport method. + PostProgressReport []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Progress is the progress argument value. + Progress models.BlockchainIndexProgress + } // SendBlock holds details about calls to the SendBlock method. SendBlock []struct { // Ctx is the ctx argument value. @@ -43,7 +67,77 @@ type BlockchainIngesterMock struct { Payload models.RPCBlock } } - lockSendBlock sync.RWMutex + lockGetProgressReport sync.RWMutex + lockPostProgressReport sync.RWMutex + lockSendBlock sync.RWMutex +} + +// GetProgressReport calls GetProgressReportFunc. +func (mock *BlockchainIngesterMock) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { + if mock.GetProgressReportFunc == nil { + panic("BlockchainIngesterMock.GetProgressReportFunc: method is nil but BlockchainIngester.GetProgressReport was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockGetProgressReport.Lock() + mock.calls.GetProgressReport = append(mock.calls.GetProgressReport, callInfo) + mock.lockGetProgressReport.Unlock() + return mock.GetProgressReportFunc(ctx) +} + +// GetProgressReportCalls gets all the calls that were made to GetProgressReport. +// Check the length with: +// +// len(mockedBlockchainIngester.GetProgressReportCalls()) +func (mock *BlockchainIngesterMock) GetProgressReportCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockGetProgressReport.RLock() + calls = mock.calls.GetProgressReport + mock.lockGetProgressReport.RUnlock() + return calls +} + +// PostProgressReport calls PostProgressReportFunc. +func (mock *BlockchainIngesterMock) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error { + if mock.PostProgressReportFunc == nil { + panic("BlockchainIngesterMock.PostProgressReportFunc: method is nil but BlockchainIngester.PostProgressReport was just called") + } + callInfo := struct { + Ctx context.Context + Progress models.BlockchainIndexProgress + }{ + Ctx: ctx, + Progress: progress, + } + mock.lockPostProgressReport.Lock() + mock.calls.PostProgressReport = append(mock.calls.PostProgressReport, callInfo) + mock.lockPostProgressReport.Unlock() + return mock.PostProgressReportFunc(ctx, progress) +} + +// PostProgressReportCalls gets all the calls that were made to PostProgressReport. +// Check the length with: +// +// len(mockedBlockchainIngester.PostProgressReportCalls()) +func (mock *BlockchainIngesterMock) PostProgressReportCalls() []struct { + Ctx context.Context + Progress models.BlockchainIndexProgress +} { + var calls []struct { + Ctx context.Context + Progress models.BlockchainIndexProgress + } + mock.lockPostProgressReport.RLock() + calls = mock.calls.PostProgressReport + mock.lockPostProgressReport.RUnlock() + return calls } // SendBlock calls SendBlockFunc. diff --git a/models/progress.go b/models/progress.go new file mode 100644 index 0000000..9727aa7 --- /dev/null +++ b/models/progress.go @@ -0,0 +1,8 @@ +package models + +type BlockchainIndexProgress struct { + BlockchainName string + EVMStack string + LastIngestedBlockNumber int64 + LatestBlockNumber int64 +}