diff --git a/README.md b/README.md index 69236ccfb2..dcb42a122f 100644 --- a/README.md +++ b/README.md @@ -74,7 +74,7 @@ to that team for all the work they put into researching this approach. Instead of requiring nodes to execute all previous transactions when joining any `hyperchain` (which may not be possible if there is very high throughput on a Subnet), the `hypersdk` just syncs the most recent state from the network. To avoid falling -behind the network while syncing this state, the `hypersdk` acts as an Avalanche Light +behind the network while syncing this state, the `hypersdk` acts as an Avalanche Lite Client and performs consensus on newly processed blocks without verifying them (updating its state sync target whenever a new block is accepted). @@ -82,7 +82,28 @@ The `hypersdk` relies on [`x/sync`](https://github.com/ava-labs/avalanchego/tree a bandwidth-aware dynamic sync implementation provided by `avalanchego`, to sync to the tip of any `hyperchain`. -#### Pebble as Default +#### Block Pruning +By default, the `hypersdk` only stores what is necessary to build/verfiy the next block +and to help new nodes sync the current state (not execute all historical state transitions). +If the `hypersdk` did not limit block storage grwoth, the storage requirements for validators +would grow at an alarming rate each day (making running any `hypervm` impractical). +Consider the simple example where we process 25k transactions per second (assume each +transaction is ~400 bytes). This would would require the `hypersdk` to store 10MB per +second (not including any overhead in the database for doing so). **This works out to +864GB per day or 20.7TB per year.** + +In practice, this means the `hypersdk` only stores the last 768 accepted blocks the genesis block, +and the last 256 revisions of state (the [ProposerVM](https://github.com/ava-labs/avalanchego/blob/master/vms/proposervm/README.md) +also stores the last 768 blocks). With a 100ms `MinimumBlockGap`, the `hypersdk` must +store at least ~600 blocks to allow for the entire `ValidityWindow` to be backfilled (otherwise +a fully-synced, restarting `hypervm` will not become "ready" until it accepts a block at +least `ValidityWindow` after the last accepted block). + +_The number of blocks and/or state revisions that the `hypersdk` stores, the `AcceptedBlockWindow`, can +be tuned by any `hypervm`. It is not possible, however, to configure the `hypersdk` to store +all historical blocks (the `AcceptedBlockWindow` is pinned to memory)._ + +#### PebbleDB Instead of employing [`goleveldb`](https://github.com/syndtr/goleveldb), the `hypersdk` uses CockroachDB's [`pebble`](https://github.com/cockroachdb/pebble) database for on-disk storage. This database is inspired by LevelDB/RocksDB but offers [a few @@ -129,7 +150,8 @@ All `hypersdk` blocks include a state root to support dynamic state sync. In dyn state sync, the state target is updated to the root of the last accepted block while the sync is ongoing instead of staying pinned to the last accepted root when the sync started. Root block inclusion means consensus can be used to select the next state -target to sync to instead of using some less secure, out-of-consensus mechanism. +target to sync to instead of using some less secure, out-of-consensus mechanism (i.e. +Avalanche Lite Client). Dynamic state sync is required for high-throughput blockchains because it relieves the nodes that serve state sync queries from storing all historical state revisions diff --git a/chain/block.go b/chain/block.go index 701098a31e..29532a5654 100644 --- a/chain/block.go +++ b/chain/block.go @@ -370,12 +370,22 @@ func (b *StatelessBlock) verify(ctx context.Context, stateReady bool) error { // context. Otherwise, the parent block will be used as the execution context. vctx, err := b.vm.GetVerifyContext(ctx, b.Hght, b.Prnt) if err != nil { + b.vm.Logger().Warn("unable to get verify context", + zap.Uint64("height", b.Hght), + zap.Stringer("blkID", b.ID()), + zap.Error(err), + ) return fmt.Errorf("%w: unable to load verify context", err) } // Parent block may not be processed when we verify this block, so [innerVerify] may // recursively verify ancestry. if err := b.innerVerify(ctx, vctx); err != nil { + b.vm.Logger().Warn("verification failed", + zap.Uint64("height", b.Hght), + zap.Stringer("blkID", b.ID()), + zap.Error(err), + ) return err } } @@ -721,25 +731,21 @@ func (b *StatelessBlock) Accept(ctx context.Context) error { return fmt.Errorf("%w: unable to commit block", err) } - // Set last accepted block - return b.SetLastAccepted(ctx) + // Mark block as accepted and update last accepted in storage + b.MarkAccepted(ctx) + return nil } -// SetLastAccepted is called during [Accept] and at the start and end of state -// sync. -func (b *StatelessBlock) SetLastAccepted(ctx context.Context) error { - if err := b.vm.SetLastAccepted(b); err != nil { - return err - } +func (b *StatelessBlock) MarkAccepted(ctx context.Context) { + // Accept block and free unnecessary memory b.st = choices.Accepted b.txsSet = nil // only used for replay protection when processing - // [Accepted] will set in-memory variables needed to ensure we don't resync - // all blocks when state sync finishes + // [Accepted] will persist the block to disk and set in-memory variables + // needed to ensure we don't resync all blocks when state sync finishes. // // Note: We will not call [b.vm.Verified] before accepting during state sync b.vm.Accepted(ctx, b) - return nil } // implements "snowman.Block.choices.Decidable" diff --git a/chain/dependencies.go b/chain/dependencies.go index fb41af56d3..ae3703f84a 100644 --- a/chain/dependencies.go +++ b/chain/dependencies.go @@ -45,7 +45,6 @@ type VM interface { IsBootstrapped() bool LastAcceptedBlock() *StatelessBlock - SetLastAccepted(*StatelessBlock) error GetStatelessBlock(context.Context, ids.ID) (*StatelessBlock, error) GetVerifyContext(ctx context.Context, blockHeight uint64, parent ids.ID) (VerifyContext, error) diff --git a/config/config.go b/config/config.go index 341816b31e..eb56fbacbe 100644 --- a/config/config.go +++ b/config/config.go @@ -12,13 +12,10 @@ import ( "github.com/ava-labs/avalanchego/utils/profiler" "github.com/ava-labs/avalanchego/utils/units" "github.com/ava-labs/hypersdk/trace" - "github.com/ava-labs/hypersdk/vm" ) const avalancheGoMinCPU = 4 -var _ vm.Config = (*Config)(nil) - type Config struct{} func (c *Config) GetLogLevel() logging.Level { return logging.Info } @@ -33,21 +30,24 @@ func (c *Config) GetMempoolSize() int { return 2_048 } func (c *Config) GetMempoolPayerSize() int { return 32 } func (c *Config) GetMempoolExemptPayers() [][]byte { return nil } func (c *Config) GetStreamingBacklogSize() int { return 1024 } -func (c *Config) GetStateHistoryLength() int { return 256 } func (c *Config) GetStateEvictionBatchSize() int { return 4 * units.MiB } func (c *Config) GetIntermediateNodeCacheSize() int { return 2 * units.GiB } func (c *Config) GetValueNodeCacheSize() int { return 2 * units.GiB } -func (c *Config) GetAcceptorSize() int { return 1024 } func (c *Config) GetTraceConfig() *trace.Config { return &trace.Config{Enabled: false} } func (c *Config) GetStateSyncParallelism() int { return 4 } -func (c *Config) GetStateSyncMinBlocks() uint64 { return 256 } func (c *Config) GetStateSyncServerDelay() time.Duration { return 0 } // used for testing -func (c *Config) GetParsedBlockCacheSize() int { return 128 } -func (c *Config) GetAcceptedBlockCacheSize() int { return 128 } + +func (c *Config) GetParsedBlockCacheSize() int { return 128 } +func (c *Config) GetStateHistoryLength() int { return 256 } +func (c *Config) GetAcceptedBlockWindow() int { return 768 } +func (c *Config) GetStateSyncMinBlocks() uint64 { return 768 } +func (c *Config) GetAcceptorSize() int { return 1024 } func (c *Config) GetContinuousProfilerConfig() *profiler.Config { return &profiler.Config{Enabled: false} } func (c *Config) GetVerifySignatures() bool { return true } func (c *Config) GetTargetBuildDuration() time.Duration { return 100 * time.Millisecond } +func (c *Config) GetProcessingBuildSkip() int { return 16 } func (c *Config) GetTargetGossipDuration() time.Duration { return 20 * time.Millisecond } +func (c *Config) GetBlockCompactionFrequency() int { return 32 } // 64 MB of deletion if 2 MB blocks diff --git a/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go b/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go index 996ed8d22b..09ea968afb 100644 --- a/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go +++ b/examples/morpheusvm/cmd/morpheus-cli/cmd/prometheus.go @@ -41,6 +41,9 @@ var generatePrometheusCmd = &cobra.Command{ panels = append(panels, fmt.Sprintf("increase(avalanche_%s_blks_rejected_count[5s])/5", chainID)) utils.Outf("{{yellow}}blocks rejected per second:{{/}} %s\n", panels[len(panels)-1]) + panels = append(panels, fmt.Sprintf("increase(avalanche_%s_vm_hypersdk_vm_deleted_blocks[5s])/5", chainID)) + utils.Outf("{{yellow}}blocks deleted per second:{{/}} %s\n", panels[len(panels)-1]) + panels = append(panels, fmt.Sprintf("avalanche_%s_vm_hypersdk_chain_bandwidth_price", chainID)) utils.Outf("{{yellow}}bandwidth unit price:{{/}} %s\n", panels[len(panels)-1]) diff --git a/examples/morpheusvm/scripts/run.sh b/examples/morpheusvm/scripts/run.sh index 2cb46791b3..645abb887e 100755 --- a/examples/morpheusvm/scripts/run.sh +++ b/examples/morpheusvm/scripts/run.sh @@ -18,16 +18,27 @@ if ! [[ "$0" =~ scripts/run.sh ]]; then fi VERSION=2eabd228952b6b7c9075bc45653f70643d9a5a7c +MAX_UINT64=18446744073709551615 MODE=${MODE:-run} LOGLEVEL=${LOGLEVEL:-info} STATESYNC_DELAY=${STATESYNC_DELAY:-0} MIN_BLOCK_GAP=${MIN_BLOCK_GAP:-100} -CREATE_TARGET=${CREATE_TARGET:-75000} +STORE_TXS=${STORE_TXS:-false} +UNLIMITED_USAGE=${UNLIMITED_USAGE:-false} if [[ ${MODE} != "run" ]]; then LOGLEVEL=debug STATESYNC_DELAY=100000000 # 100ms MIN_BLOCK_GAP=250 #ms - CREATE_TARGET=100000000 # 4M accounts (we send to random addresses) + STORE_TXS=true + UNLIMITED_USAGE=true +fi + +WINDOW_TARGET_UNITS="40000000,450000,450000,450000,450000" +MAX_BLOCK_UNITS="1800000,15000,15000,2500,15000" +if ${UNLIMITED_USAGE}; then + WINDOW_TARGET_UNITS="${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64}" + # If we don't limit the block size, AvalancheGo will reject the block. + MAX_BLOCK_UNITS="1800000,${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64}" fi echo "Running with:" @@ -36,6 +47,9 @@ echo MODE: ${MODE} echo LOG LEVEL: ${LOGLEVEL} echo STATESYNC_DELAY \(ns\): ${STATESYNC_DELAY} echo MIN_BLOCK_GAP \(ms\): ${MIN_BLOCK_GAP} +echo STORE_TXS: ${STORE_TXS} +echo WINDOW_TARGET_UNITS: ${WINDOW_TARGET_UNITS} +echo MAX_BLOCK_UNITS: ${MAX_BLOCK_UNITS} ############################ # build avalanchego @@ -100,7 +114,7 @@ find ${TMPDIR}/avalanchego-${VERSION} # Always create allocations (linter doesn't like tab) echo "creating allocations file" cat < ${TMPDIR}/allocations.json -[{"address":"morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp", "balance":1000000000000}] +[{"address":"morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp", "balance":10000000000000000000}] EOF GENESIS_PATH=$2 @@ -108,8 +122,8 @@ if [[ -z "${GENESIS_PATH}" ]]; then echo "creating VM genesis file with allocations" rm -f ${TMPDIR}/morpheusvm.genesis ${TMPDIR}/morpheus-cli genesis generate ${TMPDIR}/allocations.json \ - --window-target-units "40000000,450000,450000,${CREATE_TARGET},450000" \ - --max-block-units "1800000,15000,15000,2500,15000" \ + --window-target-units ${WINDOW_TARGET_UNITS} \ + --max-block-units ${MAX_BLOCK_UNITS} \ --min-block-gap ${MIN_BLOCK_GAP} \ --genesis-file ${TMPDIR}/morpheusvm.genesis else @@ -132,7 +146,7 @@ cat < ${TMPDIR}/morpheusvm.config "mempoolExemptPayers":["morpheus1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsp30ucp"], "parallelism": 5, "verifySignatures":true, - "storeTransactions":true, + "storeTransactions": ${STORE_TXS}, "streamingBacklogSize": 10000000, "logLevel": "${LOGLEVEL}", "stateSyncServerDelay": ${STATESYNC_DELAY} @@ -150,7 +164,8 @@ echo "creating subnet config" rm -f ${TMPDIR}/morpheusvm.subnet cat < ${TMPDIR}/morpheusvm.subnet { - "proposerMinBlockDelay": 0 + "proposerMinBlockDelay": 0, + "proposerNumHistoricalBlocks": 768 } EOF diff --git a/examples/morpheusvm/tests/e2e/e2e_test.go b/examples/morpheusvm/tests/e2e/e2e_test.go index 232a199d21..f5e26d3703 100644 --- a/examples/morpheusvm/tests/e2e/e2e_test.go +++ b/examples/morpheusvm/tests/e2e/e2e_test.go @@ -30,10 +30,10 @@ import ( ) const ( - startAmount = uint64(1000000000000) + startAmount = uint64(10000000000000000000) sendAmount = uint64(5000) - healthPollInterval = 10 * time.Second + healthPollInterval = 3 * time.Second ) func TestE2e(t *testing.T) { @@ -397,7 +397,7 @@ var _ = ginkgo.Describe("[Test]", func() { ginkgo.It("transfer in a single node (raw)", func() { nativeBalance, err := instances[0].lcli.Balance(context.TODO(), sender) gomega.Ω(err).Should(gomega.BeNil()) - gomega.Ω(nativeBalance).Should(gomega.Equal(uint64(1000000000000))) + gomega.Ω(nativeBalance).Should(gomega.Equal(startAmount)) other, err := ed25519.GeneratePrivateKey() gomega.Ω(err).Should(gomega.BeNil()) @@ -516,7 +516,37 @@ var _ = ginkgo.Describe("[Test]", func() { acceptTransaction(syncClient, lsyncClient) }) - // Create blocks before state sync starts (state sync requires at least 256 + ginkgo.It("becomes ready quickly after restart", func() { + cluster, err := anrCli.RestartNode(context.Background(), "bootstrap") + gomega.Expect(err).To(gomega.BeNil()) + + // Upon restart, the node should be able to read blocks from disk + // to initialize its [seen] index and become ready in less than + // [ValidityWindow]. + start := time.Now() + awaitHealthy(anrCli) + gomega.Expect(time.Since(start) < 20*time.Second).To(gomega.BeTrue()) + + // Update bootstrap info to latest in case it was assigned a new port + nodeURI := cluster.ClusterInfo.NodeInfos["bootstrap"].Uri + uri := nodeURI + fmt.Sprintf("/ext/bc/%s", blockchainID) + bid, err := ids.FromString(blockchainID) + gomega.Expect(err).To(gomega.BeNil()) + hutils.Outf("{{blue}}bootstrap node uri: %s{{/}}\n", uri) + c := rpc.NewJSONRPCClient(uri) + syncClient = c + networkID, _, _, err := syncClient.Network(context.TODO()) + gomega.Expect(err).Should(gomega.BeNil()) + tc := lrpc.NewJSONRPCClient(uri, networkID, bid) + lsyncClient = tc + instances[len(instances)-1] = instance{ + uri: uri, + cli: c, + lcli: tc, + } + }) + + // Create blocks before state sync starts (state sync requires at least 1024 // blocks) // // We do 1024 so that there are a number of ranges of data to fetch. diff --git a/examples/morpheusvm/tests/integration/integration_test.go b/examples/morpheusvm/tests/integration/integration_test.go index 396b905cbf..f592d37a8b 100644 --- a/examples/morpheusvm/tests/integration/integration_test.go +++ b/examples/morpheusvm/tests/integration/integration_test.go @@ -110,6 +110,7 @@ var ( // when used with embedded VMs genesisBytes []byte instances []instance + blocks []snowman.Block networkID uint32 gen *genesis.Genesis @@ -263,6 +264,7 @@ var _ = ginkgo.BeforeSuite(func() { csupply += alloc.Balance } } + blocks = []snowman.Block{} app.instances = instances color.Blue("created %d VMs", vms) @@ -401,6 +403,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(blk.Accept(ctx)).To(gomega.BeNil()) gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted)) + blocks = append(blocks, blk) lastAccepted, err := instances[1].vm.LastAccepted(ctx) gomega.Ω(err).To(gomega.BeNil()) @@ -454,7 +457,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[1]) - results := accept() + results := accept(true) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -532,7 +535,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[1]) - results := accept() + results := accept(true) // Check results gomega.Ω(results).Should(gomega.HaveLen(4)) @@ -608,7 +611,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { }) ginkgo.It("Test processing block handling", func() { - var accept, accept2 func() []*chain.Result + var accept, accept2 func(bool) []*chain.Result ginkgo.By("create processing tip", func() { parser, err := instances[1].lcli.Parser(context.Background()) @@ -643,10 +646,10 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { }) ginkgo.By("clear processing tip", func() { - results := accept() + results := accept(true) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) - results = accept2() + results = accept2(true) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) }) @@ -680,30 +683,19 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { ginkgo.It("ensure unprocessed tip works", func() { ginkgo.By("import accepted blocks to instance 2", func() { ctx := context.TODO() - o := instances[1] - blks := []snowman.Block{} - next, err := o.vm.LastAccepted(ctx) - gomega.Ω(err).Should(gomega.BeNil()) - for { - blk, err := o.vm.GetBlock(ctx, next) - gomega.Ω(err).Should(gomega.BeNil()) - blks = append([]snowman.Block{blk}, blks...) - if blk.Height() == 1 { - break - } - next = blk.Parent() - } + + gomega.Ω(blocks[0].Height()).Should(gomega.Equal(uint64(1))) n := instances[2] - blk1, err := n.vm.ParseBlock(ctx, blks[0].Bytes()) + blk1, err := n.vm.ParseBlock(ctx, blocks[0].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) err = blk1.Verify(ctx) gomega.Ω(err).Should(gomega.BeNil()) // Parse tip - blk2, err := n.vm.ParseBlock(ctx, blks[1].Bytes()) + blk2, err := n.vm.ParseBlock(ctx, blocks[1].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) - blk3, err := n.vm.ParseBlock(ctx, blks[2].Bytes()) + blk3, err := n.vm.ParseBlock(ctx, blocks[2].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) // Verify tip @@ -721,7 +713,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) // Parse another - blk4, err := n.vm.ParseBlock(ctx, blks[3].Bytes()) + blk4, err := n.vm.ParseBlock(ctx, blocks[3].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) err = blk4.Verify(ctx) gomega.Ω(err).Should(gomega.BeNil()) @@ -734,7 +726,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { ginkgo.It("processes valid index transactions (w/block listening)", func() { // Clear previous txs on instance 0 accept := expectBlk(instances[0]) - accept() // don't care about results + accept(false) // don't care about results // Subscribe to blocks cli, err := rpc.NewWebSocketClient(instances[0].WebSocketServer.URL, rpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) @@ -770,7 +762,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) accept = expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -829,7 +821,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { } gomega.Ω(err).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -846,7 +838,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { }) }) -func expectBlk(i instance) func() []*chain.Result { +func expectBlk(i instance) func(bool) []*chain.Result { ctx := context.TODO() // manually signal ready @@ -867,10 +859,14 @@ func expectBlk(i instance) func() []*chain.Result { err = i.vm.SetPreference(ctx, blk.ID()) gomega.Ω(err).To(gomega.BeNil()) - return func() []*chain.Result { + return func(add bool) []*chain.Result { gomega.Ω(blk.Accept(ctx)).To(gomega.BeNil()) gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted)) + if add { + blocks = append(blocks, blk) + } + lastAccepted, err := i.vm.LastAccepted(ctx) gomega.Ω(err).To(gomega.BeNil()) gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID())) diff --git a/examples/tokenvm/scripts/run.sh b/examples/tokenvm/scripts/run.sh index bf2cbf87d6..5bfb95d5b2 100755 --- a/examples/tokenvm/scripts/run.sh +++ b/examples/tokenvm/scripts/run.sh @@ -18,16 +18,27 @@ if ! [[ "$0" =~ scripts/run.sh ]]; then fi VERSION=2eabd228952b6b7c9075bc45653f70643d9a5a7c +MAX_UINT64=18446744073709551615 MODE=${MODE:-run} LOGLEVEL=${LOGLEVEL:-info} STATESYNC_DELAY=${STATESYNC_DELAY:-0} MIN_BLOCK_GAP=${MIN_BLOCK_GAP:-100} -CREATE_TARGET=${CREATE_TARGET:-75000} +STORE_TXS=${STORE_TXS:-false} +UNLIMITED_USAGE=${UNLIMITED_USAGE:-false} if [[ ${MODE} != "run" && ${MODE} != "run-single" ]]; then LOGLEVEL=debug STATESYNC_DELAY=100000000 # 100ms MIN_BLOCK_GAP=250 #ms - CREATE_TARGET=100000000 # 4M accounts (we send to random addresses) + STORE_TXS=true + UNLIMITED_USAGE=true +fi + +WINDOW_TARGET_UNITS="40000000,450000,450000,450000,450000" +MAX_BLOCK_UNITS="1800000,15000,15000,2500,15000" +if ${UNLIMITED_USAGE}; then + WINDOW_TARGET_UNITS="${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64}" + # If we don't limit the block size, AvalancheGo will reject the block. + MAX_BLOCK_UNITS="1800000,${MAX_UINT64},${MAX_UINT64},${MAX_UINT64},${MAX_UINT64}" fi echo "Running with:" @@ -35,6 +46,9 @@ echo VERSION: ${VERSION} echo MODE: ${MODE} echo STATESYNC_DELAY \(ns\): ${STATESYNC_DELAY} echo MIN_BLOCK_GAP \(ms\): ${MIN_BLOCK_GAP} +echo STORE_TXS: ${STORE_TXS} +echo WINDOW_TARGET_UNITS: ${WINDOW_TARGET_UNITS} +echo MAX_BLOCK_UNITS: ${MAX_BLOCK_UNITS} ############################ # build avalanchego @@ -99,7 +113,7 @@ find ${TMPDIR}/avalanchego-${VERSION} # Always create allocations (linter doesn't like tab) echo "creating allocations file" cat < ${TMPDIR}/allocations.json -[{"address":"token1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsjzf3yp", "balance":1000000000000}] +[{"address":"token1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsjzf3yp", "balance":10000000000000000000}] EOF GENESIS_PATH=$2 @@ -107,8 +121,8 @@ if [[ -z "${GENESIS_PATH}" ]]; then echo "creating VM genesis file with allocations" rm -f ${TMPDIR}/tokenvm.genesis ${TMPDIR}/token-cli genesis generate ${TMPDIR}/allocations.json \ - --window-target-units "40000000,450000,450000,${CREATE_TARGET},450000" \ - --max-block-units "1800000,15000,15000,2500,15000" \ + --window-target-units ${WINDOW_TARGET_UNITS} \ + --max-block-units ${MAX_BLOCK_UNITS} \ --min-block-gap ${MIN_BLOCK_GAP} \ --genesis-file ${TMPDIR}/tokenvm.genesis else @@ -134,7 +148,7 @@ cat < ${TMPDIR}/tokenvm.config "mempoolExemptPayers":["token1rvzhmceq997zntgvravfagsks6w0ryud3rylh4cdvayry0dl97nsjzf3yp"], "parallelism": 5, "verifySignatures":true, - "storeTransactions":true, + "storeTransactions": ${STORE_TXS}, "streamingBacklogSize": 10000000, "trackedPairs":["*"], "logLevel": "${LOGLEVEL}", @@ -153,7 +167,8 @@ echo "creating subnet config" rm -f ${TMPDIR}/tokenvm.subnet cat < ${TMPDIR}/tokenvm.subnet { - "proposerMinBlockDelay": 0 + "proposerMinBlockDelay": 0, + "proposerNumHistoricalBlocks": 768 } EOF diff --git a/examples/tokenvm/tests/e2e/e2e_test.go b/examples/tokenvm/tests/e2e/e2e_test.go index 7e94d0d534..be8875c469 100644 --- a/examples/tokenvm/tests/e2e/e2e_test.go +++ b/examples/tokenvm/tests/e2e/e2e_test.go @@ -31,7 +31,7 @@ import ( ) const ( - startAmount = uint64(1000000000000) + startAmount = uint64(10000000000000000000) sendAmount = uint64(5000) healthPollInterval = 10 * time.Second @@ -501,7 +501,7 @@ var _ = ginkgo.Describe("[Test]", func() { ginkgo.It("transfer in a single node (raw)", func() { nativeBalance, err := instancesA[0].tcli.Balance(context.TODO(), sender, ids.Empty) gomega.Ω(err).Should(gomega.BeNil()) - gomega.Ω(nativeBalance).Should(gomega.Equal(uint64(1000000000000))) + gomega.Ω(nativeBalance).Should(gomega.Equal(startAmount)) other, err := ed25519.GeneratePrivateKey() gomega.Ω(err).Should(gomega.BeNil()) diff --git a/examples/tokenvm/tests/integration/integration_test.go b/examples/tokenvm/tests/integration/integration_test.go index 509da043d8..29d8e5f616 100644 --- a/examples/tokenvm/tests/integration/integration_test.go +++ b/examples/tokenvm/tests/integration/integration_test.go @@ -113,6 +113,7 @@ var ( // when used with embedded VMs genesisBytes []byte instances []instance + blocks []snowman.Block networkID uint32 gen *genesis.Genesis @@ -265,6 +266,7 @@ var _ = ginkgo.BeforeSuite(func() { gomega.Ω(owner).Should(gomega.Equal(utils.Address(ed25519.EmptyPublicKey))) gomega.Ω(warp).Should(gomega.BeFalse()) } + blocks = []snowman.Block{} app.instances = instances color.Blue("created %d VMs", vms) @@ -403,6 +405,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(blk.Accept(ctx)).To(gomega.BeNil()) gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted)) + blocks = append(blocks, blk) lastAccepted, err := instances[1].vm.LastAccepted(ctx) gomega.Ω(err).To(gomega.BeNil()) @@ -457,7 +460,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) time.Sleep(2 * time.Second) // for replay test accept := expectBlk(instances[1]) - results := accept() + results := accept(true) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -468,7 +471,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { }) ginkgo.It("Test processing block handling", func() { - var accept, accept2 func() []*chain.Result + var accept, accept2 func(bool) []*chain.Result ginkgo.By("create processing tip", func() { parser, err := instances[1].tcli.Parser(context.Background()) @@ -505,10 +508,10 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { }) ginkgo.By("clear processing tip", func() { - results := accept() + results := accept(true) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) - results = accept2() + results = accept2(true) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) }) @@ -542,30 +545,19 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { ginkgo.It("ensure unprocessed tip and replay protection works", func() { ginkgo.By("import accepted blocks to instance 2", func() { ctx := context.TODO() - o := instances[1] - blks := []snowman.Block{} - next, err := o.vm.LastAccepted(ctx) - gomega.Ω(err).Should(gomega.BeNil()) - for { - blk, err := o.vm.GetBlock(ctx, next) - gomega.Ω(err).Should(gomega.BeNil()) - blks = append([]snowman.Block{blk}, blks...) - if blk.Height() == 1 { - break - } - next = blk.Parent() - } + + gomega.Ω(blocks[0].Height()).Should(gomega.Equal(uint64(1))) n := instances[2] - blk1, err := n.vm.ParseBlock(ctx, blks[0].Bytes()) + blk1, err := n.vm.ParseBlock(ctx, blocks[0].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) err = blk1.Verify(ctx) gomega.Ω(err).Should(gomega.BeNil()) // Parse tip - blk2, err := n.vm.ParseBlock(ctx, blks[1].Bytes()) + blk2, err := n.vm.ParseBlock(ctx, blocks[1].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) - blk3, err := n.vm.ParseBlock(ctx, blks[2].Bytes()) + blk3, err := n.vm.ParseBlock(ctx, blocks[2].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) // Verify tip @@ -591,7 +583,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) // Parse another - blk4, err := n.vm.ParseBlock(ctx, blks[3].Bytes()) + blk4, err := n.vm.ParseBlock(ctx, blocks[3].Bytes()) gomega.Ω(err).Should(gomega.BeNil()) err = blk4.Verify(ctx) gomega.Ω(err).Should(gomega.BeNil()) @@ -607,7 +599,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { ginkgo.It("processes valid index transactions (w/block listening)", func() { // Clear previous txs on instance 0 accept := expectBlk(instances[0]) - accept() // don't care about results + accept(false) // don't care about results // Subscribe to blocks cli, err := rpc.NewWebSocketClient(instances[0].WebSocketServer.URL, rpc.DefaultHandshakeTimeout, pubsub.MaxPendingMessages, pubsub.MaxReadMessageSize) @@ -643,7 +635,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) accept = expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -703,7 +695,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { } gomega.Ω(err).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -739,7 +731,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -766,7 +758,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -830,7 +822,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -868,7 +860,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -910,7 +902,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -945,7 +937,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -984,7 +976,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1052,7 +1044,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1095,7 +1087,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -1136,7 +1128,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1195,7 +1187,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) asset2ID = tx.ID() @@ -1214,7 +1206,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept = expectBlk(instances[0]) - results = accept() + results = accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -1238,7 +1230,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) asset3ID = tx.ID() @@ -1257,7 +1249,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept = expectBlk(instances[0]) - results = accept() + results = accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -1285,7 +1277,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -1323,7 +1315,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1350,7 +1342,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -1388,7 +1380,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1421,7 +1413,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1454,7 +1446,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1487,7 +1479,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeTrue()) @@ -1531,7 +1523,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1559,7 +1551,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeTrue()) @@ -1601,7 +1593,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) gomega.Ω(results[0].Success).Should(gomega.BeTrue()) @@ -1645,7 +1637,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeTrue()) @@ -1840,7 +1832,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { // Build block with context accept := expectBlkWithContext(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1872,7 +1864,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeTrue()) @@ -1916,7 +1908,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { gomega.Ω(err).Should(gomega.BeNil()) gomega.Ω(submit(context.Background())).Should(gomega.BeNil()) accept := expectBlk(instances[0]) - results := accept() + results := accept(false) gomega.Ω(results).Should(gomega.HaveLen(1)) result := results[0] gomega.Ω(result.Success).Should(gomega.BeFalse()) @@ -1924,7 +1916,7 @@ var _ = ginkgo.Describe("[Tx Processing]", func() { }) }) -func expectBlk(i instance) func() []*chain.Result { +func expectBlk(i instance) func(bool) []*chain.Result { ctx := context.TODO() // manually signal ready @@ -1942,10 +1934,14 @@ func expectBlk(i instance) func() []*chain.Result { err = i.vm.SetPreference(ctx, blk.ID()) gomega.Ω(err).To(gomega.BeNil()) - return func() []*chain.Result { + return func(add bool) []*chain.Result { gomega.Ω(blk.Accept(ctx)).To(gomega.BeNil()) gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted)) + if add { + blocks = append(blocks, blk) + } + lastAccepted, err := i.vm.LastAccepted(ctx) gomega.Ω(err).To(gomega.BeNil()) gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID())) @@ -1954,7 +1950,7 @@ func expectBlk(i instance) func() []*chain.Result { } // TODO: unify with expectBlk -func expectBlkWithContext(i instance) func() []*chain.Result { +func expectBlkWithContext(i instance) func(bool) []*chain.Result { ctx := context.TODO() // manually signal ready @@ -1974,10 +1970,14 @@ func expectBlkWithContext(i instance) func() []*chain.Result { err = i.vm.SetPreference(ctx, blk.ID()) gomega.Ω(err).To(gomega.BeNil()) - return func() []*chain.Result { + return func(add bool) []*chain.Result { gomega.Ω(blk.Accept(ctx)).To(gomega.BeNil()) gomega.Ω(blk.Status()).To(gomega.Equal(choices.Accepted)) + if add { + blocks = append(blocks, blk) + } + lastAccepted, err := i.vm.LastAccepted(ctx) gomega.Ω(err).To(gomega.BeNil()) gomega.Ω(lastAccepted).To(gomega.Equal(blk.ID())) diff --git a/pebble/metrics.go b/pebble/metrics.go index 82be352ff8..0e8db36c9e 100644 --- a/pebble/metrics.go +++ b/pebble/metrics.go @@ -12,6 +12,8 @@ import ( "github.com/prometheus/client_golang/prometheus" ) +const metricsInterval = 10 * time.Second + type metrics struct { delayStart time.Time writeStall metric.Averager @@ -21,6 +23,14 @@ type metrics struct { l0Compactions prometheus.Counter otherCompactions prometheus.Counter activeCompactions prometheus.Gauge + + tombstoneCount prometheus.Gauge + obsoleteTableSize prometheus.Gauge + obsoleteTableCount prometheus.Gauge + zombieTableSize prometheus.Gauge + zombieTableCount prometheus.Gauge + obsoleteWALSize prometheus.Gauge + obsoleteWALCount prometheus.Gauge } func newMetrics() (*prometheus.Registry, *metrics, error) { @@ -61,12 +71,54 @@ func newMetrics() (*prometheus.Registry, *metrics, error) { Name: "active_compactions", Help: "number of active compactions", }), + tombstoneCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "tombstone_count", + Help: "approximate count of internal tombstones", + }), + obsoleteTableSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "obsolete_table_size", + Help: "number of bytes present in tables no longer referenced by the db", + }), + obsoleteTableCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "obsolete_table_count", + Help: "number of table files no longer referenced by the db", + }), + zombieTableSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "zombie_table_size", + Help: "number of bytes present in tables no longer referenced by the db that are referenced by iterators", + }), + zombieTableCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "zombie_table_count", + Help: "number of table files no longer referenced by the db that are referenced by iterators", + }), + obsoleteWALSize: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "obsolete_wal_size", + Help: "number of bytes present in WAL no longer needed by the db", + }), + obsoleteWALCount: prometheus.NewGauge(prometheus.GaugeOpts{ + Namespace: "pebble", + Name: "obsolete_wal_count", + Help: "number of WAL files no longer needed by the db", + }), } errs := wrappers.Errs{} errs.Add( r.Register(m.l0Compactions), r.Register(m.otherCompactions), r.Register(m.activeCompactions), + r.Register(m.tombstoneCount), + r.Register(m.obsoleteTableSize), + r.Register(m.obsoleteTableCount), + r.Register(m.zombieTableSize), + r.Register(m.zombieTableCount), + r.Register(m.obsoleteWALSize), + r.Register(m.obsoleteWALCount), ) return r, m, errs.Err } @@ -92,3 +144,24 @@ func (db *Database) onWriteStallBegin(pebble.WriteStallBeginInfo) { func (db *Database) onWriteStallEnd() { db.metrics.writeStall.Observe(float64(time.Since(db.metrics.delayStart))) } + +func (db *Database) collectMetrics() { + t := time.NewTicker(metricsInterval) + defer t.Stop() + + for { + select { + case <-t.C: + metrics := db.db.Metrics() + db.metrics.tombstoneCount.Set(float64(metrics.Keys.TombstoneCount)) + db.metrics.obsoleteTableSize.Set(float64(metrics.Table.ObsoleteSize)) + db.metrics.obsoleteTableCount.Set(float64(metrics.Table.ObsoleteCount)) + db.metrics.zombieTableSize.Set(float64(metrics.Table.ZombieSize)) + db.metrics.zombieTableCount.Set(float64(metrics.Table.ZombieCount)) + db.metrics.obsoleteWALSize.Set(float64(metrics.WAL.ObsoletePhysicalSize)) + db.metrics.obsoleteWALCount.Set(float64(metrics.WAL.ObsoleteFiles)) + case <-db.closing: + return + } + } +} diff --git a/pebble/pebble.go b/pebble/pebble.go index 057af90e80..0ae031d9ab 100644 --- a/pebble/pebble.go +++ b/pebble/pebble.go @@ -30,13 +30,16 @@ type Database struct { db *pebble.DB metrics *metrics - closed utils.Atomic[bool] + // We use an atomic bool for most + // checks because it is much faster + // than checking if a channel is closed. + closing chan struct{} + closed utils.Atomic[bool] } type Config struct { CacheSize int // B BytesPerSync int // B - WALBytesPerSync int // B (0 disables) MemTableStopWritesThreshold int // num tables MemTableSize int // B MaxOpenFiles int @@ -47,7 +50,6 @@ func NewDefaultConfig() Config { return Config{ CacheSize: 1024 * 1024 * 1024, BytesPerSync: 1024 * 1024, - WALBytesPerSync: 1024 * 1024, MemTableStopWritesThreshold: 8, MemTableSize: 16 * 1024 * 1024, MaxOpenFiles: 4_096, @@ -57,18 +59,10 @@ func NewDefaultConfig() Config { func New(file string, cfg Config) (database.Database, *prometheus.Registry, error) { // These default settings are based on https://github.com/ethereum/go-ethereum/blob/master/ethdb/pebble/pebble.go - d := &Database{} + d := &Database{closing: make(chan struct{})} opts := &pebble.Options{ - Cache: pebble.NewCache(int64(cfg.CacheSize)), - BytesPerSync: cfg.BytesPerSync, - // Although we use `pebble.NoSync`, we still keep the WAL enabled. Pebble - // will fsync the WAL during shutdown and should ensure the db is - // recoverable if shutdown correctly. - // - // TODO: consider re-enabling: - // * https://github.com/cockroachdb/pebble/issues/2624 - // * https://github.com/ethereum/go-ethereum/pull/27522 - WALBytesPerSync: cfg.WALBytesPerSync, + Cache: pebble.NewCache(int64(cfg.CacheSize)), + BytesPerSync: cfg.BytesPerSync, MemTableStopWritesThreshold: cfg.MemTableStopWritesThreshold, MemTableSize: cfg.MemTableSize, MaxOpenFiles: cfg.MaxOpenFiles, @@ -106,11 +100,13 @@ func New(file string, cfg Config) (database.Database, *prometheus.Registry, erro return nil, nil, err } d.db = db + go d.collectMetrics() return d, registry, nil } func (db *Database) Close() error { - db.closed.Set(true) + defer db.closed.Set(true) + close(db.closing) return updateError(db.db.Close()) } @@ -147,19 +143,16 @@ func (db *Database) Get(key []byte) ([]byte, error) { // Put sets the value of the provided key to the provided value func (db *Database) Put(key []byte, value []byte) error { - // Use of [pebble.NoSync] here means we don't wait for the [Set] to be - // persisted to the WAL before returning. Basic benchmarking indicates that - // waiting for the WAL to sync reduces performance by 20%. - return updateError(db.db.Set(key, value, pebble.NoSync)) + return updateError(db.db.Set(key, value, pebble.Sync)) } // Delete removes the key from the database func (db *Database) Delete(key []byte) error { - return updateError(db.db.Delete(key, pebble.NoSync)) + return updateError(db.db.Delete(key, pebble.Sync)) } func (db *Database) Compact(start []byte, limit []byte) error { - return updateError(db.db.Compact(start, limit, true)) + return updateError(db.db.Compact(start, limit, false)) } // batch is a wrapper around a pebbleDB batch to contain sizes. @@ -175,13 +168,13 @@ func (db *Database) NewBatch() database.Batch { return &batch{batch: db.db.NewBa // Put the value into the batch for later writing func (b *batch) Put(key, value []byte) error { b.size += len(key) + len(value) + 8 // TODO: find byte overhead - return b.batch.Set(key, value, pebble.NoSync) + return b.batch.Set(key, value, pebble.Sync) } // Delete the key during writing func (b *batch) Delete(key []byte) error { b.size += len(key) + 8 // TODO: find byte overhead - return b.batch.Delete(key, pebble.NoSync) + return b.batch.Delete(key, pebble.Sync) } // Size retrieves the amount of data queued up for writing. @@ -189,7 +182,8 @@ func (b *batch) Size() int { return b.size } // Write flushes any accumulated data to disk. func (b *batch) Write() error { - return updateError(b.batch.Commit(pebble.NoSync)) + defer b.batch.Close() + return updateError(b.batch.Commit(pebble.Sync)) } // Reset resets the batch for reuse. diff --git a/vm/dependencies.go b/vm/dependencies.go index 0ccfac942e..b277f5ec6f 100644 --- a/vm/dependencies.go +++ b/vm/dependencies.go @@ -47,10 +47,12 @@ type Config interface { GetStateSyncMinBlocks() uint64 GetStateSyncServerDelay() time.Duration GetParsedBlockCacheSize() int - GetAcceptedBlockCacheSize() int + GetAcceptedBlockWindow() int GetContinuousProfilerConfig() *profiler.Config GetTargetBuildDuration() time.Duration + GetProcessingBuildSkip() int GetTargetGossipDuration() time.Duration + GetBlockCompactionFrequency() int } type Genesis interface { diff --git a/vm/errors.go b/vm/errors.go index be5b6b97c6..141293eebc 100644 --- a/vm/errors.go +++ b/vm/errors.go @@ -14,4 +14,5 @@ var ( ErrStateMissing = errors.New("state missing") ErrStateSyncing = errors.New("state still syncing") ErrUnexpectedStateRoot = errors.New("unexpected state root") + ErrTooManyProcessing = errors.New("too many processing") ) diff --git a/vm/metrics.go b/vm/metrics.go index f5587425fb..9b6133fece 100644 --- a/vm/metrics.go +++ b/vm/metrics.go @@ -21,6 +21,7 @@ type Metrics struct { buildCapped prometheus.Counter emptyBlockBuilt prometheus.Counter clearedMempool prometheus.Counter + deletedBlocks prometheus.Counter mempoolSize prometheus.Gauge bandwidthPrice prometheus.Gauge computePrice prometheus.Gauge @@ -169,6 +170,11 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) { Name: "cleared_mempool", Help: "number of times cleared mempool while building", }), + deletedBlocks: prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "vm", + Name: "deleted_blocks", + Help: "number of blocks deleted", + }), mempoolSize: prometheus.NewGauge(prometheus.GaugeOpts{ Namespace: "chain", Name: "mempool_size", @@ -222,6 +228,7 @@ func newMetrics() (*prometheus.Registry, *Metrics, error) { r.Register(m.buildCapped), r.Register(m.emptyBlockBuilt), r.Register(m.clearedMempool), + r.Register(m.deletedBlocks), r.Register(m.bandwidthPrice), r.Register(m.computePrice), r.Register(m.storageReadPrice), diff --git a/vm/resolutions.go b/vm/resolutions.go index 90d8384100..bc3bc19af0 100644 --- a/vm/resolutions.go +++ b/vm/resolutions.go @@ -251,11 +251,19 @@ func (vm *VM) Accepted(ctx context.Context, b *chain.StatelessBlock) { defer span.End() vm.metrics.txsAccepted.Add(float64(len(b.Txs))) - vm.blocks.Put(b.ID(), b) + + // Update accepted blocks on-disk and caches + if err := vm.UpdateLastAccepted(b); err != nil { + vm.Fatal("unable to update last accepted", zap.Error(err)) + } + + // Remove from verified caches + // + // We do this after setting [lastAccepted] to avoid + // a race where the block isn't accessible. vm.verifiedL.Lock() delete(vm.verifiedBlocks, b.ID()) vm.verifiedL.Unlock() - vm.lastAccepted = b // Update replay protection heap // diff --git a/vm/storage.go b/vm/storage.go index 656132bfae..64231a1cff 100644 --- a/vm/storage.go +++ b/vm/storage.go @@ -8,6 +8,8 @@ import ( "context" "encoding/binary" "errors" + "fmt" + "math/rand" "time" "github.com/ava-labs/avalanchego/cache" @@ -15,91 +17,137 @@ import ( "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/utils/crypto/bls" "github.com/ava-labs/avalanchego/vms/platformvm/warp" + "go.uber.org/zap" "github.com/ava-labs/hypersdk/chain" "github.com/ava-labs/hypersdk/consts" "github.com/ava-labs/hypersdk/keys" ) +// compactionOffset is used to randomize the height that we compact +// deleted blocks. This prevents all nodes on the network from deleting +// data from disk at the same time. +var compactionOffset int = -1 + +func init() { + rand.Seed(time.Now().UnixNano()) +} + const ( - idPrefix = 0x0 - heightPrefix = 0x1 - warpSignaturePrefix = 0x2 - warpFetchPrefix = 0x3 + blockPrefix = 0x0 + warpSignaturePrefix = 0x1 + warpFetchPrefix = 0x2 ) var ( - lastAccepted = []byte("last_accepted") isSyncing = []byte("is_syncing") + lastAccepted = []byte("last_accepted") signatureLRU = &cache.LRU[string, *chain.WarpSignature]{Size: 1024} ) -func PrefixBlockIDKey(id ids.ID) []byte { - k := make([]byte, 1+consts.IDLen) - k[0] = idPrefix - copy(k[1:], id[:]) - return k -} - func PrefixBlockHeightKey(height uint64) []byte { k := make([]byte, 1+consts.Uint64Len) - k[0] = heightPrefix + k[0] = blockPrefix binary.BigEndian.PutUint64(k[1:], height) return k } -func (vm *VM) SetLastAccepted(block *chain.StatelessBlock) error { - var ( - bid = block.ID() - vmDB = vm.vmDB - ) - if err := vmDB.Put(lastAccepted, bid[:]); err != nil { - return err - } - if err := vmDB.Put(PrefixBlockIDKey(bid), block.Bytes()); err != nil { - return err - } - // TODO: store block bytes at height to reduce amount of compaction - if err := vmDB.Put(PrefixBlockHeightKey(block.Height()), bid[:]); err != nil { - return err - } - return nil +func (vm *VM) HasGenesis() (bool, error) { + return vm.HasDiskBlock(0) +} + +func (vm *VM) GetGenesis() (*chain.StatefulBlock, error) { + return vm.GetDiskBlock(0) +} + +func (vm *VM) SetLastAcceptedHeight(height uint64) error { + return vm.vmDB.Put(lastAccepted, binary.BigEndian.AppendUint64(nil, height)) } func (vm *VM) HasLastAccepted() (bool, error) { return vm.vmDB.Has(lastAccepted) } -func (vm *VM) GetLastAccepted() (ids.ID, error) { - v, err := vm.vmDB.Get(lastAccepted) - if errors.Is(err, database.ErrNotFound) { - return ids.ID{}, nil - } +func (vm *VM) GetLastAcceptedHeight() (uint64, error) { + b, err := vm.vmDB.Get(lastAccepted) if err != nil { - return ids.ID{}, err + return 0, err + } + return binary.BigEndian.Uint64(b), nil +} + +func (vm *VM) shouldComapct(expiryHeight uint64) bool { + if compactionOffset == -1 { + compactionOffset = rand.Intn(vm.config.GetBlockCompactionFrequency()) //nolint:gosec + vm.Logger().Info("setting compaction offset", zap.Int("n", compactionOffset)) + } + return expiryHeight%uint64(vm.config.GetBlockCompactionFrequency()) == uint64(compactionOffset) +} + +// UpdateLastAccepted updates the [lastAccepted] index, stores [blk] on-disk, +// adds [blk] to the [acceptedCache], and deletes any expired blocks from +// disk. +// +// Blocks written to disk are only used when restarting the node. During normal +// operation, we only fetch blocks from memory. +// +// We store blocks by height because it doesn't cause nearly as much +// compaction as storing blocks randomly on-disk (when using [block.ID]). +func (vm *VM) UpdateLastAccepted(blk *chain.StatelessBlock) error { + batch := vm.vmDB.NewBatch() + if err := batch.Put(lastAccepted, binary.BigEndian.AppendUint64(nil, blk.Height())); err != nil { + return err + } + if err := batch.Put(PrefixBlockHeightKey(blk.Height()), blk.Bytes()); err != nil { + return err + } + expiryHeight := blk.Height() - uint64(vm.config.GetAcceptedBlockWindow()) + var expired bool + if expiryHeight > 0 && expiryHeight < blk.Height() { // ensure we don't free genesis + if err := batch.Delete(PrefixBlockHeightKey(expiryHeight)); err != nil { + return err + } + expired = true + vm.metrics.deletedBlocks.Inc() + } + if err := batch.Write(); err != nil { + return fmt.Errorf("%w: unable to update last accepted", err) } - return ids.ToID(v) + vm.lastAccepted = blk + vm.acceptedBlocksByID.Put(blk.ID(), blk) + vm.acceptedBlocksByHeight.Put(blk.Height(), blk.ID()) + if expired && vm.shouldComapct(expiryHeight) { + go func() { + start := time.Now() + if err := vm.CompactDiskBlocks(expiryHeight); err != nil { + vm.Logger().Error("unable to compact blocks", zap.Error(err)) + return + } + vm.Logger().Info("compacted disk blocks", zap.Uint64("end", expiryHeight), zap.Duration("t", time.Since(start))) + }() + } + return nil } -func (vm *VM) GetDiskBlock(bid ids.ID) (*chain.StatefulBlock, error) { - b, err := vm.vmDB.Get(PrefixBlockIDKey(bid)) +func (vm *VM) GetDiskBlock(height uint64) (*chain.StatefulBlock, error) { + b, err := vm.vmDB.Get(PrefixBlockHeightKey(height)) if err != nil { return nil, err } return chain.UnmarshalBlock(b, vm) } -func (vm *VM) DeleteDiskBlock(bid ids.ID) error { - return vm.vmDB.Delete(PrefixBlockIDKey(bid)) +func (vm *VM) HasDiskBlock(height uint64) (bool, error) { + return vm.vmDB.Has(PrefixBlockHeightKey(height)) } -func (vm *VM) GetDiskBlockIDAtHeight(height uint64) (ids.ID, error) { - v, err := vm.vmDB.Get(PrefixBlockHeightKey(height)) - if err != nil { - return ids.Empty, nil - } - return ids.ToID(v) +// CompactDiskBlocks forces compaction on the entire range of blocks up to [lastExpired]. +// +// This can be used to ensure we clean up all large tombstoned keys on a regular basis instead +// of waiting for the database to run a compaction (and potentially delete GBs of data at once). +func (vm *VM) CompactDiskBlocks(lastExpired uint64) error { + return vm.vmDB.Compact([]byte{blockPrefix}, PrefixBlockHeightKey(lastExpired)) } func (vm *VM) GetDiskIsSyncing() (bool, error) { diff --git a/vm/syncervm_client.go b/vm/syncervm_client.go index e17cb6b829..33992e92bf 100644 --- a/vm/syncervm_client.go +++ b/vm/syncervm_client.go @@ -71,6 +71,11 @@ func (s *stateSyncerClient) AcceptedSyncableBlock( sb *chain.SyncableBlock, ) (block.StateSyncMode, error) { s.init = true + s.vm.snowCtx.Log.Info("accepted syncable block", + zap.Uint64("height", sb.Height()), + zap.Stringer("blockID", sb.ID()), + ) + // If we did not finish syncing, we must state sync. syncing, err := s.vm.GetDiskIsSyncing() if err != nil { @@ -83,10 +88,7 @@ func (s *stateSyncerClient) AcceptedSyncableBlock( zap.Uint64("lastAccepted", s.vm.lastAccepted.Hght), zap.Uint64("syncableHeight", sb.Height()), ) - - // We should backfill the emap if we are starting from the last accepted - // block to avoid unnecessarily waiting for txs if we have recent blocks. - s.startingSync(false) + s.startedSync = true // We trigger [done] immediately so we let the engine know we are // synced as soon as the [ValidityWindow] worth of txs are verified. @@ -114,10 +116,7 @@ func (s *stateSyncerClient) AcceptedSyncableBlock( zap.Stringer("summary", sb), zap.Bool("already syncing", syncing), ) - - // We don't backfill emap with old data because we are going to skip ahead - // from the last accepted block. - s.startingSync(true) + s.startedSync = true // Initialize metrics for sync client r := prometheus.NewRegistry() @@ -160,9 +159,7 @@ func (s *stateSyncerClient) AcceptedSyncableBlock( // Update the last accepted to the state target block, // since we don't want bootstrapping to fetch all the blocks // from genesis to the sync target. - if err := s.target.SetLastAccepted(context.Background()); err != nil { - return block.StateSyncSkipped, err - } + s.target.MarkAccepted(context.Background()) // Kickoff state syncing from [s.target] if err := s.syncManager.Start(context.Background()); err != nil { @@ -202,9 +199,7 @@ func (s *stateSyncerClient) finishSync() error { // // NOTE: There may be a number of verified but unaccepted blocks above this // block. - if err := s.target.SetLastAccepted(context.Background()); err != nil { - return err - } + s.target.MarkAccepted(context.Background()) } return s.vm.PutDiskIsSyncing(false) } @@ -268,72 +263,3 @@ func (s *stateSyncerClient) UpdateSyncTarget(b *chain.StatelessBlock) (bool, err s.targetUpdated = true // Set [targetUpdated] so we call SetLastAccepted on finish return true, nil // Sync root target updated successfully } - -// startingSync is called before [AcceptedSyncableBlock] returns -func (s *stateSyncerClient) startingSync(state bool) { - s.startedSync = true - vm := s.vm - - // If state sync, we pessimistically assume nothing we have on-disk will - // be useful (as we will jump ahead to some future block). - if state { - return - } - - // Exit early if we don't have any blocks other than genesis (which - // contains no transactions) - blk := vm.lastAccepted - if blk.Hght == 0 { - vm.snowCtx.Log.Info("no seen transactions to backfill") - vm.startSeenTime = 0 - vm.seenValidityWindowOnce.Do(func() { - close(vm.seenValidityWindow) - }) - return - } - - // Backfill [vm.seen] with lifeline worth of transactions - r := vm.Rules(vm.lastAccepted.Tmstmp) - oldest := uint64(0) - var err error - for { - if vm.lastAccepted.Tmstmp-blk.Tmstmp > r.GetValidityWindow() { - // We are assured this function won't be running while we accept - // a block, so we don't need to protect against closing this channel - // twice. - vm.seenValidityWindowOnce.Do(func() { - close(vm.seenValidityWindow) - }) - break - } - - // It is ok to add transactions from newest to oldest - vm.seen.Add(blk.Txs) - vm.startSeenTime = blk.Tmstmp - oldest = blk.Hght - - // Exit early if next block to fetch is genesis (which contains no - // txs) - if blk.Hght <= 1 { - // If we have walked back from the last accepted block to genesis, then - // we can be sure we have all required transactions to start validation. - vm.startSeenTime = 0 - vm.seenValidityWindowOnce.Do(func() { - close(vm.seenValidityWindow) - }) - break - } - - // Set next blk in lookback - blk, err = vm.GetStatelessBlock(context.Background(), blk.Prnt) - if err != nil { - vm.snowCtx.Log.Error("could not load block, exiting backfill", zap.Error(err)) - break - } - } - vm.snowCtx.Log.Info( - "backfilled seen txs", - zap.Uint64("start", oldest), - zap.Uint64("finish", vm.lastAccepted.Hght), - ) -} diff --git a/vm/syncervm_server.go b/vm/syncervm_server.go index c16bd5628b..9ba1011a7f 100644 --- a/vm/syncervm_server.go +++ b/vm/syncervm_server.go @@ -33,8 +33,10 @@ func (vm *VM) GetStateSummary(ctx context.Context, height uint64) (block.StateSu return nil, err } summary := chain.NewSyncableBlock(block) - vm.Logger(). - Info("Serving syncable block at requested height", zap.Uint64("height", height), zap.Stringer("summary", summary)) + vm.Logger().Info("Serving syncable block at requested height", + zap.Uint64("height", height), + zap.Stringer("summary", summary), + ) return summary, nil } diff --git a/vm/vm.go b/vm/vm.go index e9c9d7d5ce..59d26f0c22 100644 --- a/vm/vm.go +++ b/vm/vm.go @@ -30,12 +30,12 @@ import ( "github.com/ava-labs/avalanchego/version" "github.com/ava-labs/avalanchego/x/merkledb" syncEng "github.com/ava-labs/avalanchego/x/sync" + hcache "github.com/ava-labs/hypersdk/cache" "github.com/prometheus/client_golang/prometheus" "go.uber.org/zap" "github.com/ava-labs/hypersdk/builder" - hcache "github.com/ava-labs/hypersdk/cache" "github.com/ava-labs/hypersdk/chain" "github.com/ava-labs/hypersdk/emap" "github.com/ava-labs/hypersdk/gossiper" @@ -78,10 +78,6 @@ type VM struct { seenValidityWindowOnce sync.Once seenValidityWindow chan struct{} - // cache block objects to optimize "GetBlockStateless" - // only put when a block is accepted - blocks *hcache.FIFO[ids.ID, *chain.StatelessBlock] - // We cannot use a map here because we may parse blocks up in the ancestry parsedBlocks *cache.LRU[ids.ID, *chain.StatelessBlock] @@ -90,6 +86,11 @@ type VM struct { verifiedL sync.RWMutex verifiedBlocks map[ids.ID]*chain.StatelessBlock + // We store the last [AcceptedBlockWindow] blocks in memory + // to avoid reading blocks from disk. + acceptedBlocksByID *hcache.FIFO[ids.ID, *chain.StatelessBlock] + acceptedBlocksByHeight *hcache.FIFO[uint64, ids.ID] + // Accepted block queue acceptedQueue chan *chain.StatelessBlock acceptorDone chan struct{} @@ -102,6 +103,7 @@ type VM struct { sigWorkers workers.Workers bootstrapped utils.Atomic[bool] + genesisBlk *chain.StatelessBlock preferred ids.ID lastAccepted *chain.StatelessBlock toEngine chan<- common.Message @@ -236,7 +238,11 @@ func (vm *VM) Initialize( vm.parsedBlocks = &cache.LRU[ids.ID, *chain.StatelessBlock]{Size: vm.config.GetParsedBlockCacheSize()} vm.verifiedBlocks = make(map[ids.ID]*chain.StatelessBlock) - vm.blocks, err = hcache.NewFIFO[ids.ID, *chain.StatelessBlock](vm.config.GetAcceptedBlockCacheSize()) + vm.acceptedBlocksByID, err = hcache.NewFIFO[ids.ID, *chain.StatelessBlock](vm.config.GetAcceptedBlockWindow()) + if err != nil { + return err + } + vm.acceptedBlocksByHeight, err = hcache.NewFIFO[uint64, ids.ID](vm.config.GetAcceptedBlockWindow()) if err != nil { return err } @@ -257,20 +263,38 @@ func (vm *VM) Initialize( return err } if has { //nolint:nestif - blkID, err := vm.GetLastAccepted() + statefulGenesis, err := vm.GetGenesis() + if err != nil { + snowCtx.Log.Error("could not get genesis", zap.Error(err)) + return err + } + genesisBlk, err := chain.ParseStatefulBlock(ctx, statefulGenesis, nil, choices.Accepted, vm) + if err != nil { + snowCtx.Log.Error("could not parse genesis", zap.Error(err)) + return err + } + vm.genesisBlk = genesisBlk + lastAcceptedHeight, err := vm.GetLastAcceptedHeight() if err != nil { snowCtx.Log.Error("could not get last accepted", zap.Error(err)) return err } - - blk, err := vm.GetStatelessBlock(ctx, blkID) + statefulBlock, err := vm.GetDiskBlock(lastAcceptedHeight) if err != nil { - snowCtx.Log.Error("could not load last accepted", zap.Error(err)) + snowCtx.Log.Error("could not get last accepted block", zap.Error(err)) return err } - - vm.preferred, vm.lastAccepted = blkID, blk - snowCtx.Log.Info("initialized vm from last accepted", zap.Stringer("block", blkID)) + blk, err := chain.ParseStatefulBlock(ctx, statefulBlock, nil, choices.Accepted, vm) + if err != nil { + snowCtx.Log.Error("could not parse last accepted", zap.Error(err)) + return err + } + vm.preferred, vm.lastAccepted = blk.ID(), blk + if err := vm.loadAcceptedBlocks(ctx); err != nil { + snowCtx.Log.Error("could not load accepted blocks from disk", zap.Error(err)) + return err + } + snowCtx.Log.Info("initialized vm from last accepted", zap.Stringer("block", blk.ID())) } else { // Set balances and compute genesis root sps := state.NewSimpleMutable(vm.stateDB) @@ -330,13 +354,13 @@ func (vm *VM) Initialize( } // Update last accepted and preferred block - if err := vm.SetLastAccepted(genesisBlk); err != nil { - snowCtx.Log.Error("could not set genesis as last accepted", zap.Error(err)) + vm.genesisBlk = genesisBlk + if err := vm.UpdateLastAccepted(genesisBlk); err != nil { + snowCtx.Log.Error("could not set genesis block as last accepted", zap.Error(err)) return err } gBlkID := genesisBlk.ID() vm.preferred, vm.lastAccepted = gBlkID, genesisBlk - vm.blocks.Put(gBlkID, genesisBlk) snowCtx.Log.Info("initialized vm from genesis", zap.Stringer("block", gBlkID)) } go vm.processAcceptedBlocks() @@ -458,6 +482,7 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { vm.Logger().Info("state sync started") return nil case snow.Bootstrapping: + // Ensure state sync client marks itself as done if it was never started syncStarted := vm.stateSyncClient.Started() if !syncStarted { // We must check if we finished syncing before starting bootstrapping. @@ -474,10 +499,22 @@ func (vm *VM) SetState(_ context.Context, state snow.State) error { // node database. return ErrStateSyncing } + // If we weren't previously syncing, we force state syncer completion so // that the node will mark itself as ready. vm.stateSyncClient.ForceDone() + + // TODO: add a config to FATAL here if could not state sync (likely won't be + // able to recover in networks where no one has the full state, bypass + // still starts sync): https://github.com/ava-labs/hypersdk/issues/438 } + + // Backfill seen transactions, if any. This will exit as soon as we reach + // a block we no longer have on disk or if we have walked back the full + // [ValidityWindow]. + vm.backfillSeenTransactions() + + // Trigger that bootstrapping has started vm.Logger().Info("bootstrapping started", zap.Bool("state sync started", syncStarted)) return vm.onBootstrapStarted() case snow.NormalOp: @@ -587,6 +624,8 @@ func (vm *VM) HealthCheck(context.Context) (interface{}, error) { // implements "block.ChainVM.commom.VM.Getter" // replaces "core.SnowmanVM.GetBlock" +// +// This is ONLY called on accepted blocks pre-ProposerVM fork. func (vm *VM) GetBlock(ctx context.Context, id ids.ID) (snowman.Block, error) { ctx, span := vm.tracer.Start(ctx, "VM.GetBlock") defer span.End() @@ -596,16 +635,10 @@ func (vm *VM) GetBlock(ctx context.Context, id ids.ID) (snowman.Block, error) { } func (vm *VM) GetStatelessBlock(ctx context.Context, blkID ids.ID) (*chain.StatelessBlock, error) { - ctx, span := vm.tracer.Start(ctx, "VM.GetStatelessBlock") + _, span := vm.tracer.Start(ctx, "VM.GetStatelessBlock") defer span.End() - // has the block been cached from previous "Accepted" call - blk, exist := vm.blocks.Get(blkID) - if exist { - return blk, nil - } - - // has the block been verified, not yet accepted + // Check if verified block vm.verifiedL.RLock() if blk, exists := vm.verifiedBlocks[blkID]; exists { vm.verifiedL.RUnlock() @@ -613,13 +646,24 @@ func (vm *VM) GetStatelessBlock(ctx context.Context, blkID ids.ID) (*chain.State } vm.verifiedL.RUnlock() - // not found in memory, fetch from disk if accepted - stBlk, err := vm.GetDiskBlock(blkID) - if err != nil { - return nil, err + // Check if last accepted + if vm.lastAccepted.ID() == blkID { + return vm.lastAccepted, nil + } + + // Check if genesis + if vm.genesisBlk.ID() == blkID { + return vm.genesisBlk, nil } - // If block on disk, it must've been accepted - return chain.ParseStatefulBlock(ctx, stBlk, nil, choices.Accepted, vm) + + // Check if recently accepted block + if blk, ok := vm.acceptedBlocksByID.Get(blkID); ok { + return blk, nil + } + + // If we don't know about the block or the block is past the + // [AcceptedBlockWindow], we return a not found error. + return nil, database.ErrNotFound } // implements "block.ChainVM.commom.VM.Parser" @@ -683,6 +727,14 @@ func (vm *VM) buildBlock(ctx context.Context, blockContext *smblock.Context) (sn // of the mempool. defer vm.checkActivity(ctx) + vm.verifiedL.RLock() + processingBlocks := len(vm.verifiedBlocks) + vm.verifiedL.RUnlock() + if processingBlocks > vm.config.GetProcessingBuildSkip() { + vm.snowCtx.Log.Warn("not building block", zap.Error(ErrTooManyProcessing)) + return nil, ErrTooManyProcessing + } + // Build block and store as parsed preferredBlk, err := vm.GetStatelessBlock(ctx, vm.preferred) if err != nil { @@ -952,16 +1004,111 @@ func (*VM) VerifyHeightIndex(context.Context) error { return nil } // GetBlockIDAtHeight implements snowmanblock.HeightIndexedChainVM // Note: must return database.ErrNotFound if the index at height is unknown. // -// This is ONLY called pre-ProposerVM fork. Since we fork immediately after -// genesis, we only need to be return the blockID of genesis. +// This is called by the VM pre-ProposerVM fork and by the sync server +// in [GetStateSummary]. func (vm *VM) GetBlockIDAtHeight(_ context.Context, height uint64) (ids.ID, error) { - if height != 0 { - return ids.ID{}, database.ErrNotFound + if height == vm.lastAccepted.Height() { + return vm.lastAccepted.ID(), nil + } + if height == vm.genesisBlk.Height() { + return vm.genesisBlk.ID(), nil + } + if blkID, ok := vm.acceptedBlocksByHeight.Get(height); ok { + return blkID, nil } + return ids.ID{}, database.ErrNotFound +} + +// backfillSeenTransactions makes a best effort to populate [vm.seen] +// with whatever transactions we already have on-disk. This will lead +// a node to becoming ready faster during a restart. +func (vm *VM) backfillSeenTransactions() { + // Exit early if we don't have any blocks other than genesis (which + // contains no transactions) + blk := vm.lastAccepted + if blk.Hght == 0 { + vm.snowCtx.Log.Info("no seen transactions to backfill") + vm.startSeenTime = 0 + vm.seenValidityWindowOnce.Do(func() { + close(vm.seenValidityWindow) + }) + return + } + + // Backfill [vm.seen] with lifeline worth of transactions + r := vm.Rules(vm.lastAccepted.Tmstmp) + oldest := uint64(0) + for { + if vm.lastAccepted.Tmstmp-blk.Tmstmp > r.GetValidityWindow() { + // We are assured this function won't be running while we accept + // a block, so we don't need to protect against closing this channel + // twice. + vm.seenValidityWindowOnce.Do(func() { + close(vm.seenValidityWindow) + }) + break + } + + // It is ok to add transactions from newest to oldest + vm.seen.Add(blk.Txs) + vm.startSeenTime = blk.Tmstmp + oldest = blk.Hght + + // Exit early if next block to fetch is genesis (which contains no + // txs) + if blk.Hght <= 1 { + // If we have walked back from the last accepted block to genesis, then + // we can be sure we have all required transactions to start validation. + vm.startSeenTime = 0 + vm.seenValidityWindowOnce.Do(func() { + close(vm.seenValidityWindow) + }) + break + } - // TODO: remove support for looking up blockIDs by height - // and store genesis ID in memory. - return vm.GetDiskBlockIDAtHeight(height) + // Set next blk in lookback + tblk, err := vm.GetStatelessBlock(context.Background(), blk.Prnt) + if err != nil { + vm.snowCtx.Log.Info("could not load block, exiting backfill", + zap.Uint64("height", blk.Height()-1), + zap.Stringer("blockID", blk.Prnt), + zap.Error(err), + ) + return + } + blk = tblk + } + vm.snowCtx.Log.Info( + "backfilled seen txs", + zap.Uint64("start", oldest), + zap.Uint64("finish", vm.lastAccepted.Hght), + ) +} + +func (vm *VM) loadAcceptedBlocks(ctx context.Context) error { + start := uint64(0) + lookback := uint64(vm.config.GetAcceptedBlockWindow()) - 1 // include latest + if vm.lastAccepted.Hght > lookback { + start = vm.lastAccepted.Hght - lookback + } + for i := start; i <= vm.lastAccepted.Hght; i++ { + stBlk, err := vm.GetDiskBlock(i) + if err != nil { + vm.snowCtx.Log.Info("could not find block on-disk", zap.Uint64("height", i)) + continue + } + blk, err := chain.ParseStatefulBlock(ctx, stBlk, nil, choices.Accepted, vm) + if err != nil { + return fmt.Errorf("%w: unable to parse block from disk", err) + } + vm.acceptedBlocksByID.Put(blk.ID(), blk) + vm.acceptedBlocksByHeight.Put(blk.Height(), blk.ID()) + } + vm.snowCtx.Log.Info("loaded blocks from disk", + zap.Uint64("start", start), + zap.Uint64("finish", vm.lastAccepted.Hght), + ) + return nil } // Fatal logs the provided message and then panics to force an exit. diff --git a/vm/vm_test.go b/vm/vm_test.go index 124bdf451d..b6d3713905 100644 --- a/vm/vm_test.go +++ b/vm/vm_test.go @@ -8,14 +8,17 @@ import ( "testing" ametrics "github.com/ava-labs/avalanchego/api/metrics" + "github.com/ava-labs/avalanchego/database/manager" "github.com/ava-labs/avalanchego/ids" "github.com/ava-labs/avalanchego/snow" "github.com/ava-labs/avalanchego/utils/logging" + "github.com/ava-labs/avalanchego/version" "github.com/golang/mock/gomock" "github.com/stretchr/testify/require" hcache "github.com/ava-labs/hypersdk/cache" "github.com/ava-labs/hypersdk/chain" + "github.com/ava-labs/hypersdk/config" "github.com/ava-labs/hypersdk/emap" "github.com/ava-labs/hypersdk/mempool" "github.com/ava-labs/hypersdk/trace" @@ -36,14 +39,19 @@ func TestBlockCache(t *testing.T) { blkID := blk.ID() tracer, _ := trace.New(&trace.Config{Enabled: false}) - bcache, _ := hcache.NewFIFO[ids.ID, *chain.StatelessBlock](3) + bByID, _ := hcache.NewFIFO[ids.ID, *chain.StatelessBlock](3) + bByHeight, _ := hcache.NewFIFO[uint64, ids.ID](3) controller := NewMockController(ctrl) vm := VM{ snowCtx: &snow.Context{Log: logging.NoLog{}, Metrics: ametrics.NewOptionalGatherer()}, + config: &config.Config{}, - tracer: tracer, + vmDB: manager.NewMemDB(version.Semantic1_0_0).Current().Database, + + tracer: tracer, + acceptedBlocksByID: bByID, + acceptedBlocksByHeight: bByHeight, - blocks: bcache, verifiedBlocks: make(map[ids.ID]*chain.StatelessBlock), seen: emap.NewEMap[*chain.Transaction](), mempool: mempool.New[*chain.Transaction](tracer, 100, 32, nil),