Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE-1089] Collect CCTP - Metrics #1165

Merged
merged 1 commit into from
Mar 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ wormholeTxs.json
serviceAccountKey.json
bigtableAccountKey.json
tsconfig.tsbuildinfo
serviceAccount.json
serviceAccount.json
.run
56 changes: 56 additions & 0 deletions analytics/scripts/protocols_stats_1d.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import "date"


calculateProtocolStats = (protocol,protocolVersion,taskCfg) => {

totalValueTransferred = from(bucket: taskCfg.sourceBucket)
|> range(start: taskCfg.since, stop:taskCfg.ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|> set(key: "_field", value: "total_value_transferred")

totalMessages = from(bucket: taskCfg.sourceBucket)
|> range(start: taskCfg.since, stop:taskCfg.ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|> set(key: "_field", value: "total_messages")

return union(tables:[totalMessages,totalValueTransferred])
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: taskCfg.destMeasurement)
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|>to(bucket: taskCfg.destBucket)
}



ts = date.truncate(t: now(), unit: 1d)
bucketInfinite = "wormscan"
bucket30d = "wormscan-30days"

cfg = {
sourceBucket:bucketInfinite,
destBucket:bucketInfinite,
destMeasurement:"core_protocols_stats_1d",
since: date.sub(d: 1d, from: ts),
ts:ts,
}

// Set this variable with the cfg of the desired task

option task = {
name: "cctp and portal_token_bridge metrics every day",
every: 1d,
}

calculateProtocolStats(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",taskCfg:cfg)

calculateProtocolStats(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",taskCfg:cfg)
50 changes: 50 additions & 0 deletions analytics/scripts/protocols_stats_1h.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import "date"

option task = {
name: "cctp and portal_token_bridge metrics every hour",
every: 1h,
}


calculateLastHourMetrics = (protocol,protocolVersion,ts) => {

since = date.sub(d: 1h, from: ts)
sourceBucket = "wormscan"
destMeasurement = "core_protocols_stats_1h"
bucket30d = "wormscan-30days"

totalValueTransferred = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: since}))
|> set(key: "_field", value: "total_value_transferred")

totalMessages = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: since}))
|> set(key: "_field", value: "total_messages")

return union(tables:[totalMessages,totalValueTransferred]) // if nothing happened during the last hour then union will result in empty and no point will be added.
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: destMeasurement)
|> map(fn: (r) => ({r with _time: since}))
|>to(bucket: bucket30d)
}

ts = date.truncate(t: now(), unit: 1h)

// execute function for CCTP_WORMHOLE_INTEGRATION
calculateLastHourMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts)


// execute function for PORTAL_TOKEN_BRIDGE
calculateLastHourMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts)
198 changes: 177 additions & 21 deletions api/handlers/protocols/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,117 @@ from(bucket: "%s")

const QueryTemplateActivityLatestPoint = `
from(bucket: "%s")
|> range(start: -1d)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s")
|> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"])
|> last()
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
`

// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day
const QueryIntProtocolsTotalStartOfDay = `
import "date"
import "types"

startOfCurrentDay = date.truncate(t: now(), unit: 1d)

data = from(bucket: "%s")
|> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")

tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))

totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")

union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfCurrentDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day
const QueryIntProtocolsDeltaSinceStartOfDay = `
import "date"
import "types"

ts = date.truncate(t: now(), unit: 1h)
startOfDay = date.truncate(t: now(), unit: 1d)

data = from(bucket: "%s")
|> range(start: startOfDay,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")

tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))

totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")

union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

// QueryIntProtocolsDeltaLastDay calculate last day delta
const QueryIntProtocolsDeltaLastDay = `
import "date"
import "types"


ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)

data = from(bucket: "%s")
|> range(start: yesterday,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")

tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))

totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")

union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:yesterday))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

type Repository struct {
queryAPI QueryDoer
logger *zap.Logger
statsBucket string
activityBucket string
statsVersion string
activityVersion string
queryAPI QueryDoer
logger *zap.Logger
bucketInfinite string
bucket30d string
statsVersion string
activityVersion string
intProtocolMeasurement map[string]struct {
Daily string
Hourly string
}
}

type rowStat struct {
Expand All @@ -50,6 +147,17 @@ type rowStat struct {
TotalValueLocked float64 `mapstructure:"total_value_locked"`
}

type intRowStat struct {
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred uint64 `mapstructure:"total_value_transferred"`
}

type intStats struct {
Latest intRowStat
DeltaLast24hr intRowStat
}

type rowActivity struct {
Protocol string `mapstructure:"protocol"`
DestinationChainId string `mapstructure:"destination_chain_id"`
Expand Down Expand Up @@ -85,14 +193,21 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer {
return &queryApiWrapper{qApi: qApi}
}

func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
return &Repository{
queryAPI: qApi,
statsBucket: statsBucket,
activityBucket: activityBucket,
bucketInfinite: bucketInfinite,
bucket30d: bucket30d,
statsVersion: statsVersion,
activityVersion: activityVersion,
logger: logger,
intProtocolMeasurement: map[string]struct {
Daily string
Hourly string
}{
CCTP: {Daily: dbconsts.CctpStatsMeasurementDaily, Hourly: dbconsts.CctpStatsMeasurementHourly},
PortalTokenBridge: {Daily: dbconsts.TokenBridgeStatsMeasurementDaily, Hourly: dbconsts.TokenBridgeStatsMeasurementHourly},
},
}
}

Expand All @@ -101,40 +216,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult,
}

// returns latest and last 24 hr stats for a given protocol
func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) {
func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) {

// fetch latest stat
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
return stats{}, err
}
// fetch last 24 hr stat
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
return stats{
Latest: latest,
Last24: last24hr,
}, err
}

func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) {
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion)
func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) {
q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion)
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol)
}

// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge)
func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) {

// calculate total values till the start of current day
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol)
totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol)
if err != nil {
return intStats{}, err
}

// calculate delta since the beginning of current day
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol)
if errCD != nil {
return intStats{}, errCD
}

latestTotal := intRowStat{
Protocol: protocol,
TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages,
TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred,
}

result := intStats{
Latest: latestTotal,
}

// calculate last day delta
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol)
if errQ3 != nil {
return result, errQ3
}

result.DeltaLast24hr = deltaYesterdayStats
return result, nil
}

func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) {
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) {
var res T
q := buildQuery(queryTemplate, bucket, measurement, contributor, version)
result, err := queryAPI.Query(ctx, q)
result, err := queryAPI.Query(ctx, query)
if err != nil {
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query))
return res, err
}
defer result.Close()

if !result.Next() {
if result.Err() != nil {
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query))
return res, result.Err()
}
logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q))
logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query))
return res, err
}

Expand Down
Loading
Loading