Skip to content

Commit

Permalink
Change wh_operation_transactions_processed table definition
Browse files Browse the repository at this point in the history
Co-authored-by: walker-16 <agpazos85@gmail.com>
  • Loading branch information
ftocal and walker-16 committed Sep 23, 2024
1 parent f41467a commit ef7f22e
Show file tree
Hide file tree
Showing 6 changed files with 29 additions and 19 deletions.
10 changes: 6 additions & 4 deletions fly/migration/migration.sql
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,17 @@ CREATE INDEX "wh_operation_transactions_timestamp_idx"

-- create table wormholescan.wh_operation_transactions_processed
CREATE TABLE wormholescan.wh_operation_transactions_processed (
"id" varchar not null,
"message_id" varchar not null,
"tx_hash" varchar not null,
"attestation_vaas_id" varchar not null,
"type" varchar not null,
"processed" bool not null,
"created_at" timestamptz not null,
"updated_at" timestamptz not null,
PRIMARY KEY (id)
PRIMARY KEY ("message_id", "tx_hash")
);
CREATE INDEX "wh_operation_transactions_processed_message_id_idx"
ON wormholescan.wh_operation_transactions_processed ("message_id");
CREATE INDEX "wh_operation_transactions_processed_attestation_vaas_id_idx"
ON wormholescan.wh_operation_transactions_processed ("attestation_vaas_id");

-- create table wormholescan.wh_governor_status
CREATE TABLE wormholescan.wh_governor_status (
Expand Down
4 changes: 2 additions & 2 deletions tx-tracker/builder/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func NewStorageLayer(ctx context.Context, metrics metrics.Metrics,
}
vaaRepository := vaa.NewVaaRepositoryPostreSQL(postgresDb, logger)
storageLayer.postgresDB = postgresDb
storageLayer.repository = consumer.NewPostgreSQLRepository(postgresDb, vaaRepository, metrics)
storageLayer.repository = consumer.NewPostgreSQLRepository(postgresDb, vaaRepository, metrics, logger)
storageLayer.vaaRepository = vaaRepository
case config.DbLayerDual:
mongoDb, err = dbutil.Connect(ctx, logger, params.MongodbUri, params.MongodbDatabase, false)
Expand All @@ -69,7 +69,7 @@ func NewStorageLayer(ctx context.Context, metrics metrics.Metrics,
}
storageLayer.postgresDB = postgresDb
postgresVaaRepository := vaa.NewVaaRepositoryPostreSQL(postgresDb, logger)
postgresRepository := consumer.NewPostgreSQLRepository(postgresDb, postgresVaaRepository, metrics)
postgresRepository := consumer.NewPostgreSQLRepository(postgresDb, postgresVaaRepository, metrics, logger)
// create dual vaa repository
storageLayer.vaaRepository = vaa.NewDualVaaRepository(mongoVaaRepository, postgresVaaRepository)
// create dual repository
Expand Down
6 changes: 3 additions & 3 deletions tx-tracker/consumer/dual_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,15 @@ func NewDualRepository(mongoRepository *MongoRepository,
}
}

