Skip to content

Commit

Permalink
Merge pull request #38 from probe-lab/feat/add-block-reqresp
Browse files Browse the repository at this point in the history
Add BlockByRangeV2 request
  • Loading branch information
cortze authored Oct 4, 2024
2 parents f3c2886 + 6da80a5 commit 3623e70
Show file tree
Hide file tree
Showing 7 changed files with 410 additions and 19 deletions.
30 changes: 28 additions & 2 deletions eth/genesis.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"fmt"
"time"

"github.com/prysmaticlabs/prysm/v5/beacon-chain/core/signing"
"github.com/prysmaticlabs/prysm/v5/config/params"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
)
Expand All @@ -24,7 +25,7 @@ var (
CapellaForkVersion ForkVersion
DenebForkVersion ForkVersion

currentBeaconConfig = params.MainnetConfig() // init with Mainnet (we would override if needed)
globalBeaconConfig = params.MainnetConfig() // init with Mainnet (we would override if needed)
)

// configure global ForkVersion variables
Expand All @@ -34,7 +35,7 @@ func initNetworkForkVersions(beaconConfig *params.BeaconChainConfig) {
BellatrixForkVersion = ForkVersion(beaconConfig.BellatrixForkVersion)
CapellaForkVersion = ForkVersion(beaconConfig.CapellaForkVersion)
DenebForkVersion = ForkVersion(beaconConfig.DenebForkVersion)
currentBeaconConfig = beaconConfig
globalBeaconConfig = beaconConfig
}

// GenesisConfig represents the Genesis configuration with the Merkle Root
Expand Down Expand Up @@ -92,3 +93,28 @@ func GetCurrentForkVersion(epoch primitives.Epoch, beaconConfg *params.BeaconCha
return [4]byte{}, fmt.Errorf("not recognized case for epoch %d", epoch)
}
}

