Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support remote db #494

Merged
merged 8 commits into from
Sep 3, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 22 additions & 1 deletion cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
"crypto/rand"
"errors"
"fmt"
"github.com/erigontech/erigon/core/blob_storage"
"github.com/spf13/afero"
"math"
"math/big"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -342,6 +346,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure DB first
var allSnapshots *freezeblocks.RoSnapshots
var allBorSnapshots *freezeblocks.BorRoSnapshots
var allBscSnapshots *freezeblocks.BscRoSnapshots
onNewSnapshot := func() {}

var cc *chain.Config
Expand Down Expand Up @@ -388,12 +393,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBscSnapshots = freezeblocks.NewBscRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allBscSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
allBscSnapshots.LogStat("bsc:remote")

cr := rawdb.NewCanonicalReader()
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
Expand Down Expand Up @@ -428,6 +436,11 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
} else {
allBorSnapshots.LogStat("bor:reopen")
}
if err := allBscSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
logger.Error("[bsc snapshots] reopen", "err", err)
} else {
allBscSnapshots.LogStat("bsc:reopen")
}

//if err = agg.openList(reply.HistoryFiles, true); err != nil {
if err = agg.OpenFolder(); err != nil {
Expand All @@ -446,7 +459,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()
}
onNewSnapshot()
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, nil)
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, allBscSnapshots)
// open blob db
if cc.Parlia != nil {
logger.Warn("Opening blob store", "path", cfg.Dirs.Blobs)
blobDbPath := path.Join(cfg.Dirs.Blobs, "blob")
blobDb := kv2.NewMDBX(log.New()).Path(blobDbPath).MustOpen()
blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), cfg.Dirs.Blobs), math.MaxUint64, cc)
blockReader.WithSidecars(blobStore)
}

db, err = temporal.New(rwKv, agg)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func main() {
defer db.Close()
defer engine.Close()

// ToDo @blxdyx support query blob data in Rpcdaemon
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger, nil)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/kv/mdbx/kv_abstract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestRemoteKvVersion(t *testing.T) {
conn := bufconn.Listen(1024 * 1024)
grpcServer := grpc.NewServer()
go func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestRemoteKvRange(t *testing.T) {
ctx, writeDB := context.Background(), memdb.NewTestDB(t)
grpcServer, conn := grpc.NewServer(), bufconn.Listen(1024*1024)
go func() {
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger)
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger)
remote.RegisterKVServer(grpcServer, kvServer)
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
Expand Down Expand Up @@ -345,7 +345,7 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write

grpcServer := grpc.NewServer()
f2 := func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, logger))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
logger.Error("private RPC server fail", "err", err)
}
Expand Down
8 changes: 7 additions & 1 deletion erigon-lib/kv/remotedbserver/remotedbserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type KvServer struct {
stateChangeStreams *StateChangePubSub
blockSnapshots Snapshots
borSnapshots Snapshots
bscSnapshots Snapshots
historySnapshots Snapshots
ctx context.Context

Expand All @@ -95,7 +96,7 @@ type Snapshots interface {
Files() []string
}

func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, bscSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
return &KvServer{
trace: false,
rangeStep: 1024,
Expand All @@ -104,6 +105,7 @@ func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapsh
ctx: ctx,
blockSnapshots: snapshots,
borSnapshots: borSnapshots,
bscSnapshots: bscSnapshots,
historySnapshots: historySnapshots,
txs: map[uint64]*threadSafeTx{},
txsMapLock: &sync.RWMutex{},
Expand Down Expand Up @@ -466,6 +468,10 @@ func (s *KvServer) Snapshots(_ context.Context, _ *remote.SnapshotsRequest) (rep
blockFiles = append(blockFiles, s.borSnapshots.Files()...)
}

if s.bscSnapshots != nil && !reflect.ValueOf(s.bscSnapshots).IsNil() { // nolint
blockFiles = append(blockFiles, s.bscSnapshots.Files()...)
}

reply = &remote.SnapshotsReply{BlocksFiles: blockFiles}
if s.historySnapshots != nil && !reflect.ValueOf(s.historySnapshots).IsNil() { // nolint
reply.HistoryFiles = s.historySnapshots.Files()
Expand Down
25 changes: 21 additions & 4 deletions erigon-lib/kv/remotedbserver/remotedbserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestKvServer_renew(t *testing.T) {
return nil
}))

s := NewKvServer(ctx, db, nil, nil, nil, log.New())
s := NewKvServer(ctx, db, nil, nil, nil, nil, log.New())
g, ctx := errgroup.WithContext(ctx)
testCase := func() error {
id, err := s.begin(ctx)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestKVServerSnapshotsReturnsSnapshots(t *testing.T) {
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, nil, historySnapshots, log.New())
s := NewKvServer(ctx, nil, blockSnapshots, nil, nil, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg"}, reply.BlocksFiles)
Expand All @@ -124,7 +124,7 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, historySnapshots, log.New())
s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, nil, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg", "borevents.seg", "borspans.seg"}, reply.BlocksFiles)
Expand All @@ -133,9 +133,26 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {

func TestKVServerSnapshotsReturnsEmptyIfNoBlockSnapshots(t *testing.T) {
ctx := context.Background()
s := NewKvServer(ctx, nil, nil, nil, nil, log.New())
s := NewKvServer(ctx, nil, nil, nil, nil, nil, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Empty(t, reply.BlocksFiles)
require.Empty(t, reply.HistoryFiles)
}

func TestKVServerSnapshotsReturnsBscSnapshots(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
blockSnapshots := NewMockSnapshots(ctrl)
blockSnapshots.EXPECT().Files().Return([]string{"headers.seg", "bodies.seg"}).Times(1)
bscSnapshots := NewMockSnapshots(ctrl)
bscSnapshots.EXPECT().Files().Return([]string{"bscblobsidecars.seg"}).Times(1)
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, nil, bscSnapshots, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg", "bscblobsidecars.seg"}, reply.BlocksFiles)
require.Equal(t, []string{"history"}, reply.HistoryFiles)
}
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
blockReader, blockWriter, allSnapshots, allBorSnapshots, _, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger)
blockReader, blockWriter, allSnapshots, allBorSnapshots, allBscSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger)
if err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
}

kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger)
kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, allBscSnapshots, agg, logger)
backend.notifications.StateChangesConsumer = kvRPC
backend.kvRPC = kvRPC

