From 0004de85ab1f6ef3311523b3feda63687c552c8c Mon Sep 17 00:00:00 2001 From: Agustin Godnic Date: Thu, 6 Jul 2023 12:49:31 -0300 Subject: [PATCH 1/3] Add context information to error message --- tx-tracker/queue/vaa_sqs.go | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/tx-tracker/queue/vaa_sqs.go b/tx-tracker/queue/vaa_sqs.go index adff7ec51..d0832fa51 100644 --- a/tx-tracker/queue/vaa_sqs.go +++ b/tx-tracker/queue/vaa_sqs.go @@ -112,7 +112,13 @@ func (m *sqsConsumerMessage) Data() *VaaEvent { func (m *sqsConsumerMessage) Done() { if err := m.consumer.DeleteMessage(m.ctx, m.id); err != nil { - m.logger.Error("Error deleting message from SQS", zap.Error(err)) + m.logger.Error("Error deleting message from SQS", + zap.String("vaaId", m.data.ID), + zap.Bool("isExpired", m.IsExpired()), + zap.Time("expiredAt", m.expiredAt), + zap.Time("now", time.Now()), + zap.Error(err), + ) } m.wg.Done() } From 2124fb495b4944c0cdbd17506d458d0f0297e1eb Mon Sep 17 00:00:00 2001 From: Agustin Godnic Date: Thu, 6 Jul 2023 12:56:11 -0300 Subject: [PATCH 2/3] Increase visibility timeout --- tx-tracker/cmd/service/main.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tx-tracker/cmd/service/main.go b/tx-tracker/cmd/service/main.go index 46e83be79..e2a1e4e5c 100644 --- a/tx-tracker/cmd/service/main.go +++ b/tx-tracker/cmd/service/main.go @@ -121,7 +121,7 @@ func newSqsConsumer(ctx context.Context, cfg *config.ServiceSettings) (*sqs.Cons // of traffic (e.g.: dozens of VAAs being emitted in the same minute), and // also when a we have to retry fetching transaction metadata many times // (due to finality delay, out-of-sync nodes, etc). - sqs.WithVisibilityTimeout(15*60), + sqs.WithVisibilityTimeout(20*60), ) return consumer, err } From 38a2cd98ce27b264763a55ee31bcc433865c04b3 Mon Sep 17 00:00:00 2001 From: Agustin Godnic Date: Thu, 6 Jul 2023 13:04:10 -0300 Subject: [PATCH 3/3] Process PythNet messages in the worker pool --- tx-tracker/consumer/consumer.go | 18 +----------------- tx-tracker/consumer/processor.go | 1 - tx-tracker/consumer/repository.go | 1 - tx-tracker/consumer/workerpool.go | 25 ++++++++++++++++++++++--- tx-tracker/queue/vaa_sqs.go | 1 - 5 files changed, 23 insertions(+), 23 deletions(-) diff --git a/tx-tracker/consumer/consumer.go b/tx-tracker/consumer/consumer.go index 6c894f3ff..30815b66b 100644 --- a/tx-tracker/consumer/consumer.go +++ b/tx-tracker/consumer/consumer.go @@ -6,7 +6,6 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" - sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -57,28 +56,13 @@ func (c *Consumer) producerLoop(ctx context.Context) { for msg := range ch { - event := msg.Data() - - // Check if message is expired. - if msg.IsExpired() { - c.logger.Warn("Message with VAA expired", zap.String("id", event.ID)) - msg.Failed() - continue - } - - // Do not process messages from PythNet - if event.ChainID == sdk.ChainIDPythNet { - msg.Done() - continue - } - // Send the VAA to the worker pool. // // The worker pool is responsible for calling `msg.Done()` err := c.workerPool.Push(ctx, msg) if err != nil { c.logger.Warn("failed to push message into worker pool", - zap.String("vaaId", event.ID), + zap.String("vaaId", msg.Data().ID), zap.Error(err), ) msg.Failed() diff --git a/tx-tracker/consumer/processor.go b/tx-tracker/consumer/processor.go index 06ee27458..133371076 100644 --- a/tx-tracker/consumer/processor.go +++ b/tx-tracker/consumer/processor.go @@ -80,7 +80,6 @@ func ProcessSourceTx( p := UpsertDocumentParams{ VaaId: params.VaaId, ChainId: params.ChainId, - TxHash: params.TxHash, TxDetail: txDetail, TxStatus: txStatus, } diff --git a/tx-tracker/consumer/repository.go b/tx-tracker/consumer/repository.go index 909bd353c..388beb90a 100644 --- a/tx-tracker/consumer/repository.go +++ b/tx-tracker/consumer/repository.go @@ -40,7 +40,6 @@ func NewRepository(logger *zap.Logger, db *mongo.Database) *Repository { type UpsertDocumentParams struct { VaaId string ChainId sdk.ChainID - TxHash string TxDetail *chains.TxDetail TxStatus domain.SourceTxStatus } diff --git a/tx-tracker/consumer/workerpool.go b/tx-tracker/consumer/workerpool.go index 5487a354d..6e09e695e 100644 --- a/tx-tracker/consumer/workerpool.go +++ b/tx-tracker/consumer/workerpool.go @@ -8,6 +8,7 @@ import ( "github.com/wormhole-foundation/wormhole-explorer/txtracker/chains" "github.com/wormhole-foundation/wormhole-explorer/txtracker/config" "github.com/wormhole-foundation/wormhole-explorer/txtracker/queue" + sdk "github.com/wormhole-foundation/wormhole/sdk/vaa" "go.uber.org/zap" ) @@ -101,6 +102,23 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { event := msg.Data() + // Check if the message is expired + if msg.IsExpired() { + w.logger.Warn("Message with VAA expired", + zap.String("vaaId", event.ID), + zap.Bool("isExpired", msg.IsExpired()), + ) + msg.Failed() + return + } + + // Do not process messages from PythNet + if event.ChainID == sdk.ChainIDPythNet { + msg.Done() + return + } + + // Process the VAA p := ProcessSourceTxParams{ VaaId: event.ID, ChainId: event.ChainID, @@ -110,17 +128,18 @@ func (w *WorkerPool) process(msg queue.ConsumerMessage) { } err := ProcessSourceTx(w.ctx, w.logger, w.rpcProviderSettings, w.repository, &p) + // Log a message informing the processing status if err == chains.ErrChainNotSupported { - w.logger.Debug("Skipping VAA - chain not supported", + w.logger.Info("Skipping VAA - chain not supported", zap.String("vaaId", event.ID), ) } else if err != nil { - w.logger.Error("Failed to upsert source transaction details", + w.logger.Error("Failed to process originTx", zap.String("vaaId", event.ID), zap.Error(err), ) } else { - w.logger.Info("Updated source transaction details in the database", + w.logger.Info("Updated originTx in the database", zap.String("id", event.ID), ) } diff --git a/tx-tracker/queue/vaa_sqs.go b/tx-tracker/queue/vaa_sqs.go index d0832fa51..58dfa015c 100644 --- a/tx-tracker/queue/vaa_sqs.go +++ b/tx-tracker/queue/vaa_sqs.go @@ -116,7 +116,6 @@ func (m *sqsConsumerMessage) Done() { zap.String("vaaId", m.data.ID), zap.Bool("isExpired", m.IsExpired()), zap.Time("expiredAt", m.expiredAt), - zap.Time("now", time.Now()), zap.Error(err), ) }