Skip to content

Commit

Permalink
support remote rpc server
Browse files Browse the repository at this point in the history
  • Loading branch information
blxdyx committed Sep 2, 2024
1 parent 82bbe34 commit daccea7
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 12 deletions.
23 changes: 22 additions & 1 deletion cmd/rpcdaemon/cli/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,15 @@ import (
"crypto/rand"
"errors"
"fmt"
"github.com/erigontech/erigon/core/blob_storage"
"github.com/spf13/afero"
"math"
"math/big"
"net"
"net/http"
"net/url"
"os"
"path"
"path/filepath"
"strings"
"time"
Expand Down Expand Up @@ -342,6 +346,7 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure DB first
var allSnapshots *freezeblocks.RoSnapshots
var allBorSnapshots *freezeblocks.BorRoSnapshots
var allBscSnapshots *freezeblocks.BscRoSnapshots
onNewSnapshot := func() {}

var cc *chain.Config
Expand Down Expand Up @@ -388,12 +393,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
// Configure sapshots
allSnapshots = freezeblocks.NewRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBorSnapshots = freezeblocks.NewBorRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
allBscSnapshots = freezeblocks.NewBscRoSnapshots(cfg.Snap, cfg.Dirs.Snap, 0, logger)
// To povide good UX - immediatly can read snapshots after RPCDaemon start, even if Erigon is down
// Erigon does store list of snapshots in db: means RPCDaemon can read this list now, but read by `remoteKvClient.Snapshots` after establish grpc connection
allSnapshots.OptimisticReopenWithDB(db)
allBorSnapshots.OptimisticalyReopenWithDB(db)
allBscSnapshots.OptimisticalyReopenWithDB(db)
allSnapshots.LogStat("remote")
allBorSnapshots.LogStat("bor:remote")
allBscSnapshots.LogStat("bsc:remote")

cr := rawdb.NewCanonicalReader()
agg, err := libstate.NewAggregator(ctx, cfg.Dirs, config3.HistoryV3AggregationStep, db, cr, logger)
Expand Down Expand Up @@ -428,6 +436,11 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
} else {
allBorSnapshots.LogStat("bor:reopen")
}
if err := allBscSnapshots.ReopenList(reply.BlocksFiles, true); err != nil {
logger.Error("[bsc snapshots] reopen", "err", err)
} else {
allBscSnapshots.LogStat("bsc:reopen")
}

//if err = agg.openList(reply.HistoryFiles, true); err != nil {
if err = agg.OpenFolder(); err != nil {
Expand All @@ -446,7 +459,15 @@ func RemoteServices(ctx context.Context, cfg *httpcfg.HttpCfg, logger log.Logger
}()
}
onNewSnapshot()
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, nil)
blockReader = freezeblocks.NewBlockReader(allSnapshots, allBorSnapshots, allBscSnapshots)
// open blob db
if cc.Parlia != nil {
logger.Warn("Opening blob store", "path", cfg.Dirs.Blobs)
blobDbPath := path.Join(cfg.Dirs.Blobs, "blob")
blobDb := kv2.NewMDBX(log.New()).Path(blobDbPath).MustOpen()
blobStore := blob_storage.NewBlobStore(blobDb, afero.NewBasePathFs(afero.NewOsFs(), cfg.Dirs.Blobs), math.MaxUint64, cc)
blockReader.WithSidecars(blobStore)
}

