diff --git a/cmd/rpcdaemon/cli/config.go b/cmd/rpcdaemon/cli/config.go index aaee2195011..8f7971b60ec 100644 --- a/cmd/rpcdaemon/cli/config.go +++ b/cmd/rpcdaemon/cli/config.go @@ -21,11 +21,15 @@ import ( "crypto/rand" "errors" "fmt" + "github.com/erigontech/erigon/core/blob_storage" + "github.com/spf13/afero" + "math" "math/big" "net" "net/http" "net/url" "os" + "path" "path/filepath" "strings" "time" @@ -342,6 +346,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger // Configure DB first var allSnapshots *freezeblocks.RoSnapshots var allBorSnapshots *freezeblocks.BorRoSnapshots + var allBscSnapshots *freezeblocks.BscRoSnapshots onNewSnapshot := func() {} var cc *chain.Config @@ -388,12 +393,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger // Configure sapshots allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) + allBscSnapshots = freezeblocks.NewBscRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger) // To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down // Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection allSnapshots.OptimisticReopenWithDB(db) allBorSnapshots.OptimisticalyReopenWithDB(db) + allBscSnapshots.OptimisticalyReopenWithDB(db) allSnapshots.LogStat("remote") allBorSnapshots.LogStat("bor:remote") + allBscSnapshots.LogStat("bsc:remote") cr := rawdb.NewCanonicalReader() agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger) @@ -428,6 +436,11 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger } else { allBorSnapshots.LogStat("bor:reopen") } + if err := allBscSnapshots.ReopenList(reply.BlocksFiles, true); err != nil { + logger.Error("[bsc snapshots] reopen", "err", err) + } else { + allBscSnapshots.LogStat("bsc:reopen") + } //if err = agg.openList(reply.HistoryFiles, true); err != nil { if err = agg.OpenFolder(); err != nil { @@ -446,7 +459,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger }() } onNewSnapshot() - blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, nil) + blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, allBscSnapshots) + // open blob db + if cc.Parlia != nil { + logger.Warn("Opening blob store", "path", cfg.Dirs.Blobs) + blobDbPath := path.Join(cfg.Dirs.Blobs, "blob") + blobDb := kv2.NewMDBX(log.New()).Path(blobDbPath).MustOpen() + blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), cfg.Dirs.Blobs), math.MaxUint64, cc) + blockReader.WithSidecars(blobStore) + } db, err = temporal.New(rwKv, agg) if err != nil { diff --git a/cmd/rpcdaemon/main.go b/cmd/rpcdaemon/main.go index 2f8453b1926..665327679c9 100644 --- a/cmd/rpcdaemon/main.go +++ b/cmd/rpcdaemon/main.go @@ -50,7 +50,6 @@ func main() { defer db.Close() defer engine.Close() - // ToDo @blxdyx support query blob data in Rpcdaemon apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger, nil) rpc.PreAllocateRPCMetricLabels(apiList) if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil { diff --git a/erigon-lib/kv/mdbx/kv_abstract_test.go b/erigon-lib/kv/mdbx/kv_abstract_test.go index a76eb6e29a1..606470595af 100644 --- a/erigon-lib/kv/mdbx/kv_abstract_test.go +++ b/erigon-lib/kv/mdbx/kv_abstract_test.go @@ -170,7 +170,7 @@ func TestRemoteKvVersion(t *testing.T) { conn := bufconn.Listen(1024 * 1024) grpcServer := grpc.NewServer() go func() { - remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger)) + remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger)) if err := grpcServer.Serve(conn); err != nil { log.Error("private RPC server fail", "err", err) } @@ -211,7 +211,7 @@ func TestRemoteKvRange(t *testing.T) { ctx, writeDB := context.Background(), memdb.NewTestDB(t) grpcServer, conn := grpc.NewServer(), bufconn.Listen(1024*1024) go func() { - kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger) + kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger) remote.RegisterKVServer(grpcServer, kvServer) if err := grpcServer.Serve(conn); err != nil { log.Error("private RPC server fail", "err", err) @@ -345,7 +345,7 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write grpcServer := grpc.NewServer() f2 := func() { - remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, logger)) + remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, nil, logger)) if err := grpcServer.Serve(conn); err != nil { logger.Error("private RPC server fail", "err", err) } diff --git a/erigon-lib/kv/remotedbserver/remotedbserver.go b/erigon-lib/kv/remotedbserver/remotedbserver.go index b6ac49bc28b..ac43794a2af 100644 --- a/erigon-lib/kv/remotedbserver/remotedbserver.go +++ b/erigon-lib/kv/remotedbserver/remotedbserver.go @@ -72,6 +72,7 @@ type KvServer struct { stateChangeStreams *StateChangePubSub blockSnapshots Snapshots borSnapshots Snapshots + bscSnapshots Snapshots historySnapshots Snapshots ctx context.Context @@ -95,7 +96,7 @@ type Snapshots interface { Files() []string } -func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer { +func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, bscSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer { return &KvServer{ trace: false, rangeStep: 1024, @@ -104,6 +105,7 @@ func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapsh ctx: ctx, blockSnapshots: snapshots, borSnapshots: borSnapshots, + bscSnapshots: bscSnapshots, historySnapshots: historySnapshots, txs: map[uint64]*threadSafeTx{}, txsMapLock: &sync.RWMutex{}, @@ -466,6 +468,10 @@ func (s *KvServer) Snapshots(_ context.Context, _ *remote.SnapshotsRequest) (rep blockFiles = append(blockFiles, s.borSnapshots.Files()...) } + if s.bscSnapshots != nil && !reflect.ValueOf(s.bscSnapshots).IsNil() { // nolint + blockFiles = append(blockFiles, s.bscSnapshots.Files()...) + } + reply = &remote.SnapshotsReply{BlocksFiles: blockFiles} if s.historySnapshots != nil && !reflect.ValueOf(s.historySnapshots).IsNil() { // nolint reply.HistoryFiles = s.historySnapshots.Files() diff --git a/erigon-lib/kv/remotedbserver/remotedbserver_test.go b/erigon-lib/kv/remotedbserver/remotedbserver_test.go index e448363d37d..720fbe18972 100644 --- a/erigon-lib/kv/remotedbserver/remotedbserver_test.go +++ b/erigon-lib/kv/remotedbserver/remotedbserver_test.go @@ -47,7 +47,7 @@ func TestKvServer_renew(t *testing.T) { return nil })) - s := NewKvServer(ctx, db, nil, nil, nil, log.New()) + s := NewKvServer(ctx, db, nil, nil, nil, nil, log.New()) g, ctx := errgroup.WithContext(ctx) testCase := func() error { id, err := s.begin(ctx) @@ -107,7 +107,7 @@ func TestKVServerSnapshotsReturnsSnapshots(t *testing.T) { historySnapshots := NewMockSnapshots(ctrl) historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1) - s := NewKvServer(ctx, nil, blockSnapshots, nil, historySnapshots, log.New()) + s := NewKvServer(ctx, nil, blockSnapshots, nil, nil, historySnapshots, log.New()) reply, err := s.Snapshots(ctx, nil) require.NoError(t, err) require.Equal(t, []string{"headers.seg", "bodies.seg"}, reply.BlocksFiles) @@ -124,7 +124,7 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) { historySnapshots := NewMockSnapshots(ctrl) historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1) - s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, historySnapshots, log.New()) + s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, nil, historySnapshots, log.New()) reply, err := s.Snapshots(ctx, nil) require.NoError(t, err) require.Equal(t, []string{"headers.seg", "bodies.seg", "borevents.seg", "borspans.seg"}, reply.BlocksFiles) @@ -133,9 +133,26 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) { func TestKVServerSnapshotsReturnsEmptyIfNoBlockSnapshots(t *testing.T) { ctx := context.Background() - s := NewKvServer(ctx, nil, nil, nil, nil, log.New()) + s := NewKvServer(ctx, nil, nil, nil, nil, nil, log.New()) reply, err := s.Snapshots(ctx, nil) require.NoError(t, err) require.Empty(t, reply.BlocksFiles) require.Empty(t, reply.HistoryFiles) } + +func TestKVServerSnapshotsReturnsBscSnapshots(t *testing.T) { + ctx := context.Background() + ctrl := gomock.NewController(t) + blockSnapshots := NewMockSnapshots(ctrl) + blockSnapshots.EXPECT().Files().Return([]string{"headers.seg", "bodies.seg"}).Times(1) + bscSnapshots := NewMockSnapshots(ctrl) + bscSnapshots.EXPECT().Files().Return([]string{"bscblobsidecars.seg"}).Times(1) + historySnapshots := NewMockSnapshots(ctrl) + historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1) + + s := NewKvServer(ctx, nil, blockSnapshots, nil, bscSnapshots, historySnapshots, log.New()) + reply, err := s.Snapshots(ctx, nil) + require.NoError(t, err) + require.Equal(t, []string{"headers.seg", "bodies.seg", "bscblobsidecars.seg"}, reply.BlocksFiles) + require.Equal(t, []string{"history"}, reply.HistoryFiles) +} diff --git a/eth/backend.go b/eth/backend.go index e3cfe87fafc..e0e92f01a84 100644 --- a/eth/backend.go +++ b/eth/backend.go @@ -342,7 +342,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger // Check if we have an already initialized chain and fall back to // that if so. Otherwise we need to generate a new genesis spec. - blockReader, blockWriter, allSnapshots, allBorSnapshots, _, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger) + blockReader, blockWriter, allSnapshots, allBorSnapshots, allBscSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger) if err != nil { return nil, err } @@ -362,7 +362,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger } } - kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger) + kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, allBscSnapshots, agg, logger) backend.notifications.StateChangesConsumer = kvRPC backend.kvRPC = kvRPC