Skip to content

Commit

Permalink
Support remote db (#494)
Browse files Browse the repository at this point in the history
* support downloader

* blockFrom should use blocksInSnapshots + 1

* support remote rpc server

* support remote rpc server

* fix the get header error

* another way

* support rpcdaemon

* add comment and fix ci
  • Loading branch information
blxdyx authored Sep 3, 2024
1 parent db69e86 commit 4518ec1
Show file tree
Hide file tree
Showing 15 changed files with 88 additions and 33 deletions.
23 changes: 22 additions & 1 deletion cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
"crypto/rand"
"errors"
"fmt"
"github.com/erigontech/erigon/core/blob_storage"
"github.com/spf13/afero"
"math"
"math/big"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -342,6 +346,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure DB first
var allSnapshots *freezeblocks.RoSnapshots
var allBorSnapshots *freezeblocks.BorRoSnapshots
var allBscSnapshots *freezeblocks.BscRoSnapshots
onNewSnapshot := func() {}

var cc *chain.Config
Expand Down Expand Up @@ -388,12 +393,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBscSnapshots = freezeblocks.NewBscRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allBscSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
allBscSnapshots.LogStat("bsc:remote")

cr := rawdb.NewCanonicalReader()
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
Expand Down Expand Up @@ -428,6 +436,11 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
} else {
allBorSnapshots.LogStat("bor:reopen")
}
if err := allBscSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
logger.Error("[bsc snapshots] reopen", "err", err)
} else {
allBscSnapshots.LogStat("bsc:reopen")
}

//if err = agg.openList(reply.HistoryFiles, true); err != nil {
if err = agg.OpenFolder(); err != nil {
Expand All @@ -446,7 +459,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()
}
onNewSnapshot()
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, nil)
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, allBscSnapshots)
// open blob db
if cc.Parlia != nil {
logger.Warn("Opening blob store", "path", cfg.Dirs.Blobs)
blobDbPath := path.Join(cfg.Dirs.Blobs, "blob")
blobDb := kv2.NewMDBX(log.New()).Path(blobDbPath).MustOpen()
blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), cfg.Dirs.Blobs), math.MaxUint64, cc)
blockReader.WithSidecars(blobStore)
}

