From 4009482dcc3e24190170d381726c9153a626e924 Mon Sep 17 00:00:00 2001 From: Mariano <9205080+marianososto@users.noreply.github.com> Date: Mon, 11 Mar 2024 17:33:36 -0300 Subject: [PATCH] [ISSUE-1089] Collect CCTP - Metrics (#1165) Add cctp and portal_token_bridge stats to protocols-stats endpoint changes indent more changes on the script new working approach on influx task indent script tested insert add 2 versions of the script multiple changes add 1day task add logic to retrieve internal protocols remove unecessary code readd empty script fix unit-tests and measurement namings fix queries fix alignment rename function task fix names improvements on influx task add .run config to gitignore add .run to gitignore fix task and rename working api multiple things Delete .run/wormscan api.run.xml Delete analytics/scripts/test_query.flux wip multiple fixes fix test wip fix queries fix unit-test due to query changes --- .gitignore | 3 +- analytics/scripts/protocols_stats_1d.flux | 56 +++++ analytics/scripts/protocols_stats_1h.flux | 50 +++++ api/handlers/protocols/repository.go | 198 ++++++++++++++++-- api/handlers/protocols/service.go | 140 ++++++++++--- api/handlers/protocols/service_test.go | 141 ++++++------- api/internal/metrics/prometheus.go | 9 + api/main.go | 11 +- .../wormscan/protocols/controller_test.go | 6 +- common/dbconsts/consts.go | 8 + 10 files changed, 496 insertions(+), 126 deletions(-) create mode 100644 analytics/scripts/protocols_stats_1d.flux create mode 100644 analytics/scripts/protocols_stats_1h.flux diff --git a/.gitignore b/.gitignore index 8194300a6..8ac2603c9 100644 --- a/.gitignore +++ b/.gitignore @@ -16,4 +16,5 @@ wormholeTxs.json serviceAccountKey.json bigtableAccountKey.json tsconfig.tsbuildinfo -serviceAccount.json \ No newline at end of file +serviceAccount.json +.run \ No newline at end of file diff --git a/analytics/scripts/protocols_stats_1d.flux b/analytics/scripts/protocols_stats_1d.flux new file mode 100644 index 000000000..8314a7ec2 --- /dev/null +++ b/analytics/scripts/protocols_stats_1d.flux @@ -0,0 +1,56 @@ +import "date" + + +calculateProtocolStats = (protocol,protocolVersion,taskCfg) => { + +totalValueTransferred = from(bucket: taskCfg.sourceBucket) + |> range(start: taskCfg.since, stop:taskCfg.ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume" and r._value > 0) + |> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"]) + |> group() + |> sum() + |> map(fn: (r) => ({r with _time: time(v:taskCfg.since)})) + |> set(key: "_field", value: "total_value_transferred") + +totalMessages = from(bucket: taskCfg.sourceBucket) + |> range(start: taskCfg.since, stop:taskCfg.ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume") + |> group() + |> count() + |> map(fn: (r) => ({r with _time: time(v:taskCfg.since)})) + |> set(key: "_field", value: "total_messages") + +return union(tables:[totalMessages,totalValueTransferred]) + |> set(key: "app_id", value: protocol) + |> set(key: "version", value: protocolVersion) + |> set(key: "_measurement", value: taskCfg.destMeasurement) + |> map(fn: (r) => ({r with _time: time(v:taskCfg.since)})) + |>to(bucket: taskCfg.destBucket) +} + + + +ts = date.truncate(t: now(), unit: 1d) +bucketInfinite = "wormscan" +bucket30d = "wormscan-30days" + +cfg = { + sourceBucket:bucketInfinite, + destBucket:bucketInfinite, + destMeasurement:"core_protocols_stats_1d", + since: date.sub(d: 1d, from: ts), + ts:ts, +} + +// Set this variable with the cfg of the desired task + +option task = { + name: "cctp and portal_token_bridge metrics every day", + every: 1d, +} + +calculateProtocolStats(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",taskCfg:cfg) + +calculateProtocolStats(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",taskCfg:cfg) \ No newline at end of file diff --git a/analytics/scripts/protocols_stats_1h.flux b/analytics/scripts/protocols_stats_1h.flux new file mode 100644 index 000000000..0295143eb --- /dev/null +++ b/analytics/scripts/protocols_stats_1h.flux @@ -0,0 +1,50 @@ +import "date" + +option task = { + name: "cctp and portal_token_bridge metrics every hour", + every: 1h, +} + + +calculateLastHourMetrics = (protocol,protocolVersion,ts) => { + +since = date.sub(d: 1h, from: ts) +sourceBucket = "wormscan" +destMeasurement = "core_protocols_stats_1h" +bucket30d = "wormscan-30days" + +totalValueTransferred = from(bucket: sourceBucket) + |> range(start: since, stop:ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume" and r._value > 0) + |> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"]) + |> group() + |> sum() + |> map(fn: (r) => ({r with _time: since})) + |> set(key: "_field", value: "total_value_transferred") + +totalMessages = from(bucket: sourceBucket) + |> range(start: since, stop:ts) + |> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol) + |> filter(fn: (r) => r._field == "volume") + |> group() + |> count() + |> map(fn: (r) => ({r with _time: since})) + |> set(key: "_field", value: "total_messages") + +return union(tables:[totalMessages,totalValueTransferred]) // if nothing happened during the last hour then union will result in empty and no point will be added. + |> set(key: "app_id", value: protocol) + |> set(key: "version", value: protocolVersion) + |> set(key: "_measurement", value: destMeasurement) + |> map(fn: (r) => ({r with _time: since})) + |>to(bucket: bucket30d) +} + +ts = date.truncate(t: now(), unit: 1h) + +// execute function for CCTP_WORMHOLE_INTEGRATION +calculateLastHourMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts) + + +// execute function for PORTAL_TOKEN_BRIDGE +calculateLastHourMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts) \ No newline at end of file diff --git a/api/handlers/protocols/repository.go b/api/handlers/protocols/repository.go index 235558c1a..1fb724cc9 100644 --- a/api/handlers/protocols/repository.go +++ b/api/handlers/protocols/repository.go @@ -28,20 +28,117 @@ from(bucket: "%s") const QueryTemplateActivityLatestPoint = ` from(bucket: "%s") - |> range(start: -1d) + |> range(start: 1970-01-01T00:00:00Z) |> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s") |> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"]) |> last() |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") ` +// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day +const QueryIntProtocolsTotalStartOfDay = ` + import "date" + import "types" + + startOfCurrentDay = date.truncate(t: now(), unit: 1d) + + data = from(bucket: "%s") + |> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay) + |> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s") + +tvt = data + |> filter(fn : (r) => r._field == "total_value_transferred") + |> group() + |> sum() + |> set(key:"_field",value:"total_value_transferred") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) + +totalMsgs = data + |> filter(fn : (r) => r._field == "total_messages") + |> group() + |> sum() + |> set(key:"_field",value:"total_messages") + +union(tables:[tvt,totalMsgs]) + |> set(key:"_time",value:string(v:startOfCurrentDay)) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> set(key:"app_id",value:"%s") +` + +// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day +const QueryIntProtocolsDeltaSinceStartOfDay = ` + import "date" + import "types" + + ts = date.truncate(t: now(), unit: 1h) + startOfDay = date.truncate(t: now(), unit: 1d) + + data = from(bucket: "%s") + |> range(start: startOfDay,stop:ts) + |> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s") + +tvt = data + |> filter(fn : (r) => r._field == "total_value_transferred") + |> group() + |> sum() + |> set(key:"_field",value:"total_value_transferred") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) + +totalMsgs = data + |> filter(fn : (r) => r._field == "total_messages") + |> group() + |> sum() + |> set(key:"_field",value:"total_messages") + +union(tables:[tvt,totalMsgs]) + |> set(key:"_time",value:string(v:startOfDay)) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> set(key:"app_id",value:"%s") +` + +// QueryIntProtocolsDeltaLastDay calculate last day delta +const QueryIntProtocolsDeltaLastDay = ` + import "date" + import "types" + + + ts = date.truncate(t: now(), unit: 1h) + yesterday = date.sub(d: 1d, from: ts) + + data = from(bucket: "%s") + |> range(start: yesterday,stop:ts) + |> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s") + +tvt = data + |> filter(fn : (r) => r._field == "total_value_transferred") + |> group() + |> sum() + |> set(key:"_field",value:"total_value_transferred") + |> map(fn: (r) => ({r with _value: int(v: r._value)})) + +totalMsgs = data + |> filter(fn : (r) => r._field == "total_messages") + |> group() + |> sum() + |> set(key:"_field",value:"total_messages") + +union(tables:[tvt,totalMsgs]) + |> set(key:"_time",value:string(v:yesterday)) + |> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value") + |> set(key:"app_id",value:"%s") +` + type Repository struct { - queryAPI QueryDoer - logger *zap.Logger - statsBucket string - activityBucket string - statsVersion string - activityVersion string + queryAPI QueryDoer + logger *zap.Logger + bucketInfinite string + bucket30d string + statsVersion string + activityVersion string + intProtocolMeasurement map[string]struct { + Daily string + Hourly string + } } type rowStat struct { @@ -50,6 +147,17 @@ type rowStat struct { TotalValueLocked float64 `mapstructure:"total_value_locked"` } +type intRowStat struct { + Protocol string `mapstructure:"app_id"` + TotalMessages uint64 `mapstructure:"total_messages"` + TotalValueTransferred uint64 `mapstructure:"total_value_transferred"` +} + +type intStats struct { + Latest intRowStat + DeltaLast24hr intRowStat +} + type rowActivity struct { Protocol string `mapstructure:"protocol"` DestinationChainId string `mapstructure:"destination_chain_id"` @@ -85,14 +193,21 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer { return &queryApiWrapper{qApi: qApi} } -func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository { +func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository { return &Repository{ queryAPI: qApi, - statsBucket: statsBucket, - activityBucket: activityBucket, + bucketInfinite: bucketInfinite, + bucket30d: bucket30d, statsVersion: statsVersion, activityVersion: activityVersion, logger: logger, + intProtocolMeasurement: map[string]struct { + Daily string + Hourly string + }{ + CCTP: {Daily: dbconsts.CctpStatsMeasurementDaily, Hourly: dbconsts.CctpStatsMeasurementHourly}, + PortalTokenBridge: {Daily: dbconsts.TokenBridgeStatsMeasurementDaily, Hourly: dbconsts.TokenBridgeStatsMeasurementHourly}, + }, } } @@ -101,40 +216,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult, } // returns latest and last 24 hr stats for a given protocol -func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) { +func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) { + // fetch latest stat - latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion) + q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) + latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) if err != nil { return stats{}, err } // fetch last 24 hr stat - last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion) + q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion) + last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol) return stats{ Latest: latest, Last24: last24hr, }, err } -func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) { - return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion) +func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) { + q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion) + return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol) +} + +// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge) +func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) { + + // calculate total values till the start of current day + totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol) + totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol) + if err != nil { + return intStats{}, err + } + + // calculate delta since the beginning of current day + q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol) + if errCD != nil { + return intStats{}, errCD + } + + latestTotal := intRowStat{ + Protocol: protocol, + TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages, + TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred, + } + + result := intStats{ + Latest: latestTotal, + } + + // calculate last day delta + q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol) + deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol) + if errQ3 != nil { + return result, errQ3 + } + + result.DeltaLast24hr = deltaYesterdayStats + return result, nil } -func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) { +func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) { var res T - q := buildQuery(queryTemplate, bucket, measurement, contributor, version) - result, err := queryAPI.Query(ctx, q) + result, err := queryAPI.Query(ctx, query) if err != nil { - logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q)) + logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query)) return res, err } defer result.Close() if !result.Next() { if result.Err() != nil { - logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q)) + logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query)) return res, result.Err() } - logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q)) + logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query)) return res, err } diff --git a/api/handlers/protocols/service.go b/api/handlers/protocols/service.go index 7c28cf4ea..bf62698fb 100644 --- a/api/handlers/protocols/service.go +++ b/api/handlers/protocols/service.go @@ -2,7 +2,8 @@ package protocols import ( "context" - "encoding/json" + "github.com/wormhole-foundation/wormhole-explorer/api/cacheable" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" "go.uber.org/zap" "strconv" @@ -11,16 +12,27 @@ import ( "time" ) +const CCTP = "CCTP_WORMHOLE_INTEGRATION" +const PortalTokenBridge = "PORTAL_TOKEN_BRIDGE" + type Service struct { Protocols []string repo *Repository logger *zap.Logger + intProtocols []string cache cache.Cache cacheKeyPrefix string cacheTTL int + metrics metrics.Metrics + tvl tvlProvider } type ProtocolTotalValuesDTO struct { + ProtocolStats + Error string `json:"error,omitempty"` +} + +type ProtocolStats struct { Protocol string `json:"protocol"` TotalMessages uint64 `json:"total_messages"` TotalValueLocked float64 `json:"total_value_locked,omitempty"` @@ -28,54 +40,129 @@ type ProtocolTotalValuesDTO struct { TotalValueTransferred float64 `json:"total_value_transferred,omitempty"` LastDayMessages uint64 `json:"last_day_messages,omitempty"` LastDayDiffPercentage string `json:"last_day_diff_percentage,omitempty"` - Error string `json:"error,omitempty"` } -func NewService(protocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int) *Service { +type tvlProvider interface { + Get(ctx context.Context) (string, error) +} + +func NewService(extProtocols, intProtocols []string, repo *Repository, logger *zap.Logger, cache cache.Cache, cacheKeyPrefix string, cacheTTL int, metrics metrics.Metrics, tvlProvider tvlProvider) *Service { return &Service{ - Protocols: protocols, + Protocols: extProtocols, repo: repo, logger: logger, + intProtocols: intProtocols, cache: cache, cacheKeyPrefix: cacheKeyPrefix, cacheTTL: cacheTTL, + metrics: metrics, + tvl: tvlProvider, } } func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO { wg := &sync.WaitGroup{} - wg.Add(len(s.Protocols)) - results := make(chan ProtocolTotalValuesDTO, len(s.Protocols)) + totalProtocols := len(s.Protocols) + len(s.intProtocols) + wg.Add(totalProtocols) + results := make(chan ProtocolTotalValuesDTO, totalProtocols) - for i := range s.Protocols { - go s.getProtocolTotalValues(ctx, wg, s.Protocols[i], results) + for _, p := range s.Protocols { + go s.fetchProtocolValues(ctx, wg, p, results, s.getProtocolStats) + } + for _, p := range s.intProtocols { + go s.fetchProtocolValues(ctx, wg, p, results, s.getIntProtocolStats) } wg.Wait() close(results) resultsSlice := make([]ProtocolTotalValuesDTO, 0, len(s.Protocols)) for r := range results { + r.Protocol = getProtocolNameDto(r.Protocol) resultsSlice = append(resultsSlice, r) } return resultsSlice } -func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO) { +func getProtocolNameDto(protocol string) string { + switch protocol { + case CCTP: + return "cctp" + case PortalTokenBridge: + return "portal_token_bridge" + default: + return protocol + } +} + +func (s *Service) fetchProtocolValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO, fetch func(context.Context, string) (ProtocolStats, error)) { defer wg.Done() - cacheKey := s.cacheKeyPrefix + ":" + strings.ToUpper(protocol) - cachedValue, errCache := s.cache.Get(ctx, cacheKey) - if errCache == nil { - var val ProtocolTotalValuesDTO - errCacheUnmarshall := json.Unmarshal([]byte(cachedValue), &val) - if errCacheUnmarshall == nil { - results <- val - return + val, err := cacheable.GetOrLoad[ProtocolStats](ctx, + s.logger, + s.cache, + time.Duration(s.cacheTTL)*time.Minute, + s.cacheKeyPrefix+":"+strings.ToUpper(protocol), + s.metrics, + func() (ProtocolStats, error) { + return fetch(ctx, protocol) + }, + ) + + res := ProtocolTotalValuesDTO{ + ProtocolStats: val, + } + if err != nil { + res.Error = err.Error() + } + results <- res +} + +// getProtocolStats fetches stats for CCTP and PortalTokenBridge +func (s *Service) getIntProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { + + protocolStats, err := s.repo.getInternalProtocolStats(ctx, protocol) + if err != nil { + return ProtocolStats{ + Protocol: protocol, + TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8, + TotalMessages: protocolStats.Latest.TotalMessages, + }, err + } + + diffLastDay := protocolStats.DeltaLast24hr.TotalMessages + val := ProtocolStats{ + Protocol: protocol, + TotalValueTransferred: float64(protocolStats.Latest.TotalValueTransferred) / 1e8, + TotalMessages: protocolStats.Latest.TotalMessages, + LastDayMessages: diffLastDay, + } + + lastDayTotalMessages := protocolStats.Latest.TotalMessages - diffLastDay + if lastDayTotalMessages != 0 { + percentage := strconv.FormatFloat(float64(diffLastDay)/float64(lastDayTotalMessages)*100, 'f', 2, 64) + "%" + val.LastDayDiffPercentage = percentage + } + + if CCTP == protocol { + tvl, errTvl := s.tvl.Get(ctx) + if errTvl != nil { + s.logger.Error("error fetching tvl", zap.Error(errTvl), zap.String("protocol", protocol)) + return val, errTvl } - s.logger.Error("error unmarshalling cache value", zap.Error(errCacheUnmarshall), zap.String("cache_key", cacheKey)) + tvlFloat, errTvl := strconv.ParseFloat(tvl, 64) + if errTvl != nil { + s.logger.Error("error parsing tvl value", zap.Error(errTvl), zap.String("protocol", protocol), zap.String("tvl_str", tvl)) + return val, errTvl + } + val.TotalValueLocked = tvlFloat } + return val, nil +} + +func (s *Service) getProtocolStats(ctx context.Context, protocol string) (ProtocolStats, error) { + type statsResult struct { result stats Err error @@ -90,18 +177,17 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup activity, err := s.repo.getProtocolActivity(ctx, protocol) if err != nil { s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol)) - results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: err.Error()} - return + return ProtocolStats{Protocol: protocol}, err + } rStats := <-statsRes if rStats.Err != nil { s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", protocol)) - results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: rStats.Err.Error()} - return + return ProtocolStats{Protocol: protocol}, rStats.Err } - dto := ProtocolTotalValuesDTO{ + dto := ProtocolStats{ Protocol: protocol, TotalValueLocked: rStats.result.Latest.TotalValueLocked, TotalMessages: rStats.result.Latest.TotalMessages, @@ -117,11 +203,5 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup dto.LastDayDiffPercentage = strconv.FormatFloat(float64(last24HrMessages)/float64(totalMessagesAsFromLast24hr)*100, 'f', 2, 64) + "%" } - dtoJson, _ := json.Marshal(dto) // don't handle error since the full lifecycle of the dto is under this scope - errCache = s.cache.Set(ctx, cacheKey, string(dtoJson), time.Duration(s.cacheTTL)*time.Minute) - if errCache != nil { - s.logger.Error("error setting cache", zap.Error(errCache), zap.String("cache_key", cacheKey)) - } - - results <- dto + return dto, nil } diff --git a/api/handlers/protocols/service_test.go b/api/handlers/protocols/service_test.go index 0c6f6e098..d759d639e 100644 --- a/api/handlers/protocols/service_test.go +++ b/api/handlers/protocols/service_test.go @@ -9,6 +9,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/test-go/testify/mock" "github.com/wormhole-foundation/wormhole-explorer/api/handlers/protocols" + "github.com/wormhole-foundation/wormhole-explorer/api/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/common/client/cache" cacheMock "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/mock" "github.com/wormhole-foundation/wormhole-explorer/common/dbconsts" @@ -52,14 +53,14 @@ func TestService_GetProtocolsTotalValues(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -97,14 +98,14 @@ func TestService_GetProtocolsTotalValues_FailedFetchingActivity(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") queryAPI.On("Query", ctx, activityQuery).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_activity_error")) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -139,14 +140,14 @@ func TestService_GetProtocolsTotalValues_FailedFetchingStats(t *testing.T) { ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(&api.QueryTableResult{}, errors.New("mocked_fetching_stats_error")) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "bucket30d", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") + activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "bucket30d", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, errNil) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{"protocol1"}, nil, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) @@ -160,8 +161,9 @@ func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { mockCache := &cacheMock.CacheMock{} var cacheErr error cacheErr = nil - mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(`{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%"}`, cacheErr) - service := protocols.NewService([]string{"protocol1"}, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 0) + cachedValue := fmt.Sprintf(`{"result": {"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":4,"last_day_diff_percentage":"75.00%%"},"timestamp":"%s"}`, time.Now().Format(time.RFC3339)) + mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return(cachedValue, cacheErr) + service := protocols.NewService([]string{"protocol1"}, nil, nil, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) assert.Equal(t, 1, len(values)) assert.Equal(t, "protocol1", values[0].Protocol) @@ -174,71 +176,63 @@ func TestService_GetProtocolsTotalValues_CacheHit(t *testing.T) { } -func TestService_GetProtocolsTotalValues_CacheMiss_FetchAndUpdate(t *testing.T) { - - ctx := context.Background() - mockCache := &cacheMock.CacheMock{} - mockCache.On("Get", ctx, "WORMSCAN:PROTOCOLS:PROTOCOL1").Return("", cache.ErrNotFound) // mock cache miss - - // mock cache update, validate it's called once. - mockCache.On("Set", - ctx, - "WORMSCAN:PROTOCOLS:PROTOCOL1", - `{"protocol":"protocol1","total_messages":7,"total_value_locked":5,"total_value_secured":9,"total_value_transferred":7,"last_day_messages":3,"last_day_diff_percentage":"75.00%"}`, - time.Duration(60)*time.Minute). - Return(nil). - Times(1) - +func TestService_GetCCTP_Stats(t *testing.T) { var errNil error - respStatsLatest := &mockQueryTableResult{} - respStatsLatest.On("Next").Return(true) - respStatsLatest.On("Err").Return(errNil) - respStatsLatest.On("Close").Return(errNil) - respStatsLatest.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "protocol": "protocol1", - "total_messages": uint64(7), - "total_value_locked": float64(5), + + totalStartOfCurrentDay := &mockQueryTableResult{} + totalStartOfCurrentDay.On("Next").Return(true) + totalStartOfCurrentDay.On("Err").Return(errNil) + totalStartOfCurrentDay.On("Close").Return(errNil) + totalStartOfCurrentDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "app_id": protocols.CCTP, + "total_messages": uint64(50), + "total_value_transferred": 4e8, })) - respStatsLastDay := &mockQueryTableResult{} - respStatsLastDay.On("Next").Return(true) - respStatsLastDay.On("Err").Return(errNil) - respStatsLastDay.On("Close").Return(errNil) - respStatsLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "protocol": "protocol1", - "total_messages": uint64(4), - "total_value_locked": float64(5), + deltaSinceStartOfDay := &mockQueryTableResult{} + deltaSinceStartOfDay.On("Next").Return(true) + deltaSinceStartOfDay.On("Err").Return(errNil) + deltaSinceStartOfDay.On("Close").Return(errNil) + deltaSinceStartOfDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "app_id": protocols.CCTP, + "total_messages": uint64(6), + "total_value_transferred": 2e8, })) - respActivityLast := &mockQueryTableResult{} - respActivityLast.On("Next").Return(true) - respActivityLast.On("Err").Return(errNil) - respActivityLast.On("Close").Return(errNil) - respActivityLast.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ - "protocol": "protocol1", - "total_messages": uint64(15), - "total_value_transferred": float64(7), - "total_value_secure": float64(9), + deltaLastDay := &mockQueryTableResult{} + deltaLastDay.On("Next").Return(true) + deltaLastDay.On("Err").Return(errNil) + deltaLastDay.On("Close").Return(errNil) + deltaLastDay.On("Record").Return(query.NewFluxRecord(1, map[string]interface{}{ + "app_id": protocols.CCTP, + "total_messages": uint64(7), + "total_value_transferred": 132, })) + ctx := context.Background() queryAPI := &mockQueryAPI{} - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLatestPoint, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLatest, nil) - queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryTemplateLast24Point, "protocols_bucket", dbconsts.ProtocolsStatsMeasurement, "protocol1", "v1")).Return(respStatsLastDay, nil) - activityQuery := fmt.Sprintf(protocols.QueryTemplateActivityLatestPoint, "protocols_bucket", dbconsts.ProtocolsActivityMeasurement, "protocol1", "v1") - queryAPI.On("Query", ctx, activityQuery).Return(respActivityLast, nil) - repository := protocols.NewRepository(queryAPI, "protocols_bucket", "protocols_bucket", "v1", "v1", zap.NewNop()) - service := protocols.NewService([]string{"protocol1"}, repository, zap.NewNop(), mockCache, "WORMSCAN:PROTOCOLS", 60) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsTotalStartOfDay, "bucketInfinite", dbconsts.CctpStatsMeasurementDaily, protocols.CCTP, protocols.CCTP)).Return(totalStartOfCurrentDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaSinceStartOfDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaSinceStartOfDay, errNil) + queryAPI.On("Query", ctx, fmt.Sprintf(protocols.QueryIntProtocolsDeltaLastDay, "bucket30d", dbconsts.CctpStatsMeasurementHourly, protocols.CCTP, protocols.CCTP)).Return(deltaLastDay, errNil) + repository := protocols.NewRepository(queryAPI, "bucketInfinite", "bucket30d", "v1", "v1", zap.NewNop()) + service := protocols.NewService([]string{}, []string{protocols.CCTP}, repository, zap.NewNop(), cache.NewDummyCacheClient(), "WORMSCAN:PROTOCOLS", 0, metrics.NewNoOpMetrics(), &mockTvl{}) values := service.GetProtocolsTotalValues(ctx) + assert.NotNil(t, values) assert.Equal(t, 1, len(values)) - assert.Equal(t, "protocol1", values[0].Protocol) - assert.Equal(t, 5.00, values[0].TotalValueLocked) - assert.Equal(t, uint64(7), values[0].TotalMessages) - assert.Equal(t, 9.00, values[0].TotalValueSecured) - assert.Equal(t, 7.00, values[0].TotalValueTransferred) - assert.Equal(t, uint64(3), values[0].LastDayMessages) - assert.Equal(t, "75.00%", values[0].LastDayDiffPercentage) + for i := range values { + switch values[i].Protocol { + case "cctp": + assert.Equal(t, uint64(56), values[i].TotalMessages) + assert.Equal(t, 6.0, values[i].TotalValueTransferred) + assert.Equal(t, uint64(7), values[i].LastDayMessages) + assert.Equal(t, "14.29%", values[i].LastDayDiffPercentage) + assert.Equal(t, 1235.523, values[i].TotalValueLocked) + default: + t.Errorf("unexpected protocol %s", values[i].Protocol) + } + } } type mockQueryAPI struct { @@ -273,3 +267,10 @@ func (m *mockQueryTableResult) Close() error { args := m.Called() return args.Error(0) } + +type mockTvl struct { +} + +func (t *mockTvl) Get(ctx context.Context) (string, error) { + return "1235.523", nil +} diff --git a/api/internal/metrics/prometheus.go b/api/internal/metrics/prometheus.go index 747e5c475..0146a4f59 100644 --- a/api/internal/metrics/prometheus.go +++ b/api/internal/metrics/prometheus.go @@ -30,3 +30,12 @@ func NewPrometheusMetrics(environment string) *PrometheusMetrics { func (m *PrometheusMetrics) IncExpiredCacheResponse(key string) { m.expiredCacheResponseCount.WithLabelValues(key).Inc() } + +type noOpMetrics struct{} + +func (s *noOpMetrics) IncExpiredCacheResponse(_ string) { +} + +func NewNoOpMetrics() Metrics { + return &noOpMetrics{} +} diff --git a/api/main.go b/api/main.go index f66ea69cf..0f14683a8 100644 --- a/api/main.go +++ b/api/main.go @@ -159,7 +159,14 @@ func main() { relaysRepo := relays.NewRepository(db.Database, rootLogger) operationsRepo := operations.NewRepository(db.Database, rootLogger) statsRepo := stats.NewRepository(influxCli, cfg.Influx.Organization, cfg.Influx.Bucket24Hours, rootLogger) - protocolsRepo := protocols.NewRepository(protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), cfg.Influx.Bucket30Days, cfg.Influx.Bucket30Days, cfg.ProtocolsStatsVersion, cfg.ProtocolsActivityVersion, rootLogger) + protocolsRepo := protocols.NewRepository( + protocols.WrapQueryAPI(influxCli.QueryAPI(cfg.Influx.Organization)), + cfg.Influx.BucketInfinite, + cfg.Influx.Bucket30Days, + cfg.ProtocolsStatsVersion, + cfg.ProtocolsActivityVersion, + rootLogger, + ) // create token provider tokenProvider := domain.NewTokenProvider(cfg.P2pNetwork) @@ -179,7 +186,7 @@ func main() { relaysService := relays.NewService(relaysRepo, rootLogger) operationsService := operations.NewService(operationsRepo, rootLogger) statsService := stats.NewService(statsRepo, cache, expirationTime, metrics, rootLogger) - protocolsService := protocols.NewService(cfg.Protocols, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration) + protocolsService := protocols.NewService(cfg.Protocols, []string{protocols.CCTP, protocols.PortalTokenBridge}, protocolsRepo, rootLogger, cache, cfg.Cache.ProtocolsStatsKey, cfg.Cache.ProtocolsStatsExpiration, metrics, tvl) // Set up a custom error handler response.SetEnableStackTrace(*cfg) diff --git a/api/routes/wormscan/protocols/controller_test.go b/api/routes/wormscan/protocols/controller_test.go index f7976dba8..f21060418 100644 --- a/api/routes/wormscan/protocols/controller_test.go +++ b/api/routes/wormscan/protocols/controller_test.go @@ -45,8 +45,10 @@ func TestGetContributorsTotalValues(t *testing.T) { service := mockService(func(ctx context.Context) []contributorsHandlerPkg.ProtocolTotalValuesDTO { return []contributorsHandlerPkg.ProtocolTotalValuesDTO{ { - Protocol: "protocol1", - Error: inputArgs.mockError, + ProtocolStats: contributorsHandlerPkg.ProtocolStats{ + Protocol: "protocol1", + }, + Error: inputArgs.mockError, }, } }) diff --git a/common/dbconsts/consts.go b/common/dbconsts/consts.go index 5f687d0fa..12e7fbd16 100644 --- a/common/dbconsts/consts.go +++ b/common/dbconsts/consts.go @@ -4,4 +4,12 @@ package dbconsts const ( ProtocolsActivityMeasurement = "protocols_activity" ProtocolsStatsMeasurement = "protocols_stats_v1" + + CctpStatsMeasurementHourly = intProtocolStatsMeasurement1h + TokenBridgeStatsMeasurementHourly = intProtocolStatsMeasurement1h + intProtocolStatsMeasurement1h = "core_protocols_stats_1h" + + CctpStatsMeasurementDaily = intProtocolStatsMeasurement1d + TokenBridgeStatsMeasurementDaily = intProtocolStatsMeasurement1d + intProtocolStatsMeasurement1d = "core_protocols_stats_1d" )