Skip to content

Commit

Permalink
changes
Browse files Browse the repository at this point in the history
 indent

more changes on the script

 new working approach on influx task

 indent script

 tested insert

 add 2 versions of the script

multiple changes

 add 1day task

 add logic to retrieve internal protocols
  • Loading branch information
marianososto committed Mar 4, 2024
1 parent 54cf6c7 commit 803a5bb
Show file tree
Hide file tree
Showing 7 changed files with 280 additions and 45 deletions.
Empty file.
Empty file.
162 changes: 144 additions & 18 deletions api/handlers/protocols/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,81 @@ from(bucket: "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
`

// QueryIntProtocolsTotalStartOfDay Query template for internal protocols (cctp and portal_token_bridge) to fetch total values till the start of current day
const QueryIntProtocolsTotalStartOfDay = `
import "date"
import "types"
startOfCurrentDay = date.truncate(t: now(), unit: 1d)
from(bucket: "%s")
|> range(start: 1970-01-01T00:00:00Z,stop:startOfCurrentDay)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
`

// QueryIntProtocolsDeltaSinceStartOfDay calculate delta since the beginning of current day
const QueryIntProtocolsDeltaSinceStartOfDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
startOfDay = date.sub(d: 1d, from: ts)
from(bucket: "%s")
|> range(start: startOfDay, stop: ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
`

// QueryIntProtocolsDeltaLastDay calculate last day delta
const QueryIntProtocolsDeltaLastDay = `
import "date"
import "types"
ts = date.truncate(t: now(), unit: 1h)
yesterday = date.sub(d: 1d, from: ts)
from(bucket: "%s")
|> range(start: yesterday, stop: ts)
|> filter(fn: (r) => r._measurement == "%s" and r.app_id == "%s")
|> pivot(rowKey:["_time"], columnKey: ["_field"], valueColumn: "_value")
|> group(columns: ["app_id"])
|> reduce(fn: (r, accumulator) => ({
total_messages: accumulator.total_messages + (if exists r.total_messages then r.total_messages else float(v:0)),
total_value_transferred: accumulator.total_value_transferred + (if exists r.total_value_transferred then r.total_value_transferred else float(v:0))
}), identity: {
total_messages: float(v:0),
total_value_transferred: float(v:0.0)
})
`

type Repository struct {
queryAPI QueryDoer
logger *zap.Logger
statsBucket string
activityBucket string
statsVersion string
activityVersion string
queryAPI QueryDoer
logger *zap.Logger
statsBucket string
activityBucket string
bucketInfinite string
bucket30d string
statsVersion string
activityVersion string
internalProtocolToMeasurement map[string]string
}

type rowStat struct {
Expand All @@ -50,6 +118,17 @@ type rowStat struct {
TotalValueLocked float64 `mapstructure:"total_value_locked"`
}

type intRowStat struct {
Protocol string `mapstructure:"app_id"`
TotalMessages uint64 `mapstructure:"total_messages"`
TotalValueTransferred float64 `mapstructure:"total_value_transferred"`
}

type intStats struct {
Latest intRowStat
DeltaLast24hr intRowStat
}

type rowActivity struct {
Protocol string `mapstructure:"protocol"`
DestinationChainId string `mapstructure:"destination_chain_id"`
Expand Down Expand Up @@ -85,14 +164,20 @@ func WrapQueryAPI(qApi api.QueryAPI) QueryDoer {
return &queryApiWrapper{qApi: qApi}
}

func NewRepository(qApi QueryDoer, statsBucket, activityBucket, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
func NewRepository(qApi QueryDoer, statsBucket, activityBucket, bucketInfinite, bucket30d, statsVersion, activityVersion string, logger *zap.Logger) *Repository {
return &Repository{
queryAPI: qApi,
statsBucket: statsBucket,
activityBucket: activityBucket,
bucketInfinite: bucketInfinite,
bucket30d: bucket30d,
statsVersion: statsVersion,
activityVersion: activityVersion,
logger: logger,
internalProtocolToMeasurement: map[string]string{
CCTP: dbconsts.CCTPStatsMeasurement,
PortalTokenBridge: dbconsts.TokenBridgeStatsMeasurement,
},
}
}

Expand All @@ -101,40 +186,81 @@ func (q *queryApiWrapper) Query(ctx context.Context, query string) (QueryResult,
}

// returns latest and last 24 hr stats for a given protocol
func (r *Repository) getProtocolStats(ctx context.Context, contributor string) (stats, error) {
func (r *Repository) getProtocolStats(ctx context.Context, protocol string) (stats, error) {

// fetch latest stat
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLatestPoint, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q := buildQuery(QueryTemplateLatestPoint, r.statsBucket, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
latest, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
if err != nil {
return stats{}, err
}
// fetch last 24 hr stat
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, r.statsBucket, QueryTemplateLast24Point, dbconsts.ProtocolsStatsMeasurement, contributor, r.statsVersion)
q = buildQuery(QueryTemplateLast24Point, r.statsBucket, dbconsts.ProtocolsStatsMeasurement, protocol, r.statsVersion)
last24hr, err := fetchSingleRecordData[rowStat](r.logger, r.queryAPI, ctx, q, protocol)
return stats{
Latest: latest,
Last24: last24hr,
}, err
}

func (r *Repository) getProtocolActivity(ctx context.Context, contributor string) (rowActivity, error) {
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, r.activityBucket, QueryTemplateActivityLatestPoint, dbconsts.ProtocolsActivityMeasurement, contributor, r.activityVersion)
func (r *Repository) getProtocolActivity(ctx context.Context, protocol string) (rowActivity, error) {
q := buildQuery(QueryTemplateActivityLatestPoint, r.activityBucket, dbconsts.ProtocolsActivityMeasurement, protocol, r.activityVersion)
return fetchSingleRecordData[rowActivity](r.logger, r.queryAPI, ctx, q, protocol)
}

// returns latest and last 24 hr for internal protocols (cctp and portal_token_bridge)
func (r *Repository) getInternalProtocolStats(ctx context.Context, protocol string) (intStats, error) {

// calculate total values till the start of current day
totalTillCurrentDayQuery := fmt.Sprintf(QueryIntProtocolsTotalStartOfDay, r.bucketInfinite, r.internalProtocolToMeasurement[protocol], protocol)
totalsUntilToday, err := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, totalTillCurrentDayQuery, protocol)
if err != nil {
return intStats{}, err
}

// calculate delta since the beginning of current day
q2 := fmt.Sprintf(QueryIntProtocolsDeltaSinceStartOfDay, r.bucket30d, r.internalProtocolToMeasurement[protocol], protocol)
currentDayStats, errCD := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q2, protocol)
if errCD != nil {
return intStats{}, errCD
}

latestTotal := intRowStat{
Protocol: protocol,
TotalMessages: totalsUntilToday.TotalMessages + currentDayStats.TotalMessages,
TotalValueTransferred: totalsUntilToday.TotalValueTransferred + currentDayStats.TotalValueTransferred,
}

result := intStats{
Latest: latestTotal,
}

// calculate last day delta
q3 := fmt.Sprintf(QueryIntProtocolsDeltaLastDay, r.bucket30d, r.internalProtocolToMeasurement[protocol], protocol)
deltaYesterdayStats, errQ3 := fetchSingleRecordData[intRowStat](r.logger, r.queryAPI, ctx, q3, protocol)
if errQ3 != nil {
return result, errQ3
}

result.DeltaLast24hr = deltaYesterdayStats
return result, nil
}

func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, bucket, queryTemplate, measurement, contributor, version string) (T, error) {
func fetchSingleRecordData[T any](logger *zap.Logger, queryAPI QueryDoer, ctx context.Context, query, protocol string) (T, error) {
var res T
q := buildQuery(queryTemplate, bucket, measurement, contributor, version)
result, err := queryAPI.Query(ctx, q)
result, err := queryAPI.Query(ctx, query)
if err != nil {
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error executing query to fetch data", zap.Error(err), zap.String("protocol", protocol), zap.String("query", query))
return res, err
}
defer result.Close()

if !result.Next() {
if result.Err() != nil {
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", contributor), zap.String("query", q))
logger.Error("error reading query response", zap.Error(result.Err()), zap.String("protocol", protocol), zap.String("query", query))
return res, result.Err()
}
logger.Info("empty query response", zap.String("protocol", contributor), zap.String("query", q))
logger.Info("empty query response", zap.String("protocol", protocol), zap.String("query", query))
return res, err
}

Expand Down
73 changes: 56 additions & 17 deletions api/handlers/protocols/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import (
"sync"
)

const CCTP = "CCTP_WORMHOLE_INTEGRATION"
const PortalTokenBridge = "PORTAL_TOKEN_BRIDGE"

type Service struct {
Protocols []string
repo *Repository
logger *zap.Logger
Protocols []string
repo *Repository
logger *zap.Logger
intProtocols []string
}

type ProtocolTotalValuesDTO struct {
Expand All @@ -24,23 +28,28 @@ type ProtocolTotalValuesDTO struct {
Error string `json:"error,omitempty"`
}

func NewService(protocols []string, repo *Repository, logger *zap.Logger) *Service {
func NewService(extProtocols, intProtocols []string, repo *Repository, logger *zap.Logger) *Service {
return &Service{
Protocols: protocols,
repo: repo,
logger: logger,
Protocols: extProtocols,
repo: repo,
logger: logger,
intProtocols: intProtocols,
}
}

func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalValuesDTO {

wg := &sync.WaitGroup{}
wg.Add(len(s.Protocols))
results := make(chan ProtocolTotalValuesDTO, len(s.Protocols))
totalProtocols := len(s.Protocols) + len(s.intProtocols)
wg.Add(totalProtocols)
results := make(chan ProtocolTotalValuesDTO, totalProtocols)

for i := range s.Protocols {
go s.getProtocolTotalValues(ctx, wg, s.Protocols[i], results)
}
for i := range s.intProtocols {
go s.getIntProtocolValues(ctx, wg, s.intProtocols[i], results)
}
wg.Wait()
close(results)

Expand All @@ -51,7 +60,37 @@ func (s *Service) GetProtocolsTotalValues(ctx context.Context) []ProtocolTotalVa
return resultsSlice
}

func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, contributor string, results chan<- ProtocolTotalValuesDTO) {
func (s *Service) getIntProtocolValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO) {
defer wg.Done()

protocolStats, err := s.repo.getInternalProtocolStats(ctx, protocol)
if err != nil {
results <- ProtocolTotalValuesDTO{
Protocol: protocol,
TotalValueTransferred: protocolStats.Latest.TotalValueTransferred,
TotalMessages: protocolStats.Latest.TotalMessages,
Error: err.Error(),
}
return
}

diffLastDay := protocolStats.DeltaLast24hr.TotalMessages
val := ProtocolTotalValuesDTO{
Protocol: protocol,
TotalValueTransferred: protocolStats.Latest.TotalValueTransferred,
TotalMessages: protocolStats.Latest.TotalMessages,
LastDayMessages: diffLastDay,
}

lastDayTotalMessages := protocolStats.Latest.TotalMessages - diffLastDay
if lastDayTotalMessages != 0 {
percentage := strconv.FormatFloat(float64(diffLastDay)/float64(lastDayTotalMessages)*100, 'f', 2, 64) + "%"
val.LastDayDiffPercentage = percentage
}
results <- val
}

func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup, protocol string, results chan<- ProtocolTotalValuesDTO) {
defer wg.Done()

type statsResult struct {
Expand All @@ -60,27 +99,27 @@ func (s *Service) getProtocolTotalValues(ctx context.Context, wg *sync.WaitGroup
}
statsRes := make(chan statsResult, 1)
go func() {
rowStats, errStats := s.repo.getProtocolStats(ctx, contributor)
rowStats, errStats := s.repo.getProtocolStats(ctx, protocol)
statsRes <- statsResult{result: rowStats, Err: errStats}
close(statsRes)
}()

activity, err := s.repo.getProtocolActivity(ctx, contributor)
activity, err := s.repo.getProtocolActivity(ctx, protocol)
if err != nil {
s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", contributor))
results <- ProtocolTotalValuesDTO{Protocol: contributor, Error: err.Error()}
s.logger.Error("error fetching protocol activity", zap.Error(err), zap.String("protocol", protocol))
results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: err.Error()}
return
}

rStats := <-statsRes
if rStats.Err != nil {
s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", contributor))
results <- ProtocolTotalValuesDTO{Protocol: contributor, Error: rStats.Err.Error()}
s.logger.Error("error fetching protocol stats", zap.Error(rStats.Err), zap.String("protocol", protocol))
results <- ProtocolTotalValuesDTO{Protocol: protocol, Error: rStats.Err.Error()}
return
}

dto := ProtocolTotalValuesDTO{
Protocol: contributor,
Protocol: protocol,
TotalValueLocked: rStats.result.Latest.TotalValueLocked,
TotalMessages: rStats.result.Latest.TotalMessages,
TotalValueTransferred: activity.TotalValueTransferred,
Expand Down
Loading

0 comments on commit 803a5bb

Please sign in to comment.