db, err = temporal.New(rwKv, agg)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func main() {
defer db.Close()
defer engine.Close()

// ToDo @blxdyx support query blob data in Rpcdaemon
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger, nil)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
Expand Down
3 changes: 2 additions & 1 deletion core/blob_storage/snappy.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package blob_storage
import (
"bufio"
"encoding/binary"
"errors"
"fmt"
"github.com/c2h5oh/datasize"
"github.com/erigontech/erigon/cl/sentinel/communication/ssz_snappy"
Expand Down Expand Up @@ -49,7 +50,7 @@ func snappyReader(r io.Reader, val *types.BlobSidecar) error {
return fmt.Errorf("unable to read varint from message prefix: %v", err)
}
if encodedLn > uint64(16*datasize.MB) {
return fmt.Errorf("payload too big")
return errors.New("payload too big")
}
sr := snappy.NewReader(r)
raw := make([]byte, encodedLn)
Expand Down
5 changes: 3 additions & 2 deletions core/state_transition.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package core

import (
"bytes"
"errors"
"fmt"
"github.com/erigontech/erigon/consensus"
"slices"
Expand Down Expand Up @@ -418,10 +419,10 @@ func (st *StateTransition) TransitionDb(refunds bool, gasBailout bool) (*evmtype
if rules.IsNano {
for _, blackListAddr := range types.NanoBlackList {
if blackListAddr == sender.Address() {
return nil, fmt.Errorf("block blacklist account")
return nil, errors.New("block blacklist account")
}
if msg.To() != nil && *msg.To() == blackListAddr {
return nil, fmt.Errorf("block blacklist account")
return nil, errors.New("block blacklist account")
}
}
}
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/kv/mdbx/kv_abstract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestRemoteKvVersion(t *testing.T) {
conn := bufconn.Listen(1024 * 1024)
grpcServer := grpc.NewServer()
go func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestRemoteKvRange(t *testing.T) {
ctx, writeDB := context.Background(), memdb.NewTestDB(t)
grpcServer, conn := grpc.NewServer(), bufconn.Listen(1024*1024)
go func() {
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger)
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger)
remote.RegisterKVServer(grpcServer, kvServer)
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
Expand Down Expand Up @@ -345,7 +345,7 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write

grpcServer := grpc.NewServer()
f2 := func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, logger))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
logger.Error("private RPC server fail", "err", err)
}
Expand Down
8 changes: 7 additions & 1 deletion erigon-lib/kv/remotedbserver/remotedbserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type KvServer struct {
stateChangeStreams *StateChangePubSub
blockSnapshots Snapshots
borSnapshots Snapshots
bscSnapshots Snapshots
historySnapshots Snapshots
ctx context.Context

Expand All @@ -95,7 +96,7 @@ type Snapshots interface {
Files() []string
}

func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, bscSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
return &KvServer{
trace: false,
rangeStep: 1024,
Expand All @@ -104,6 +105,7 @@ func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapsh
ctx: ctx,
blockSnapshots: snapshots,
borSnapshots: borSnapshots,
bscSnapshots: bscSnapshots,
historySnapshots: historySnapshots,
txs: map[uint64]*threadSafeTx{},
txsMapLock: &sync.RWMutex{},
Expand Down Expand Up @@ -466,6 +468,10 @@ func (s *KvServer) Snapshots(_ context.Context, _ *remote.SnapshotsRequest) (rep
blockFiles = append(blockFiles, s.borSnapshots.Files()...)
}

if s.bscSnapshots != nil && !reflect.ValueOf(s.bscSnapshots).IsNil() { // nolint
blockFiles = append(blockFiles, s.bscSnapshots.Files()...)
}

reply = &remote.SnapshotsReply{BlocksFiles: blockFiles}
if s.historySnapshots != nil && !reflect.ValueOf(s.historySnapshots).IsNil() { // nolint
reply.HistoryFiles = s.historySnapshots.Files()
Expand Down
25 changes: 21 additions & 4 deletions erigon-lib/kv/remotedbserver/remotedbserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestKvServer_renew(t *testing.T) {
return nil
}))

s := NewKvServer(ctx, db, nil, nil, nil, log.New())
s := NewKvServer(ctx, db, nil, nil, nil, nil, log.New())
g, ctx := errgroup.WithContext(ctx)
testCase := func() error {
id, err := s.begin(ctx)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestKVServerSnapshotsReturnsSnapshots(t *testing.T) {
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, nil, historySnapshots, log.New())
s := NewKvServer(ctx, nil, blockSnapshots, nil, nil, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg"}, reply.BlocksFiles)
Expand All @@ -124,7 +124,7 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, historySnapshots, log.New())
s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, nil, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg", "borevents.seg", "borspans.seg"}, reply.BlocksFiles)
Expand All @@ -133,9 +133,26 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {

func TestKVServerSnapshotsReturnsEmptyIfNoBlockSnapshots(t *testing.T) {
ctx := context.Background()
s := NewKvServer(ctx, nil, nil, nil, nil, log.New())
s := NewKvServer(ctx, nil, nil, nil, nil, nil, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Empty(t, reply.BlocksFiles)
require.Empty(t, reply.HistoryFiles)
}

func TestKVServerSnapshotsReturnsBscSnapshots(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
blockSnapshots := NewMockSnapshots(ctrl)
blockSnapshots.EXPECT().Files().Return([]string{"headers.seg", "bodies.seg"}).Times(1)
bscSnapshots := NewMockSnapshots(ctrl)
bscSnapshots.EXPECT().Files().Return([]string{"bscblobsidecars.seg"}).Times(1)
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, nil, bscSnapshots, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg", "bscblobsidecars.seg"}, reply.BlocksFiles)
require.Equal(t, []string{"history"}, reply.HistoryFiles)
}
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
blockReader, blockWriter, allSnapshots, allBorSnapshots, _, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger)
blockReader, blockWriter, allSnapshots, allBorSnapshots, allBscSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger)
if err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
}

kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger)
kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, allBscSnapshots, agg, logger)
backend.notifications.StateChangesConsumer = kvRPC
backend.kvRPC = kvRPC