func GetForkVersionFromForkDigest(forkD [4]byte) (forkV ForkVersion, err error) {
genesisRoot := GenesisConfigs[globalBeaconConfig.ConfigName].GenesisValidatorRoot
phase0D, _ := signing.ComputeForkDigest(Phase0ForkVersion[:], genesisRoot)
altairD, _ := signing.ComputeForkDigest(AltairForkVersion[:], genesisRoot)
bellatrixD, _ := signing.ComputeForkDigest(BellatrixForkVersion[:], genesisRoot)
capellaD, _ := signing.ComputeForkDigest(CapellaForkVersion[:], genesisRoot)
denebD, _ := signing.ComputeForkDigest(DenebForkVersion[:], genesisRoot)
switch forkD {
case phase0D:
forkV = Phase0ForkVersion
case altairD:
forkV = AltairForkVersion
case bellatrixD:
forkV = BellatrixForkVersion
case capellaD:
forkV = CapellaForkVersion
case denebD:
forkV = DenebForkVersion
default:
forkV = ForkVersion{}
err = fmt.Errorf("not recognized fork_version for (%s)", hex.EncodeToString([]byte(forkD[:])))
}
return forkV, err
}
6 changes: 3 additions & 3 deletions eth/node_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -435,13 +435,13 @@ func topicFormatFromBase(topicBase string) (string, error) {
func hasSubnets(topic string) (subnets uint64, hasSubnets bool) {
switch topic {
case p2p.GossipAttestationMessage:
return currentBeaconConfig.AttestationSubnetCount, true
return globalBeaconConfig.AttestationSubnetCount, true

case p2p.GossipSyncCommitteeMessage:
return currentBeaconConfig.SyncCommitteeSubnetCount, true
return globalBeaconConfig.SyncCommitteeSubnetCount, true

case p2p.GossipBlobSidecarMessage:
return currentBeaconConfig.BlobsidecarSubnetCount, true
return globalBeaconConfig.BlobsidecarSubnetCount, true

default:
return uint64(0), false
Expand Down
220 changes: 219 additions & 1 deletion eth/reqresp.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/hex"
"errors"
"fmt"
"io"
"log/slog"
Expand All @@ -14,6 +15,7 @@ import (
"time"

ssz "github.com/ferranbt/fastssz"
"github.com/libp2p/go-libp2p/core"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -22,6 +24,9 @@ import (
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/encoder"
"github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types"
psync "github.com/prysmaticlabs/prysm/v5/beacon-chain/sync"
"github.com/prysmaticlabs/prysm/v5/consensus-types/blocks"
"github.com/prysmaticlabs/prysm/v5/consensus-types/interfaces"
"github.com/prysmaticlabs/prysm/v5/consensus-types/primitives"
pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1"
"go.opentelemetry.io/otel/attribute"
Expand Down Expand Up @@ -552,8 +557,14 @@ func (r *ReqResp) delegateStream(ctx context.Context, upstream network.Stream) e

func (r *ReqResp) Status(ctx context.Context, pid peer.ID) (status *pb.Status, err error) {
defer func() {
av, err := r.host.Peerstore().Get(pid, "AgentVersion")
if err != nil {
av = "unknown"
}

reqData := map[string]any{
"PeerID": pid.String(),
"AgentVersion": av,
"PeerID": pid.String(),
}
if status != nil {
reqData["ForkDigest"] = hex.EncodeToString(status.ForkDigest)
Expand Down Expand Up @@ -723,6 +734,90 @@ func (r *ReqResp) MetaData(ctx context.Context, pid peer.ID) (resp *pb.MetaDataV
return resp, nil
}

func (r *ReqResp) BlocksByRangeV2(ctx context.Context, pid peer.ID, firstSlot, lastSlot uint64) ([]interfaces.ReadOnlySignedBeaconBlock, error) {
var err error
blocks := make([]interfaces.ReadOnlySignedBeaconBlock, 0, (lastSlot - firstSlot))

startT := time.Now()

defer func() {
reqData := map[string]any{
"PeerID": pid.String(),
}

if blocks != nil {
reqData["RequestedBlocks"] = lastSlot - firstSlot
reqData["ReceivedBlocks"] = len(blocks)
reqData["Duration"] = time.Since(startT)
}

if err != nil {
reqData["Error"] = err.Error()
}

traceEvt := &hermeshost.TraceEvent{
Type: "REQUEST_BLOCKS_BY_RANGE",
PeerID: r.host.ID(),
Timestamp: time.Now(),
Payload: reqData,
}
traceCtx := context.Background()
if err := r.cfg.DataStream.PutRecord(traceCtx, traceEvt); err != nil {
slog.Warn("failed to put record", tele.LogAttrError(err))
}

attrs := []attribute.KeyValue{
attribute.String("rpc", "block_by_range"),
attribute.Bool("success", err == nil),
}
r.meterRequestCounter.Add(traceCtx, 1, metric.WithAttributes(attrs...))
}()

slog.Debug("Perform blocks_by_range request", tele.LogAttrPeerID(pid))
stream, err := r.host.NewStream(ctx, pid, r.protocolID(p2p.RPCBlocksByRangeTopicV2))
if err != nil {
return blocks, fmt.Errorf("new %s stream to peer %s: %w", p2p.RPCMetaDataTopicV2, pid, err)
}
defer stream.Close()
defer logDeferErr(stream.Reset, "failed closing stream") // no-op if closed

req := &pb.BeaconBlocksByRangeRequest{
StartSlot: primitives.Slot(firstSlot),
Count: (lastSlot - firstSlot),
Step: 1,
}
if err := r.writeRequest(ctx, stream, req); err != nil {
return blocks, fmt.Errorf("write block_by_range request: %w", err)
}

// read and decode status response
process := func(blk interfaces.ReadOnlySignedBeaconBlock) error {
blocks = append(blocks, blk)
slog.Info(
"got signed_beacon_block",
slog.Attr{Key: "block_number", Value: slog.AnyValue(blk.Block().Slot())},
slog.Attr{Key: "from", Value: slog.AnyValue(pid.String())},
)
return nil
}

for i := uint64(0); ; i++ {
isFirstChunk := i == 0
blk, err := r.readChunkedBlock(stream, &encoder.SszNetworkEncoder{}, isFirstChunk)
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return nil, fmt.Errorf("reading block_by_range request: %w", err)
}
if err := process(blk); err != nil {
return nil, fmt.Errorf("processing block_by_range chunk: %w", err)
}
}

return blocks, nil
}

// readRequest reads a request from the given network stream and populates the
// data parameter with the decoded request. It also sets a read deadline on the
// stream and returns an error if it fails to do so. After reading the request,
Expand Down Expand Up @@ -855,3 +950,126 @@ func (r *ReqResp) writeResponse(ctx context.Context, stream network.Stream, data

return nil
}

// ReadChunkedBlock handles each response chunk that is sent by the
// peer and converts it into a beacon block.
// Adaptation from Prysm's -> https://github.com/prysmaticlabs/prysm/blob/2e29164582c3665cdf5a472cd4ec9838655c9754/beacon-chain/sync/rpc_chunked_response.go#L85
func (r *ReqResp) readChunkedBlock(stream core.Stream, encoding encoder.NetworkEncoding, isFirstChunk bool) (interfaces.ReadOnlySignedBeaconBlock, error) {
// Handle deadlines differently for first chunk
if isFirstChunk {
return r.readFirstChunkedBlock(stream, encoding)
}
return r.readResponseChunk(stream, encoding)
}

// readFirstChunkedBlock reads the first chunked block and applies the appropriate deadlines to it.
func (r *ReqResp) readFirstChunkedBlock(stream core.Stream, encoding encoder.NetworkEncoding) (interfaces.ReadOnlySignedBeaconBlock, error) {
// read status
code, errMsg, err := psync.ReadStatusCode(stream, encoding)
if err != nil {
return nil, err
}
if code != 0 {
return nil, fmt.Errorf(errMsg)
}
// set deadline for reading from stream
if err = stream.SetWriteDeadline(time.Now().Add(r.cfg.WriteTimeout)); err != nil {
return nil, fmt.Errorf("failed setting write deadline on stream: %w", err)
}
// get fork version and block type
forkD, err := r.readForkDigestFromStream(stream)
if err != nil {
return nil, err
}
forkV, err := GetForkVersionFromForkDigest(forkD)
if err != nil {
return nil, err
}
return r.getBlockForForkVersion(forkV, encoding, stream)
}

// readResponseChunk reads the response from the stream and decodes it into the
// provided message type.
func (r *ReqResp) readResponseChunk(stream core.Stream, encoding encoder.NetworkEncoding) (interfaces.ReadOnlySignedBeaconBlock, error) {
if err := stream.SetWriteDeadline(time.Now().Add(r.cfg.WriteTimeout)); err != nil {
return nil, fmt.Errorf("failed setting write deadline on stream: %w", err)
}
code, errMsg, err := psync.ReadStatusCode(stream, encoding)
if err != nil {
return nil, err
}
if code != 0 {
return nil, fmt.Errorf(errMsg)
}
// No-op for now with the rpc context.
forkD, err := r.readForkDigestFromStream(stream)
if err != nil {
return nil, err
}
forkV, err := GetForkVersionFromForkDigest(forkD)
if err != nil {
return nil, err
}

return r.getBlockForForkVersion(forkV, encoding, stream)
}

// readForkDigestFromStream reads any attached context-bytes to the payload.
func (r *ReqResp) readForkDigestFromStream(stream network.Stream) (forkD [4]byte, err error) {
// Read context (fork-digest) from stream (assumes it has it)
b := make([]byte, 4)
if _, err = stream.Read(b); err != nil {
return ForkVersion{}, err
}
copy(forkD[:], b)
return forkD, nil
}

// getBlockForForkVersion returns an ReadOnlySignedBeaconBlock interface from the block type of each ForkVersion
func (r *ReqResp) getBlockForForkVersion(forkV ForkVersion, encoding encoder.NetworkEncoding, stream network.Stream) (sblk interfaces.ReadOnlySignedBeaconBlock, err error) {
switch forkV {
case Phase0ForkVersion:
blk := &pb.SignedBeaconBlock{}
err = encoding.DecodeWithMaxLength(stream, blk)
if err != nil {
return sblk, err
}
return blocks.NewSignedBeaconBlock(blk)

case AltairForkVersion:
blk := &pb.SignedBeaconBlockAltair{}
err = encoding.DecodeWithMaxLength(stream, blk)
if err != nil {
return sblk, err
}
return blocks.NewSignedBeaconBlock(blk)

case BellatrixForkVersion:
blk := &pb.SignedBeaconBlockBellatrix{}
err = encoding.DecodeWithMaxLength(stream, blk)
if err != nil {
return sblk, err
}
return blocks.NewSignedBeaconBlock(blk)

case CapellaForkVersion:
blk := &pb.SignedBeaconBlockCapella{}
err = encoding.DecodeWithMaxLength(stream, blk)
if err != nil {
return sblk, err
}
return blocks.NewSignedBeaconBlock(blk)

case DenebForkVersion:
blk := &pb.SignedBeaconBlockDeneb{}
err = encoding.DecodeWithMaxLength(stream, blk)
if err != nil {
return sblk, err
}
return blocks.NewSignedBeaconBlock(blk)

default:
sblk, _ := blocks.NewSignedBeaconBlock(&pb.SignedBeaconBlock{})
return sblk, fmt.Errorf("unrecognized fork_version (received:%s) (ours: %s) (global: %s)", forkV, r.cfg.ForkDigest, DenebForkVersion)
}
}
Loading

0 comments on commit 3623e70

Please sign in to comment.