Skip to content

Commit

Permalink
Add backfiller for analytics from mongodb (#562)
Browse files Browse the repository at this point in the history
  • Loading branch information
ftocal committed Jul 25, 2023
1 parent 8fc89e3 commit 13819e2
Show file tree
Hide file tree
Showing 9 changed files with 400 additions and 164 deletions.
58 changes: 49 additions & 9 deletions analytics/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,24 +71,64 @@ func addVaaCountCommand(parent *cobra.Command) {
parent.AddCommand(vaaCountCmd)
}

func addVaaVolumeCommand(parent *cobra.Command) {
func addVaaVolumeFromFileCommand(parent *cobra.Command) {
var input, output, prices string
vaaVolumeCmd := &cobra.Command{
Use: "vaa-volume",

//vaa-volume from csv file
vaaVolumeFileCmd := &cobra.Command{
Use: "file",
Short: "Generate volume metrics from a VAA csv file",
Run: func(_ *cobra.Command, _ []string) {
metrics.RunVaaVolume(input, output, prices)
metrics.RunVaaVolumeFromFile(input, output, prices)
},
}

// input flag
vaaVolumeCmd.Flags().StringVar(&input, "input", "", "path to input vaa file")
vaaVolumeCmd.MarkFlagRequired("input")
vaaVolumeFileCmd.Flags().StringVar(&input, "input", "", "path to input vaa file")
vaaVolumeFileCmd.MarkFlagRequired("input")
// output flag
vaaVolumeFileCmd.Flags().StringVar(&output, "output", "", "path to output file")
vaaVolumeFileCmd.MarkFlagRequired("output")
// prices flag
vaaVolumeFileCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file")

parent.AddCommand(vaaVolumeFileCmd)
}

func addVaaVolumeFromMongoCommand(parent *cobra.Command) {
var mongoUri, mongoDb, output, prices string
//vaa-volume from MongoDB
vaaVolumeMongoCmd := &cobra.Command{
Use: "mongo",
Short: "Generate volume metrics from MongoDB",
Run: func(_ *cobra.Command, _ []string) {
metrics.RunVaaVolumeFromMongo(mongoUri, mongoDb, output, prices)
},
}

//mongo flags
vaaVolumeMongoCmd.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection")
vaaVolumeMongoCmd.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database")

// output flag
vaaVolumeCmd.Flags().StringVar(&output, "output", "", "path to output file")
vaaVolumeCmd.MarkFlagRequired("output")
vaaVolumeMongoCmd.Flags().StringVar(&output, "output", "", "path to output file")
vaaVolumeMongoCmd.MarkFlagRequired("output")
// prices flag
vaaVolumeCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file")
vaaVolumeMongoCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file")

parent.AddCommand(vaaVolumeMongoCmd)

}

func addVaaVolumeCommand(parent *cobra.Command) {

vaaVolumeCmd := &cobra.Command{
Use: "vaa-volume",
Short: "Generate volume metric",
}

addVaaVolumeFromFileCommand(vaaVolumeCmd)
addVaaVolumeFromMongoCommand(vaaVolumeCmd)
parent.AddCommand(vaaVolumeCmd)
}

Expand Down
134 changes: 15 additions & 119 deletions analytics/cmd/metrics/volume.go
100755 → 100644
Original file line number Diff line number Diff line change
@@ -1,13 +1,8 @@
package metrics

import (
"bufio"
"encoding/hex"
"errors"
"fmt"
"io"
"os"
"strings"
"time"

"github.com/influxdata/influxdb-client-go/v2/api/write"
Expand All @@ -16,143 +11,44 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/analytics/metric"
"github.com/wormhole-foundation/wormhole-explorer/analytics/prices"
"github.com/wormhole-foundation/wormhole-explorer/common/domain"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)

type LineParser struct {
type VaaConverter struct {
MissingTokens map[sdk.Address]sdk.ChainID
MissingTokensCounter map[sdk.Address]int
PriceCache *prices.CoinPricesCache
Metrics metrics.Metrics
}

// read a csv file with VAAs and convert into a decoded csv file
// ready to upload to the database
func RunVaaVolume(inputFile, outputFile, pricesFile string) {

// build logger
logger := logger.New("wormhole-explorer-analytics")

logger.Info("starting wormhole-explorer-analytics ...")

// open input file
f, err := os.Open(inputFile)
if err != nil {
logger.Fatal("opening input file", zap.Error(err))
}
defer f.Close()

// create missing tokens file
missingTokensFile := "missing_tokens.csv"
fmissingTokens, err := os.Create(missingTokensFile)
if err != nil {
logger.Fatal("creating missing tokens file", zap.Error(err))
}
defer fmissingTokens.Close()

//open output file for writing
fout, err := os.Create(outputFile)
if err != nil {
logger.Fatal("creating output file", zap.Error(err))
}
defer fout.Close()

// init price cache!
logger.Info("loading historical prices...")
lp := NewLineParser(pricesFile)
lp.PriceCache.InitCache()
logger.Info("loaded historical prices")

r := bufio.NewReader(f)

c := 0
i := 0
// read file line by line and send to workpool
for {
line, _, err := r.ReadLine() //loading chunk into buffer
if err != nil {
if err == io.EOF {
break
}
logger.Fatal("a real error happened here", zap.Error(err))
}
nl, err := lp.ParseLine(line)
if err != nil {
//fmt.Printf(",")
} else {
c++
fout.Write([]byte(nl))

if c == 10000 {
fmt.Printf(".")
c = 0
i := i + 1
if i == 10 {
fmt.Printf("\n")
i = 0
}
}

}

}

for k := range lp.MissingTokensCounter {
fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), lp.MissingTokens[k], lp.MissingTokensCounter[k]))
}

logger.Info("missing tokens", zap.Int("count", len(lp.MissingTokens)))

logger.Info("finished wormhole-explorer-analytics")

}

func NewLineParser(filename string) *LineParser {
priceCache := prices.NewCoinPricesCache(filename)
return &LineParser{
func NewVaaConverter(priceCache *prices.CoinPricesCache) *VaaConverter {
return &VaaConverter{
MissingTokens: make(map[sdk.Address]sdk.ChainID),
MissingTokensCounter: make(map[sdk.Address]int),
PriceCache: priceCache,
Metrics: metrics.NewNoopMetrics(),
}
}

// ParseLine takes a CSV line as input, and generates a line protocol entry as output.
//
// The format for InfluxDB line protocol is: vaa,tags fields timestamp
func (lp *LineParser) ParseLine(line []byte) (string, error) {
func (c *VaaConverter) Convert(vaaBytes []byte) (string, error) {

// Parse the VAA and payload
var vaa *sdk.VAA
var payload *sdk.TransferPayloadHdr
{
tt := strings.Split(string(line), ",")
if len(tt) != 2 {
return "", fmt.Errorf("expected line to have two tokens, but has %d: %s", len(tt), line)
}
vaaBytes, err := hex.DecodeString(tt[1])
if err != nil {
return "", fmt.Errorf("error decoding: %v", err)
}
vaa, err = sdk.Unmarshal(vaaBytes)
if err != nil {
return "", fmt.Errorf("error unmarshaling vaa: %v", err)
}
payload, err = sdk.DecodeTransferPayloadHdr(vaa.Payload)
if err != nil {
return "", fmt.Errorf("error decoding payload: %v", err)
}
vaa, err := sdk.Unmarshal(vaaBytes)
if err != nil {
return "", fmt.Errorf("error unmarshaling vaa: %v", err)
}
payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload)
if err != nil {
return "", fmt.Errorf("error decoding payload: %v", err)
}

// Look up token metadata
tokenMetadata, ok := domain.GetTokenByAddress(vaa.EmitterChain, payload.OriginAddress.String())
if !ok {

// if not found, add to missing tokens
lp.MissingTokens[payload.OriginAddress] = vaa.EmitterChain
lp.MissingTokensCounter[payload.OriginAddress] = lp.MissingTokensCounter[payload.OriginAddress] + 1
c.MissingTokens[payload.OriginAddress] = vaa.EmitterChain
c.MissingTokensCounter[payload.OriginAddress] = c.MissingTokensCounter[payload.OriginAddress] + 1

return "", fmt.Errorf("unknown token: %s %s", payload.OriginChain.String(), payload.OriginAddress.String())
}
Expand All @@ -165,14 +61,14 @@ func (lp *LineParser) ParseLine(line []byte) (string, error) {
TokenPriceFunc: func(_ domain.Symbol, timestamp time.Time) (decimal.Decimal, error) {

// fetch the historic price from cache
price, err := lp.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp)
price, err := c.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp)
if err != nil {
return decimal.NewFromInt(0), err
}

return price, nil
},
Metrics: lp.Metrics,
Metrics: c.Metrics,
}

var err error
Expand Down
124 changes: 124 additions & 0 deletions analytics/cmd/metrics/volume_csv.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
package metrics

import (
"bufio"
"encoding/hex"
"fmt"
"io"
"os"
"strings"

"github.com/wormhole-foundation/wormhole-explorer/analytics/prices"
"github.com/wormhole-foundation/wormhole-explorer/common/logger"
"go.uber.org/zap"
)

type LineParser struct {
converter *VaaConverter
}

// read a csv file with VAAs and convert into a decoded csv file
// ready to upload to the database
func RunVaaVolumeFromFile(inputFile, outputFile, pricesFile string) {

// build logger
logger := logger.New("wormhole-explorer-analytics")

logger.Info("starting wormhole-explorer-analytics ...")

// open input file
f, err := os.Open(inputFile)
if err != nil {
logger.Fatal("opening input file", zap.Error(err))
}
defer f.Close()

// create missing tokens file
missingTokensFile := "missing_tokens.csv"
fmissingTokens, err := os.Create(missingTokensFile)
if err != nil {
logger.Fatal("creating missing tokens file", zap.Error(err))
}
defer fmissingTokens.Close()

//open output file for writing
fout, err := os.Create(outputFile)
if err != nil {
logger.Fatal("creating output file", zap.Error(err))
}
defer fout.Close()

// init price cache!
logger.Info("loading historical prices...")
priceCache := prices.NewCoinPricesCache(pricesFile)
priceCache.InitCache()
converter := NewVaaConverter(priceCache)
lp := NewLineParser(converter)
logger.Info("loaded historical prices")

r := bufio.NewReader(f)

c := 0
i := 0
// read file line by line and send to workpool
for {
line, _, err := r.ReadLine() //loading chunk into buffer
if err != nil {
if err == io.EOF {
break
}
logger.Fatal("a real error happened here", zap.Error(err))
}
nl, err := lp.ParseLine(line)
if err != nil {
//fmt.Printf(",")
} else {
c++
fout.Write([]byte(nl))

if c == 10000 {
fmt.Printf(".")
c = 0
i := i + 1
if i == 10 {
fmt.Printf("\n")
i = 0
}
}
}

}

for k := range converter.MissingTokensCounter {
fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k]))
}

logger.Info("missing tokens", zap.Int("count", len(converter.MissingTokens)))

logger.Info("finished wormhole-explorer-analytics")

}

func NewLineParser(converter *VaaConverter) *LineParser {
return &LineParser{
converter: converter,
}
}

// ParseLine takes a CSV line as input, and generates a line protocol entry as output.
//
// The format for InfluxDB line protocol is: vaa,tags fields timestamp
func (lp *LineParser) ParseLine(line []byte) (string, error) {

// Parse the VAA and payload
tt := strings.Split(string(line), ",")
if len(tt) != 2 {
return "", fmt.Errorf("expected line to have two tokens, but has %d: %s", len(tt), line)
}
vaaBytes, err := hex.DecodeString(tt[1])
if err != nil {
return "", fmt.Errorf("error decoding: %v", err)
}

return lp.converter.Convert(vaaBytes)
}
Loading

0 comments on commit 13819e2

Please sign in to comment.