Skip to content

Commit

Permalink
add standalone dbs and configs (#1354)
Browse files Browse the repository at this point in the history
* add standalone dbs and configs

* show value in config

* fix tests

* move to vm

* remove the noSkip flag

* reviews

* check isMemDB earlier

* rename var

* return nit

---------

Co-authored-by: Darioush Jalali <darioush.jalali@avalabs.org>
  • Loading branch information
ceyonur and darioush authored Oct 21, 2024
1 parent de75e82 commit 44e1411
Show file tree
Hide file tree
Showing 6 changed files with 209 additions and 36 deletions.
27 changes: 27 additions & 0 deletions plugin/evm/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"time"

"github.com/ava-labs/avalanchego/database/pebbledb"
"github.com/ava-labs/subnet-evm/core/txpool/legacypool"
"github.com/ava-labs/subnet-evm/eth"
"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -60,8 +61,11 @@ const (
// - state sync time: ~6 hrs.
defaultStateSyncMinBlocks = 300_000
defaultStateSyncRequestSize = 1024 // the number of key/values to ask peers for per request
defaultDBType = pebbledb.Name
)

type PBool bool

var (
defaultEnabledAPIs = []string{
"eth",
Expand Down Expand Up @@ -225,6 +229,14 @@ type Config struct {

// RPC settings
HttpBodyLimit uint64 `json:"http-body-limit"`

// Database settings
UseStandaloneDatabase *PBool `json:"use-standalone-database"`
DatabaseConfigContent string `json:"database-config"`
DatabaseConfigFile string `json:"database-config-file"`
DatabaseType string `json:"database-type"`
DatabasePath string `json:"database-path"`
DatabaseReadOnly bool `json:"database-read-only"`
}

// EthAPIs returns an array of strings representing the Eth APIs that should be enabled
Expand Down Expand Up @@ -284,6 +296,7 @@ func (c *Config) SetDefaults() {
c.StateSyncRequestSize = defaultStateSyncRequestSize
c.AllowUnprotectedTxHashes = defaultAllowUnprotectedTxHashes
c.AcceptedCacheSize = defaultAcceptedCacheSize
c.DatabaseType = defaultDBType
}

func (d *Duration) UnmarshalJSON(data []byte) (err error) {
Expand Down Expand Up @@ -338,3 +351,17 @@ func (c *Config) Deprecate() string {

return msg
}

func (p *PBool) String() string {
if p == nil {
return "nil"
}
return fmt.Sprintf("%t", *p)
}

func (p *PBool) Bool() bool {
if p == nil {
return false
}
return bool(*p)
}
4 changes: 1 addition & 3 deletions plugin/evm/syncervm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -273,9 +273,7 @@ func TestVMShutdownWhileSyncing(t *testing.T) {
}

func createSyncServerAndClientVMs(t *testing.T, test syncTest, numBlocks int) *syncVMSetup {
var (
require = require.New(t)
)
require := require.New(t)
// configure [serverVM]
_, serverVM, _, serverAppSender := GenesisVM(t, true, genesisJSONLatest, "", "")
t.Cleanup(func() {
Expand Down
200 changes: 176 additions & 24 deletions plugin/evm/vm.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package evm

import (
"context"
"encoding/base64"
"encoding/json"
"errors"
"fmt"
Expand All @@ -20,6 +21,7 @@ import (
"github.com/ava-labs/avalanchego/network/p2p/gossip"
"github.com/prometheus/client_golang/prometheus"

avalancheNode "github.com/ava-labs/avalanchego/node"
"github.com/ava-labs/subnet-evm/commontype"
"github.com/ava-labs/subnet-evm/consensus/dummy"
"github.com/ava-labs/subnet-evm/constants"
Expand Down Expand Up @@ -66,7 +68,10 @@ import (
avalancheRPC "github.com/gorilla/rpc/v2"

"github.com/ava-labs/avalanchego/codec"
"github.com/ava-labs/avalanchego/database"
"github.com/ava-labs/avalanchego/database/leveldb"
"github.com/ava-labs/avalanchego/database/memdb"
"github.com/ava-labs/avalanchego/database/meterdb"
"github.com/ava-labs/avalanchego/database/pebbledb"
"github.com/ava-labs/avalanchego/database/prefixdb"
"github.com/ava-labs/avalanchego/database/versiondb"
"github.com/ava-labs/avalanchego/ids"
Expand All @@ -81,7 +86,10 @@ import (

commonEng "github.com/ava-labs/avalanchego/snow/engine/common"

avalanchemetrics "github.com/ava-labs/avalanchego/api/metrics"
"github.com/ava-labs/avalanchego/database"
avalancheUtils "github.com/ava-labs/avalanchego/utils"
avalancheconstants "github.com/ava-labs/avalanchego/utils/constants"
avalancheJSON "github.com/ava-labs/avalanchego/utils/json"
)

Expand All @@ -106,6 +114,7 @@ const (
ethMetricsPrefix = "eth"
sdkMetricsPrefix = "sdk"
chainStateMetricsPrefix = "chain_state"
dbMetricsPrefix = "db"

// gossip constants
pushGossipDiscardedElements = 16_384
Expand Down Expand Up @@ -201,7 +210,6 @@ type VM struct {
// [acceptedBlockDB] is the database to store the last accepted
// block.
acceptedBlockDB database.Database

// [warpDB] is used to store warp message signatures
// set to a prefixDB with the prefix [warpPrefix]
warpDB database.Database
Expand Down Expand Up @@ -246,6 +254,7 @@ type VM struct {
ethTxPushGossiper avalancheUtils.Atomic[*gossip.PushGossiper[*GossipEthTx]]
ethTxPullGossiper gossip.Gossiper

chainAlias string
// RPC handlers (should be stopped before closing chaindb)
rpcHandlers []interface{ Stop() }
}
Expand Down Expand Up @@ -284,8 +293,9 @@ func (vm *VM) Initialize(
// fallback to ChainID string instead of erroring
alias = vm.ctx.ChainID.String()
}
vm.chainAlias = alias

subnetEVMLogger, err := InitLogger(alias, vm.config.LogLevel, vm.config.LogJSONFormat, vm.ctx.Log)
subnetEVMLogger, err := InitLogger(vm.chainAlias, vm.config.LogLevel, vm.config.LogJSONFormat, vm.ctx.Log)
if err != nil {
return fmt.Errorf("failed to initialize logger due to: %w ", err)
}
Expand All @@ -306,16 +316,15 @@ func (vm *VM) Initialize(

vm.toEngine = toEngine
vm.shutdownChan = make(chan struct{}, 1)
// Use NewNested rather than New so that the structure of the database
// remains the same regardless of the provided baseDB type.
vm.chaindb = rawdb.NewDatabase(Database{prefixdb.NewNested(ethDBPrefix, db)})
vm.db = versiondb.New(db)
vm.acceptedBlockDB = prefixdb.New(acceptedPrefix, vm.db)
vm.metadataDB = prefixdb.New(metadataPrefix, vm.db)
// Note warpDB is not part of versiondb because it is not necessary
// that warp signatures are committed to the database atomically with
// the last accepted block.
vm.warpDB = prefixdb.New(warpPrefix, db)

if err := vm.initializeMetrics(); err != nil {
return fmt.Errorf("failed to initialize metrics: %w", err)
}

// Initialize the database
if err := vm.initializeDBs(db); err != nil {
return fmt.Errorf("failed to initialize databases: %w", err)
}

if vm.config.InspectDatabase {
start := time.Now()
Expand Down Expand Up @@ -466,10 +475,6 @@ func (vm *VM) Initialize(
}
log.Info(fmt.Sprintf("lastAccepted = %s", lastAcceptedHash))

if err := vm.initializeMetrics(); err != nil {
return err
}

// initialize peer network
if vm.p2pSender == nil {
vm.p2pSender = appSender
Expand Down Expand Up @@ -800,8 +805,8 @@ func (vm *VM) initBlockBuilding() error {
// setAppRequestHandlers sets the request handlers for the VM to serve state sync
// requests.
func (vm *VM) setAppRequestHandlers() {
// Create separate EVM TrieDB (read only) for serving leafs requests.
// We create a separate TrieDB here, so that it has a separate cache from the one
// Create standalone EVM TrieDB (read only) for serving leafs requests.
// We create a standalone TrieDB here, so that it has a standalone cache from the one
// used by the node when processing blocks.
evmTrieDB := triedb.NewDatabase(
vm.chaindb,
Expand Down Expand Up @@ -1010,13 +1015,9 @@ func (vm *VM) CreateHandlers(context.Context) (map[string]http.Handler, error) {
return nil, err
}

primaryAlias, err := vm.ctx.BCLookup.PrimaryAlias(vm.ctx.ChainID)
if err != nil {
return nil, fmt.Errorf("failed to get primary alias for chain due to %w", err)
}
apis := make(map[string]http.Handler)
if vm.config.AdminAPIEnabled {
adminAPI, err := newHandler("admin", NewAdminService(vm, os.ExpandEnv(fmt.Sprintf("%s_subnet_evm_performance_%s", vm.config.AdminAPIDir, primaryAlias))))
adminAPI, err := newHandler("admin", NewAdminService(vm, os.ExpandEnv(fmt.Sprintf("%s_subnet_evm_performance_%s", vm.config.AdminAPIDir, vm.chainAlias))))
if err != nil {
return nil, fmt.Errorf("failed to register service for admin API due to %w", err)
}
Expand Down Expand Up @@ -1190,3 +1191,154 @@ func attachEthService(handler *rpc.Server, apis []rpc.API, names []string) error

return nil
}

// useStandaloneDatabase returns true if the chain can and should use a standalone database
// other than given by [db] in Initialize()
func (vm *VM) useStandaloneDatabase(acceptedDB database.Database) (bool, error) {
// no config provided, use default
standaloneDBFlag := vm.config.UseStandaloneDatabase
if standaloneDBFlag != nil {
return standaloneDBFlag.Bool(), nil
}

// check if the chain can use a standalone database
_, err := acceptedDB.Get(lastAcceptedKey)
if err == database.ErrNotFound {
// If there is nothing in the database, we can use the standalone database
return true, nil
}
return false, err
}

// getDatabaseConfig returns the database configuration for the chain
// to be used by separate, standalone database.
func getDatabaseConfig(config Config, chainDataDir string) (avalancheNode.DatabaseConfig, error) {
var (
configBytes []byte
err error
)
if len(config.DatabaseConfigContent) != 0 {
dbConfigContent := config.DatabaseConfigContent
configBytes, err = base64.StdEncoding.DecodeString(dbConfigContent)
if err != nil {
return avalancheNode.DatabaseConfig{}, fmt.Errorf("unable to decode base64 content: %w", err)
}
} else if len(config.DatabaseConfigFile) != 0 {
configPath := config.DatabaseConfigFile
configBytes, err = os.ReadFile(configPath)
if err != nil {
return avalancheNode.DatabaseConfig{}, err
}
}

dbPath := filepath.Join(chainDataDir, "db")
if len(config.DatabasePath) != 0 {
dbPath = config.DatabasePath
}

return avalancheNode.DatabaseConfig{
Name: config.DatabaseType,
ReadOnly: config.DatabaseReadOnly,
Path: dbPath,
Config: configBytes,
}, nil
}

// initializeDBs initializes the databases used by the VM.
// If [useStandaloneDB] is true, the chain will use a standalone database for its state.
// Otherwise, the chain will use the provided [avaDB] for its state.
func (vm *VM) initializeDBs(avaDB database.Database) error {
db := avaDB
// skip standalone database initialization if we are running in unit tests
if vm.ctx.NetworkID != avalancheconstants.UnitTestID {
// first initialize the accepted block database to check if we need to use a standalone database
verDB := versiondb.New(avaDB)
acceptedDB := prefixdb.New(acceptedPrefix, verDB)
useStandAloneDB, err := vm.useStandaloneDatabase(acceptedDB)
if err != nil {
return err
}
if useStandAloneDB {
// If we are using a standalone database, we need to create a new database
// for the chain state.
dbConfig, err := getDatabaseConfig(vm.config, vm.ctx.ChainDataDir)
if err != nil {
return err
}
log.Info("Using standalone database for the chain state", "DatabaseConfig", dbConfig)
db, err = vm.createDatabase(dbConfig)
if err != nil {
return err
}
}
}
// Use NewNested rather than New so that the structure of the database
// remains the same regardless of the provided baseDB type.
vm.chaindb = rawdb.NewDatabase(Database{prefixdb.NewNested(ethDBPrefix, db)})
vm.db = versiondb.New(db)
vm.acceptedBlockDB = prefixdb.New(acceptedPrefix, db)
vm.metadataDB = prefixdb.New(metadataPrefix, db)
// Note warpDB is not part of versiondb because it is not necessary
// that warp signatures are committed to the database atomically with
// the last accepted block.
// [warpDB] is used to store warp message signatures
// set to a prefixDB with the prefix [warpPrefix]
vm.warpDB = prefixdb.New(warpPrefix, db)
return nil
}

// createDatabase returns a new database instance with the provided configuration
func (vm *VM) createDatabase(dbConfig avalancheNode.DatabaseConfig) (database.Database, error) {
dbRegisterer, err := avalanchemetrics.MakeAndRegister(
vm.ctx.Metrics,
dbMetricsPrefix,
)
if err != nil {
return nil, err
}
var db database.Database
// start the db
switch dbConfig.Name {
case leveldb.Name:
dbPath := filepath.Join(dbConfig.Path, leveldb.Name)
db, err = leveldb.New(dbPath, dbConfig.Config, vm.ctx.Log, dbRegisterer)
if err != nil {
return nil, fmt.Errorf("couldn't create %s at %s: %w", leveldb.Name, dbPath, err)
}
case memdb.Name:
db = memdb.New()
case pebbledb.Name:
dbPath := filepath.Join(dbConfig.Path, pebbledb.Name)
db, err = pebbledb.New(dbPath, dbConfig.Config, vm.ctx.Log, dbRegisterer)
if err != nil {
return nil, fmt.Errorf("couldn't create %s at %s: %w", pebbledb.Name, dbPath, err)
}
default:
return nil, fmt.Errorf(
"db-type was %q but should have been one of {%s, %s, %s}",
dbConfig.Name,
leveldb.Name,
memdb.Name,
pebbledb.Name,
)
}

if dbConfig.ReadOnly && dbConfig.Name != memdb.Name {
db = versiondb.New(db)
}

meterDBReg, err := avalanchemetrics.MakeAndRegister(
vm.ctx.Metrics,
"meterdb",
)
if err != nil {
return nil, err
}

db, err = meterdb.New(meterDBReg, db)
if err != nil {
return nil, fmt.Errorf("failed to create meterdb: %w", err)
}

return db, nil
}
7 changes: 2 additions & 5 deletions plugin/evm/vm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -189,10 +189,8 @@ func setupGenesis(
genesisBytes := buildGenesisTest(t, genesisJSON)
ctx := NewContext()

baseDB := memdb.New()

// initialize the atomic memory
atomicMemory := atomic.NewMemory(prefixdb.New([]byte{0}, baseDB))
atomicMemory := atomic.NewMemory(prefixdb.New([]byte{0}, memdb.New()))
ctx.SharedMemory = atomicMemory.NewSharedMemory(ctx.ChainID)

// NB: this lock is intentionally left locked when this function returns.
Expand All @@ -206,8 +204,7 @@ func setupGenesis(
ctx.Keystore = userKeystore.NewBlockchainKeyStore(ctx.ChainID)

issuer := make(chan commonEng.Message, 1)
prefixedDB := prefixdb.New([]byte{1}, baseDB)
return ctx, prefixedDB, genesisBytes, issuer, atomicMemory
return ctx, memdb.New(), genesisBytes, issuer, atomicMemory
}

// GenesisVM creates a VM instance with the genesis test bytes and returns
Expand Down
4 changes: 1 addition & 3 deletions plugin/evm/vm_upgrade_bytes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,7 @@ import (
"github.com/stretchr/testify/require"
)

var (
DefaultEtnaTime = uint64(upgrade.GetConfig(testNetworkID).EtnaTime.Unix())
)
var DefaultEtnaTime = uint64(upgrade.GetConfig(testNetworkID).EtnaTime.Unix())

func TestVMUpgradeBytesPrecompile(t *testing.T) {
// Make a TxAllowListConfig upgrade at genesis and convert it to JSON to apply as upgradeBytes.
Expand Down
Loading

0 comments on commit 44e1411

Please sign in to comment.