-
Notifications
You must be signed in to change notification settings - Fork 691
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Node: Processor benchmarking (#4075)
- Loading branch information
1 parent
92b54bb
commit 43c96f9
Showing
1 changed file
with
243 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,243 @@ | ||
package processor | ||
|
||
import ( | ||
"context" | ||
"crypto/ecdsa" | ||
"crypto/rand" | ||
"fmt" | ||
"os" | ||
"runtime/pprof" | ||
"testing" | ||
"time" | ||
|
||
"github.com/certusone/wormhole/node/pkg/common" | ||
"github.com/certusone/wormhole/node/pkg/db" | ||
"github.com/certusone/wormhole/node/pkg/gwrelayer" | ||
gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" | ||
|
||
// gossipv1 "github.com/certusone/wormhole/node/pkg/proto/gossip/v1" | ||
ethCommon "github.com/ethereum/go-ethereum/common" | ||
"github.com/ethereum/go-ethereum/crypto" | ||
"github.com/stretchr/testify/require" | ||
"github.com/wormhole-foundation/wormhole/sdk/vaa" | ||
"go.uber.org/zap" | ||
) | ||
|
||
/* | ||
No gwrelayer: | ||
average time to do 1800000 observations: 51.246µs | ||
there were 1100000 under quorum, taking an average time of 62.888µs | ||
there were 100000 quorum reached, taking an average time of 68.049µs | ||
there were 600000 over quorum, taking an average time of 27.101µs | ||
there were 100000 handle message calls, taking an average time of 28.723µs | ||
With gwrelayer (where observations are not relayed): | ||
average time to do 1800000 observations: 51.18µs | ||
there were 1100000 under quorum, taking an average time of 62.713µs | ||
there were 100000 quorum reached, taking an average time of 68.316µs | ||
there were 600000 over quorum, taking an average time of 27.182µs | ||
there were 100000 handle message calls, taking an average time of 28.704µs | ||
*/ | ||
|
||
// go test -bench ^BenchmarkHandleObservation -benchtime=1x | ||
func BenchmarkHandleObservation(b *testing.B) { | ||
const NumObservations = 100000 | ||
ctx := context.Background() | ||
db := db.OpenDb(nil, nil) | ||
defer db.Close() | ||
p, pd := createProcessorForTest(b, NumObservations, ctx, db, false) | ||
require.NotNil(b, p) | ||
require.NotNil(b, pd) | ||
|
||
var totalTime, underQuorumTime, quorumReachedTime, overQuorumTime, handleMsgTime time.Duration | ||
var totalCount, underQuorumCount, quorumReachedCount, overQuorumCount int | ||
for count := 0; count < NumObservations; count++ { | ||
k := pd.createMessagePublication(b, uint64(count)) | ||
start := time.Now() | ||
p.handleMessage(k) | ||
handleMsgTime += time.Since(start) | ||
|
||
for guardianIdx := 1; guardianIdx < 19; guardianIdx++ { | ||
start := time.Now() | ||
p.handleSingleObservation(pd.guardianAddrs[guardianIdx], pd.createObservation(b, guardianIdx, k)) | ||
duration := time.Since(start) | ||
totalCount++ | ||
totalTime += duration | ||
if guardianIdx < 12 { | ||
underQuorumCount++ | ||
underQuorumTime += duration | ||
} else if guardianIdx == 12 { | ||
quorumReachedCount++ | ||
quorumReachedTime += duration | ||
} else { | ||
overQuorumCount++ | ||
overQuorumTime += duration | ||
} | ||
} | ||
} | ||
require.Equal(b, NumObservations, len(pd.gossipVaaSendC)) | ||
// This won't work once batching is enabled. | ||
// require.Equal(b, NumObservations, len(pd.gossipAttestationSendC)) | ||
fmt.Println("average time to do ", totalCount, " observations: ", totalTime/time.Duration(totalCount)) | ||
fmt.Println("there were ", underQuorumCount, " under quorum, taking an average time of ", underQuorumTime/time.Duration(underQuorumCount)) | ||
fmt.Println("there were ", quorumReachedCount, " quorum reached, taking an average time of ", quorumReachedTime/time.Duration(quorumReachedCount)) | ||
fmt.Println("there were ", overQuorumCount, " over quorum, taking an average time of ", overQuorumTime/time.Duration(overQuorumCount)) | ||
fmt.Println("there were ", NumObservations, " handle message calls, taking an average time of ", handleMsgTime/time.Duration(NumObservations)) | ||
} | ||
|
||
// go test -bench ^BenchmarkProfileHandleObservation -benchtime=1x | ||
// To view profiling results: | ||
// go install github.com/google/pprof@latest | ||
// sudo apt install graphviz | ||
// pprof -http=:8080 handleObs.prof | ||
|
||
func BenchmarkProfileHandleObservation(b *testing.B) { | ||
// return | ||
const NumObservations = 100000 | ||
f, err := os.Create("handleObs.prof") | ||
require.NoError(b, err) | ||
err = pprof.StartCPUProfile(f) | ||
require.NoError(b, err) | ||
defer pprof.StopCPUProfile() | ||
|
||
ctx := context.Background() | ||
db := db.OpenDb(nil, nil) | ||
defer db.Close() | ||
p, pd := createProcessorForTest(b, NumObservations, ctx, db, false) | ||
require.NotNil(b, p) | ||
require.NotNil(b, pd) | ||
|
||
for count := 0; count < NumObservations; count++ { | ||
k := pd.createMessagePublication(b, uint64(count)) | ||
p.handleMessage(k) | ||
|
||
for guardianIdx := 1; guardianIdx < 19; guardianIdx++ { | ||
p.handleSingleObservation(pd.guardianAddrs[guardianIdx], pd.createObservation(b, guardianIdx, k)) | ||
} | ||
} | ||
require.Equal(b, NumObservations, len(pd.gossipVaaSendC)) | ||
} | ||
|
||
type ProcessorData struct { | ||
gossipAttestationSendC chan []byte | ||
gossipVaaSendC chan []byte | ||
emitterChain vaa.ChainID | ||
emitterAddress vaa.Address | ||
guardianKeys []*ecdsa.PrivateKey | ||
guardianAddrs [][]byte | ||
} | ||
|
||
func (pd *ProcessorData) messageID(seqNum uint64) string { | ||
return fmt.Sprintf("%d/%s/%d", pd.emitterChain, pd.emitterAddress, seqNum) | ||
} | ||
|
||
// createProcessorForTest creates a processor for benchmarking. It assumes we are index zero in the guardian set. | ||
func createProcessorForTest(b *testing.B, numVAAs int, ctx context.Context, db *db.Database, useBatching bool) (*Processor, *ProcessorData) { | ||
b.Helper() | ||
logger := zap.NewNop() | ||
|
||
var ourKey *ecdsa.PrivateKey | ||
keys := []ethCommon.Address{} | ||
guardianKeys := []*ecdsa.PrivateKey{} | ||
guardianAddrs := [][]byte{} | ||
|
||
for count := 0; count < 19; count++ { | ||
gk, err := ecdsa.GenerateKey(crypto.S256(), rand.Reader) | ||
require.NoError(b, err) | ||
keys = append(keys, crypto.PubkeyToAddress(gk.PublicKey)) | ||
guardianKeys = append(guardianKeys, gk) | ||
guardianAddrs = append(guardianAddrs, crypto.PubkeyToAddress(gk.PublicKey).Bytes()) | ||
if ourKey == nil { | ||
ourKey = gk | ||
} | ||
} | ||
|
||
gs := common.NewGuardianSet(keys, 0) | ||
gst := common.NewGuardianSetState(nil) | ||
gst.Set(gs) | ||
|
||
emitterAddress, err := vaa.StringToAddress("0x3ee18B2214AFF97000D974cf647E7C347E8fa585") | ||
require.NoError(b, err) | ||
|
||
gwRelayer := gwrelayer.NewGatewayRelayer(ctx, logger, "wormhole14ejqjyq8um4p3xfqj74yld5waqljf88fz25yxnma0cngspxe3les00fpj", nil, common.MainNet) | ||
require.NoError(b, gwRelayer.Start(ctx)) | ||
|
||
pd := &ProcessorData{ | ||
gossipAttestationSendC: make(chan []byte, numVAAs+100), | ||
gossipVaaSendC: make(chan []byte, numVAAs+100), | ||
emitterChain: vaa.ChainIDEthereum, | ||
emitterAddress: emitterAddress, | ||
guardianKeys: guardianKeys, | ||
guardianAddrs: guardianAddrs, | ||
} | ||
|
||
p := &Processor{ | ||
gossipAttestationSendC: pd.gossipAttestationSendC, | ||
gossipVaaSendC: pd.gossipVaaSendC, | ||
gk: ourKey, | ||
gs: gs, | ||
gst: gst, | ||
db: db, | ||
logger: logger, | ||
state: &aggregationState{observationMap{}}, | ||
ourAddr: crypto.PubkeyToAddress(ourKey.PublicKey), | ||
pythnetVaas: make(map[string]PythNetVaaEntry), | ||
updatedVAAs: make(map[string]*updateVaaEntry), | ||
gatewayRelayer: gwRelayer, | ||
} | ||
|
||
batchCutoverCompleteFlag.Store(useBatching) | ||
|
||
go func() { _ = p.vaaWriter(ctx) }() | ||
go func() { _ = p.batchProcessor(ctx) }() | ||
|
||
return p, pd | ||
} | ||
|
||
func (pd *ProcessorData) createMessagePublication(b *testing.B, sequence uint64) *common.MessagePublication { | ||
b.Helper() | ||
return &common.MessagePublication{ | ||
TxHash: ethCommon.HexToHash(fmt.Sprintf("%064x", sequence)), | ||
Timestamp: time.Now(), | ||
Nonce: 42, | ||
Sequence: sequence, | ||
EmitterChain: pd.emitterChain, | ||
EmitterAddress: pd.emitterAddress, | ||
Payload: []byte{0x01, 0x02, 0x03, 0x04}, | ||
ConsistencyLevel: 32, | ||
} | ||
} | ||
|
||
func (pd *ProcessorData) createObservation(b *testing.B, guardianIdx int, k *common.MessagePublication) *gossipv1.Observation { | ||
b.Helper() | ||
v := &VAA{ | ||
VAA: vaa.VAA{ | ||
Version: vaa.SupportedVAAVersion, | ||
GuardianSetIndex: uint32(guardianIdx), | ||
Signatures: nil, | ||
Timestamp: k.Timestamp, | ||
Nonce: k.Nonce, | ||
EmitterChain: k.EmitterChain, | ||
EmitterAddress: k.EmitterAddress, | ||
Payload: k.Payload, | ||
Sequence: k.Sequence, | ||
ConsistencyLevel: k.ConsistencyLevel, | ||
}, | ||
Unreliable: k.Unreliable, | ||
Reobservation: k.IsReobservation, | ||
} | ||
|
||
// Generate digest of the unsigned VAA. | ||
digest := v.SigningDigest() | ||
|
||
// Sign the digest using our node's guardian key. | ||
signature, err := crypto.Sign(digest.Bytes(), pd.guardianKeys[guardianIdx]) | ||
require.NoError(b, err) | ||
|
||
return &gossipv1.Observation{ | ||
Hash: digest.Bytes(), | ||
Signature: signature, | ||
TxHash: k.TxHash.Bytes(), | ||
MessageId: pd.messageID(k.Sequence), | ||
} | ||
} |