Skip to content

Commit

Permalink
node/node_test: Test governance vaa injection
Browse files Browse the repository at this point in the history
  • Loading branch information
tbjump authored and tbjump committed Aug 14, 2023
1 parent 1cabbe8 commit 2a06fd4
Showing 1 changed file with 72 additions and 16 deletions.
88 changes: 72 additions & 16 deletions node/pkg/node/node_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (

"sync/atomic"

"github.com/certusone/wormhole/node/pkg/adminrpc"
"github.com/certusone/wormhole/node/pkg/common"
"github.com/certusone/wormhole/node/pkg/db"
"github.com/certusone/wormhole/node/pkg/devnet"
Expand Down Expand Up @@ -64,6 +65,8 @@ const WAIT_FOR_METRICS = false
// The level at which logs will be written to console; During testing, logs are produced and buffered at Info level, because some tests need to look for certain entries.
var CONSOLE_LOG_LEVEL = zap.InfoLevel

const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)

var TEST_ID_CTR atomic.Uint32

func getTestId() uint {
Expand Down Expand Up @@ -341,9 +344,12 @@ func waitForVaa(t testing.TB, ctx context.Context, c publicrpcv1.PublicRPCServic
}

type testCase struct {
msg *common.MessagePublication // a Wormhole message
msg *common.MessagePublication // a Wormhole message
govMsg *nodev1.GovernanceMessage // protobuf representation of msg as governance message, if applicable.
// number of Guardians who will initially observe this message through the mock watcher
numGuardiansObserve int
// number of Guardians where the governance message will be injected through the adminrpc
numGuardiansInjectGov int
// if true, Guardians will not observe this message in the mock watcher, if they receive a reobservation request for it
unavailableInReobservation bool
// if true, the test environment will inject a reobservation request signed by Guardian 1,
Expand Down Expand Up @@ -509,6 +515,34 @@ func TestMain(m *testing.M) {
os.Exit(m.Run())
}

func createGovernanceMsgAndVaa(t testing.TB) (*common.MessagePublication, *nodev1.GovernanceMessage) {
t.Helper()
msgGov := someMessage()
msgGov.EmitterAddress = vaa.GovernanceEmitter
msgGov.EmitterChain = vaa.GovernanceChain

govMsg := &nodev1.GovernanceMessage{
Sequence: msgGov.Sequence,
Nonce: msgGov.Nonce,
Payload: &nodev1.GovernanceMessage_GuardianSet{
GuardianSet: &nodev1.GuardianSetUpdate{
Guardians: []*nodev1.GuardianSetUpdate_Guardian{
{
Pubkey: "0x187727CdD17C8142FE9b29A066F577548423aF0e",
Name: "P2P Validator",
},
},
},
},
}
govVaa, err := adminrpc.GovMsgToVaa(govMsg, guardianSetIndex, msgGov.Timestamp)
require.NoError(t, err)
msgGov.Payload = govVaa.Payload
msgGov.ConsistencyLevel = govVaa.ConsistencyLevel

return msgGov, govMsg
}

// TestConsensus tests that a set of guardians can form consensus on certain messages and reject certain other messages
func TestConsensus(t *testing.T) {
// adjust processor time intervals to make tests pass faster
Expand All @@ -523,6 +557,8 @@ func TestConsensus(t *testing.T) {
msgGovEmitter := someMessage()
msgGovEmitter.EmitterAddress = vaa.GovernanceEmitter

msgGov, msgGovProto := createGovernanceMsgAndVaa(t)

msgWrongEmitterChain := someMessage()
msgWrongEmitterChain.EmitterChain = vaa.ChainIDEthereum

Expand Down Expand Up @@ -585,6 +621,13 @@ func TestConsensus(t *testing.T) {
numGuardiansObserve: numGuardians,
mustReachQuorum: true,
},
{ // Injected governance message
msg: msgGov,
govMsg: msgGovProto,
numGuardiansObserve: 0,
numGuardiansInjectGov: numGuardians,
mustReachQuorum: true,
},
// TODO add a testcase to test the automatic re-observation requests.
// Need to refactor various usage of wall time to a mockable time first. E.g. using https://github.com/benbjohnson/clock
}
Expand All @@ -594,7 +637,6 @@ func TestConsensus(t *testing.T) {
// runConsensusTests spins up `numGuardians` guardians and runs & verifies the testCases
func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
const testTimeout = time.Second * 30
const guardianSetIndex = 5 // index of the active guardian set (can be anything, just needs to be set to something)
const vaaCheckGuardianIndex uint = 0 // we will query this guardian's publicrpc for VAAs
const adminRpcGuardianIndex uint = 0 // we will query this guardian's adminRpc
testId := getTestId()
Expand Down Expand Up @@ -681,27 +723,28 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
}
}

// Wait for adminrpc to come online
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[adminRpcGuardianIndex].config.adminSocket)).Len() == 0 {
logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}

// Send manual re-observation requests
// Do adminrpc stuff: Send manual re-observation requests and perform governance msg injections
func() { // put this in own function to use defer
s := fmt.Sprintf("unix:///%s", gs[adminRpcGuardianIndex].config.adminSocket)
conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
// Wait for adminrpc to come online
adminCs := make([]nodev1.NodePrivilegedServiceClient, numGuardians)
for i := 0; i < numGuardians; i++ {
for zapObserver.FilterMessage("admin server listening on").FilterField(zap.String("path", gs[i].config.adminSocket)).Len() == 0 {
logger.Info("admin server seems to be offline (according to logs). Waiting 100ms...")
time.Sleep(time.Microsecond * 100)
}

c := nodev1.NewNodePrivilegedServiceClient(conn)
s := fmt.Sprintf("unix:///%s", gs[i].config.adminSocket)
conn, err := grpc.DialContext(ctx, s, grpc.WithTransportCredentials(insecure.NewCredentials()))
require.NoError(t, err)
defer conn.Close()
adminCs[i] = nodev1.NewNodePrivilegedServiceClient(conn)
}

for i, testCase := range testCases {
if testCase.performManualReobservationRequest {
// timeout for grpc query
logger.Info("injecting observation request through admin rpc", zap.Int("test_case", i))
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
_, err = c.SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{
_, err := adminCs[adminRpcGuardianIndex].SendObservationRequest(queryCtx, &nodev1.SendObservationRequestRequest{
ObservationRequest: &gossipv1.ObservationRequest{
ChainId: uint32(testCase.msg.EmitterChain),
TxHash: testCase.msg.TxHash[:],
Expand All @@ -710,6 +753,19 @@ func runConsensusTests(t *testing.T, testCases []testCase, numGuardians int) {
queryCancel()
assert.NoError(t, err)
}

for j := 0; j < testCase.numGuardiansInjectGov; j++ {
require.NotNil(t, testCase.govMsg)
logger.Info("injecting message through admin rpc", zap.Int("test_case", i), zap.Int("guardian", j))
queryCtx, queryCancel := context.WithTimeout(ctx, time.Second)
_, err := adminCs[j].InjectGovernanceVAA(queryCtx, &nodev1.InjectGovernanceVAARequest{
CurrentSetIndex: guardianSetIndex,
Messages: []*nodev1.GovernanceMessage{testCase.govMsg},
Timestamp: uint32(testCase.msg.Timestamp.Unix()),
})
queryCancel()
assert.NoError(t, err)
}
}
}()

Expand Down

0 comments on commit 2a06fd4

Please sign in to comment.