diff --git a/client/duneapi/client.go b/client/duneapi/client.go index a7dc13f..7a00920 100644 --- a/client/duneapi/client.go +++ b/client/duneapi/client.go @@ -5,7 +5,9 @@ import ( "context" "encoding/json" "fmt" + "io" "log/slog" + "net/http" "sync" "time" @@ -22,6 +24,12 @@ type BlockchainIngester interface { // SendBlock sends a block to DuneAPI SendBlock(ctx context.Context, payload models.RPCBlock) error + // GetProgressReport gets a progress report from DuneAPI + GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) + + // PostProgressReport sends a progress report to DuneAPI + PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error + // - API to discover the latest block number ingested // this can also provide "next block ranges" to push to DuneAPI // - log/metrics on catching up/falling behind, distance from tip of chain @@ -155,3 +163,135 @@ func (c *client) idempotencyKey(rpcBlock models.RPCBlock) string { func (c *client) Close() error { return c.compressor.Close() } + +func (c *client) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error { + var request BlockchainProgress + var err error + var responseStatus string + var responseBody string + start := time.Now() + + // Log response + defer func() { + if err != nil { + c.log.Error("Sending progress report failed", + "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "error", err, + "statusCode", responseStatus, + "duration", time.Since(start), + "responseBody", responseBody, + ) + } else { + c.log.Info("Sent progress report", + "lastIngestedBlockNumer", request.LastIngestedBlockNumber, + "duration", time.Since(start), + ) + } + }() + + url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) + c.log.Debug("Sending request", "url", url) + payload, err := json.Marshal(progress) + if err != nil { + return err + } + req, err := retryablehttp.NewRequestWithContext(ctx, "POST", url, bytes.NewReader(payload)) + if err != nil { + return err + } + req.Header.Set("Content-Type", "application/json") + req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + responseStatus = resp.Status + + if resp.StatusCode != http.StatusOK { + bs, err := io.ReadAll(resp.Body) + responseBody = string(bs) + if err != nil { + return err + } + err = fmt.Errorf("got non-OK response, status code: %s body: %s", responseStatus, responseBody) + return err + } + + return nil +} + +func (c *client) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { + var response BlockchainProgress + var err error + var responseStatus string + start := time.Now() + + // Log response + defer func() { + if err != nil { + c.log.Error("Getting progress report failed", + "error", err, + "statusCode", responseStatus, + "duration", time.Since(start), + ) + } else { + c.log.Info("Got progress report", + "progress", response.String(), + "duration", time.Since(start), + ) + } + }() + + url := fmt.Sprintf("%s/api/beta/blockchain/%s/ingest/progress", c.cfg.URL, c.cfg.BlockchainName) + c.log.Debug("Sending request", "url", url) + req, err := retryablehttp.NewRequestWithContext(ctx, "GET", url, nil) // empty body + if err != nil { + return nil, err + } + req.Header.Set("x-dune-api-key", c.cfg.APIKey) + req = req.WithContext(ctx) + resp, err := c.httpClient.Do(req) + if err != nil { + return nil, err + } + defer resp.Body.Close() + + responseBody, err := io.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + if resp.StatusCode != http.StatusOK { + var errorResp errorResponse + err = json.Unmarshal(responseBody, &errorResp) + if err != nil { + return nil, err + } + err = fmt.Errorf("got non-OK response, status code: %d, body: '%s'", resp.StatusCode, errorResp.Error) + // No progress yet + if resp.StatusCode == http.StatusNotFound { + return &models.BlockchainIndexProgress{ + BlockchainName: c.cfg.BlockchainName, + EVMStack: c.cfg.Stack.String(), + LastIngestedBlockNumber: 0, + LatestBlockNumber: 0, + }, nil + } + return nil, err + } + + err = json.Unmarshal(responseBody, &response) + if err != nil { + return nil, err + } + + progress := &models.BlockchainIndexProgress{ + BlockchainName: c.cfg.BlockchainName, + EVMStack: c.cfg.Stack.String(), + LastIngestedBlockNumber: response.LastIngestedBlockNumber, + LatestBlockNumber: response.LatestBlockNumber, + } + return progress, nil +} diff --git a/client/duneapi/models.go b/client/duneapi/models.go index 31cc90a..6ee8229 100644 --- a/client/duneapi/models.go +++ b/client/duneapi/models.go @@ -46,3 +46,16 @@ type BlockchainIngestRequest struct { IdempotencyKey string Payload []byte } + +type BlockchainProgress struct { + LastIngestedBlockNumber int64 `json:"last_ingested_block_number"` + LatestBlockNumber int64 `json:"latest_block_number"` +} + +func (p *BlockchainProgress) String() string { + return fmt.Sprintf("%+v", *p) +} + +type errorResponse struct { + Error string `json:"error"` +} diff --git a/cmd/main.go b/cmd/main.go index d5c2e33..c93c203 100644 --- a/cmd/main.go +++ b/cmd/main.go @@ -68,6 +68,8 @@ func main() { MaxBatchSize: 1, ReportProgressInterval: cfg.ReportProgressInterval, PollInterval: cfg.PollInterval, + Stack: cfg.RPCStack, + BlockchainName: cfg.BlockchainName, }, ) diff --git a/ingester/ingester.go b/ingester/ingester.go index 32de024..e3f7bc7 100644 --- a/ingester/ingester.go +++ b/ingester/ingester.go @@ -40,6 +40,8 @@ type Config struct { MaxBatchSize int PollInterval time.Duration ReportProgressInterval time.Duration + Stack models.EVMStack + BlockchainName string } type Info struct { diff --git a/mocks/duneapi/client.go b/mocks/duneapi/client.go index ccea12d..f918ed3 100644 --- a/mocks/duneapi/client.go +++ b/mocks/duneapi/client.go @@ -20,6 +20,12 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // // make and configure a mocked duneapi.BlockchainIngester // mockedBlockchainIngester := &BlockchainIngesterMock{ +// GetProgressReportFunc: func(ctx context.Context) (*models.BlockchainIndexProgress, error) { +// panic("mock out the GetProgressReport method") +// }, +// PostProgressReportFunc: func(ctx context.Context, progress models.BlockchainIndexProgress) error { +// panic("mock out the PostProgressReport method") +// }, // SendBlockFunc: func(ctx context.Context, payload models.RPCBlock) error { // panic("mock out the SendBlock method") // }, @@ -30,11 +36,29 @@ var _ duneapi.BlockchainIngester = &BlockchainIngesterMock{} // // } type BlockchainIngesterMock struct { + // GetProgressReportFunc mocks the GetProgressReport method. + GetProgressReportFunc func(ctx context.Context) (*models.BlockchainIndexProgress, error) + + // PostProgressReportFunc mocks the PostProgressReport method. + PostProgressReportFunc func(ctx context.Context, progress models.BlockchainIndexProgress) error + // SendBlockFunc mocks the SendBlock method. SendBlockFunc func(ctx context.Context, payload models.RPCBlock) error // calls tracks calls to the methods. calls struct { + // GetProgressReport holds details about calls to the GetProgressReport method. + GetProgressReport []struct { + // Ctx is the ctx argument value. + Ctx context.Context + } + // PostProgressReport holds details about calls to the PostProgressReport method. + PostProgressReport []struct { + // Ctx is the ctx argument value. + Ctx context.Context + // Progress is the progress argument value. + Progress models.BlockchainIndexProgress + } // SendBlock holds details about calls to the SendBlock method. SendBlock []struct { // Ctx is the ctx argument value. @@ -43,7 +67,77 @@ type BlockchainIngesterMock struct { Payload models.RPCBlock } } - lockSendBlock sync.RWMutex + lockGetProgressReport sync.RWMutex + lockPostProgressReport sync.RWMutex + lockSendBlock sync.RWMutex +} + +// GetProgressReport calls GetProgressReportFunc. +func (mock *BlockchainIngesterMock) GetProgressReport(ctx context.Context) (*models.BlockchainIndexProgress, error) { + if mock.GetProgressReportFunc == nil { + panic("BlockchainIngesterMock.GetProgressReportFunc: method is nil but BlockchainIngester.GetProgressReport was just called") + } + callInfo := struct { + Ctx context.Context + }{ + Ctx: ctx, + } + mock.lockGetProgressReport.Lock() + mock.calls.GetProgressReport = append(mock.calls.GetProgressReport, callInfo) + mock.lockGetProgressReport.Unlock() + return mock.GetProgressReportFunc(ctx) +} + +// GetProgressReportCalls gets all the calls that were made to GetProgressReport. +// Check the length with: +// +// len(mockedBlockchainIngester.GetProgressReportCalls()) +func (mock *BlockchainIngesterMock) GetProgressReportCalls() []struct { + Ctx context.Context +} { + var calls []struct { + Ctx context.Context + } + mock.lockGetProgressReport.RLock() + calls = mock.calls.GetProgressReport + mock.lockGetProgressReport.RUnlock() + return calls +} + +// PostProgressReport calls PostProgressReportFunc. +func (mock *BlockchainIngesterMock) PostProgressReport(ctx context.Context, progress models.BlockchainIndexProgress) error { + if mock.PostProgressReportFunc == nil { + panic("BlockchainIngesterMock.PostProgressReportFunc: method is nil but BlockchainIngester.PostProgressReport was just called") + } + callInfo := struct { + Ctx context.Context + Progress models.BlockchainIndexProgress + }{ + Ctx: ctx, + Progress: progress, + } + mock.lockPostProgressReport.Lock() + mock.calls.PostProgressReport = append(mock.calls.PostProgressReport, callInfo) + mock.lockPostProgressReport.Unlock() + return mock.PostProgressReportFunc(ctx, progress) +} + +// PostProgressReportCalls gets all the calls that were made to PostProgressReport. +// Check the length with: +// +// len(mockedBlockchainIngester.PostProgressReportCalls()) +func (mock *BlockchainIngesterMock) PostProgressReportCalls() []struct { + Ctx context.Context + Progress models.BlockchainIndexProgress +} { + var calls []struct { + Ctx context.Context + Progress models.BlockchainIndexProgress + } + mock.lockPostProgressReport.RLock() + calls = mock.calls.PostProgressReport + mock.lockPostProgressReport.RUnlock() + return calls } // SendBlock calls SendBlockFunc. diff --git a/models/progress.go b/models/progress.go new file mode 100644 index 0000000..9727aa7 --- /dev/null +++ b/models/progress.go @@ -0,0 +1,8 @@ +package models + +type BlockchainIndexProgress struct { + BlockchainName string + EVMStack string + LastIngestedBlockNumber int64 + LatestBlockNumber int64 +}