db, err = temporal.New(rwKv, agg)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion cmd/rpcdaemon/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func main() {
defer db.Close()
defer engine.Close()

// ToDo @blxdyx support query blob data in Rpcdaemon
apiList := jsonrpc.APIList(db, backend, txPool, mining, ff, stateCache, blockReader, cfg, engine, logger, nil)
rpc.PreAllocateRPCMetricLabels(apiList)
if err := cli.StartRpcServer(ctx, cfg, apiList, logger); err != nil {
Expand Down
6 changes: 3 additions & 3 deletions erigon-lib/kv/mdbx/kv_abstract_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,7 @@ func TestRemoteKvVersion(t *testing.T) {
conn := bufconn.Listen(1024 * 1024)
grpcServer := grpc.NewServer()
go func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
}
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestRemoteKvRange(t *testing.T) {
ctx, writeDB := context.Background(), memdb.NewTestDB(t)
grpcServer, conn := grpc.NewServer(), bufconn.Listen(1024*1024)
go func() {
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, logger)
kvServer := remotedbserver.NewKvServer(ctx, writeDB, nil, nil, nil, nil, logger)
remote.RegisterKVServer(grpcServer, kvServer)
if err := grpcServer.Serve(conn); err != nil {
log.Error("private RPC server fail", "err", err)
Expand Down Expand Up @@ -345,7 +345,7 @@ func setupDatabases(t *testing.T, logger log.Logger, f mdbx.TableCfgFunc) (write

grpcServer := grpc.NewServer()
f2 := func() {
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, logger))
remote.RegisterKVServer(grpcServer, remotedbserver.NewKvServer(ctx, writeDBs[1], nil, nil, nil, nil, logger))
if err := grpcServer.Serve(conn); err != nil {
logger.Error("private RPC server fail", "err", err)
}
Expand Down
8 changes: 7 additions & 1 deletion erigon-lib/kv/remotedbserver/remotedbserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@ type KvServer struct {
stateChangeStreams *StateChangePubSub
blockSnapshots Snapshots
borSnapshots Snapshots
bscSnapshots Snapshots
historySnapshots Snapshots
ctx context.Context

Expand All @@ -95,7 +96,7 @@ type Snapshots interface {
Files() []string
}

func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapshots Snapshots, bscSnapshots Snapshots, historySnapshots Snapshots, logger log.Logger) *KvServer {
return &KvServer{
trace: false,
rangeStep: 1024,
Expand All @@ -104,6 +105,7 @@ func NewKvServer(ctx context.Context, db kv.RoDB, snapshots Snapshots, borSnapsh
ctx: ctx,
blockSnapshots: snapshots,
borSnapshots: borSnapshots,
bscSnapshots: bscSnapshots,
historySnapshots: historySnapshots,
txs: map[uint64]*threadSafeTx{},
txsMapLock: &sync.RWMutex{},
Expand Down Expand Up @@ -466,6 +468,10 @@ func (s *KvServer) Snapshots(_ context.Context, _ *remote.SnapshotsRequest) (rep
blockFiles = append(blockFiles, s.borSnapshots.Files()...)
}

if s.bscSnapshots != nil && !reflect.ValueOf(s.bscSnapshots).IsNil() { // nolint
blockFiles = append(blockFiles, s.bscSnapshots.Files()...)
}

reply = &remote.SnapshotsReply{BlocksFiles: blockFiles}
if s.historySnapshots != nil && !reflect.ValueOf(s.historySnapshots).IsNil() { // nolint
reply.HistoryFiles = s.historySnapshots.Files()
Expand Down
25 changes: 21 additions & 4 deletions erigon-lib/kv/remotedbserver/remotedbserver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ func TestKvServer_renew(t *testing.T) {
return nil
}))

s := NewKvServer(ctx, db, nil, nil, nil, log.New())
s := NewKvServer(ctx, db, nil, nil, nil, nil, log.New())
g, ctx := errgroup.WithContext(ctx)
testCase := func() error {
id, err := s.begin(ctx)
Expand Down Expand Up @@ -107,7 +107,7 @@ func TestKVServerSnapshotsReturnsSnapshots(t *testing.T) {
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, nil, historySnapshots, log.New())
s := NewKvServer(ctx, nil, blockSnapshots, nil, nil, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg"}, reply.BlocksFiles)
Expand All @@ -124,7 +124,7 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, historySnapshots, log.New())
s := NewKvServer(ctx, nil, blockSnapshots, borSnapshots, nil, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg", "borevents.seg", "borspans.seg"}, reply.BlocksFiles)
Expand All @@ -133,9 +133,26 @@ func TestKVServerSnapshotsReturnsBorSnapshots(t *testing.T) {

func TestKVServerSnapshotsReturnsEmptyIfNoBlockSnapshots(t *testing.T) {
ctx := context.Background()
s := NewKvServer(ctx, nil, nil, nil, nil, log.New())
s := NewKvServer(ctx, nil, nil, nil, nil, nil, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Empty(t, reply.BlocksFiles)
require.Empty(t, reply.HistoryFiles)
}

func TestKVServerSnapshotsReturnsBscSnapshots(t *testing.T) {
ctx := context.Background()
ctrl := gomock.NewController(t)
blockSnapshots := NewMockSnapshots(ctrl)
blockSnapshots.EXPECT().Files().Return([]string{"headers.seg", "bodies.seg"}).Times(1)
bscSnapshots := NewMockSnapshots(ctrl)
bscSnapshots.EXPECT().Files().Return([]string{"bscblobsidecars.seg"}).Times(1)
historySnapshots := NewMockSnapshots(ctrl)
historySnapshots.EXPECT().Files().Return([]string{"history"}).Times(1)

s := NewKvServer(ctx, nil, blockSnapshots, nil, bscSnapshots, historySnapshots, log.New())
reply, err := s.Snapshots(ctx, nil)
require.NoError(t, err)
require.Equal(t, []string{"headers.seg", "bodies.seg", "bscblobsidecars.seg"}, reply.BlocksFiles)
require.Equal(t, []string{"history"}, reply.HistoryFiles)
}
4 changes: 2 additions & 2 deletions eth/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger

// Check if we have an already initialized chain and fall back to
// that if so. Otherwise we need to generate a new genesis spec.
blockReader, blockWriter, allSnapshots, allBorSnapshots, _, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger)
blockReader, blockWriter, allSnapshots, allBorSnapshots, allBscSnapshots, agg, err := setUpBlockReader(ctx, chainKv, config.Dirs, config, chainConfig.Bor != nil, chainConfig.Parlia != nil, logger)
if err != nil {
return nil, err
}
Expand All @@ -362,7 +362,7 @@ func New(ctx context.Context, stack *node.Node, config *ethconfig.Config, logger
}
}

kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, agg, logger)
kvRPC := remotedbserver.NewKvServer(ctx, backend.chainDB, allSnapshots, allBorSnapshots, allBscSnapshots, agg, logger)
backend.notifications.StateChangesConsumer = kvRPC
backend.kvRPC = kvRPC

Expand Down

0 comments on commit daccea7

Please sign in to comment.