func (r *DualRepository) AlreadyProcessed(ctx context.Context, vaaId string, digest string) (bool, error) {
processed, err := r.mongoRepository.AlreadyProcessed(ctx, vaaId, digest)
func (r *DualRepository) AlreadyProcessed(ctx context.Context, vaaId string, txHash string) (bool, error) {
processed, err := r.mongoRepository.AlreadyProcessed(ctx, vaaId, txHash)
if err != nil {
return false, err
}
if !processed {
return false, nil
}
processed, err = r.postgresRepository.AlreadyProcessed(ctx, vaaId, digest)
processed, err = r.postgresRepository.AlreadyProcessed(ctx, vaaId, txHash)
if err != nil {
return false, err
}
Expand Down
2 changes: 1 addition & 1 deletion tx-tracker/consumer/mongo_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ func (r *MongoRepository) UpsertOriginTx(ctx context.Context, originTx, _ *Upser
}

// AlreadyProcessed returns true if the given VAA ID has already been processed.
func (r *MongoRepository) AlreadyProcessed(ctx context.Context, vaaId string, digest string) (bool, error) {
func (r *MongoRepository) AlreadyProcessed(ctx context.Context, vaaId string, _ string) (bool, error) {
result := r.
globalTransactions.
FindOne(ctx, bson.D{
Expand Down
24 changes: 16 additions & 8 deletions tx-tracker/consumer/postresql_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,20 +11,23 @@ import (
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/metrics"
"github.com/wormhole-foundation/wormhole-explorer/txtracker/internal/repository/vaa"
sdk "github.com/wormhole-foundation/wormhole/sdk/vaa"
"go.uber.org/zap"
)

type PostgreSQLRepository struct {
dbClient *db.DB
vaaRepository *vaa.RepositoryPostreSQL
metrics metrics.Metrics
logger *zap.Logger
}

func NewPostgreSQLRepository(postreSQLClient *db.DB, vaaRepository *vaa.RepositoryPostreSQL,
metrics metrics.Metrics) *PostgreSQLRepository {
metrics metrics.Metrics, logger *zap.Logger) *PostgreSQLRepository {
return &PostgreSQLRepository{
metrics: metrics,
dbClient: postreSQLClient,
vaaRepository: vaaRepository,
logger: logger,
}
}

Expand Down Expand Up @@ -115,7 +118,12 @@ func (p *PostgreSQLRepository) upsertOriginTx(ctx context.Context, params *Upser
return err
}

return p.registerProcessedVaa(ctx, params.Id, params.VaaId)
if nativeTxHash == nil {
p.logger.Warn("nativeTxHash is nil", zap.String("id", params.Id), zap.String("vaaId", params.VaaId))
return nil
}

return p.registerProcessedVaa(ctx, params.Id, params.VaaId, *nativeTxHash, "source-tx")
}

func (p *PostgreSQLRepository) UpsertTargetTx(ctx context.Context, params *TargetTxUpdate) error {
Expand Down Expand Up @@ -201,20 +209,20 @@ func (p *PostgreSQLRepository) GetTxStatus(ctx context.Context, targetTxUpdate *
return status, err
}

func (p *PostgreSQLRepository) AlreadyProcessed(ctx context.Context, vaaId string, digest string) (bool, error) {
func (p *PostgreSQLRepository) AlreadyProcessed(ctx context.Context, vaaId string, txhash string) (bool, error) {
var count int
err := p.dbClient.SelectOne(ctx, &count, `SELECT COUNT(*) FROM wormholescan.wh_operation_transactions_processed WHERE id = $1`, digest)
err := p.dbClient.SelectOne(ctx, &count, `SELECT COUNT(*) FROM wormholescan.wh_operation_transactions_processed WHERE message_id = $1 and tx_hash`, vaaId, txhash)
return count > 0, err
}

func (p *PostgreSQLRepository) registerProcessedVaa(ctx context.Context, vaaDigest, vaaId string) error {
func (p *PostgreSQLRepository) registerProcessedVaa(ctx context.Context, vaaDigest, vaaId, txHash, txType string) error {
now := time.Now()
_, err := p.dbClient.Exec(ctx,
`INSERT INTO wormholescan.wh_operation_transactions_processed (id,message_id,processed,created_at,updated_at)
VALUES ($1,$2,true,$3,$4)
`INSERT INTO wormholescan.wh_operation_transactions_processed (message_id,tx_hash,attestation_vaas_id,"type", processed,created_at,updated_at)
VALUES ($1,$2,$3,$4,true,$5,$6)
ON CONFLICT (id) DO UPDATE
SET updated_at = EXCLUDED.updated_at`,
vaaDigest, vaaId, now, now)
vaaId, txHash, vaaDigest, txType, now, now)
return err
}

Expand Down
2 changes: 1 addition & 1 deletion tx-tracker/consumer/source_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ func ProcessSourceTx(
// In those cases, when we fetch the message for the second time,
// we don't want to hit the RPC nodes again for performance reasons.

processed, err := repository.AlreadyProcessed(ctx, params.VaaId, params.ID)
processed, err := repository.AlreadyProcessed(ctx, params.VaaId, params.TxHash)
if err != nil {
return nil, err
}
Expand Down

0 comments on commit ef7f22e

Please sign in to comment.