Skip to content

Commit

Permalink
Add cctp and portal_token_bridge stats to protocols-stats endpoint
Browse files Browse the repository at this point in the history
changes

 indent

more changes on the script

 new working approach on influx task

 indent script

 tested insert

 add 2 versions of the script

multiple changes

 add 1day task

 add logic to retrieve internal protocols

 remove unecessary code

 readd empty script

fix  unit-tests and measurement namings

 fix queries

 fix alignment

rename function task

fix names

improvements on influx task

 add .run config to gitignore

add .run to gitignore

fix task and rename

working api

 multiple things

Delete .run/wormscan api.run.xml
  • Loading branch information
marianososto committed Mar 7, 2024
1 parent 2bec07b commit 6a88f46
Show file tree
Hide file tree
Showing 11 changed files with 469 additions and 126 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -16,4 +16,5 @@ wormholeTxs.json
serviceAccountKey.json
bigtableAccountKey.json
tsconfig.tsbuildinfo
serviceAccount.json
serviceAccount.json
.run
53 changes: 53 additions & 0 deletions analytics/scripts/protocols_stats_1d.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import "date"

option task = {
name: "cctp and portal_token_bridge metrics every day",
every: 1d,
}


calculateLastDayMetrics = (protocol,protocolVersion,ts) => {

bucket30d = "wormscan-30days"
bucketInfinite = "wormscan"
measurement1d = "cctp_tb_stats_1d"
yesterday = date.sub(d: 1d, from: ts)

totalValueTransferred = from(bucket: bucket30d)
|> range(start: yesterday, stop: ts)
|> filter(fn: (r) => r._measurement == "cctp_tb_stats_1h" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "total_value_transferred")
|> keep(columns: ["_value","_field","app_id","_measurement"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: yesterday}))
|> map(fn: (r) => ({r with _value: float(v:r._value)}))
|> set(key: "_field", value: "total_value_transferred")

totalMessages = from(bucket: bucket30d)
|> range(start: yesterday, stop: ts)
|> filter(fn: (r) => r._measurement == "cctp_tb_stats_1h" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "total_messages")
|> keep(columns: ["_value","_field","app_id","_measurement"])
|> group()
|> count()
|> map(fn: (r) => ({r with _time: yesterday}))
|> map(fn: (r) => ({r with _value: float(v:r._value)}))
|> set(key: "_field", value: "total_value_transferred")

lastDayTotals = union(tables:[totalMessages,totalValueTransferred])
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: measurement1d)
|> map(fn: (r) => ({r with _time: yesterday}))
to(bucket:bucketInfinite)
}

ts = date.truncate(t: now(), unit: 1d)

// execute function for CCTP_WORMHOLE_INTEGRATION
calculateTotalMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts)


// execute function for PORTAL_TOKEN_BRIDGE
calculateTotalMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts)
52 changes: 52 additions & 0 deletions analytics/scripts/protocols_stats_1h.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import "date"

option task = {
name: "cctp and portal_token_bridge metrics every hour",
every: 1h,
}


calculateLastHourMetrics = (protocol,protocolVersion,ts) => {

since = date.sub(d: 1h, from: ts)
sourceBucket = "wormscan"
destMeasurement = "internal_protocols_stats_1h"
bucket30d = "wormscan-30days"

totalValueTransferred = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume" and r._value > 0)
|> drop(columns:["destination_chain","emitter_chain","token_address","token_chain","version"])
|> group()
|> sum()
|> map(fn: (r) => ({r with _time: since}))
|> map(fn: (r) => ({r with _value: float(v:r._value)})) //total_value_transferred:float
|> set(key: "_field", value: "total_value_transferred")

totalMessages = from(bucket: sourceBucket)
|> range(start: since, stop:ts)
|> filter(fn: (r) => r._measurement == "vaa_volume_v2" and r.app_id == protocol)
|> filter(fn: (r) => r._field == "volume")
|> group()
|> count()
|> map(fn: (r) => ({r with _time: since}))
|> map(fn: (r) => ({r with _value: float(v:r._value)})) //total_messages:float
|> set(key: "_field", value: "total_messages")

return union(tables:[totalMessages,totalValueTransferred]) // if nothing happened during the last hour then union will result in empty and no point will be added.
|> set(key: "app_id", value: protocol)
|> set(key: "version", value: protocolVersion)
|> set(key: "_measurement", value: destMeasurement)
|> map(fn: (r) => ({r with _time: since}))
|>to(bucket: bucket30d)
}

ts = date.truncate(t: now(), unit: 1h)

// execute function for CCTP_WORMHOLE_INTEGRATION
calculateLastHourMetrics(protocol:"CCTP_WORMHOLE_INTEGRATION",protocolVersion:"v1",ts:ts)


