From 92b54bb749c454a6140665de557f008fcdbbe89d Mon Sep 17 00:00:00 2001 From: bruce-riley <96066700+bruce-riley@users.noreply.github.com> Date: Mon, 26 Aug 2024 08:41:29 -0600 Subject: [PATCH] Node: DB not logging to Grafana (#4093) * Node: DB not logging to Grafana * Code review rework * Remove unnecessary variable --- node/cmd/guardiand/node.go | 205 ++++++++++++++++----------------- node/pkg/db/accountant_test.go | 30 ++--- node/pkg/db/db.go | 12 -- node/pkg/db/db_test.go | 26 +---- node/pkg/db/governor_test.go | 40 ++----- node/pkg/db/purge_vaas_test.go | 19 ++- 6 files changed, 130 insertions(+), 202 deletions(-) diff --git a/node/cmd/guardiand/node.go b/node/cmd/guardiand/node.go index fffe74c890..3900b4a71a 100644 --- a/node/cmd/guardiand/node.go +++ b/node/cmd/guardiand/node.go @@ -619,6 +619,9 @@ func runNode(cmd *cobra.Command, args []string) { // Verify flags + if *nodeName == "" { + logger.Fatal("Please specify --nodeName") + } if *nodeKeyPath == "" && env != common.UnsafeDevNet { // In devnet mode, keys are deterministically generated. logger.Fatal("Please specify --nodeKey") } @@ -643,6 +646,100 @@ func runNode(cmd *cobra.Command, args []string) { logger.Fatal("Please specify --ethRPC") } + // In devnet mode, we generate a deterministic guardian key and write it to disk. + if env == common.UnsafeDevNet { + err := devnet.GenerateAndStoreDevnetGuardianKey(*guardianKeyPath) + if err != nil { + logger.Fatal("failed to generate devnet guardian key", zap.Error(err)) + } + } + + // Load guardian key + gk, err := common.LoadGuardianKey(*guardianKeyPath, env == common.UnsafeDevNet) + if err != nil { + logger.Fatal("failed to load guardian key", zap.Error(err)) + } + + logger.Info("Loaded guardian key", zap.String( + "address", ethcrypto.PubkeyToAddress(gk.PublicKey).String())) + + // Load p2p private key + var p2pKey libp2p_crypto.PrivKey + if env == common.UnsafeDevNet { + idx, err := devnet.GetDevnetIndex() + if err != nil { + logger.Fatal("Failed to parse hostname - are we running in devnet?") + } + p2pKey = devnet.DeterministicP2PPrivKeyByIndex(int64(idx)) + + if idx != 0 { + firstGuardianName, err := devnet.GetFirstGuardianNameFromBootstrapPeers(*p2pBootstrap) + if err != nil { + logger.Fatal("failed to get first guardian name from bootstrap peers", zap.String("bootstrapPeers", *p2pBootstrap), zap.Error(err)) + } + // try to connect to guardian-0 + for { + _, err := net.LookupIP(firstGuardianName) + if err == nil { + break + } + logger.Info(fmt.Sprintf("Error resolving %s. Trying again...", firstGuardianName)) + time.Sleep(time.Second) + } + // TODO this is a hack. If this is not the bootstrap Guardian, we wait 10s such that the bootstrap Guardian has enough time to start. + // This may no longer be necessary because now the p2p.go ensures that it can connect to at least one bootstrap peer and will + // exit the whole guardian if it is unable to. Sleeping here for a bit may reduce overall startup time by preventing unnecessary restarts, though. + logger.Info("This is not a bootstrap Guardian. Waiting another 10 seconds for the bootstrap guardian to come online.") + time.Sleep(time.Second * 10) + } + } else { + p2pKey, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath) + if err != nil { + logger.Fatal("Failed to load node key", zap.Error(err)) + } + } + + // Set up telemetry if it is enabled. We can't do this until we have the p2p key and the guardian key. + // Telemetry is enabled by default in mainnet/testnet. In devnet it is disabled by default. + usingLoki := *telemetryLokiURL != "" + if !*disableTelemetry && (env != common.UnsafeDevNet || (env == common.UnsafeDevNet && usingLoki)) { + if !usingLoki { + logger.Fatal("Please specify --telemetryLokiURL or set --disableTelemetry=false") + } + + // Get libp2p peer ID from private key + pk := p2pKey.GetPublic() + peerID, err := peer.IDFromPublicKey(pk) + if err != nil { + logger.Fatal("Failed to get peer ID from private key", zap.Error(err)) + } + + labels := map[string]string{ + "node_name": *nodeName, + "node_key": peerID.String(), + "guardian_addr": ethcrypto.PubkeyToAddress(gk.PublicKey).String(), + "network": *p2pNetworkID, + "version": version.Version(), + } + + skipPrivateLogs := !*publicRpcLogToTelemetry + + var tm *telemetry.Telemetry + if usingLoki { + logger.Info("Using Loki telemetry logger", + zap.String("publicRpcLogDetail", *publicRpcLogDetailStr), + zap.Bool("logPublicRpcToTelemetry", *publicRpcLogToTelemetry)) + + tm, err = telemetry.NewLokiCloudLogger(context.Background(), logger, *telemetryLokiURL, "wormhole", skipPrivateLogs, labels) + if err != nil { + logger.Fatal("Failed to initialize telemetry", zap.Error(err)) + } + } + + defer tm.Close() + logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger + } + // Validate the args for all the EVM chains. The last flag indicates if the chain is allowed in mainnet. *ethContract = checkEvmArgs(logger, *ethRPC, *ethContract, "eth", true) *bscContract = checkEvmArgs(logger, *bscRPC, *bscContract, "bsc", true) @@ -743,10 +840,6 @@ func runNode(cmd *cobra.Command, args []string) { logger.Fatal("--publicRpcLogDetail should be one of (none, minimal, full)") } - if *nodeName == "" { - logger.Fatal("Please specify --nodeName") - } - // Complain about Infura on mainnet. // // As it turns out, Infura has a bug where it would sometimes incorrectly round @@ -770,63 +863,6 @@ func runNode(cmd *cobra.Command, args []string) { logger.Fatal("Infura is known to send incorrect blocks - please use your own nodes") } - // In devnet mode, we generate a deterministic guardian key and write it to disk. - if env == common.UnsafeDevNet { - err := devnet.GenerateAndStoreDevnetGuardianKey(*guardianKeyPath) - if err != nil { - logger.Fatal("failed to generate devnet guardian key", zap.Error(err)) - } - } - - // Database - db := db.OpenDb(logger, dataDir) - defer db.Close() - - // Guardian key - gk, err := common.LoadGuardianKey(*guardianKeyPath, env == common.UnsafeDevNet) - if err != nil { - logger.Fatal("failed to load guardian key", zap.Error(err)) - } - - logger.Info("Loaded guardian key", zap.String( - "address", ethcrypto.PubkeyToAddress(gk.PublicKey).String())) - - // Load p2p private key - var p2pKey libp2p_crypto.PrivKey - if env == common.UnsafeDevNet { - idx, err := devnet.GetDevnetIndex() - if err != nil { - logger.Fatal("Failed to parse hostname - are we running in devnet?") - } - p2pKey = devnet.DeterministicP2PPrivKeyByIndex(int64(idx)) - - if idx != 0 { - firstGuardianName, err := devnet.GetFirstGuardianNameFromBootstrapPeers(*p2pBootstrap) - if err != nil { - logger.Fatal("failed to get first guardian name from bootstrap peers", zap.String("bootstrapPeers", *p2pBootstrap), zap.Error(err)) - } - // try to connect to guardian-0 - for { - _, err := net.LookupIP(firstGuardianName) - if err == nil { - break - } - logger.Info(fmt.Sprintf("Error resolving %s. Trying again...", firstGuardianName)) - time.Sleep(time.Second) - } - // TODO this is a hack. If this is not the bootstrap Guardian, we wait 10s such that the bootstrap Guardian has enough time to start. - // This may no longer be necessary because now the p2p.go ensures that it can connect to at least one bootstrap peer and will - // exit the whole guardian if it is unable to. Sleeping here for a bit may reduce overall startup time by preventing unnecessary restarts, though. - logger.Info("This is not a bootstrap Guardian. Waiting another 10 seconds for the bootstrap guardian to come online.") - time.Sleep(time.Second * 10) - } - } else { - p2pKey, err = common.GetOrCreateNodeKey(logger, *nodeKeyPath) - if err != nil { - logger.Fatal("Failed to load node key", zap.Error(err)) - } - } - rpcMap := make(map[string]string) rpcMap["acalaRPC"] = *acalaRPC rpcMap["accountantWS"] = *accountantWS @@ -898,55 +934,16 @@ func runNode(cmd *cobra.Command, args []string) { rootCtxCancel() }() - usingLoki := *telemetryLokiURL != "" - - var hasTelemetryCredential bool = usingLoki - - // Telemetry is enabled by default in mainnet/testnet. In devnet it is disabled by default - if !*disableTelemetry && (env != common.UnsafeDevNet || (env == common.UnsafeDevNet && hasTelemetryCredential)) { - if !hasTelemetryCredential { - logger.Fatal("Please specify --telemetryLokiURL or set --disableTelemetry=false") - } - - // Get libp2p peer ID from private key - pk := p2pKey.GetPublic() - peerID, err := peer.IDFromPublicKey(pk) - if err != nil { - logger.Fatal("Failed to get peer ID from private key", zap.Error(err)) - } - - labels := map[string]string{ - "node_name": *nodeName, - "node_key": peerID.String(), - "guardian_addr": ethcrypto.PubkeyToAddress(gk.PublicKey).String(), - "network": *p2pNetworkID, - "version": version.Version(), - } - - skipPrivateLogs := !*publicRpcLogToTelemetry - - var tm *telemetry.Telemetry - if usingLoki { - logger.Info("Using Loki telemetry logger", - zap.String("publicRpcLogDetail", *publicRpcLogDetailStr), - zap.Bool("logPublicRpcToTelemetry", *publicRpcLogToTelemetry)) - - tm, err = telemetry.NewLokiCloudLogger(context.Background(), logger, *telemetryLokiURL, "wormhole", skipPrivateLogs, labels) - if err != nil { - logger.Fatal("Failed to initialize telemetry", zap.Error(err)) - } - } - - defer tm.Close() - logger = tm.WrapLogger(logger) // Wrap logger with telemetry logger - } - // log golang version logger.Info("golang version", zap.String("golang_version", runtime.Version())) // Redirect ipfs logs to plain zap ipfslog.SetPrimaryCore(logger.Core()) + // Database + db := db.OpenDb(logger.With(zap.String("component", "badgerDb")), dataDir) + defer db.Close() + wormchainId := "wormchain" if env == common.TestNet { wormchainId = "wormchain-testnet-0" diff --git a/node/pkg/db/accountant_test.go b/node/pkg/db/accountant_test.go index 93f5e29ea4..c79728bc1b 100644 --- a/node/pkg/db/accountant_test.go +++ b/node/pkg/db/accountant_test.go @@ -53,13 +53,10 @@ func TestAcctIsPendingTransfer(t *testing.T) { func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() - tokenBridgeAddr, _ := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") + tokenBridgeAddr, err := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") require.NoError(t, err) msg1 := &common.MessagePublication{ @@ -118,33 +115,25 @@ func TestAcctStoreAndDeletePendingTransfers(t *testing.T) { } func TestAcctGetEmptyData(t *testing.T) { + logger := zap.NewNop() dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(logger, &dbPath) defer db.Close() - logger, _ := zap.NewDevelopment() - pendings, err := db.AcctGetData(logger) require.NoError(t, err) assert.Equal(t, 0, len(pendings)) } func TestAcctGetData(t *testing.T) { + logger := zap.NewNop() dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(logger, &dbPath) defer db.Close() - logger, _ := zap.NewDevelopment() - // Store some unrelated junk in the db to make sure it gets skipped. junk := []byte("ABC123") - err = db.db.Update(func(txn *badger.Txn) error { + err := db.db.Update(func(txn *badger.Txn) error { if err := txn.Set(junk, junk); err != nil { return err } @@ -202,10 +191,7 @@ func TestAcctGetData(t *testing.T) { func TestAcctLoadingWhereOldPendingsGetDropped(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) diff --git a/node/pkg/db/db.go b/node/pkg/db/db.go index 04cc806fc9..6bdfb70115 100644 --- a/node/pkg/db/db.go +++ b/node/pkg/db/db.go @@ -83,17 +83,6 @@ func (i *VAAID) EmitterPrefixBytes() []byte { return []byte(fmt.Sprintf("signed/%d/%s", i.EmitterChain, i.EmitterAddress)) } -// TODO: Deprecate in favor of OpenDb -func Open(path string) (*Database, error) { - db, err := badger.Open(badger.DefaultOptions(path)) - if err != nil { - return nil, fmt.Errorf("failed to open database: %w", err) - } - return &Database{ - db: db, - }, nil -} - func (d *Database) Close() error { return d.db.Close() } @@ -237,7 +226,6 @@ func (d *Database) FindEmitterSequenceGap(prefix VAAID) (resp []uint64, firstSeq // Figure out gaps. for i := firstSeq; i <= lastSeq; i++ { if !seqs[i] { - fmt.Printf("missing: %d\n", i) resp = append(resp, i) } } diff --git a/node/pkg/db/db_test.go b/node/pkg/db/db_test.go index 5fde4c07d4..543d1b6cd6 100644 --- a/node/pkg/db/db_test.go +++ b/node/pkg/db/db_test.go @@ -14,6 +14,7 @@ import ( "github.com/dgraph-io/badger/v3" "github.com/ethereum/go-ethereum/crypto" "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" "testing" "time" @@ -88,10 +89,7 @@ func TestEmitterPrefixBytesWithOnlyChainID(t *testing.T) { func TestStoreSignedVAAUnsigned(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -103,10 +101,7 @@ func TestStoreSignedVAAUnsigned(t *testing.T) { func TestStoreSignedVAASigned(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -121,10 +116,7 @@ func TestStoreSignedVAASigned(t *testing.T) { func TestStoreSignedVAABatch(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -183,10 +175,7 @@ func TestStoreSignedVAABatch(t *testing.T) { func TestGetSignedVAABytes(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -213,10 +202,7 @@ func TestGetSignedVAABytes(t *testing.T) { func TestFindEmitterSequenceGap(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) diff --git a/node/pkg/db/governor_test.go b/node/pkg/db/governor_test.go index 87b564c467..0b0b2d17b0 100644 --- a/node/pkg/db/governor_test.go +++ b/node/pkg/db/governor_test.go @@ -136,10 +136,7 @@ func TestIsPendingMsg(t *testing.T) { func TestGetChainGovernorData(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() logger := zap.NewNop() @@ -153,10 +150,7 @@ func TestGetChainGovernorData(t *testing.T) { func TestStoreTransfer(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") @@ -187,10 +181,7 @@ func TestStoreTransfer(t *testing.T) { func TestDeleteTransfer(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() tokenAddr, err := vaa.StringToAddress("0x707f9118e33a9b8998bea41dd0d46f38bb963fc8") @@ -230,10 +221,7 @@ func TestDeleteTransfer(t *testing.T) { func TestStorePendingMsg(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() tokenBridgeAddr, err2 := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") @@ -258,10 +246,7 @@ func TestStorePendingMsg(t *testing.T) { func TestDeletePendingMsg(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() tokenBridgeAddr, err2 := vaa.StringToAddress("0x0290fb167208af455bb137780163b7b7a9a10c16") @@ -328,10 +313,7 @@ func TestSerializeAndDeserializeOfPendingTransfer(t *testing.T) { func TestStoreAndReloadTransfers(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -617,10 +599,7 @@ func marshalOldMessagePublication(msg *common.MessagePublication) []byte { func TestLoadingOldPendingTransfers(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -846,10 +825,7 @@ func TestDeserializeOfOldTransfer(t *testing.T) { func TestOldTransfersUpdatedWhenReloading(t *testing.T) { dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) diff --git a/node/pkg/db/purge_vaas_test.go b/node/pkg/db/purge_vaas_test.go index 83f70ec561..ef572434cb 100644 --- a/node/pkg/db/purge_vaas_test.go +++ b/node/pkg/db/purge_vaas_test.go @@ -8,6 +8,7 @@ import ( "github.com/ethereum/go-ethereum/crypto" "github.com/wormhole-foundation/wormhole/sdk/vaa" + "go.uber.org/zap" "testing" "time" @@ -63,10 +64,7 @@ func TestPurgingPythnetVAAs(t *testing.T) { var emitterAddress = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 4} dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -77,7 +75,7 @@ func TestPurgingPythnetVAAs(t *testing.T) { pythnetSeqNum := uint64(10000) solanaSeqNum := uint64(20000) for count := 0; count < 50; count++ { - err = storeVAA(db, &vaa.VAA{ + err := storeVAA(db, &vaa.VAA{ Version: uint8(1), GuardianSetIndex: uint32(1), Signatures: nil, @@ -111,7 +109,7 @@ func TestPurgingPythnetVAAs(t *testing.T) { // Create 75 VAAs each for Pythnet and Solana that are less than three days old. timeStamp = now.Add(-time.Hour * time.Duration(3*24-1)) for count := 0; count < 75; count++ { - err = storeVAA(db, &vaa.VAA{ + err := storeVAA(db, &vaa.VAA{ Version: uint8(1), GuardianSetIndex: uint32(1), Signatures: nil, @@ -168,10 +166,7 @@ func TestPurgingVAAsForOneEmitterAddress(t *testing.T) { var solanaEmitterAddress1 = vaa.Address{0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1} dbPath := t.TempDir() - db, err := Open(dbPath) - if err != nil { - t.Error("failed to open database") - } + db := OpenDb(zap.NewNop(), &dbPath) defer db.Close() defer os.Remove(dbPath) @@ -182,7 +177,7 @@ func TestPurgingVAAsForOneEmitterAddress(t *testing.T) { pythnetSeqNum := uint64(10000) solanaSeqNum := uint64(20000) for count := 0; count < 50; count++ { - err = storeVAA(db, &vaa.VAA{ + err := storeVAA(db, &vaa.VAA{ Version: uint8(1), GuardianSetIndex: uint32(1), Signatures: nil, @@ -231,7 +226,7 @@ func TestPurgingVAAsForOneEmitterAddress(t *testing.T) { // Create 75 VAAs each for each emitter that are less than three days old. timeStamp = now.Add(-time.Hour * time.Duration(3*24-1)) for count := 0; count < 75; count++ { - err = storeVAA(db, &vaa.VAA{ + err := storeVAA(db, &vaa.VAA{ Version: uint8(1), GuardianSetIndex: uint32(1), Signatures: nil,