diff --git a/.gitignore b/.gitignore index 8194300a6..8ac2603c9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ wormholeTxs.json serviceAccountKey.json bigtableAccountKey.json tsconfig.tsbuildinfo -serviceAccount.json \ No newline at end of file +serviceAccount.json +.run \ No newline at end of file diff --git a/analytics/scripts/protocols_stats_1d.flux b/analytics/scripts/protocols_stats_1d.flux new file mode 100644 index 000000000..8314a7ec2 --- /dev/null +++ b/analytics/scripts/protocols_stats_1d.flux @@ -0,0 +1,56 @@ +import "date" + + +calculateProtocolStats = (protocol,protocolVersion,taskCfg) => { + +totalValueTransferred = from(bucket: taskCfg.sourceBucket) + |> range(start: taskCfg.since, stop:taskCfg.ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume" and r._value > 0) + |> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"]) + |> group() + |> sum() + |> map(fn: (r) => ({r with _time: time(v:taskCfg.since)})) + |> set(key: "_field", value: "total_value_transferred") + +totalMessages = from(bucket: taskCfg.sourceBucket) + |> range(start: taskCfg.since, stop:taskCfg.ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume") + |> group() + |> count() + |> map(fn: (r) => ({r with _time: time(v:taskCfg.since)})) + |> set(key: "_field", value: "total_messages") + +return union(tables:[totalMessages,totalValueTransferred]) + |> set(key: "app_id", value: protocol) + |> set(key: "version", value: protocolVersion) + |> set(key: "_measurement", value: taskCfg.destMeasurement) + |> map(fn: (r) => ({r with _time: time(v:taskCfg.since)})) + |>to(bucket: taskCfg.destBucket) +} + + + +ts = date.truncate(t: now(), unit: 1d) +bucketInfinite = "wormscan" +bucket30d = "wormscan-30days" + +cfg = { + sourceBucket:bucketInfinite, + destBucket:bucketInfinite, + destMeasurement:"core_protocols_stats_1d", + since: date.sub(d: 1d, from: ts), + ts:ts, +} + +// Set this variable with the cfg of the desired task + +option task = { + name: "cctp and portal_token_bridge metrics every day", + every: 1d, +} + +calculateProtocolStats(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",taskCfg:cfg) + +calculateProtocolStats(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",taskCfg:cfg) \ No newline at end of file diff --git a/analytics/scripts/protocols_stats_1h.flux b/analytics/scripts/protocols_stats_1h.flux new file mode 100644 index 000000000..0295143eb --- /dev/null +++ b/analytics/scripts/protocols_stats_1h.flux @@ -0,0 +1,50 @@ +import "date" + +option task = { + name: "cctp and portal_token_bridge metrics every hour", + every: 1h, +} + + +calculateLastHourMetrics = (protocol,protocolVersion,ts) => { + +since = date.sub(d: 1h, from: ts) +sourceBucket = "wormscan" +destMeasurement = "core_protocols_stats_1h" +bucket30d = "wormscan-30days" + +totalValueTransferred = from(bucket: sourceBucket) + |> range(start: since, stop:ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume" and r._value > 0) + |> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"]) + |> group() + |> sum() + |> map(fn: (r) => ({r with _time: since})) + |> set(key: "_field", value: "total_value_transferred") + +totalMessages = from(bucket: sourceBucket) + |> range(start: since, stop:ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume") + |> group() + |> count() + |> map(fn: (r) => ({r with _time: since})) + |> set(key: "_field", value: "total_messages") + +return union(tables:[totalMessages,totalValueTransferred]) // if nothing happened during the last hour then union will result in empty and no point will be added. + |> set(key: "app_id", value: protocol) + |> set(key: "version", value: protocolVersion) + |> set(key: "_measurement", value: destMeasurement) + |> map(fn: (r) => ({r with _time: since})) + |>to(bucket: bucket30d) +} + +ts = date.truncate(t: now(), unit: 1h) + +// execute function for CCTP_WORMHOLE_INTEGRATION +calculateLastHourMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts) + + +// execute function for PORTAL_TOKEN_BRIDGE +calculateLastHourMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts) \ No newline at end of file diff --git a/api/handlers/protocols/repository.go b/api/handlers/protocols/repository.go index 235558c1a..1fb724cc9 100644 --- a/api/handlers/protocols/repository.go +++ b/api/handlers/protocols/repository.go @@ -28,20 +28,117 @@ from(bucket: "%s") const QueryTemplateActivityLatestPoint = ` from(bucket: "%s") - |> range(start: -1d) + |> range(start: 1970-01-01T00:00:00Z) |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") |> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"]) |> last() |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") ` +// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day +const QueryIntProtocolsTotalStartOfDay = ` + import "date" + import "types" + + startOfCurrentDay = date.truncate(t: now(), unit: 1d) + + data = from(bucket: "%s") + |> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay) + |> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s") + +tvt = data + |> filter(fn : (r) => r._field == "total_value_transferred") + |> group() + |> sum() + |> set(key:"_field",value:"total_value_transferred") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) + +totalMsgs = data + |> filter(fn : (r) => r._field == "total_messages") + |> group() + |> sum() + |> set(key:"_field",value:"total_messages") + +union(tables:[tvt,totalMsgs]) + |> set(key:"_time",value:string(v:startOfCurrentDay)) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> set(key:"app_id",value:"%s") +` + +// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day +const QueryIntProtocolsDeltaSinceStartOfDay = ` + import "date" + import "types" + + ts = date.truncate(t: now(), unit: 1h) + startOfDay = date.truncate(t: now(), unit: 1d) + + data = from(bucket: "%s") + |> range(start: startOfDay,stop:ts) + |> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s") + +tvt = data + |> filter(fn : (r) => r._field == "total_value_transferred") + |> group() + |> sum() + |> set(key:"_field",value:"total_value_transferred") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) + +totalMsgs = data + |> filter(fn : (r) => r._field == "total_messages") + |> group() + |> sum() + |> set(key:"_field",value:"total_messages") + +union(tables:[tvt,totalMsgs]) + |> set(key:"_time",value:string(v:startOfDay)) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> set(key:"app_id",value:"%s") +` + +// QueryIntProtocolsDeltaLastDay calculate last day delta +const QueryIntProtocolsDeltaLastDay = ` + import "date" + import "types" + + + ts = date.truncate(t: now(), unit: 1h) + yesterday = date.sub(d: 1d, from: ts) + + data = from(bucket: "%s") + |> range(start: yesterday,stop:ts) + |> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s") + +tvt = data + |> filter(fn : (r) => r._field == "total_value_transferred") + |> group() + |> sum() + |> set(key:"_field",value:"total_value_transferred") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) + +totalMsgs = data + |> filter(fn : (r) => r._field == "total_messages") + |> group() + |> sum() + |> set(key:"_field",value:"total_messages") + +union(tables:[tvt,totalMsgs]) + |> set(key:"_time",value:string(v:yesterday)) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> set(key:"app_id",value:"%s") +` + type Repository struct { - queryAPI QueryDoer - logger *zap.Logger - statsBucket string - activityBucket string - statsVersion string - activityVersion string + queryAPI QueryDoer + logger *zap.Logger + bucketInfinite string + bucket30d string + statsVersion string + activityVersion string + intProtocolMeasurement map[string]struct { + Daily string + Hourly string + } } type rowStat struct { @@ -50,6 +147,17 @@ type rowStat struct { TotalValueLocked float64 `mapstructure:"total_value_locked"` } +type intRowStat struct { + Protocol string `mapstructure:"app_id"` + TotalMessages uint64 `mapstructure:"total_messages"` + TotalValueTransferred uint64 `mapstructure:"total_value_transferred"` +} + +type intStats struct { + Latest intRowStat + DeltaLast24hr intRowStat +} + type rowActivity struct { Protocol string `mapstructure:"protocol"` DestinationChainId string `mapstructure:"destination_chain_id"` @@ -85,14 +193,21 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer { return &queryApiWrapper{qApi: qApi} } -func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository { +func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository { return &Repository{ queryAPI: qApi, - statsBucket: statsBucket, - activityBucket: activityBucket, + bucketInfinite: bucketInfinite, + bucket30d: bucket30d, statsVersion: statsVersion, activityVersion: activityVersion, logger: logger, + intProtocolMeasurement: map[string]struct { + Daily string + Hourly string + }{ + CCTP: {Daily: dbconsts.CctpStatsMeasurementDaily, Hourly: dbconsts.CctpStatsMeasurementHourly}, + PortalTokenBridge: {Daily: dbconsts.TokenBridgeStatsMeasurementDaily, Hourly: dbconsts.TokenBridgeStatsMeasurementHourly}, + }, } } @@ -101,40 +216,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult, } // returns latest and last 24 hr stats for a given protocol -func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) { +func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) { + // fetch latest stat - latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion) + q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) + latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) if err != nil { return stats{}, err } // fetch last 24 hr stat - last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion) + q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) + last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) return stats{ Latest: latest, Last24: last24hr, }, err } -func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) { - return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion) +func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) { + q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion) + return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) +} + +// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge) +func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) { + + // calculate total values till the start of current day + totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol) + totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol) + if err != nil { + return intStats{}, err + } + + // calculate delta since the beginning of current day + q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol) + if errCD != nil { + return intStats{}, errCD + } + + latestTotal := intRowStat{ + Protocol: protocol, + TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages, + TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred, + } + + result := intStats{ + Latest: latestTotal, + } + + // calculate last day delta + q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol) + if errQ3 != nil { + return result, errQ3 + } + + result.DeltaLast24hr = deltaYesterdayStats + return result, nil } -func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) { +func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) { var res T - q := buildQuery(queryTemplate, bucket, measurement, contributor, version) - result, err := queryAPI.Query(ctx, q) + result, err := queryAPI.Query(ctx, query) if err != nil { - logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q)) + logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query)) return res, err } defer result.Close() if !result.Next() { if result.Err() != nil { - logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q)) + logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query)) return res, result.Err() } - logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q)) + logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query)) return res, err } diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go index 7c28cf4ea..bf62698fb 100644 --- a/api/handlers/protocols/service.go +++ b/api/handlers/protocols/service.go @@ -2,7 +2,8 @@ package protocols import ( "context" - "encoding/json" + "github.com/wormhole-foundation/wormhole-explorer/api/cacheable" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" "go.uber.org/zap" "strconv" @@ -11,16 +12,27 @@ import ( "time" ) +const CCTP = "CCTP_WORMHOLE_INTEGRATION" +const PortalTokenBridge = "PORTAL_TOKEN_BRIDGE" + type Service struct { Protocols []string repo *Repository logger *zap.Logger + intProtocols []string cache cache.Cache cacheKeyPrefix string cacheTTL int + metrics metrics.Metrics + tvl tvlProvider } type ProtocolTotalValuesDTO struct { + ProtocolStats + Error string `json:"error,omitempty"` +} + +type ProtocolStats struct { Protocol string `json:"protocol"` TotalMessages uint64 `json:"total_messages"` TotalValueLocked float64 `json:"total_value_locked,omitempty"` @@ -28,54 +40,129 @@ type ProtocolTotalValuesDTO struct { TotalValueTransferred float64 `json:"total_value_transferred,omitempty"` LastDayMessages uint64 `json:"last_day_messages,omitempty"` LastDayDiffPercentage string `json:"last_day_diff_percentage,omitempty"` - Error string `json:"error,omitempty"` } -func NewService(protocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int) *Service { +type tvlProvider interface { + Get(ctx context.Context) (string, error) +} + +func NewService(extProtocols, intProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { return &Service{ - Protocols: protocols, + Protocols: extProtocols, repo: repo, logger: logger, + intProtocols: intProtocols, cache: cache, cacheKeyPrefix: cacheKeyPrefix, cacheTTL: cacheTTL, + metrics: metrics, + tvl: tvlProvider, } } func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO { wg := &sync.WaitGroup{} - wg.Add(len(s.Protocols)) - results := make(chan ProtocolTotalValuesDTO, len(s.Protocols)) + totalProtocols := len(s.Protocols) + len(s.intProtocols) + wg.Add(totalProtocols) + results := make(chan ProtocolTotalValuesDTO, totalProtocols) - for i := range s.Protocols { - go s.getProtocolTotalValues(ctx, wg, s.Protocols[i], results) + for _, p := range s.Protocols { + go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats) + } + for _, p := range s.intProtocols { + go s.fetchProtocolValues(ctx, wg, p, results, s.getIntProtocolStats) } wg.Wait() close(results) resultsSlice := make([]ProtocolTotalValuesDTO, 0, len(s.Protocols)) for r := range results { + r.Protocol = getProtocolNameDto(r.Protocol) resultsSlice = append(resultsSlice, r) } return resultsSlice } -func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO) { +func getProtocolNameDto(protocol string) string { + switch protocol { + case CCTP: + return "cctp" + case PortalTokenBridge: + return "portal_token_bridge" + default: + return protocol + } +} + +func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO, fetch func(context.Context, string) (ProtocolStats, error)) { defer wg.Done() - cacheKey := s.cacheKeyPrefix + ":" + strings.ToUpper(protocol) - cachedValue, errCache := s.cache.Get(ctx, cacheKey) - if errCache == nil { - var val ProtocolTotalValuesDTO - errCacheUnmarshall := json.Unmarshal([]byte(cachedValue), &val) - if errCacheUnmarshall == nil { - results <- val - return + val, err := cacheable.GetOrLoad[ProtocolStats](ctx, + s.logger, + s.cache, + time.Duration(s.cacheTTL)*time.Minute, + s.cacheKeyPrefix+":"+strings.ToUpper(protocol), + s.metrics, + func() (ProtocolStats, error) { + return fetch(ctx, protocol) + }, + ) + + res := ProtocolTotalValuesDTO{ + ProtocolStats: val, + } + if err != nil { + res.Error = err.Error() + } + results <- res +} + +// getProtocolStats fetches stats for CCTP and PortalTokenBridge +func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { + + protocolStats, err := s.repo.getInternalProtocolStats(ctx, protocol) + if err != nil { + return ProtocolStats{ + Protocol: protocol, + TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8, + TotalMessages: protocolStats.Latest.TotalMessages, + }, err + } + + diffLastDay := protocolStats.DeltaLast24hr.TotalMessages + val := ProtocolStats{ + Protocol: protocol, + TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8, + TotalMessages: protocolStats.Latest.TotalMessages, + LastDayMessages: diffLastDay, + } + + lastDayTotalMessages := protocolStats.Latest.TotalMessages - diffLastDay + if lastDayTotalMessages != 0 { + percentage := strconv.FormatFloat(float64(diffLastDay)/float64(lastDayTotalMessages)*100, 'f', 2, 64) + "%" + val.LastDayDiffPercentage = percentage + } + + if CCTP == protocol { + tvl, errTvl := s.tvl.Get(ctx) + if errTvl != nil { + s.logger.Error("error fetching tvl", zap.Error(errTvl), zap.String("protocol", protocol)) + return val, errTvl } - s.logger.Error("error unmarshalling cache value", zap.Error(errCacheUnmarshall), zap.String("cache_key", cacheKey)) + tvlFloat, errTvl := strconv.ParseFloat(tvl, 64) + if errTvl != nil { + s.logger.Error("error parsing tvl value", zap.Error(errTvl), zap.String("protocol", protocol), zap.String("tvl_str", tvl)) + return val, errTvl + } + val.TotalValueLocked = tvlFloat } + return val, nil +} + +func (s *Service) getProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { + type statsResult struct { result stats Err error @@ -90,18 +177,17 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup activity, err := s.repo.getProtocolActivity(ctx, protocol) if err != nil { s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol)) - results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: err.Error()} - return + return ProtocolStats{Protocol: protocol}, err + } rStats := <-statsRes if rStats.Err != nil { s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", protocol)) - results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: rStats.Err.Error()} - return + return ProtocolStats{Protocol: protocol}, rStats.Err } - dto := ProtocolTotalValuesDTO{ + dto := ProtocolStats{ Protocol: protocol, TotalValueLocked: rStats.result.Latest.TotalValueLocked, TotalMessages: rStats.result.Latest.TotalMessages, @@ -117,11 +203,5 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup dto.LastDayDiffPercentage = strconv.FormatFloat(float64(last24HrMessages)/float64(totalMessagesAsFromLast24hr)*100, 'f', 2, 64) + "%" } - dtoJson, _ := json.Marshal(dto) // don't handle error since the full lifecycle of the dto is under this scope - errCache = s.cache.Set(ctx, cacheKey, string(dtoJson), time.Duration(s.cacheTTL)*time.Minute) - if errCache != nil { - s.logger.Error("error setting cache", zap.Error(errCache), zap.String("cache_key", cacheKey)) - } - - results <- dto + return dto, nil } diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go index 0c6f6e098..d759d639e 100644 --- a/api/handlers/protocols/service_test.go +++ b/api/handlers/protocols/service_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/test-go/testify/mock" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" cacheMock "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/mock" "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" @@ -52,14 +53,14 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -97,14 +98,14 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error")) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -139,14 +140,14 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -160,8 +161,9 @@ func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { mockCache := &cacheMock.CacheMock{} var cacheErr error cacheErr = nil - mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(`{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%"}`, cacheErr) - service := protocols.NewService([]string{"protocol1"}, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 0) + cachedValue := fmt.Sprintf(`{"result": {"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%%"},"timestamp":"%s"}`, time.Now().Format(time.RFC3339)) + mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(cachedValue, cacheErr) + service := protocols.NewService([]string{"protocol1"}, nil, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) assert.Equal(t, "protocol1", values[0].Protocol) @@ -174,71 +176,63 @@ func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { } -func TestService_GetProtocolsTotalValues_CacheMiss_FetchAndUpdate(t *testing.T) { - - ctx := context.Background() - mockCache := &cacheMock.CacheMock{} - mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return("", cache.ErrNotFound) // mock cache miss - - // mock cache update, validate it's called once. - mockCache.On("Set", - ctx, - "WORMSCAN:PROTOCOLS:PROTOCOL1", - `{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":3,"last_day_diff_percentage":"75.00%"}`, - time.Duration(60)*time.Minute). - Return(nil). - Times(1) - +func TestService_GetCCTP_Stats(t *testing.T) { var errNil error - respStatsLatest := &mockQueryTableResult{} - respStatsLatest.On("Next").Return(true) - respStatsLatest.On("Err").Return(errNil) - respStatsLatest.On("Close").Return(errNil) - respStatsLatest.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "protocol": "protocol1", - "total_messages": uint64(7), - "total_value_locked": float64(5), + + totalStartOfCurrentDay := &mockQueryTableResult{} + totalStartOfCurrentDay.On("Next").Return(true) + totalStartOfCurrentDay.On("Err").Return(errNil) + totalStartOfCurrentDay.On("Close").Return(errNil) + totalStartOfCurrentDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "app_id": protocols.CCTP, + "total_messages": uint64(50), + "total_value_transferred": 4e8, })) - respStatsLastDay := &mockQueryTableResult{} - respStatsLastDay.On("Next").Return(true) - respStatsLastDay.On("Err").Return(errNil) - respStatsLastDay.On("Close").Return(errNil) - respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "protocol": "protocol1", - "total_messages": uint64(4), - "total_value_locked": float64(5), + deltaSinceStartOfDay := &mockQueryTableResult{} + deltaSinceStartOfDay.On("Next").Return(true) + deltaSinceStartOfDay.On("Err").Return(errNil) + deltaSinceStartOfDay.On("Close").Return(errNil) + deltaSinceStartOfDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "app_id": protocols.CCTP, + "total_messages": uint64(6), + "total_value_transferred": 2e8, })) - respActivityLast := &mockQueryTableResult{} - respActivityLast.On("Next").Return(true) - respActivityLast.On("Err").Return(errNil) - respActivityLast.On("Close").Return(errNil) - respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "protocol": "protocol1", - "total_messages": uint64(15), - "total_value_transferred": float64(7), - "total_value_secure": float64(9), + deltaLastDay := &mockQueryTableResult{} + deltaLastDay.On("Next").Return(true) + deltaLastDay.On("Err").Return(errNil) + deltaLastDay.On("Close").Return(errNil) + deltaLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "app_id": protocols.CCTP, + "total_messages": uint64(7), + "total_value_transferred": 132, })) + ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.CCTP, protocols.CCTP)).Return(totalStartOfCurrentDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaSinceStartOfDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaLastDay, errNil) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{}, []string{protocols.CCTP}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) + assert.NotNil(t, values) assert.Equal(t, 1, len(values)) - assert.Equal(t, "protocol1", values[0].Protocol) - assert.Equal(t, 5.00, values[0].TotalValueLocked) - assert.Equal(t, uint64(7), values[0].TotalMessages) - assert.Equal(t, 9.00, values[0].TotalValueSecured) - assert.Equal(t, 7.00, values[0].TotalValueTransferred) - assert.Equal(t, uint64(3), values[0].LastDayMessages) - assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) + for i := range values { + switch values[i].Protocol { + case "cctp": + assert.Equal(t, uint64(56), values[i].TotalMessages) + assert.Equal(t, 6.0, values[i].TotalValueTransferred) + assert.Equal(t, uint64(7), values[i].LastDayMessages) + assert.Equal(t, "14.29%", values[i].LastDayDiffPercentage) + assert.Equal(t, 1235.523, values[i].TotalValueLocked) + default: + t.Errorf("unexpected protocol %s", values[i].Protocol) + } + } } type mockQueryAPI struct { @@ -273,3 +267,10 @@ func (m *mockQueryTableResult) Close() error { args := m.Called() return args.Error(0) } + +type mockTvl struct { +} + +func (t *mockTvl) Get(ctx context.Context) (string, error) { + return "1235.523", nil +} diff --git a/api/internal/metrics/prometheus.go b/api/internal/metrics/prometheus.go index 747e5c475..0146a4f59 100644 --- a/api/internal/metrics/prometheus.go +++ b/api/internal/metrics/prometheus.go @@ -30,3 +30,12 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics { func (m *PrometheusMetrics) IncExpiredCacheResponse(key string) { m.expiredCacheResponseCount.WithLabelValues(key).Inc() } + +type noOpMetrics struct{} + +func (s *noOpMetrics) IncExpiredCacheResponse(_ string) { +} + +func NewNoOpMetrics() Metrics { + return &noOpMetrics{} +} diff --git a/api/main.go b/api/main.go index f66ea69cf..0f14683a8 100644 --- a/api/main.go +++ b/api/main.go @@ -159,7 +159,14 @@ func main() { relaysRepo := relays.NewRepository(db.Database, rootLogger) operationsRepo := operations.NewRepository(db.Database, rootLogger) statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger) - protocolsRepo := protocols.NewRepository(protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.Bucket30Days, cfg.Influx.Bucket30Days, cfg.ProtocolsStatsVersion, cfg.ProtocolsActivityVersion, rootLogger) + protocolsRepo := protocols.NewRepository( + protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), + cfg.Influx.BucketInfinite, + cfg.Influx.Bucket30Days, + cfg.ProtocolsStatsVersion, + cfg.ProtocolsActivityVersion, + rootLogger, + ) // create token provider tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork) @@ -179,7 +186,7 @@ func main() { relaysService := relays.NewService(relaysRepo, rootLogger) operationsService := operations.NewService(operationsRepo, rootLogger) statsService := stats.NewService(statsRepo, cache, expirationTime, metrics, rootLogger) - protocolsService := protocols.NewService(cfg.Protocols, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration) + protocolsService := protocols.NewService(cfg.Protocols, []string{protocols.CCTP, protocols.PortalTokenBridge}, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration, metrics, tvl) // Set up a custom error handler response.SetEnableStackTrace(*cfg) diff --git a/api/routes/wormscan/protocols/controller_test.go b/api/routes/wormscan/protocols/controller_test.go index f7976dba8..f21060418 100644 --- a/api/routes/wormscan/protocols/controller_test.go +++ b/api/routes/wormscan/protocols/controller_test.go @@ -45,8 +45,10 @@ func TestGetContributorsTotalValues(t *testing.T) { service := mockService(func(ctx context.Context) []contributorsHandlerPkg.ProtocolTotalValuesDTO { return []contributorsHandlerPkg.ProtocolTotalValuesDTO{ { - Protocol: "protocol1", - Error: inputArgs.mockError, + ProtocolStats: contributorsHandlerPkg.ProtocolStats{ + Protocol: "protocol1", + }, + Error: inputArgs.mockError, }, } }) diff --git a/common/dbconsts/consts.go b/common/dbconsts/consts.go index 5f687d0fa..12e7fbd16 100644 --- a/common/dbconsts/consts.go +++ b/common/dbconsts/consts.go @@ -4,4 +4,12 @@ package dbconsts const ( ProtocolsActivityMeasurement = "protocols_activity" ProtocolsStatsMeasurement = "protocols_stats_v1" + + CctpStatsMeasurementHourly = intProtocolStatsMeasurement1h + TokenBridgeStatsMeasurementHourly = intProtocolStatsMeasurement1h + intProtocolStatsMeasurement1h = "core_protocols_stats_1h" + + CctpStatsMeasurementDaily = intProtocolStatsMeasurement1d + TokenBridgeStatsMeasurementDaily = intProtocolStatsMeasurement1d + intProtocolStatsMeasurement1d = "core_protocols_stats_1d" )