diff --git a/deploy/analytics/env/production-mainnet.env b/deploy/analytics/env/production-mainnet.env index fa806f833..63a10aba5 100644 --- a/deploy/analytics/env/production-mainnet.env +++ b/deploy/analytics/env/production-mainnet.env @@ -20,4 +20,4 @@ AWS_IAM_ROLE= CACHE_CHANNEL=WORMSCAN:NOTIONAL VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan VAA_PAYLOAD_PARSER_TIMEOUT=10 -DB_LAYER=mongo +DB_LAYER=dual diff --git a/deploy/analytics/env/production-testnet.env b/deploy/analytics/env/production-testnet.env index 9f2f4344b..a07aa5026 100644 --- a/deploy/analytics/env/production-testnet.env +++ b/deploy/analytics/env/production-testnet.env @@ -20,4 +20,4 @@ AWS_IAM_ROLE= CACHE_CHANNEL=WORMSCAN:NOTIONAL VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan-testnet VAA_PAYLOAD_PARSER_TIMEOUT=10 -DB_LAYER=mongo +DB_LAYER=dual diff --git a/deploy/analytics/env/staging-testnet.env b/deploy/analytics/env/staging-testnet.env index 2927c49e2..420b3d195 100644 --- a/deploy/analytics/env/staging-testnet.env +++ b/deploy/analytics/env/staging-testnet.env @@ -20,4 +20,4 @@ AWS_IAM_ROLE= CACHE_CHANNEL=WORMSCAN:NOTIONAL VAA_PAYLOAD_PARSER_URL=http://wormscan-vaa-payload-parser.wormscan-testnet VAA_PAYLOAD_PARSER_TIMEOUT=10 -DB_LAYER=mongo \ No newline at end of file +DB_LAYER=dual diff --git a/deploy/parser/env/production-mainnet.env b/deploy/parser/env/production-mainnet.env index c1cd6d26d..d9157dd27 100644 --- a/deploy/parser/env/production-mainnet.env +++ b/deploy/parser/env/production-mainnet.env @@ -21,4 +21,4 @@ PPROF_ENABLED=false AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true -DB_LAYER=mongo +DB_LAYER=dual diff --git a/deploy/parser/env/production-testnet.env b/deploy/parser/env/production-testnet.env index 1c67c8722..7017d7b3a 100644 --- a/deploy/parser/env/production-testnet.env +++ b/deploy/parser/env/production-testnet.env @@ -21,4 +21,4 @@ PPROF_ENABLED=false AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true -DB_LAYER=mongo +DB_LAYER=dual diff --git a/deploy/parser/env/staging-testnet.env b/deploy/parser/env/staging-testnet.env index 06503a0aa..2ead0aef9 100644 --- a/deploy/parser/env/staging-testnet.env +++ b/deploy/parser/env/staging-testnet.env @@ -21,4 +21,4 @@ PPROF_ENABLED=false AWS_IAM_ROLE= ALERT_ENABLED=false METRICS_ENABLED=true -DB_LAYER=mongo +DB_LAYER=dual diff --git a/deploy/tx-tracker/env/production-mainnet.env b/deploy/tx-tracker/env/production-mainnet.env index 46fa91b0f..e84c8cd6d 100644 --- a/deploy/tx-tracker/env/production-mainnet.env +++ b/deploy/tx-tracker/env/production-mainnet.env @@ -18,7 +18,7 @@ P2P_NETWORK=mainnet AWS_IAM_ROLE= METRICS_ENABLED=true NOTIONAL_CACHE_CHANNEL=WORMSCAN:NOTIONAL -DB_LAYER=mongo +DB_LAYER=dual ACALA_BASE_URL=https://eth-rpc-acala.aca-api.network ACALA_REQUESTS_PER_MINUTE=12 diff --git a/deploy/tx-tracker/env/production-testnet.env b/deploy/tx-tracker/env/production-testnet.env index 35055be47..de499c6a7 100644 --- a/deploy/tx-tracker/env/production-testnet.env +++ b/deploy/tx-tracker/env/production-testnet.env @@ -18,7 +18,7 @@ P2P_NETWORK=testnet AWS_IAM_ROLE= METRICS_ENABLED=true NOTIONAL_CACHE_CHANNEL=WORMSCAN:NOTIONAL -DB_LAYER=mongo +DB_LAYER=dual ACALA_BASE_URL=https://acala-dev.aca-dev.network/eth/http ACALA_REQUESTS_PER_MINUTE=12 diff --git a/deploy/tx-tracker/env/staging-testnet.env b/deploy/tx-tracker/env/staging-testnet.env index c7ba396d1..bd88e877c 100644 --- a/deploy/tx-tracker/env/staging-testnet.env +++ b/deploy/tx-tracker/env/staging-testnet.env @@ -18,7 +18,7 @@ P2P_NETWORK=testnet AWS_IAM_ROLE= METRICS_ENABLED=true NOTIONAL_CACHE_CHANNEL=WORMSCAN:NOTIONAL -DB_LAYER=mongo +DB_LAYER=dual ACALA_BASE_URL=https://acala-dev.aca-dev.network/eth/http ACALA_REQUESTS_PER_MINUTE=12 diff --git a/fly/migration/migration.sql b/fly/migration/migration.sql index 6eb40f2f1..45b212cce 100644 --- a/fly/migration/migration.sql +++ b/fly/migration/migration.sql @@ -85,15 +85,17 @@ CREATE INDEX "wh_operation_transactions_timestamp_idx" -- create table wormholescan.wh_operation_transactions_processed CREATE TABLE wormholescan.wh_operation_transactions_processed ( - "id" varchar not null, "message_id" varchar not null, + "tx_hash" varchar not null, + "attestation_vaas_id" varchar not null, + "type" varchar not null, "processed" bool not null, "created_at" timestamptz not null, "updated_at" timestamptz not null, - PRIMARY KEY (id) + PRIMARY KEY ("message_id", "tx_hash") ); -CREATE INDEX "wh_operation_transactions_processed_message_id_idx" - ON wormholescan.wh_operation_transactions_processed ("message_id"); +CREATE INDEX "wh_operation_transactions_processed_attestation_vaas_id_idx" + ON wormholescan.wh_operation_transactions_processed ("attestation_vaas_id"); -- create table wormholescan.wh_governor_status CREATE TABLE wormholescan.wh_governor_status ( @@ -222,7 +224,7 @@ CREATE TABLE wormholescan.wh_operation_prices ( "coingecko_id" varchar not null, "symbol" varchar not null, "token_usd_price" decimal(20,8) not null, - "total_token" decimal(20,8) not null, + "total_token" decimal(30,8) not null, "total_usd" decimal(20,8) not null, "timestamp" timestamptz not null, "created_at" timestamptz not null, diff --git a/tx-tracker/builder/storage.go b/tx-tracker/builder/storage.go index 332e93791..b9adcae8d 100644 --- a/tx-tracker/builder/storage.go +++ b/tx-tracker/builder/storage.go @@ -53,7 +53,7 @@ func NewStorageLayer(ctx context.Context, metrics metrics.Metrics, } vaaRepository := vaa.NewVaaRepositoryPostreSQL(postgresDb, logger) storageLayer.postgresDB = postgresDb - storageLayer.repository = consumer.NewPostgreSQLRepository(postgresDb, vaaRepository, metrics) + storageLayer.repository = consumer.NewPostgreSQLRepository(postgresDb, vaaRepository, metrics, logger) storageLayer.vaaRepository = vaaRepository case config.DbLayerDual: mongoDb, err = dbutil.Connect(ctx, logger, params.MongodbUri, params.MongodbDatabase, false) @@ -69,7 +69,7 @@ func NewStorageLayer(ctx context.Context, metrics metrics.Metrics, } storageLayer.postgresDB = postgresDb postgresVaaRepository := vaa.NewVaaRepositoryPostreSQL(postgresDb, logger) - postgresRepository := consumer.NewPostgreSQLRepository(postgresDb, postgresVaaRepository, metrics) + postgresRepository := consumer.NewPostgreSQLRepository(postgresDb, postgresVaaRepository, metrics, logger) // create dual vaa repository storageLayer.vaaRepository = vaa.NewDualVaaRepository(mongoVaaRepository, postgresVaaRepository) // create dual repository diff --git a/tx-tracker/consumer/dual_repository.go b/tx-tracker/consumer/dual_repository.go index 01824f608..11593f6a2 100644 --- a/tx-tracker/consumer/dual_repository.go +++ b/tx-tracker/consumer/dual_repository.go @@ -17,15 +17,15 @@ func NewDualRepository(mongoRepository *MongoRepository, } } -func (r *DualRepository) AlreadyProcessed(ctx context.Context, vaaId string, digest string) (bool, error) { - processed, err := r.mongoRepository.AlreadyProcessed(ctx, vaaId, digest) +func (r *DualRepository) AlreadyProcessed(ctx context.Context, vaaId string, txHash string) (bool, error) { + processed, err := r.mongoRepository.AlreadyProcessed(ctx, vaaId, txHash) if err != nil { return false, err } if !processed { return false, nil } - processed, err = r.postgresRepository.AlreadyProcessed(ctx, vaaId, digest) + processed, err = r.postgresRepository.AlreadyProcessed(ctx, vaaId, txHash) if err != nil { return false, err } diff --git a/tx-tracker/consumer/mongo_repository.go b/tx-tracker/consumer/mongo_repository.go index 9767d32f5..ac3cd8db9 100644 --- a/tx-tracker/consumer/mongo_repository.go +++ b/tx-tracker/consumer/mongo_repository.go @@ -150,7 +150,7 @@ func (r *MongoRepository) UpsertOriginTx(ctx context.Context, originTx, _ *Upser } // AlreadyProcessed returns true if the given VAA ID has already been processed. -func (r *MongoRepository) AlreadyProcessed(ctx context.Context, vaaId string, digest string) (bool, error) { +func (r *MongoRepository) AlreadyProcessed(ctx context.Context, vaaId string, _ string) (bool, error) { result := r. globalTransactions. FindOne(ctx, bson.D{ diff --git a/tx-tracker/consumer/postresql_repository.go b/tx-tracker/consumer/postresql_repository.go index 95875e983..218baa478 100644 --- a/tx-tracker/consumer/postresql_repository.go +++ b/tx-tracker/consumer/postresql_repository.go @@ -11,20 +11,23 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics" "github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/repository/vaa" sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" ) type PostgreSQLRepository struct { dbClient *db.DB vaaRepository *vaa.RepositoryPostreSQL metrics metrics.Metrics + logger *zap.Logger } func NewPostgreSQLRepository(postreSQLClient *db.DB, vaaRepository *vaa.RepositoryPostreSQL, - metrics metrics.Metrics) *PostgreSQLRepository { + metrics metrics.Metrics, logger *zap.Logger) *PostgreSQLRepository { return &PostgreSQLRepository{ metrics: metrics, dbClient: postreSQLClient, vaaRepository: vaaRepository, + logger: logger, } } @@ -115,7 +118,12 @@ func (p *PostgreSQLRepository) upsertOriginTx(ctx context.Context, params *Upser return err } - return p.registerProcessedVaa(ctx, params.Id, params.VaaId) + if nativeTxHash == nil { + p.logger.Warn("nativeTxHash is nil", zap.String("id", params.Id), zap.String("vaaId", params.VaaId)) + return nil + } + + return p.registerProcessedVaa(ctx, params.Id, params.VaaId, *nativeTxHash, "source-tx") } func (p *PostgreSQLRepository) UpsertTargetTx(ctx context.Context, params *TargetTxUpdate) error { @@ -201,20 +209,20 @@ func (p *PostgreSQLRepository) GetTxStatus(ctx context.Context, targetTxUpdate * return status, err } -func (p *PostgreSQLRepository) AlreadyProcessed(ctx context.Context, vaaId string, digest string) (bool, error) { +func (p *PostgreSQLRepository) AlreadyProcessed(ctx context.Context, vaaId string, txhash string) (bool, error) { var count int - err := p.dbClient.SelectOne(ctx, &count, `SELECT COUNT(*) FROM wormholescan.wh_operation_transactions_processed WHERE id = $1`, digest) + err := p.dbClient.SelectOne(ctx, &count, `SELECT COUNT(*) FROM wormholescan.wh_operation_transactions_processed WHERE message_id = $1 and tx_hash = $2`, vaaId, txhash) return count > 0, err } -func (p *PostgreSQLRepository) registerProcessedVaa(ctx context.Context, vaaDigest, vaaId string) error { +func (p *PostgreSQLRepository) registerProcessedVaa(ctx context.Context, vaaDigest, vaaId, txHash, txType string) error { now := time.Now() _, err := p.dbClient.Exec(ctx, - `INSERT INTO wormholescan.wh_operation_transactions_processed (id,message_id,processed,created_at,updated_at) - VALUES ($1,$2,true,$3,$4) - ON CONFLICT (id) DO UPDATE + `INSERT INTO wormholescan.wh_operation_transactions_processed (message_id,tx_hash,attestation_vaas_id,"type", processed,created_at,updated_at) + VALUES ($1,$2,$3,$4,true,$5,$6) + ON CONFLICT (message_id, tx_hash) DO UPDATE SET updated_at = EXCLUDED.updated_at`, - vaaDigest, vaaId, now, now) + vaaId, txHash, vaaDigest, txType, now, now) return err } diff --git a/tx-tracker/consumer/source_processor.go b/tx-tracker/consumer/source_processor.go index 843add43f..487bd5ba8 100644 --- a/tx-tracker/consumer/source_processor.go +++ b/tx-tracker/consumer/source_processor.go @@ -65,7 +65,7 @@ func ProcessSourceTx( // In those cases, when we fetch the message for the second time, // we don't want to hit the RPC nodes again for performance reasons. - processed, err := repository.AlreadyProcessed(ctx, params.VaaId, params.ID) + processed, err := repository.AlreadyProcessed(ctx, params.VaaId, params.TxHash) if err != nil { return nil, err }