Skip to content

Commit

Permalink
refactor scorecards
Browse files Browse the repository at this point in the history
  • Loading branch information
marianososto committed Sep 18, 2024
1 parent 9ce04c7 commit 6918544
Show file tree
Hide file tree
Showing 2 changed files with 295 additions and 50 deletions.
153 changes: 103 additions & 50 deletions api/handlers/transactions/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,20 @@ package transactions

import (
"context"
errors2 "errors"
"fmt"
"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/influxdata/influxdb-client-go/v2/api/query"
"github.com/pkg/errors"
"github.com/valyala/fasthttp"
"strconv"
"strings"
"sync"
"time"

influxdb2 "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api"
//"github.com/influxdata/influxdb-client-go/v2/api"
"github.com/mitchellh/mapstructure"
"github.com/pkg/errors"
"github.com/wormhole-foundation/wormhole-explorer/api/handlers/common"
"github.com/wormhole-foundation/wormhole-explorer/api/internal/config"
errs "github.com/wormhole-foundation/wormhole-explorer/api/internal/errors"
Expand Down Expand Up @@ -127,10 +130,10 @@ type repositoryCollections struct {
}

type Repository struct {
tvl *tvl.Tvl
tvl getTvl
p2pNetwork string
influxCli influxdb2.Client
queryAPI api.QueryAPI
queryAPI influxQueryAPI
bucketInfiniteRetention string
bucket30DaysRetention string
bucket24HoursRetention string
Expand All @@ -140,6 +143,45 @@ type Repository struct {
logger *zap.Logger
}

type influxQueryAPI interface {
Query(ctx context.Context, query string) (influxQueryResult, error)
}

type influxQueryResult interface {
Err() error
Next() bool
Record() *query.FluxRecord
}

type influxAdapter struct {
influxAPI api.QueryAPI
}

func (i *influxAdapter) Query(ctx context.Context, query string) (influxQueryResult, error) {
result, err := i.influxAPI.Query(ctx, query)
return &influxResult{result}, err
}

type influxResult struct {
result *api.QueryTableResult
}

func (i *influxResult) Err() error {
return i.result.Err()
}

func (i *influxResult) Next() bool {
return i.result.Next()
}

func (i *influxResult) Record() *query.FluxRecord {
return i.result.Record()
}

type getTvl interface {
Get(ctx context.Context) (string, error)
}

type offset string

const _24h offset = "24h"
Expand All @@ -160,7 +202,7 @@ func NewRepository(
tvl: tvl,
p2pNetwork: p2pNetwork,
influxCli: client,
queryAPI: client.QueryAPI(org),
queryAPI: &influxAdapter{client.QueryAPI(org)},
bucket24HoursRetention: bucket24HoursRetention,
bucket30DaysRetention: bucket30DaysRetention,
bucketInfiniteRetention: bucketInfiniteRetention,
Expand Down Expand Up @@ -442,87 +484,89 @@ func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) {
// We use a `sync.WaitGroup` to block until all goroutines are done.
var wg sync.WaitGroup

var messages24h, tvl, totalTxCount, totalTxVolume, volume24h, volume7d, volume30d, totalPythMessage string
ctxWithCancel, cancel := context.WithCancel(ctx)
defer cancel()

var resultErr error
mutex := &sync.Mutex{}
collectError := func(err error) {
mutex.Lock()
resultErr = errors2.Join(resultErr, err)
mutex.Unlock()
}

handleErr := func(errMsgLog string, err error) {
if err != nil {
r.logger.Error(errMsgLog, zap.Error(err))
collectError(err)
cancel() // this will signal the rest of goroutines to exit also.
}
}

var messages24h, totalValueLocked, totalTxCount, totalTxVolume, volume24h, volume7d, volume30d, totalPythMessage string

wg.Add(1)
go func() {
defer wg.Done()
var err error
messages24h, err = r.getMessages24h(ctx)
if err != nil {
r.logger.Error("failed to query 24h messages", zap.Error(err))
}
messages24h, err = r.getMessages24h(ctxWithCancel)
handleErr("failed to get 24h messages", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
tvl, err = r.tvl.Get(ctx)
if err != nil {
r.logger.Error("failed to get tvl", zap.Error(err))
}
totalValueLocked, err = r.tvl.Get(ctxWithCancel)
handleErr("failed to get tvl", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
totalTxCount, err = r.getTotalTxCount(ctx)
if err != nil {
r.logger.Error("failed to tx count", zap.Error(err))
}

totalTxCount, err = r.getTotalTxCount(ctxWithCancel)
handleErr("failed to get total tx count", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
totalPythMessage, err = r.getTotalPythMessage(ctx)
if err != nil {
r.logger.Error("failed to get total pyth message", zap.Error(err))
}
totalPythMessage, err = r.getTotalPythMessage(ctxWithCancel)
handleErr("failed to get total pyth message", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
totalTxVolume, err = r.getTotalTxVolume(ctx)
if err != nil {
r.logger.Error("failed to get total tx volume", zap.Error(err))
}
totalTxVolume, err = r.getTotalTxVolume(ctxWithCancel)
handleErr("failed to get total tx volume", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
volume24h, err = r.getVolume(ctx, _24h)
if err != nil {
r.logger.Error("failed to get 24h volume", zap.Error(err))
}
volume24h, err = r.getVolume(ctxWithCancel, _24h)
handleErr("failed to get 24h volume", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
volume7d, err = r.getVolume(ctx, _7d)
if err != nil {
r.logger.Error("failed to get 7d volume", zap.Error(err))
}
volume7d, err = r.getVolume(ctxWithCancel, _7d)
handleErr("failed to get 7d volume", err)
}()

wg.Add(1)
go func() {
defer wg.Done()
var err error
volume30d, err = r.getVolume(ctx, _30d)
if err != nil {
r.logger.Error("failed to get 30d volume", zap.Error(err))
}
volume30d, err = r.getVolume(ctxWithCancel, _30d)
handleErr("failed to get 30d volume", err)
}()

// Each of the queries synchronized by this wait group has a context timeout.
Expand All @@ -538,12 +582,12 @@ func (r *Repository) GetScorecards(ctx context.Context) (*Scorecards, error) {
TotalMessages: totalMessage,
TotalTxCount: totalTxCount,
TotalTxVolume: totalTxVolume,
Tvl: tvl,
Tvl: totalValueLocked,
Volume24h: volume24h,
Volume7d: volume7d,
Volume30d: volume30d,
}
return &scorecards, nil
return &scorecards, resultErr
}

// calculateTotalMessage calculate the total message from the total tx count and the total pyth message
Expand Down Expand Up @@ -572,8 +616,8 @@ func calculateTotalMessage(p2pNetwork string, totalTxCount, totalPythMessage str

func (r *Repository) getTotalTxCount(ctx context.Context) (string, error) {

query := buildTotalTrxCountQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now())
result, err := r.queryAPI.Query(ctx, query)
trxCountQuery := buildTotalTrxCountQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now())
result, err := r.queryAPI.Query(ctx, trxCountQuery)
if err != nil {
r.logger.Error("failed to query total tx count by portal bridge", zap.Error(err))
return "", err
Expand All @@ -596,8 +640,8 @@ func (r *Repository) getTotalTxCount(ctx context.Context) (string, error) {

func (r *Repository) getTotalTxVolume(ctx context.Context) (string, error) {

query := buildTotalTrxVolumeQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now())
result, err := r.queryAPI.Query(ctx, query)
trxVolumeQuery := buildTotalTrxVolumeQuery(r.bucketInfiniteRetention, r.bucket30DaysRetention, time.Now())
result, err := r.queryAPI.Query(ctx, trxVolumeQuery)
if err != nil {
r.logger.Error("failed to query total tx volume by portal bridge", zap.Error(err))
return "", err
Expand All @@ -621,8 +665,8 @@ func (r *Repository) getTotalTxVolume(ctx context.Context) (string, error) {
func (r *Repository) getMessages24h(ctx context.Context) (string, error) {

// query 24h transactions
query := fmt.Sprintf(queryTemplateMessages24h, r.bucket24HoursRetention, r.bucket24HoursRetention)
result, err := r.queryAPI.Query(ctx, query)
msg24hrVolumeQuery := buildMessages24HrQuery(r.bucket24HoursRetention)
result, err := r.queryAPI.Query(ctx, msg24hrVolumeQuery)
if err != nil {
r.logger.Error("failed to query 24h messages", zap.Error(err))
return "", err
Expand All @@ -646,11 +690,15 @@ func (r *Repository) getMessages24h(ctx context.Context) (string, error) {
return fmt.Sprint(row.Value), nil
}

func buildMessages24HrQuery(bucket24Hr string) string {
return fmt.Sprintf(queryTemplateMessages24h, bucket24Hr, bucket24Hr)
}

func (r *Repository) getVolume(ctx context.Context, from offset) (string, error) {

// query volume
query := fmt.Sprintf(queryTemplateVolume, r.bucketInfiniteRetention, from)
result, err := r.queryAPI.Query(ctx, query)
queryVolume := buildVolumeQuery(r.bucketInfiniteRetention, from)
result, err := r.queryAPI.Query(ctx, queryVolume)
if err != nil {
r.logger.Error("failed to query volume", zap.Any("from", from), zap.Error(err))
return "", err
Expand All @@ -676,6 +724,10 @@ func (r *Repository) getVolume(ctx context.Context, from offset) (string, error)
return volume, nil
}

func buildVolumeQuery(bucketInfinite string, from offset) string {
return fmt.Sprintf(queryTemplateVolume, bucketInfinite, from)
}

// GetTransactionCount get the last transactions.
func (r *Repository) GetTransactionCount(ctx context.Context, q *TransactionCountQuery) ([]TransactionCountResult, error) {
query := buildLastTrxQuery(r.bucket30DaysRetention, time.Now(), q)
Expand Down Expand Up @@ -724,7 +776,8 @@ func (r *Repository) getTotalPythMessage(ctx context.Context) (string, error) {

filter := bson.M{"emitterAddr": pythEmitterAddr}
options := options.FindOne().SetSort(bson.D{{Key: "timestamp", Value: -1}})
err := r.collections.vaasPythnet.FindOne(ctx, filter, options).Decode(&vaaPyth)
singleResult := r.collections.vaasPythnet.FindOne(ctx, filter, options)
err := singleResult.Decode(&vaaPyth)
if err != nil {
if err == mongo.ErrNoDocuments {
r.logger.Warn("no pyth message found")
Expand Down
Loading

0 comments on commit 6918544

Please sign in to comment.