// execute function for PORTAL_TOKEN_BRIDGE
calculateLastHourMetrics(protocol:"PORTAL_TOKEN_BRIDGE",protocolVersion:"v1",ts:ts)
2 changes: 2 additions & 0 deletions analytics/scripts/test_query.flux
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from(bucket: "wormscan")
|> range(start: 1970-01-01T00:00:00Z,stop:now())
170 changes: 149 additions & 21 deletions api/handlers/protocols/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,20 +28,89 @@ from(bucket: "%s")

const QueryTemplateActivityLatestPoint = `
from(bucket: "%s")
|> range(start: -1d)
|> range(start: -3d)
|> filter(fn: (r) => r._measurement == "%s" and r.protocol == "%s" and r.version == "%s")
|> keep(columns: ["_time","_field","protocol", "_value", "total_value_secure", "total_value_transferred"])
|> last()
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
`

// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day
const QueryIntProtocolsTotalStartOfDay = `
import "date"
import "types"
startOfCurrentDay = date.truncate(t: now(), unit: 1d)
from(bucket: "%s")
|> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
`

// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day
const QueryIntProtocolsDeltaSinceStartOfDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
startOfDay = date.truncate(t: now(), unit: 1d)
from(bucket: "%s")
|> range(start: startOfDay, stop: ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
`

// QueryIntProtocolsDeltaLastDay calculate last day delta
const QueryIntProtocolsDeltaLastDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)
from(bucket: "%s")
|> range(start: yesterday, stop: ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
`

type Repository struct {
queryAPI QueryDoer
logger *zap.Logger
statsBucket string
activityBucket string
statsVersion string
activityVersion string
queryAPI QueryDoer
logger *zap.Logger
bucketInfinite string
bucket30d string
statsVersion string
activityVersion string
intProtocolMeasurement map[string]struct {
Daily string
Hourly string
}
}

type rowStat struct {
Expand All @@ -50,6 +119,17 @@ type rowStat struct {
TotalValueLocked float64 `mapstructure:"total_value_locked"`
}

type intRowStat struct {
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred float64 `mapstructure:"total_value_transferred"`
}

type intStats struct {
Latest intRowStat
DeltaLast24hr intRowStat
}

type rowActivity struct {
Protocol string `mapstructure:"protocol"`
DestinationChainId string `mapstructure:"destination_chain_id"`
Expand Down Expand Up @@ -85,14 +165,21 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer {
return &queryApiWrapper{qApi: qApi}
}

func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
func NewRepository(qApi QueryDoer, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
return &Repository{
queryAPI: qApi,
statsBucket: statsBucket,
activityBucket: activityBucket,
bucketInfinite: bucketInfinite,
bucket30d: bucket30d,
statsVersion: statsVersion,
activityVersion: activityVersion,
logger: logger,
intProtocolMeasurement: map[string]struct {
Daily string
Hourly string
}{
CCTP: {Daily: dbconsts.CctpStatsMeasurementDaily, Hourly: dbconsts.CctpStatsMeasurementHourly},
PortalTokenBridge: {Daily: dbconsts.TokenBridgeStatsMeasurementDaily, Hourly: dbconsts.TokenBridgeStatsMeasurementHourly},
},
}
}

Expand All @@ -101,40 +188,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult,
}

// returns latest and last 24 hr stats for a given protocol
func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) {
func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) {

// fetch latest stat
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q := buildQuery(QueryTemplateLatestPoint, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
return stats{}, err
}
// fetch last 24 hr stat
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q = buildQuery(QueryTemplateLast24Point, r.bucket30d, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
return stats{
Latest: latest,
Last24: last24hr,
}, err
}

func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) {
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion)
func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) {
q := buildQuery(QueryTemplateActivityLatestPoint, r.bucket30d, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion)
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol)
}

// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge)
func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) {

// calculate total values till the start of current day
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.intProtocolMeasurement[protocol].Daily, protocol)
totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol)
if err != nil {
return intStats{}, err
}

// calculate delta since the beginning of current day
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol)
currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol)
if errCD != nil {
return intStats{}, errCD
}

latestTotal := intRowStat{
Protocol: protocol,
TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages,
TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred,
}

result := intStats{
Latest: latestTotal,
}

// calculate last day delta
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.intProtocolMeasurement[protocol].Hourly, protocol)
deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol)
if errQ3 != nil {
return result, errQ3
}

result.DeltaLast24hr = deltaYesterdayStats
return result, nil
}

func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) {
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) {
var res T
q := buildQuery(queryTemplate, bucket, measurement, contributor, version)
result, err := queryAPI.Query(ctx, q)
result, err := queryAPI.Query(ctx, query)
if err != nil {
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query))
return res, err
}
defer result.Close()

if !result.Next() {
if result.Err() != nil {
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query))
return res, result.Err()
}
logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q))
logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query))
return res, err
}

Expand Down
Loading

0 comments on commit 6a88f46

Please sign in to comment.