Skip to content

Commit

Permalink
Node/Solana: Allow reobservation by transaction ID
Browse files Browse the repository at this point in the history
  • Loading branch information
bruce-riley committed Aug 29, 2024
1 parent 541092c commit 1850ea1
Show file tree
Hide file tree
Showing 2 changed files with 150 additions and 64 deletions.
199 changes: 135 additions & 64 deletions node/pkg/watchers/solana/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"time"

"encoding/base64"
"encoding/hex"
"encoding/json"

"github.com/certusone/wormhole/node/pkg/common"
Expand Down Expand Up @@ -130,6 +131,16 @@ var (
emptyGapBytes = []byte{0, 0, 0}
)

const (
// NOTE: We have a test to make sure these constants don't change in solana-go.

// SolanaAccountLen is the expected length of an account identifier, which is a public key. Using the number here because that's what the admin client will populate.
SolanaAccountLen = 32

// SolanaSignatureLen is the expected length of a signature. As of v1.11.0, solana-go does not have a const for this.
SolanaSignatureLen = 64
)

var (
solanaConnectionErrors = promauto.NewCounterVec(
prometheus.CounterOpts{
Expand Down Expand Up @@ -354,6 +365,9 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
timer := time.NewTicker(time.Second * 1)
defer timer.Stop()

// TODO: Get rid of this before putting this PR out for review. The Ankr endpoint doesn't have history of our test transaction. The public devnet endpoint
// knows about it but gives us rate limit errors when we poll. This hack disables polling so we can issue a reobservation request.
// TESTNET_REOBSV_HACK := false
for {
select {
case <-ctx.Done():
Expand All @@ -371,13 +385,33 @@ func (s *SolanaWatcher) Run(ctx context.Context) error {
panic("unexpected chain id")
}

acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request", zap.String("account", acc.String()))

rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0, true)
cancel()
if len(m.TxHash) == solana.PublicKeyLength { // Request by account ID
acc := solana.PublicKeyFromBytes(m.TxHash)
logger.Info("received observation request with account id", zap.String("account", acc.String()))
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
s.fetchMessageAccount(rCtx, logger, acc, 0, true)
cancel()
} else if len(m.TxHash) == SolanaSignatureLen { // Request by transaction ID
signature := solana.SignatureFromBytes(m.TxHash)
logger.Info("received observation request with transaction id", zap.Stringer("signature", signature))
maxSupportedTransactionVersion := uint64(0)
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
parsedTxResult, err := s.rpcClient.GetParsedTransaction(rCtx, signature, &rpc.GetParsedTransactionOpts{Commitment: s.commitment, MaxSupportedTransactionVersion: &maxSupportedTransactionVersion})
cancel()
if err != nil {
logger.Error("failed to get parsed transaction for observation request", zap.String("bytes", hex.EncodeToString(m.TxHash)), zap.Stringer("signature", signature), zap.Error(err))
} else {
s.processParsedTransaction(ctx, logger, parsedTxResult, signature, 0, true)
}
} else {
logger.Error("ignoring an observation request of unexpected length", zap.Int("len", len(m.TxHash)), zap.String("bytes", hex.EncodeToString(m.TxHash)))
}
case <-timer.C:
// if TESTNET_REOBSV_HACK {
// continue
// } else {
// TESTNET_REOBSV_HACK = true
// }
// Get current slot height
rCtx, cancel := context.WithTimeout(ctx, rpcTimeout)
start := time.Now()
Expand Down Expand Up @@ -569,7 +603,7 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
continue
}

tx, err := txRpc.GetTransaction()
twm, err := txRpc.GetTransaction()
if err != nil {
logger.Error("failed to unmarshal transaction",
zap.Uint64("slot", slot),
Expand All @@ -580,7 +614,20 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
continue
}

err = s.populateLookupTableAccounts(ctx, tx)
parsedTxResult, err := s.rpcClient.GetParsedTransaction(rCtx, twm.Signatures[0], &rpc.GetParsedTransactionOpts{Commitment: s.commitment, MaxSupportedTransactionVersion: &maxSupportedTransactionVersion})
if err != nil {
logger.Error("failed to get parsed transaction",
zap.Uint64("slot", slot),
zap.Int("txNum", txNum),
zap.Int("dataLen", len(txRpc.Transaction.GetBinary())),
zap.Error(err),
)
continue
}
tx := parsedTxResult.Transaction

// TODO: Get rid of all this after we run in mainnet for a while and verify we don't see any errors. /////////////////////////////////////////////////////////////
err = s.populateLookupTableAccounts(ctx, twm)
if err != nil {
logger.Error("failed to fetch lookup table accounts",
zap.Uint64("slot", slot),
Expand All @@ -590,81 +637,102 @@ func (s *SolanaWatcher) fetchBlock(ctx context.Context, logger *zap.Logger, slot
continue
}

signature := tx.Signatures[0]
var programIndex uint16
for n, key := range tx.Message.AccountKeys {
if key.Equals(s.contract) {
programIndex = uint16(n)
if len(tx.Message.AccountKeys) != len(twm.Message.AccountKeys) {
logger.Error("parsed tx has different number of account keys than original tx", zap.Any("origAccountKeys", twm.Message.AccountKeys), zap.Any("parsedAccountKeys", tx.Message.AccountKeys))
continue
} else {
errorFound := false
for idx, orig := range twm.Message.AccountKeys {
if orig != tx.Message.AccountKeys[idx].PublicKey {
logger.Error("account keys differ", zap.Int("idx", idx), zap.Any("origAccountKeys", twm.Message.AccountKeys), zap.Any("parsedAccountKeys", tx.Message.AccountKeys))
errorFound = true
break
}
}
if errorFound {
continue
}
}
if programIndex == 0 {
continue
// TODO: End of stuff to get rid of. /////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////

s.processParsedTransaction(ctx, logger, parsedTxResult, tx.Signatures[0], slot, false)
}

if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipped or unavailable block retrieved on retry attempt",
zap.Uint("empty_retry", emptyRetry),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

return true
}

func (s *SolanaWatcher) processParsedTransaction(ctx context.Context, logger *zap.Logger, parsedTxResult *rpc.GetParsedTransactionResult, signature solana.Signature, slot uint64, isReobservation bool) {
foundContract := false
for _, key := range parsedTxResult.Transaction.Message.AccountKeys {
if key.PublicKey.Equals(s.contract) {
foundContract = true
}
}
if !foundContract {
return
}
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found Wormhole transaction",
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found Wormhole transaction",
// Find top-level instructions.
for i, inst := range parsedTxResult.Transaction.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}

// Find top-level instructions
for i, inst := range tx.Message.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, false)
// Find inner instructions.
for _, inner := range parsedTxResult.Meta.InnerInstructions {
for i, inst := range inner.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, signature, i, isReobservation)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)),
zap.Binary("data", inst.Data))
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found a top-level Wormhole instruction",
logger.Debug("found an inner Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}

for _, inner := range txRpc.Meta.InnerInstructions {
for i, inst := range inner.Instructions {
found, err := s.processInstruction(ctx, logger, slot, inst, programIndex, tx, signature, i, false)
if err != nil {
logger.Error("malformed Wormhole instruction",
zap.Error(err),
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
} else if found {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("found an inner Wormhole instruction",
zap.Int("idx", i),
zap.Stringer("signature", signature),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}
}
}
}
}

if emptyRetry > 0 && logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipped or unavailable block retrieved on retry attempt",
zap.Uint("empty_retry", emptyRetry),
zap.Uint64("slot", slot),
zap.String("commitment", string(s.commitment)))
}

return true
}

func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst solana.CompiledInstruction, programIndex uint16, tx *solana.Transaction, signature solana.Signature, idx int, isReobservation bool) (bool, error) {
if inst.ProgramIDIndex != programIndex {
func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logger, slot uint64, inst *rpc.ParsedInstruction, signature solana.Signature, idx int, isReobservation bool) (bool, error) {
if inst.ProgramId != s.contract {
return false, nil
}

Expand Down Expand Up @@ -697,20 +765,20 @@ func (s *SolanaWatcher) processInstruction(ctx context.Context, logger *zap.Logg
return false, fmt.Errorf("failed to determine commitment: %w", err)
}

if level != s.commitment {
if level != s.commitment && !isReobservation {
return true, nil
}

// The second account in a well-formed Wormhole instruction is the VAA program account.
acc := tx.Message.AccountKeys[inst.Accounts[1]]
accPublicKey := inst.Accounts[1]

if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("fetching VAA account", zap.Stringer("acc", acc),
logger.Debug("fetching VAA account", zap.Stringer("acc", accPublicKey),
zap.Stringer("signature", signature), zap.Uint64("slot", slot), zap.Int("idx", idx))
}

common.RunWithScissors(ctx, s.errC, "retryFetchMessageAccount", func(ctx context.Context) error {
s.retryFetchMessageAccount(ctx, logger, acc, slot, 0, isReobservation)
s.retryFetchMessageAccount(ctx, logger, accPublicKey, slot, 0, isReobservation)
return nil
})

Expand Down Expand Up @@ -883,9 +951,11 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a
if isReobservation && s.commitment == rpc.CommitmentFinalized {
// There is only a single reobservation request channel for each chain, which is assigned to the finalized watcher.
// If someone requests reobservation of a confirmed message, we should allow the observation to go through.
logger.Info("allowing reobservation although the commitment level does not match the watcher",
zap.Stringer("account", acc), zap.String("message commitment", string(commitment)), zap.String("watcher commitment", string(s.commitment)),
)
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("allowing reobservation although the commitment level does not match the watcher",
zap.Stringer("account", acc), zap.String("message commitment", string(commitment)), zap.String("watcher commitment", string(s.commitment)),
)
}
} else {
if logger.Level().Enabled(zapcore.DebugLevel) {
logger.Debug("skipping message which does not match the watcher commitment", zap.Stringer("account", acc), zap.String("message commitment", string(commitment)), zap.String("watcher commitment", string(s.commitment)))
Expand Down Expand Up @@ -937,6 +1007,7 @@ func (s *SolanaWatcher) processMessageAccount(logger *zap.Logger, data []byte, a

if logger.Level().Enabled(s.msgObservedLogLevel) {
logger.Log(s.msgObservedLogLevel, "message observed",
zap.Stringer("txHash", txHash),
zap.Stringer("account", acc),
zap.Time("timestamp", observation.Timestamp),
zap.Uint32("nonce", observation.Nonce),
Expand Down
15 changes: 15 additions & 0 deletions node/pkg/watchers/solana/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package solana

import (
"testing"

"github.com/gagliardetto/solana-go"

"github.com/stretchr/testify/assert"
)

func TestVerifyConstants(t *testing.T) {
// If either of these ever change, message publication and reobservation may break.
assert.Equal(t, SolanaAccountLen, solana.PublicKeyLength)
assert.Equal(t, SolanaSignatureLen, len(solana.Signature{}))
}

0 comments on commit 1850ea1

Please sign in to comment.