Expand Down
7 changes: 2 additions & 5 deletions eth/consensuschain/consensus_chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,8 @@ func (cr Reader) GetHeaderByNumber(number uint64) *types.Header {
}
func (cr Reader) GetHeaderByHash(hash common.Hash) *types.Header {
if cr.blockReader != nil {
number := rawdb.ReadHeaderNumber(cr.tx, hash)
if number == nil {
return nil
}
return cr.GetHeader(hash, *number)
h, _ := cr.blockReader.HeaderByHash(context.Background(), cr.tx, hash)
return h
}
h, _ := rawdb.ReadHeaderByHash(cr.tx, hash)
return h
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/bsc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (api *BscImpl) GetVerifyResult(ctx context.Context, blockNr rpc.BlockNumber

// PendingTransactions returns the transactions that are in the transaction pool
// and have a from address that is one of the accounts this node manages.
func (s *BscImpl) PendingTransactions() ([]*RPCTransaction, error) {
func (api *BscImpl) PendingTransactions() ([]*RPCTransaction, error) {
return nil, fmt.Errorf(NotImplemented, "eth_pendingTransactions")
}

Expand Down
6 changes: 1 addition & 5 deletions turbo/jsonrpc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package jsonrpc

import (
"github.com/erigontech/erigon/consensus/parlia"

txpool "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/kvcache"
Expand Down Expand Up @@ -49,8 +47,8 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
dbImpl := NewDBAPIImpl() /* deprecated */
adminImpl := NewAdminAPI(eth)
parityImpl := NewParityAPIImpl(base, db)
bscImpl := NewBscAPI(ethImpl)

var bscImpl *BscImpl
var borImpl *BorImpl

type lazy interface {
Expand All @@ -59,8 +57,6 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
}

switch engine := engine.(type) {
case *parlia.Parlia:
bscImpl = NewBscAPI(ethImpl)
case *bor.Bor:
borImpl = NewBorAPI(base, db)
case lazy:
Expand Down
2 changes: 1 addition & 1 deletion turbo/snapshotsync/freezeblocks/bsc_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (br *BlockRetire) retireBscBlocks(ctx context.Context, minBlockNum uint64,
} else {
minimumBlob = chapelMinSegFrom
}
blockFrom := max(blockReader.FrozenBscBlobs(), minimumBlob)
blockFrom := max(blockReader.FrozenBscBlobs()+1, minimumBlob)
blocksRetired := false
for _, snap := range blockReader.BscSnapshots().Types() {
if maxBlockNum <= blockFrom || maxBlockNum-blockFrom < snaptype.Erigon2MergeLimit {
Expand Down
19 changes: 18 additions & 1 deletion turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func computeBlocksToPrune(blockReader services.FullBlockReader, p prune.Mode) (b
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, headerchain, blobs bool, prune prune.Mode, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error {
snapshots := blockReader.Snapshots()
bscSnapshots := blockReader.BscSnapshots()
borSnapshots := blockReader.BorSnapshots()

// Find minimum block to download.
Expand All @@ -282,6 +283,11 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
return err
}
}
if cc.Parlia != nil {
if err := bscSnapshots.ReopenFolder(); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -314,9 +320,14 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
}
}

// Bsc keep all the blob snapshot but Caplin is on the contrary
if caplin == NoCaplin {
blobs = !blobs
}

// build all download requests
for _, p := range preverifiedBlockSnapshots {
if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) {
if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) && !strings.Contains(p.Name, "bscblobsiders") {
continue
}
if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") {
Expand Down Expand Up @@ -416,6 +427,12 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
}
}

if cc.Parlia != nil {
if err := bscSnapshots.ReopenFolder(); err != nil {
return err
}
}

if err := agg.OpenFolder(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
ctx, ctxCancel := context.WithCancel(context.Background())
db, agg := temporaltest.NewTestDB(tb, dirs)

erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, nil, logger)
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, nil, nil, logger)
allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger)
allBorSnapshots := freezeblocks.NewBorRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger)
allBscSnapshots := freezeblocks.NewBscRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger)
Expand Down
Loading

0 comments on commit 4518ec1

Please sign in to comment.