Skip to content

Commit

Permalink
[ISSUE-1089] Collect CCTP - Metrics (#1165)
Browse files Browse the repository at this point in the history
Add cctp and portal_token_bridge stats to protocols-stats endpoint

changes

 indent

more changes on the script

 new working approach on influx task

 indent script

 tested insert

 add 2 versions of the script

multiple changes

 add 1day task

 add logic to retrieve internal protocols

 remove unecessary code

 readd empty script

fix  unit-tests and measurement namings

 fix queries

 fix alignment

rename function task

fix names

improvements on influx task

 add .run config to gitignore

add .run to gitignore

fix task and rename

working api

 multiple things

Delete .run/wormscan api.run.xml

Delete analytics/scripts/test_query.flux

wip

 multiple fixes

 fix test

wip

 fix queries

fix unit-test due to query changes
  • Loading branch information
marianososto committed Mar 11, 2024
1 parent 2d61225 commit 4009482
Show file tree
Hide file tree
Showing 10 changed files with 496 additions and 126 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ wormholeTxs.json
serviceAccountKey.json
bigtableAccountKey.json
tsconfig.tsbuildinfo
serviceAccount.json
serviceAccount.json
.run
56 changes: 56 additions & 0 deletions analytics/scripts/protocols_stats_1d.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
import "date"


calculateProtocolStats = (protocol,protocolVersion,taskCfg) => {

totalValueTransferred = from(bucket: taskCfg.sourceBucket)
|> range(start: taskCfg.since, stop:taskCfg.ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|> set(key: "_field", value: "total_value_transferred")

totalMessages = from(bucket: taskCfg.sourceBucket)
|> range(start: taskCfg.since, stop:taskCfg.ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|> set(key: "_field", value: "total_messages")

return union(tables:[totalMessages,totalValueTransferred])
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: taskCfg.destMeasurement)
|> map(fn: (r) => ({r with _time: time(v:taskCfg.since)}))
|>to(bucket: taskCfg.destBucket)
}



ts = date.truncate(t: now(), unit: 1d)
bucketInfinite = "wormscan"
bucket30d = "wormscan-30days"

cfg = {
sourceBucket:bucketInfinite,
destBucket:bucketInfinite,
destMeasurement:"core_protocols_stats_1d",
since: date.sub(d: 1d, from: ts),
ts:ts,
}

// Set this variable with the cfg of the desired task

option task = {
name: "cctp and portal_token_bridge metrics every day",
every: 1d,
}

calculateProtocolStats(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",taskCfg:cfg)

calculateProtocolStats(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",taskCfg:cfg)
50 changes: 50 additions & 0 deletions analytics/scripts/protocols_stats_1h.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
import "date"

option task = {
name: "cctp and portal_token_bridge metrics every hour",
every: 1h,
}


calculateLastHourMetrics = (protocol,protocolVersion,ts) => {

since = date.sub(d: 1h, from: ts)
sourceBucket = "wormscan"
destMeasurement = "core_protocols_stats_1h"
bucket30d = "wormscan-30days"

totalValueTransferred = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: since}))
|> set(key: "_field", value: "total_value_transferred")

totalMessages = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: since}))
|> set(key: "_field", value: "total_messages")

return union(tables:[totalMessages,totalValueTransferred]) // if nothing happened during the last hour then union will result in empty and no point will be added.
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: destMeasurement)
|> map(fn: (r) => ({r with _time: since}))
|>to(bucket: bucket30d)
}

ts = date.truncate(t: now(), unit: 1h)

// execute function for CCTP_WORMHOLE_INTEGRATION
calculateLastHourMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts)


// execute function for PORTAL_TOKEN_BRIDGE
calculateLastHourMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts)
198 changes: 177 additions & 21 deletions api/handlers/protocols/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,117 @@ from(bucket: "%s")

const QueryTemplateActivityLatestPoint = `
from(bucket: "%s")
|> range(start: -1d)
|> range(start: 1970-01-01T00:00:00Z)
|> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s")
|> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"])
|> last()
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
`

// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day
const QueryIntProtocolsTotalStartOfDay = `
import "date"
import "types"
startOfCurrentDay = date.truncate(t: now(), unit: 1d)
data = from(bucket: "%s")
|> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfCurrentDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day
const QueryIntProtocolsDeltaSinceStartOfDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
startOfDay = date.truncate(t: now(), unit: 1d)
data = from(bucket: "%s")
|> range(start: startOfDay,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:startOfDay))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

// QueryIntProtocolsDeltaLastDay calculate last day delta
const QueryIntProtocolsDeltaLastDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)
data = from(bucket: "%s")
|> range(start: yesterday,stop:ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
tvt = data
|> filter(fn : (r) => r._field == "total_value_transferred")
|> group()
|> sum()
|> set(key:"_field",value:"total_value_transferred")
|> map(fn: (r) => ({r with _value: int(v: r._value)}))
totalMsgs = data
|> filter(fn : (r) => r._field == "total_messages")
|> group()
|> sum()
|> set(key:"_field",value:"total_messages")
union(tables:[tvt,totalMsgs])
|> set(key:"_time",value:string(v:yesterday))
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> set(key:"app_id",value:"%s")
`

type Repository struct {
queryAPI QueryDoer
logger *zap.Logger
statsBucket string
activityBucket string
statsVersion string
activityVersion string
queryAPI QueryDoer
logger *zap.Logger
bucketInfinite string
bucket30d string
statsVersion string
activityVersion string
intProtocolMeasurement map[string]struct {
Daily string
Hourly string
}
}

type rowStat struct {
Expand All @@ -50,6 +147,17 @@ type rowStat struct {
TotalValueLocked float64 `mapstructure:"total_value_locked"`
}

type intRowStat struct {
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred uint64 `mapstructure:"total_value_transferred"`
}

type intStats struct {
Latest intRowStat
DeltaLast24hr intRowStat
}

type rowActivity struct {
Protocol string `mapstructure:"protocol"`
DestinationChainId string `mapstructure:"destination_chain_id"`
Expand Down Expand Up @@ -85,14 +193,21 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer {
return &queryApiWrapper{qApi: qApi}
}

func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
return &Repository{
queryAPI: qApi,
statsBucket: statsBucket,
activityBucket: activityBucket,
bucketInfinite: bucketInfinite,
bucket30d: bucket30d,
statsVersion: statsVersion,
activityVersion: activityVersion,
logger: logger,
intProtocolMeasurement: map[string]struct {
Daily string
Hourly string
}{
CCTP: {Daily: dbconsts.CctpStatsMeasurementDaily, Hourly: dbconsts.CctpStatsMeasurementHourly},
PortalTokenBridge: {Daily: dbconsts.TokenBridgeStatsMeasurementDaily, Hourly: dbconsts.TokenBridgeStatsMeasurementHourly},
},
}
}

Expand All @@ -101,40 +216,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult,
}

// returns latest and last 24 hr stats for a given protocol
func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) {
func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) {

// fetch latest stat
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
return stats{}, err
}
// fetch last 24 hr stat
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
return stats{
Latest: latest,
Last24: last24hr,
}, err
}

func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) {
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion)
func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) {
q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion)
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol)
}

// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge)
func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) {

// calculate total values till the start of current day
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol, protocol)
totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol)
if err != nil {
return intStats{}, err
}

// calculate delta since the beginning of current day
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol)
if errCD != nil {
return intStats{}, errCD
}

latestTotal := intRowStat{
Protocol: protocol,
TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages,
TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred,
}

result := intStats{
Latest: latestTotal,
}

// calculate last day delta
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol, protocol)
deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol)
if errQ3 != nil {
return result, errQ3
}

result.DeltaLast24hr = deltaYesterdayStats
return result, nil
}

func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) {
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) {
var res T
q := buildQuery(queryTemplate, bucket, measurement, contributor, version)
result, err := queryAPI.Query(ctx, q)
result, err := queryAPI.Query(ctx, query)
if err != nil {
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query))
return res, err
}
defer result.Close()

if !result.Next() {
if result.Err() != nil {
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query))
return res, result.Err()
}
logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q))
logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query))
return res, err
}

Expand Down
Loading

0 comments on commit 4009482

Please sign in to comment.