Expand Down
7 changes: 2 additions & 5 deletions eth/consensuschain/consensus_chain_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,11 +74,8 @@ func (cr Reader) GetHeaderByNumber(number uint64) *types.Header {
}
func (cr Reader) GetHeaderByHash(hash common.Hash) *types.Header {
if cr.blockReader != nil {
number := rawdb.ReadHeaderNumber(cr.tx, hash)
if number == nil {
return nil
}
return cr.GetHeader(hash, *number)
h, _ := cr.blockReader.HeaderByHash(context.Background(), cr.tx, hash)
return h
}
h, _ := rawdb.ReadHeaderByHash(cr.tx, hash)
return h
Expand Down
2 changes: 1 addition & 1 deletion turbo/jsonrpc/bsc_api.go
Original file line number Diff line number Diff line change
Expand Up @@ -217,7 +217,7 @@ func (api *BscImpl) GetVerifyResult(ctx context.Context, blockNr rpc.BlockNumber

// PendingTransactions returns the transactions that are in the transaction pool
// and have a from address that is one of the accounts this node manages.
func (s *BscImpl) PendingTransactions() ([]*RPCTransaction, error) {
func (api *BscImpl) PendingTransactions() ([]*RPCTransaction, error) {
return nil, fmt.Errorf(NotImplemented, "eth_pendingTransactions")
}

Expand Down
6 changes: 1 addition & 5 deletions turbo/jsonrpc/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
package jsonrpc

import (
"github.com/erigontech/erigon/consensus/parlia"

txpool "github.com/erigontech/erigon-lib/gointerfaces/txpoolproto"
"github.com/erigontech/erigon-lib/kv"
"github.com/erigontech/erigon-lib/kv/kvcache"
Expand Down Expand Up @@ -49,8 +47,8 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
dbImpl := NewDBAPIImpl() /* deprecated */
adminImpl := NewAdminAPI(eth)
parityImpl := NewParityAPIImpl(base, db)
bscImpl := NewBscAPI(ethImpl)

var bscImpl *BscImpl
var borImpl *BorImpl

type lazy interface {
Expand All @@ -59,8 +57,6 @@ func APIList(db kv.RoDB, eth rpchelper.ApiBackend, txPool txpool.TxpoolClient, m
}

switch engine := engine.(type) {
case *parlia.Parlia:
bscImpl = NewBscAPI(ethImpl)
case *bor.Bor:
borImpl = NewBorAPI(base, db)
case lazy:
Expand Down
2 changes: 1 addition & 1 deletion turbo/snapshotsync/freezeblocks/bsc_snapshots.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func (br *BlockRetire) retireBscBlocks(ctx context.Context, minBlockNum uint64,
} else {
minimumBlob = chapelMinSegFrom
}
blockFrom := max(blockReader.FrozenBscBlobs(), minimumBlob)
blockFrom := max(blockReader.FrozenBscBlobs()+1, minimumBlob)
blocksRetired := false
for _, snap := range blockReader.BscSnapshots().Types() {
if maxBlockNum <= blockFrom || maxBlockNum-blockFrom < snaptype.Erigon2MergeLimit {
Expand Down
18 changes: 17 additions & 1 deletion turbo/snapshotsync/snapshotsync.go
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,7 @@ func computeBlocksToPrune(blockReader services.FullBlockReader, p prune.Mode) (b
// for MVP we sync with Downloader only once, in future will send new snapshots also
func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs, headerchain, blobs bool, prune prune.Mode, caplin CaplinMode, agg *state.Aggregator, tx kv.RwTx, blockReader services.FullBlockReader, cc *chain.Config, snapshotDownloader proto_downloader.DownloaderClient, stagesIdsList []string) error {
snapshots := blockReader.Snapshots()
bscSnapshots := blockReader.BscSnapshots()
borSnapshots := blockReader.BorSnapshots()

// Find minimum block to download.
Expand All @@ -282,6 +283,11 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
return err
}
}
if cc.Parlia != nil {
if err := bscSnapshots.ReopenFolder(); err != nil {
return err
}
}
return nil
}

Expand Down Expand Up @@ -314,9 +320,13 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
}
}

if caplin == NoCaplin {
blobs = !blobs
blxdyx marked this conversation as resolved.
Show resolved Hide resolved
}

// build all download requests
for _, p := range preverifiedBlockSnapshots {
if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) {
if caplin == NoCaplin && (strings.Contains(p.Name, "beaconblocks") || strings.Contains(p.Name, "blobsidecars")) && !strings.Contains(p.Name, "bscblobsiders") {
continue
}
if caplin == OnlyCaplin && !strings.Contains(p.Name, "beaconblocks") && !strings.Contains(p.Name, "blobsidecars") {
Expand Down Expand Up @@ -416,6 +426,12 @@ func WaitForDownloader(ctx context.Context, logPrefix string, dirs datadir.Dirs,
}
}

if cc.Parlia != nil {
if err := bscSnapshots.ReopenFolder(); err != nil {
return err
}
}

if err := agg.OpenFolder(); err != nil {
return err
}
Expand Down
2 changes: 1 addition & 1 deletion turbo/stages/mock/mock_sentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func MockWithEverything(tb testing.TB, gspec *types.Genesis, key *ecdsa.PrivateK
ctx, ctxCancel := context.WithCancel(context.Background())
db, agg := temporaltest.NewTestDB(tb, dirs)

erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, nil, logger)
erigonGrpcServeer := remotedbserver.NewKvServer(ctx, db, nil, nil, nil, nil, logger)
allSnapshots := freezeblocks.NewRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger)
allBorSnapshots := freezeblocks.NewBorRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger)
allBscSnapshots := freezeblocks.NewBscRoSnapshots(ethconfig.Defaults.Snapshot, dirs.Snap, 0, logger)
Expand Down
8 changes: 4 additions & 4 deletions turbo/stages/stageloop.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,7 @@ func NewDefaultStages(ctx context.Context,
runInTestMode := cfg.ImportMode

return stagedsync.DefaultStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, engine, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune),
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, engine, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling || cfg.BlobPrune, silkworm, cfg.Prune),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications),
stagedsync.StageBorHeimdallCfg(db, snapDb, stagedsync.MiningState{}, *controlServer.ChainConfig, heimdallClient, blockReader, controlServer.Hd, controlServer.Penalize, recents, signatures, cfg.WithHeimdallWaypointRecording, nil),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
Expand Down Expand Up @@ -660,7 +660,7 @@ func NewPipelineStages(ctx context.Context,

if len(cfg.Sync.UploadLocation) == 0 {
return stagedsync.PipelineStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, engine, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune),
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, engine, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling || cfg.BlobPrune, silkworm, cfg.Prune),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd),
stagedsync.StageExecuteBlocksCfg(db, cfg.Prune, cfg.BatchSize, controlServer.ChainConfig, controlServer.Engine, &vm.Config{}, notifications.Accumulator, cfg.StateStream, false, dirs, blockReader, controlServer.Hd, cfg.Genesis, cfg.Sync, SilkwormForExecutionStage(silkworm, cfg)),
Expand All @@ -669,7 +669,7 @@ func NewPipelineStages(ctx context.Context,
}

