diff --git a/api/handlers/transactions/repository.go b/api/handlers/transactions/repository.go index a46961cd6..2f74804fc 100644 --- a/api/handlers/transactions/repository.go +++ b/api/handlers/transactions/repository.go @@ -2,7 +2,11 @@ package transactions import ( "context" + errors2 "errors" "fmt" + "github.com/influxdata/influxdb-client-go/v2/api" + "github.com/influxdata/influxdb-client-go/v2/api/query" + "github.com/pkg/errors" "github.com/valyala/fasthttp" "strconv" "strings" @@ -10,9 +14,8 @@ import ( "time" influxdb2 "github.com/influxdata/influxdb-client-go/v2" - "github.com/influxdata/influxdb-client-go/v2/api" + //"github.com/influxdata/influxdb-client-go/v2/api" "github.com/mitchellh/mapstructure" - "github.com/pkg/errors" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/common" "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors" @@ -127,10 +130,10 @@ type repositoryCollections struct { } type Repository struct { - tvl *tvl.Tvl + tvl getTvl p2pNetwork string influxCli influxdb2.Client - queryAPI api.QueryAPI + queryAPI influxQueryAPI bucketInfiniteRetention string bucket30DaysRetention string bucket24HoursRetention string @@ -140,6 +143,45 @@ type Repository struct { logger *zap.Logger } +type influxQueryAPI interface { + Query(ctx context.Context, query string) (influxQueryResult, error) +} + +type influxQueryResult interface { + Err() error + Next() bool + Record() *query.FluxRecord +} + +type influxAdapter struct { + influxAPI api.QueryAPI +} + +func (i *influxAdapter) Query(ctx context.Context, query string) (influxQueryResult, error) { + result, err := i.influxAPI.Query(ctx, query) + return &influxResult{result}, err +} + +type influxResult struct { + result *api.QueryTableResult +} + +func (i *influxResult) Err() error { + return i.result.Err() +} + +func (i *influxResult) Next() bool { + return i.result.Next() +} + +func (i *influxResult) Record() *query.FluxRecord { + return i.result.Record() +} + +type getTvl interface { + Get(ctx context.Context) (string, error) +} + type offset string const _24h offset = "24h" @@ -160,7 +202,7 @@ func NewRepository( tvl: tvl, p2pNetwork: p2pNetwork, influxCli: client, - queryAPI: client.QueryAPI(org), + queryAPI: &influxAdapter{client.QueryAPI(org)}, bucket24HoursRetention: bucket24HoursRetention, bucket30DaysRetention: bucket30DaysRetention, bucketInfiniteRetention: bucketInfiniteRetention, @@ -442,87 +484,89 @@ func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) { // We use a `sync.WaitGroup` to block until all goroutines are done. var wg sync.WaitGroup - var messages24h, tvl, totalTxCount, totalTxVolume, volume24h, volume7d, volume30d, totalPythMessage string + ctxWithCancel, cancel := context.WithCancel(ctx) + defer cancel() + + var resultErr error + mutex := &sync.Mutex{} + collectError := func(err error) { + mutex.Lock() + resultErr = errors2.Join(resultErr, err) + mutex.Unlock() + } + + handleErr := func(errMsgLog string, err error) { + if err != nil { + r.logger.Error(errMsgLog, zap.Error(err)) + collectError(err) + cancel() // this will signal the rest of goroutines to exit also. + } + } + + var messages24h, totalValueLocked, totalTxCount, totalTxVolume, volume24h, volume7d, volume30d, totalPythMessage string wg.Add(1) go func() { defer wg.Done() var err error - messages24h, err = r.getMessages24h(ctx) - if err != nil { - r.logger.Error("failed to query 24h messages", zap.Error(err)) - } + messages24h, err = r.getMessages24h(ctxWithCancel) + handleErr("failed to get 24h messages", err) }() wg.Add(1) go func() { defer wg.Done() var err error - tvl, err = r.tvl.Get(ctx) - if err != nil { - r.logger.Error("failed to get tvl", zap.Error(err)) - } + totalValueLocked, err = r.tvl.Get(ctxWithCancel) + handleErr("failed to get tvl", err) }() wg.Add(1) go func() { defer wg.Done() var err error - totalTxCount, err = r.getTotalTxCount(ctx) - if err != nil { - r.logger.Error("failed to tx count", zap.Error(err)) - } - + totalTxCount, err = r.getTotalTxCount(ctxWithCancel) + handleErr("failed to get total tx count", err) }() wg.Add(1) go func() { defer wg.Done() var err error - totalPythMessage, err = r.getTotalPythMessage(ctx) - if err != nil { - r.logger.Error("failed to get total pyth message", zap.Error(err)) - } + totalPythMessage, err = r.getTotalPythMessage(ctxWithCancel) + handleErr("failed to get total pyth message", err) }() wg.Add(1) go func() { defer wg.Done() var err error - totalTxVolume, err = r.getTotalTxVolume(ctx) - if err != nil { - r.logger.Error("failed to get total tx volume", zap.Error(err)) - } + totalTxVolume, err = r.getTotalTxVolume(ctxWithCancel) + handleErr("failed to get total tx volume", err) }() wg.Add(1) go func() { defer wg.Done() var err error - volume24h, err = r.getVolume(ctx, _24h) - if err != nil { - r.logger.Error("failed to get 24h volume", zap.Error(err)) - } + volume24h, err = r.getVolume(ctxWithCancel, _24h) + handleErr("failed to get 24h volume", err) }() wg.Add(1) go func() { defer wg.Done() var err error - volume7d, err = r.getVolume(ctx, _7d) - if err != nil { - r.logger.Error("failed to get 7d volume", zap.Error(err)) - } + volume7d, err = r.getVolume(ctxWithCancel, _7d) + handleErr("failed to get 7d volume", err) }() wg.Add(1) go func() { defer wg.Done() var err error - volume30d, err = r.getVolume(ctx, _30d) - if err != nil { - r.logger.Error("failed to get 30d volume", zap.Error(err)) - } + volume30d, err = r.getVolume(ctxWithCancel, _30d) + handleErr("failed to get 30d volume", err) }() // Each of the queries synchronized by this wait group has a context timeout. @@ -538,12 +582,12 @@ func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) { TotalMessages: totalMessage, TotalTxCount: totalTxCount, TotalTxVolume: totalTxVolume, - Tvl: tvl, + Tvl: totalValueLocked, Volume24h: volume24h, Volume7d: volume7d, Volume30d: volume30d, } - return &scorecards, nil + return &scorecards, resultErr } // calculateTotalMessage calculate the total message from the total tx count and the total pyth message @@ -572,8 +616,8 @@ func calculateTotalMessage(p2pNetwork string, totalTxCount, totalPythMessage str func (r *Repository) getTotalTxCount(ctx context.Context) (string, error) { - query := buildTotalTrxCountQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now()) - result, err := r.queryAPI.Query(ctx, query) + trxCountQuery := buildTotalTrxCountQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now()) + result, err := r.queryAPI.Query(ctx, trxCountQuery) if err != nil { r.logger.Error("failed to query total tx count by portal bridge", zap.Error(err)) return "", err @@ -596,8 +640,8 @@ func (r *Repository) getTotalTxCount(ctx context.Context) (string, error) { func (r *Repository) getTotalTxVolume(ctx context.Context) (string, error) { - query := buildTotalTrxVolumeQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now()) - result, err := r.queryAPI.Query(ctx, query) + trxVolumeQuery := buildTotalTrxVolumeQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now()) + result, err := r.queryAPI.Query(ctx, trxVolumeQuery) if err != nil { r.logger.Error("failed to query total tx volume by portal bridge", zap.Error(err)) return "", err @@ -621,8 +665,8 @@ func (r *Repository) getTotalTxVolume(ctx context.Context) (string, error) { func (r *Repository) getMessages24h(ctx context.Context) (string, error) { // query 24h transactions - query := fmt.Sprintf(queryTemplateMessages24h, r.bucket24HoursRetention, r.bucket24HoursRetention) - result, err := r.queryAPI.Query(ctx, query) + msg24hrVolumeQuery := buildMessages24HrQuery(r.bucket24HoursRetention) + result, err := r.queryAPI.Query(ctx, msg24hrVolumeQuery) if err != nil { r.logger.Error("failed to query 24h messages", zap.Error(err)) return "", err @@ -646,11 +690,15 @@ func (r *Repository) getMessages24h(ctx context.Context) (string, error) { return fmt.Sprint(row.Value), nil } +func buildMessages24HrQuery(bucket24Hr string) string { + return fmt.Sprintf(queryTemplateMessages24h, bucket24Hr, bucket24Hr) +} + func (r *Repository) getVolume(ctx context.Context, from offset) (string, error) { // query volume - query := fmt.Sprintf(queryTemplateVolume, r.bucketInfiniteRetention, from) - result, err := r.queryAPI.Query(ctx, query) + queryVolume := buildVolumeQuery(r.bucketInfiniteRetention, from) + result, err := r.queryAPI.Query(ctx, queryVolume) if err != nil { r.logger.Error("failed to query volume", zap.Any("from", from), zap.Error(err)) return "", err @@ -676,6 +724,10 @@ func (r *Repository) getVolume(ctx context.Context, from offset) (string, error) return volume, nil } +func buildVolumeQuery(bucketInfinite string, from offset) string { + return fmt.Sprintf(queryTemplateVolume, bucketInfinite, from) +} + // GetTransactionCount get the last transactions. func (r *Repository) GetTransactionCount(ctx context.Context, q *TransactionCountQuery) ([]TransactionCountResult, error) { query := buildLastTrxQuery(r.bucket30DaysRetention, time.Now(), q) @@ -724,7 +776,8 @@ func (r *Repository) getTotalPythMessage(ctx context.Context) (string, error) { filter := bson.M{"emitterAddr": pythEmitterAddr} options := options.FindOne().SetSort(bson.D{{Key: "timestamp", Value: -1}}) - err := r.collections.vaasPythnet.FindOne(ctx, filter, options).Decode(&vaaPyth) + singleResult := r.collections.vaasPythnet.FindOne(ctx, filter, options) + err := singleResult.Decode(&vaaPyth) if err != nil { if err == mongo.ErrNoDocuments { r.logger.Warn("no pyth message found") diff --git a/api/handlers/transactions/repository_test.go b/api/handlers/transactions/repository_test.go index 52d0d20bb..f3746750b 100644 --- a/api/handlers/transactions/repository_test.go +++ b/api/handlers/transactions/repository_test.go @@ -1,8 +1,16 @@ package transactions import ( + "context" + "errors" + "github.com/influxdata/influxdb-client-go/v2/api/query" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/config" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo/integration/mtest" + "go.uber.org/zap" "strings" "testing" "time" @@ -721,3 +729,187 @@ func Test_buildTokenSymbolActivityQuery(t *testing.T) { }) } } + +func TestGetScorecards(t *testing.T) { + + m := mtest.New(t, mtest.NewOptions().ClientType(mtest.Mock)) + defer m.Close() + + logger, _ := zap.NewDevelopment() + + tests := []struct { + name string + mockTvlReturn string + mockTvlErr error + mockQueryResults map[string]struct { + res *mockInfluxQueryResult + expectedQuery string + } + mockPythResponse bson.D + expectedErr bool + expectedScorecards *Scorecards + }{ + { + name: "All queries succeed", + mockTvlReturn: "1000", + mockTvlErr: nil, + mockQueryResults: map[string]struct { + res *mockInfluxQueryResult + expectedQuery string + }{ + "messages24h": {mockInfluxResult(100), buildMessages24HrQuery("wormscan-24hours")}, + "totalTxCount": {mockInfluxResult(100), buildTotalTrxCountQuery("wormscan", "wormscan-30days", time.Now())}, + "totalTxVolume": {mockInfluxResult(1000e8), buildTotalTrxVolumeQuery("wormscan", "wormscan-30days", time.Now())}, + "volume24h": {mockInfluxResult(200e8), buildVolumeQuery("wormscan", _24h)}, + "volume7d": {mockInfluxResult(1500e8), buildVolumeQuery("wormscan", _7d)}, + "volume30d": {mockInfluxResult(5000e8), buildVolumeQuery("wormscan", _30d)}, + }, + mockPythResponse: bson.D{{"_id", "some-id"}, {"sequence", "123456"}}, + expectedErr: false, + expectedScorecards: &Scorecards{ + Messages24h: "100", + TotalMessages: "965587054", // 965463498 + 100 + 123456 + TotalTxCount: "100", + TotalTxVolume: "1000.00000000", + Tvl: "1000", + Volume24h: "200.00000000", + Volume7d: "1500.00000000", + Volume30d: "5000.00000000", + }, + }, + { + name: "Tvl query fails", + mockTvlReturn: "", + mockTvlErr: errors.New("mock_tvl_error"), + mockQueryResults: map[string]struct { + res *mockInfluxQueryResult + expectedQuery string + }{ + "messages24h": {mockInfluxResult(100), buildMessages24HrQuery("wormscan-24hours")}, + "totalTxCount": {mockInfluxResult(100), buildTotalTrxCountQuery("wormscan", "wormscan-30days", time.Now())}, + "totalTxVolume": {mockInfluxResult(1000e8), buildTotalTrxVolumeQuery("wormscan", "wormscan-30days", time.Now())}, + "volume24h": {mockInfluxResult(200e8), buildVolumeQuery("wormscan", _24h)}, + "volume7d": {mockInfluxResult(1500e8), buildVolumeQuery("wormscan", _7d)}, + "volume30d": {mockInfluxResult(5000e8), buildVolumeQuery("wormscan", _30d)}, + }, + mockPythResponse: bson.D{{"_id", "some-id"}, {"sequence", "123456"}}, + expectedErr: true, + expectedScorecards: nil, + }, + + { + name: "Multiple queries fail", + mockTvlReturn: "1000", + mockTvlErr: nil, + mockQueryResults: map[string]struct { + res *mockInfluxQueryResult + expectedQuery string + }{ + "messages24h": {mockInfluxError("failed_query"), buildMessages24HrQuery("wormscan-24hours")}, + "totalTxCount": {mockInfluxResult(100), buildTotalTrxCountQuery("wormscan", "wormscan-30days", time.Now())}, + "totalTxVolume": {mockInfluxError("failed_query"), buildTotalTrxVolumeQuery("wormscan", "wormscan-30days", time.Now())}, + "volume24h": {mockInfluxResult(200e8), buildVolumeQuery("wormscan", _24h)}, + "volume7d": {mockInfluxResult(1500e8), buildVolumeQuery("wormscan", _7d)}, + "volume30d": {mockInfluxResult(5000e8), buildVolumeQuery("wormscan", _30d)}, + }, + mockPythResponse: nil, // Simulating no documents found + expectedErr: true, + expectedScorecards: nil, + }, + } + + for _, tt := range tests { + m.Run(tt.name, func(mt *mtest.T) { + tvlMock := new(mockTvl) + queryAPIMock := new(mockQueryAPI) + mt.AddMockResponses(mtest.CreateCursorResponse(1, "wormhole.vaasPythnet", "firstBatch", tt.mockPythResponse)) + + repo := &Repository{ + tvl: tvlMock, + queryAPI: queryAPIMock, + logger: logger, + p2pNetwork: config.P2pMainNet, + collections: repositoryCollections{ + vaasPythnet: mt.Coll, + }, + bucket24HoursRetention: "wormscan-24hours", + bucket30DaysRetention: "wormscan-30days", + bucketInfiniteRetention: "wormscan", + } + + tvlMock.On("Get", mock.Anything).Return(tt.mockTvlReturn, tt.mockTvlErr) + + for _, result := range tt.mockQueryResults { + queryAPIMock.On("Query", mock.Anything, result.expectedQuery).Return(result.res, nil) + } + + scorecards, err := repo.GetScorecards(context.Background()) + + if tt.expectedErr { + assert.Error(t, err) + } else { + assert.NoError(t, err) + assert.NotNil(t, scorecards) + assert.Equal(t, tt.expectedScorecards, scorecards) + } + + tvlMock.AssertExpectations(t) + queryAPIMock.AssertExpectations(t) + }) + } +} + +// Helper function to create a mock influx result +func mockInfluxResult(value uint64) *mockInfluxQueryResult { + result := new(mockInfluxQueryResult) + result.On("Next").Return(true) + result.On("Record").Return(query.NewFluxRecord(0, map[string]interface{}{"_value": value})) + result.On("Err").Return(nil) + return result +} + +// Helper function to create a mock influx result with an error +func mockInfluxError(errMsg string) *mockInfluxQueryResult { + result := new(mockInfluxQueryResult) + result.On("Next").Return(false) + result.On("Err").Return(errors.New(errMsg)) + return result +} + +// Mocking the Tvl interface +type mockTvl struct { + mock.Mock +} + +func (m *mockTvl) Get(ctx context.Context) (string, error) { + args := m.Called(ctx) + return args.String(0), args.Error(1) +} + +// Mocking the influxQueryAPI interface +type mockQueryAPI struct { + mock.Mock +} + +func (m *mockQueryAPI) Query(ctx context.Context, query string) (influxQueryResult, error) { + args := m.Called(ctx, query) + return args.Get(0).(influxQueryResult), args.Error(1) +} + +// Mocking the influxQueryResult interface +type mockInfluxQueryResult struct { + mock.Mock +} + +func (m *mockInfluxQueryResult) Err() error { + return m.Called().Error(0) +} + +func (m *mockInfluxQueryResult) Next() bool { + return m.Called().Bool(0) +} + +func (m *mockInfluxQueryResult) Record() *query.FluxRecord { + args := m.Called() + return args.Get(0).(*query.FluxRecord) +}