diff --git a/analytics/cmd/main.go b/analytics/cmd/main.go index d9a12a8b8..88372ce86 100644 --- a/analytics/cmd/main.go +++ b/analytics/cmd/main.go @@ -71,24 +71,64 @@ func addVaaCountCommand(parent *cobra.Command) { parent.AddCommand(vaaCountCmd) } -func addVaaVolumeCommand(parent *cobra.Command) { +func addVaaVolumeFromFileCommand(parent *cobra.Command) { var input, output, prices string - vaaVolumeCmd := &cobra.Command{ - Use: "vaa-volume", + + //vaa-volume from csv file + vaaVolumeFileCmd := &cobra.Command{ + Use: "file", Short: "Generate volume metrics from a VAA csv file", Run: func(_ *cobra.Command, _ []string) { - metrics.RunVaaVolume(input, output, prices) + metrics.RunVaaVolumeFromFile(input, output, prices) }, } + // input flag - vaaVolumeCmd.Flags().StringVar(&input, "input", "", "path to input vaa file") - vaaVolumeCmd.MarkFlagRequired("input") + vaaVolumeFileCmd.Flags().StringVar(&input, "input", "", "path to input vaa file") + vaaVolumeFileCmd.MarkFlagRequired("input") + // output flag + vaaVolumeFileCmd.Flags().StringVar(&output, "output", "", "path to output file") + vaaVolumeFileCmd.MarkFlagRequired("output") + // prices flag + vaaVolumeFileCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file") + + parent.AddCommand(vaaVolumeFileCmd) +} + +func addVaaVolumeFromMongoCommand(parent *cobra.Command) { + var mongoUri, mongoDb, output, prices string + //vaa-volume from MongoDB + vaaVolumeMongoCmd := &cobra.Command{ + Use: "mongo", + Short: "Generate volume metrics from MongoDB", + Run: func(_ *cobra.Command, _ []string) { + metrics.RunVaaVolumeFromMongo(mongoUri, mongoDb, output, prices) + }, + } + + //mongo flags + vaaVolumeMongoCmd.Flags().StringVar(&mongoUri, "mongo-uri", "", "Mongo connection") + vaaVolumeMongoCmd.Flags().StringVar(&mongoDb, "mongo-database", "", "Mongo database") + // output flag - vaaVolumeCmd.Flags().StringVar(&output, "output", "", "path to output file") - vaaVolumeCmd.MarkFlagRequired("output") + vaaVolumeMongoCmd.Flags().StringVar(&output, "output", "", "path to output file") + vaaVolumeMongoCmd.MarkFlagRequired("output") // prices flag - vaaVolumeCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file") + vaaVolumeMongoCmd.Flags().StringVar(&prices, "prices", "prices.csv", "path to prices file") + + parent.AddCommand(vaaVolumeMongoCmd) + +} + +func addVaaVolumeCommand(parent *cobra.Command) { + + vaaVolumeCmd := &cobra.Command{ + Use: "vaa-volume", + Short: "Generate volume metric", + } + addVaaVolumeFromFileCommand(vaaVolumeCmd) + addVaaVolumeFromMongoCommand(vaaVolumeCmd) parent.AddCommand(vaaVolumeCmd) } diff --git a/analytics/cmd/metrics/volume.go b/analytics/cmd/metrics/volume.go old mode 100755 new mode 100644 index 17a7c1719..a206d6fb3 --- a/analytics/cmd/metrics/volume.go +++ b/analytics/cmd/metrics/volume.go @@ -1,13 +1,8 @@ package metrics import ( - "bufio" - "encoding/hex" "errors" "fmt" - "io" - "os" - "strings" "time" "github.com/influxdata/influxdb-client-go/v2/api/write" @@ -16,102 +11,18 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/analytics/metric" "github.com/wormhole-foundation/wormhole-explorer/analytics/prices" "github.com/wormhole-foundation/wormhole-explorer/common/domain" - "github.com/wormhole-foundation/wormhole-explorer/common/logger" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" - "go.uber.org/zap" ) -type LineParser struct { +type VaaConverter struct { MissingTokens map[sdk.Address]sdk.ChainID MissingTokensCounter map[sdk.Address]int PriceCache *prices.CoinPricesCache Metrics metrics.Metrics } -// read a csv file with VAAs and convert into a decoded csv file -// ready to upload to the database -func RunVaaVolume(inputFile, outputFile, pricesFile string) { - - // build logger - logger := logger.New("wormhole-explorer-analytics") - - logger.Info("starting wormhole-explorer-analytics ...") - - // open input file - f, err := os.Open(inputFile) - if err != nil { - logger.Fatal("opening input file", zap.Error(err)) - } - defer f.Close() - - // create missing tokens file - missingTokensFile := "missing_tokens.csv" - fmissingTokens, err := os.Create(missingTokensFile) - if err != nil { - logger.Fatal("creating missing tokens file", zap.Error(err)) - } - defer fmissingTokens.Close() - - //open output file for writing - fout, err := os.Create(outputFile) - if err != nil { - logger.Fatal("creating output file", zap.Error(err)) - } - defer fout.Close() - - // init price cache! - logger.Info("loading historical prices...") - lp := NewLineParser(pricesFile) - lp.PriceCache.InitCache() - logger.Info("loaded historical prices") - - r := bufio.NewReader(f) - - c := 0 - i := 0 - // read file line by line and send to workpool - for { - line, _, err := r.ReadLine() //loading chunk into buffer - if err != nil { - if err == io.EOF { - break - } - logger.Fatal("a real error happened here", zap.Error(err)) - } - nl, err := lp.ParseLine(line) - if err != nil { - //fmt.Printf(",") - } else { - c++ - fout.Write([]byte(nl)) - - if c == 10000 { - fmt.Printf(".") - c = 0 - i := i + 1 - if i == 10 { - fmt.Printf("\n") - i = 0 - } - } - - } - - } - - for k := range lp.MissingTokensCounter { - fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), lp.MissingTokens[k], lp.MissingTokensCounter[k])) - } - - logger.Info("missing tokens", zap.Int("count", len(lp.MissingTokens))) - - logger.Info("finished wormhole-explorer-analytics") - -} - -func NewLineParser(filename string) *LineParser { - priceCache := prices.NewCoinPricesCache(filename) - return &LineParser{ +func NewVaaConverter(priceCache *prices.CoinPricesCache) *VaaConverter { + return &VaaConverter{ MissingTokens: make(map[sdk.Address]sdk.ChainID), MissingTokensCounter: make(map[sdk.Address]int), PriceCache: priceCache, @@ -119,31 +30,16 @@ func NewLineParser(filename string) *LineParser { } } -// ParseLine takes a CSV line as input, and generates a line protocol entry as output. -// -// The format for InfluxDB line protocol is: vaa,tags fields timestamp -func (lp *LineParser) ParseLine(line []byte) (string, error) { +func (c *VaaConverter) Convert(vaaBytes []byte) (string, error) { // Parse the VAA and payload - var vaa *sdk.VAA - var payload *sdk.TransferPayloadHdr - { - tt := strings.Split(string(line), ",") - if len(tt) != 2 { - return "", fmt.Errorf("expected line to have two tokens, but has %d: %s", len(tt), line) - } - vaaBytes, err := hex.DecodeString(tt[1]) - if err != nil { - return "", fmt.Errorf("error decoding: %v", err) - } - vaa, err = sdk.Unmarshal(vaaBytes) - if err != nil { - return "", fmt.Errorf("error unmarshaling vaa: %v", err) - } - payload, err = sdk.DecodeTransferPayloadHdr(vaa.Payload) - if err != nil { - return "", fmt.Errorf("error decoding payload: %v", err) - } + vaa, err := sdk.Unmarshal(vaaBytes) + if err != nil { + return "", fmt.Errorf("error unmarshaling vaa: %v", err) + } + payload, err := sdk.DecodeTransferPayloadHdr(vaa.Payload) + if err != nil { + return "", fmt.Errorf("error decoding payload: %v", err) } // Look up token metadata @@ -151,8 +47,8 @@ func (lp *LineParser) ParseLine(line []byte) (string, error) { if !ok { // if not found, add to missing tokens - lp.MissingTokens[payload.OriginAddress] = vaa.EmitterChain - lp.MissingTokensCounter[payload.OriginAddress] = lp.MissingTokensCounter[payload.OriginAddress] + 1 + c.MissingTokens[payload.OriginAddress] = vaa.EmitterChain + c.MissingTokensCounter[payload.OriginAddress] = c.MissingTokensCounter[payload.OriginAddress] + 1 return "", fmt.Errorf("unknown token: %s %s", payload.OriginChain.String(), payload.OriginAddress.String()) } @@ -165,14 +61,14 @@ func (lp *LineParser) ParseLine(line []byte) (string, error) { TokenPriceFunc: func(_ domain.Symbol, timestamp time.Time) (decimal.Decimal, error) { // fetch the historic price from cache - price, err := lp.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp) + price, err := c.PriceCache.GetPriceByTime(tokenMetadata.CoingeckoID, timestamp) if err != nil { return decimal.NewFromInt(0), err } return price, nil }, - Metrics: lp.Metrics, + Metrics: c.Metrics, } var err error diff --git a/analytics/cmd/metrics/volume_csv.go b/analytics/cmd/metrics/volume_csv.go new file mode 100755 index 000000000..eb6e0ae6c --- /dev/null +++ b/analytics/cmd/metrics/volume_csv.go @@ -0,0 +1,124 @@ +package metrics + +import ( + "bufio" + "encoding/hex" + "fmt" + "io" + "os" + "strings" + + "github.com/wormhole-foundation/wormhole-explorer/analytics/prices" + "github.com/wormhole-foundation/wormhole-explorer/common/logger" + "go.uber.org/zap" +) + +type LineParser struct { + converter *VaaConverter +} + +// read a csv file with VAAs and convert into a decoded csv file +// ready to upload to the database +func RunVaaVolumeFromFile(inputFile, outputFile, pricesFile string) { + + // build logger + logger := logger.New("wormhole-explorer-analytics") + + logger.Info("starting wormhole-explorer-analytics ...") + + // open input file + f, err := os.Open(inputFile) + if err != nil { + logger.Fatal("opening input file", zap.Error(err)) + } + defer f.Close() + + // create missing tokens file + missingTokensFile := "missing_tokens.csv" + fmissingTokens, err := os.Create(missingTokensFile) + if err != nil { + logger.Fatal("creating missing tokens file", zap.Error(err)) + } + defer fmissingTokens.Close() + + //open output file for writing + fout, err := os.Create(outputFile) + if err != nil { + logger.Fatal("creating output file", zap.Error(err)) + } + defer fout.Close() + + // init price cache! + logger.Info("loading historical prices...") + priceCache := prices.NewCoinPricesCache(pricesFile) + priceCache.InitCache() + converter := NewVaaConverter(priceCache) + lp := NewLineParser(converter) + logger.Info("loaded historical prices") + + r := bufio.NewReader(f) + + c := 0 + i := 0 + // read file line by line and send to workpool + for { + line, _, err := r.ReadLine() //loading chunk into buffer + if err != nil { + if err == io.EOF { + break + } + logger.Fatal("a real error happened here", zap.Error(err)) + } + nl, err := lp.ParseLine(line) + if err != nil { + //fmt.Printf(",") + } else { + c++ + fout.Write([]byte(nl)) + + if c == 10000 { + fmt.Printf(".") + c = 0 + i := i + 1 + if i == 10 { + fmt.Printf("\n") + i = 0 + } + } + } + + } + + for k := range converter.MissingTokensCounter { + fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k])) + } + + logger.Info("missing tokens", zap.Int("count", len(converter.MissingTokens))) + + logger.Info("finished wormhole-explorer-analytics") + +} + +func NewLineParser(converter *VaaConverter) *LineParser { + return &LineParser{ + converter: converter, + } +} + +// ParseLine takes a CSV line as input, and generates a line protocol entry as output. +// +// The format for InfluxDB line protocol is: vaa,tags fields timestamp +func (lp *LineParser) ParseLine(line []byte) (string, error) { + + // Parse the VAA and payload + tt := strings.Split(string(line), ",") + if len(tt) != 2 { + return "", fmt.Errorf("expected line to have two tokens, but has %d: %s", len(tt), line) + } + vaaBytes, err := hex.DecodeString(tt[1]) + if err != nil { + return "", fmt.Errorf("error decoding: %v", err) + } + + return lp.converter.Convert(vaaBytes) +} diff --git a/analytics/cmd/metrics/volume_mongo.go b/analytics/cmd/metrics/volume_mongo.go new file mode 100755 index 000000000..f40676dc8 --- /dev/null +++ b/analytics/cmd/metrics/volume_mongo.go @@ -0,0 +1,112 @@ +package metrics + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/wormhole-foundation/wormhole-explorer/analytics/prices" + "github.com/wormhole-foundation/wormhole-explorer/common/db" + "github.com/wormhole-foundation/wormhole-explorer/common/logger" + "github.com/wormhole-foundation/wormhole-explorer/common/repository" + "go.uber.org/zap" +) + +// read a csv file with VAAs and convert into a decoded csv file +// ready to upload to the database +func RunVaaVolumeFromMongo(mongoUri, mongoDb, outputFile, pricesFile string) { + + rootCtx := context.Background() + + // build logger + logger := logger.New("wormhole-explorer-analytics") + + logger.Info("starting wormhole-explorer-analytics ...") + + //setup DB connection + db, err := db.New(rootCtx, logger, mongoUri, mongoDb) + if err != nil { + logger.Fatal("Failed to connect MongoDB", zap.Error(err)) + } + + // create a new VAA repository + vaaRepository := repository.NewVaaRepository(db.Database, logger) + + // create missing tokens file + missingTokensFile := "missing_tokens.csv" + fmissingTokens, err := os.Create(missingTokensFile) + if err != nil { + logger.Fatal("creating missing tokens file", zap.Error(err)) + } + defer fmissingTokens.Close() + + //open output file for writing + fout, err := os.Create(outputFile) + if err != nil { + logger.Fatal("creating output file", zap.Error(err)) + } + defer fout.Close() + + // init price cache! + logger.Info("loading historical prices...") + priceCache := prices.NewCoinPricesCache(pricesFile) + priceCache.InitCache() + converter := NewVaaConverter(priceCache) + logger.Info("loaded historical prices") + + endTime := time.Now() + startTime := time.Date(2021, 1, 1, 0, 0, 0, 0, time.UTC) + + // start backfilling + page := int64(0) + c := 0 + for { + logger.Info("Processing page", zap.Int64("page", page), + zap.String("start_time", startTime.Format(time.RFC3339)), + zap.String("end_time", endTime.Format(time.RFC3339))) + + vaas, err := vaaRepository.FindPageByTimeRange(rootCtx, startTime, endTime, page, 1000, true) + if err != nil { + logger.Error("Failed to get vaas", zap.Error(err)) + break + } + + if len(vaas) == 0 { + logger.Info("Empty page", zap.Int64("page", page)) + break + } + for i, v := range vaas { + logger.Debug("Processing vaa", zap.String("id", v.ID)) + nl, err := converter.Convert(v.Vaa) + if err != nil { + //fmt.Printf(",") + } else { + c++ + fout.Write([]byte(nl)) + + if c == 10000 { + fmt.Printf(".") + c = 0 + i := i + 1 + if i == 10 { + fmt.Printf("\n") + i = 0 + } + } + } + } + page++ + } + logger.Info("Closing database connections ...") + db.Close() + + for k := range converter.MissingTokensCounter { + fmissingTokens.WriteString(fmt.Sprintf("%s,%s,%d\n", k.String(), converter.MissingTokens[k], converter.MissingTokensCounter[k])) + } + + logger.Info("missing tokens", zap.Int("count", len(converter.MissingTokens))) + + logger.Info("finished wormhole-explorer-analytics") + +} diff --git a/analytics/cmd/service/run.go b/analytics/cmd/service/run.go index a4bed4722..f1b00fd4b 100644 --- a/analytics/cmd/service/run.go +++ b/analytics/cmd/service/run.go @@ -7,7 +7,6 @@ import ( "os" "os/signal" "syscall" - "time" "github.com/aws/aws-sdk-go-v2/aws" awsconfig "github.com/aws/aws-sdk-go-v2/config" @@ -24,10 +23,10 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/analytics/queue" wormscanNotionalCache "github.com/wormhole-foundation/wormhole-explorer/common/client/cache/notional" sqs_client "github.com/wormhole-foundation/wormhole-explorer/common/client/sqs" + "github.com/wormhole-foundation/wormhole-explorer/common/db" health "github.com/wormhole-foundation/wormhole-explorer/common/health" "github.com/wormhole-foundation/wormhole-explorer/common/logger" "go.mongodb.org/mongo-driver/mongo" - "go.mongodb.org/mongo-driver/mongo/options" "go.uber.org/zap" ) @@ -58,7 +57,7 @@ func Run() { // setup DB connection logger.Info("connecting to MongoDB...") - db, err := NewDatabase(rootCtx, logger, config.MongodbURI, config.MongodbDatabase) + db, err := db.New(rootCtx, logger, config.MongodbURI, config.MongodbDatabase) if err != nil { logger.Fatal("failed to connect MongoDB", zap.Error(err)) } @@ -220,36 +219,3 @@ func newNotionalCache( return notionalCache, nil } - -// Database contains handles to MongoDB. -type Database struct { - Database *mongo.Database - client *mongo.Client -} - -// NewDatabase connects to DB and returns a client that will disconnect when the passed in context is cancelled. -func NewDatabase(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { - - cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) - if err != nil { - return nil, err - } - - return &Database{client: cli, Database: cli.Database(databaseName)}, err -} - -const databaseCloseDeadline = 30 * time.Second - -// Close attempts to gracefully Close the database connection. -func (d *Database) Close() error { - - ctx, cancelFunc := context.WithDeadline( - context.Background(), - time.Now().Add(databaseCloseDeadline), - ) - - err := d.client.Disconnect(ctx) - - cancelFunc() - return err -} diff --git a/analytics/go.mod b/analytics/go.mod index 7c79274cf..923d7f44d 100644 --- a/analytics/go.mod +++ b/analytics/go.mod @@ -20,6 +20,7 @@ require ( github.com/wormhole-foundation/wormhole/sdk v0.0.0-20230426150516-e695fad0bed8 go.mongodb.org/mongo-driver v1.11.2 go.uber.org/zap v1.24.0 + gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 ) require ( diff --git a/analytics/go.sum b/analytics/go.sum index 3274ac6e2..6af5a4758 100644 --- a/analytics/go.sum +++ b/analytics/go.sum @@ -706,7 +706,10 @@ gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLks gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22 h1:VpOs+IwYnYBaFnrNAeB8UUWtL3vEUnzSCL1nVjPhqrw= +gopkg.in/mgo.v2 v2.0.0-20190816093944-a6b53ec6cb22/go.mod h1:yeKp02qBN3iKW1OzL3MGk2IdtZzaj7SFntXj72NppTA= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= diff --git a/common/db/db.go b/common/db/db.go new file mode 100644 index 000000000..7335bb836 --- /dev/null +++ b/common/db/db.go @@ -0,0 +1,31 @@ +package db + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/zap" +) + +// Database definition. +type Database struct { + Database *mongo.Database + client *mongo.Client +} + +// New connects to DB and returns a client that will disconnect when the passed in context is cancelled. +func New(appCtx context.Context, log *zap.Logger, uri, databaseName string) (*Database, error) { + cli, err := mongo.Connect(appCtx, options.Client().ApplyURI(uri)) + if err != nil { + return nil, err + } + return &Database{client: cli, Database: cli.Database(databaseName)}, err +} + +// Close closes the database connections. +func (d *Database) Close() error { + ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(30*time.Second)) + return d.client.Disconnect(ctx) +} diff --git a/common/repository/vaa.go b/common/repository/vaa.go new file mode 100644 index 000000000..abee90bff --- /dev/null +++ b/common/repository/vaa.go @@ -0,0 +1,63 @@ +package repository + +import ( + "context" + "time" + + "go.mongodb.org/mongo-driver/bson" + "go.mongodb.org/mongo-driver/mongo" + "go.mongodb.org/mongo-driver/mongo/options" + "go.uber.org/zap" +) + +// VaaRepository is a repository for VAA. +type VaaRepository struct { + db *mongo.Database + logger *zap.Logger + vaas *mongo.Collection +} + +// VaaDoc is a document for VAA. +type VaaDoc struct { + ID string `bson:"_id" json:"id"` + Vaa []byte `bson:"vaas" json:"vaa"` +} + +// NewVaaRepository create a new Vaa repository. +func NewVaaRepository(db *mongo.Database, logger *zap.Logger) *VaaRepository { + return &VaaRepository{db: db, + logger: logger.With(zap.String("module", "VaaRepository")), + vaas: db.Collection("vaas"), + } +} + +// FindById finds VAA by id. +func (r *VaaRepository) FindById(ctx context.Context, id string) (*VaaDoc, error) { + var vaaDoc VaaDoc + err := r.vaas.FindOne(ctx, bson.M{"_id": id}).Decode(&vaaDoc) + return &vaaDoc, err +} + +// FindPageByTimeRange finds VAA by time range. +func (r *VaaRepository) FindPageByTimeRange(ctx context.Context, startTime time.Time, endTime time.Time, page, pageSize int64, sortAsc bool) ([]*VaaDoc, error) { + filter := bson.M{ + "timestamp": bson.M{ + "$gte": startTime, + "$lt": endTime, + }, + } + sort := -1 + if sortAsc { + sort = 1 + } + + skip := page * pageSize + opts := &options.FindOptions{Skip: &skip, Limit: &pageSize, Sort: bson.M{"timestamp": sort}} + cur, err := r.vaas.Find(ctx, filter, opts) + if err != nil { + return nil, err + } + var vaas []*VaaDoc + err = cur.All(ctx, &vaas) + return vaas, err +}