return stagedsync.UploaderPipelineStages(ctx,
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, engine, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling, silkworm, cfg.Prune),
stagedsync.StageSnapshotsCfg(db, *controlServer.ChainConfig, cfg.Sync, dirs, blockRetire, snapDownloader, blockReader, notifications, engine, agg, cfg.InternalCL && cfg.CaplinConfig.Backfilling, cfg.CaplinConfig.BlobBackfilling || cfg.BlobPrune, silkworm, cfg.Prune),
stagedsync.StageHeadersCfg(db, controlServer.Hd, controlServer.Bd, *controlServer.ChainConfig, cfg.Sync, controlServer.SendHeaderRequest, controlServer.PropagateNewBlockHashes, controlServer.Penalize, cfg.BatchSize, p2pCfg.NoDiscovery, blockReader, blockWriter, dirs.Tmp, notifications),
stagedsync.StageBlockHashesCfg(db, dirs.Tmp, controlServer.ChainConfig, blockWriter),
stagedsync.StageSendersCfg(db, controlServer.ChainConfig, cfg.Sync, false, dirs.Tmp, cfg.Prune, blockReader, controlServer.Hd),
Expand Down Expand Up @@ -723,7 +723,7 @@ func NewPolygonSyncStages(
consensusEngine,
agg,
config.InternalCL && config.CaplinConfig.Backfilling,
config.CaplinConfig.BlobBackfilling,
config.CaplinConfig.BlobBackfilling || config.BlobPrune,
silkworm,
config.Prune,
),
Expand Down
Loading