From 1850ea166b5682e9697b828303daa5543681b957 Mon Sep 17 00:00:00 2001 From: Bruce Riley Date: Thu, 29 Aug 2024 12:49:58 -0500 Subject: [PATCH] Node/Solana: Allow reobservation by transaction ID --- node/pkg/watchers/solana/client.go | 199 ++++++++++++++++-------- node/pkg/watchers/solana/client_test.go | 15 ++ 2 files changed, 150 insertions(+), 64 deletions(-) create mode 100644 node/pkg/watchers/solana/client_test.go diff --git a/node/pkg/watchers/solana/client.go b/node/pkg/watchers/solana/client.go index 0befb34b7c..be4ed4ff98 100644 --- a/node/pkg/watchers/solana/client.go +++ b/node/pkg/watchers/solana/client.go @@ -10,6 +10,7 @@ import ( "time" "encoding/base64" + "encoding/hex" "encoding/json" "github.com/certusone/wormhole/node/pkg/common" @@ -130,6 +131,16 @@ var ( emptyGapBytes = []byte{0, 0, 0} ) +const ( + // NOTE: We have a test to make sure these constants don't change in solana-go. + + // SolanaAccountLen is the expected length of an account identifier, which is a public key. Using the number here because that's what the admin client will populate. + SolanaAccountLen = 32 + + // SolanaSignatureLen is the expected length of a signature. As of v1.11.0, solana-go does not have a const for this. + SolanaSignatureLen = 64 +) + var ( solanaConnectionErrors = promauto.NewCounterVec( prometheus.CounterOpts{ @@ -354,6 +365,9 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { timer := time.NewTicker(time.Second * 1) defer timer.Stop() + // TODO: Get rid of this before putting this PR out for review. The Ankr endpoint doesn't have history of our test transaction. The public devnet endpoint + // knows about it but gives us rate limit errors when we poll. This hack disables polling so we can issue a reobservation request. + // TESTNET_REOBSV_HACK := false for { select { case <-ctx.Done(): @@ -371,13 +385,33 @@ func (s *SolanaWatcher) Run(ctx context.Context) error { panic("unexpected chain id") } - acc := solana.PublicKeyFromBytes(m.TxHash) - logger.Info("received observation request", zap.String("account", acc.String())) - - rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) - s.fetchMessageAccount(rCtx, logger, acc, 0, true) - cancel() + if len(m.TxHash) == solana.PublicKeyLength { // Request by account ID + acc := solana.PublicKeyFromBytes(m.TxHash) + logger.Info("received observation request with account id", zap.String("account", acc.String())) + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + s.fetchMessageAccount(rCtx, logger, acc, 0, true) + cancel() + } else if len(m.TxHash) == SolanaSignatureLen { // Request by transaction ID + signature := solana.SignatureFromBytes(m.TxHash) + logger.Info("received observation request with transaction id", zap.Stringer("signature", signature)) + maxSupportedTransactionVersion := uint64(0) + rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) + parsedTxResult, err := s.rpcClient.GetParsedTransaction(rCtx, signature, &rpc.GetParsedTransactionOpts{Commitment: s.commitment, MaxSupportedTransactionVersion: &maxSupportedTransactionVersion}) + cancel() + if err != nil { + logger.Error("failed to get parsed transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err)) + } else { + s.processParsedTransaction(ctx, logger, parsedTxResult, signature, 0, true) + } + } else { + logger.Error("ignoring an observation request of unexpected length", zap.Int("len", len(m.TxHash)), zap.String("bytes", hex.EncodeToString(m.TxHash))) + } case <-timer.C: + // if TESTNET_REOBSV_HACK { + // continue + // } else { + // TESTNET_REOBSV_HACK = true + // } // Get current slot height rCtx, cancel := context.WithTimeout(ctx, rpcTimeout) start := time.Now() @@ -569,7 +603,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot continue } - tx, err := txRpc.GetTransaction() + twm, err := txRpc.GetTransaction() if err != nil { logger.Error("failed to unmarshal transaction", zap.Uint64("slot", slot), @@ -580,7 +614,20 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot continue } - err = s.populateLookupTableAccounts(ctx, tx) + parsedTxResult, err := s.rpcClient.GetParsedTransaction(rCtx, twm.Signatures[0], &rpc.GetParsedTransactionOpts{Commitment: s.commitment, MaxSupportedTransactionVersion: &maxSupportedTransactionVersion}) + if err != nil { + logger.Error("failed to get parsed transaction", + zap.Uint64("slot", slot), + zap.Int("txNum", txNum), + zap.Int("dataLen", len(txRpc.Transaction.GetBinary())), + zap.Error(err), + ) + continue + } + tx := parsedTxResult.Transaction + + // TODO: Get rid of all this after we run in mainnet for a while and verify we don't see any errors. ///////////////////////////////////////////////////////////// + err = s.populateLookupTableAccounts(ctx, twm) if err != nil { logger.Error("failed to fetch lookup table accounts", zap.Uint64("slot", slot), @@ -590,38 +637,90 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot continue } - signature := tx.Signatures[0] - var programIndex uint16 - for n, key := range tx.Message.AccountKeys { - if key.Equals(s.contract) { - programIndex = uint16(n) + if len(tx.Message.AccountKeys) != len(twm.Message.AccountKeys) { + logger.Error("parsed tx has different number of account keys than original tx", zap.Any("origAccountKeys", twm.Message.AccountKeys), zap.Any("parsedAccountKeys", tx.Message.AccountKeys)) + continue + } else { + errorFound := false + for idx, orig := range twm.Message.AccountKeys { + if orig != tx.Message.AccountKeys[idx].PublicKey { + logger.Error("account keys differ", zap.Int("idx", idx), zap.Any("origAccountKeys", twm.Message.AccountKeys), zap.Any("parsedAccountKeys", tx.Message.AccountKeys)) + errorFound = true + break + } + } + if errorFound { + continue } } - if programIndex == 0 { - continue + // TODO: End of stuff to get rid of. ///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////// + + s.processParsedTransaction(ctx, logger, parsedTxResult, tx.Signatures[0], slot, false) + } + + if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("skipped or unavailable block retrieved on retry attempt", + zap.Uint("empty_retry", emptyRetry), + zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) + } + + return true +} + +func (s *SolanaWatcher) processParsedTransaction(ctx context.Context, logger *zap.Logger, parsedTxResult *rpc.GetParsedTransactionResult, signature solana.Signature, slot uint64, isReobservation bool) { + foundContract := false + for _, key := range parsedTxResult.Transaction.Message.AccountKeys { + if key.PublicKey.Equals(s.contract) { + foundContract = true } + } + if !foundContract { + return + } + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("found Wormhole transaction", + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) + } - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("found Wormhole transaction", + // Find top-level instructions. + for i, inst := range parsedTxResult.Transaction.Message.Instructions { + found, err := s.processInstruction(ctx, logger, slot, inst, signature, i, isReobservation) + if err != nil { + logger.Error("malformed Wormhole instruction", + zap.Error(err), + zap.Int("idx", i), zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) + zap.String("commitment", string(s.commitment)), + zap.Binary("data", inst.Data)) + } else if found { + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("found a top-level Wormhole instruction", + zap.Int("idx", i), + zap.Stringer("signature", signature), + zap.Uint64("slot", slot), + zap.String("commitment", string(s.commitment))) + } } + } - // Find top-level instructions - for i, inst := range tx.Message.Instructions { - found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, false) + // Find inner instructions. + for _, inner := range parsedTxResult.Meta.InnerInstructions { + for i, inst := range inner.Instructions { + found, err := s.processInstruction(ctx, logger, slot, inst, signature, i, isReobservation) if err != nil { logger.Error("malformed Wormhole instruction", zap.Error(err), zap.Int("idx", i), zap.Stringer("signature", signature), zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment)), - zap.Binary("data", inst.Data)) + zap.String("commitment", string(s.commitment))) } else if found { if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("found a top-level Wormhole instruction", + logger.Debug("found an inner Wormhole instruction", zap.Int("idx", i), zap.Stringer("signature", signature), zap.Uint64("slot", slot), @@ -629,42 +728,11 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot } } } - - for _, inner := range txRpc.Meta.InnerInstructions { - for i, inst := range inner.Instructions { - found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, false) - if err != nil { - logger.Error("malformed Wormhole instruction", - zap.Error(err), - zap.Int("idx", i), - zap.Stringer("signature", signature), - zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) - } else if found { - if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("found an inner Wormhole instruction", - zap.Int("idx", i), - zap.Stringer("signature", signature), - zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) - } - } - } - } - } - - if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("skipped or unavailable block retrieved on retry attempt", - zap.Uint("empty_retry", emptyRetry), - zap.Uint64("slot", slot), - zap.String("commitment", string(s.commitment))) } - - return true } -func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) { - if inst.ProgramIDIndex != programIndex { +func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst *rpc.ParsedInstruction, signature solana.Signature, idx int, isReobservation bool) (bool, error) { + if inst.ProgramId != s.contract { return false, nil } @@ -697,20 +765,20 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg return false, fmt.Errorf("failed to determine commitment: %w", err) } - if level != s.commitment { + if level != s.commitment && !isReobservation { return true, nil } // The second account in a well-formed Wormhole instruction is the VAA program account. - acc := tx.Message.AccountKeys[inst.Accounts[1]] + accPublicKey := inst.Accounts[1] if logger.Level().Enabled(zapcore.DebugLevel) { - logger.Debug("fetching VAA account", zap.Stringer("acc", acc), + logger.Debug("fetching VAA account", zap.Stringer("acc", accPublicKey), zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx)) } common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error { - s.retryFetchMessageAccount(ctx, logger, acc, slot, 0, isReobservation) + s.retryFetchMessageAccount(ctx, logger, accPublicKey, slot, 0, isReobservation) return nil }) @@ -883,9 +951,11 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a if isReobservation && s.commitment == rpc.CommitmentFinalized { // There is only a single reobservation request channel for each chain, which is assigned to the finalized watcher. // If someone requests reobservation of a confirmed message, we should allow the observation to go through. - logger.Info("allowing reobservation although the commitment level does not match the watcher", - zap.Stringer("account", acc), zap.String("message commitment", string(commitment)), zap.String("watcher commitment", string(s.commitment)), - ) + if logger.Level().Enabled(zapcore.DebugLevel) { + logger.Debug("allowing reobservation although the commitment level does not match the watcher", + zap.Stringer("account", acc), zap.String("message commitment", string(commitment)), zap.String("watcher commitment", string(s.commitment)), + ) + } } else { if logger.Level().Enabled(zapcore.DebugLevel) { logger.Debug("skipping message which does not match the watcher commitment", zap.Stringer("account", acc), zap.String("message commitment", string(commitment)), zap.String("watcher commitment", string(s.commitment))) @@ -937,6 +1007,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a if logger.Level().Enabled(s.msgObservedLogLevel) { logger.Log(s.msgObservedLogLevel, "message observed", + zap.Stringer("txHash", txHash), zap.Stringer("account", acc), zap.Time("timestamp", observation.Timestamp), zap.Uint32("nonce", observation.Nonce), diff --git a/node/pkg/watchers/solana/client_test.go b/node/pkg/watchers/solana/client_test.go new file mode 100644 index 0000000000..15d41cae05 --- /dev/null +++ b/node/pkg/watchers/solana/client_test.go @@ -0,0 +1,15 @@ +package solana + +import ( + "testing" + + "github.com/gagliardetto/solana-go" + + "github.com/stretchr/testify/assert" +) + +func TestVerifyConstants(t *testing.T) { + // If either of these ever change, message publication and reobservation may break. + assert.Equal(t, SolanaAccountLen, solana.PublicKeyLength) + assert.Equal(t, SolanaSignatureLen, len(solana.Signature{})) +}