diff --git a/beacon-chain/blockchain/process_block.go b/beacon-chain/blockchain/process_block.go index 8ed7d329531e..06a609e91389 100644 --- a/beacon-chain/blockchain/process_block.go +++ b/beacon-chain/blockchain/process_block.go @@ -233,7 +233,9 @@ func (s *Service) onBlockBatch(ctx context.Context, blks []consensusblocks.ROBlo return err } } - if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), b); err != nil { + + nodeID := s.cfg.P2P.NodeID() + if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), b); err != nil { return errors.Wrapf(err, "could not validate blob data availability at slot %d", b.Block().Slot()) } args := &forkchoicetypes.BlockAndCheckpoints{Block: b.Block(), diff --git a/beacon-chain/blockchain/receive_block.go b/beacon-chain/blockchain/receive_block.go index 0e64e320287a..ac86a427c38e 100644 --- a/beacon-chain/blockchain/receive_block.go +++ b/beacon-chain/blockchain/receive_block.go @@ -238,7 +238,9 @@ func (s *Service) handleDA( if err != nil { return 0, err } - if err := avs.IsDataAvailable(ctx, s.CurrentSlot(), rob); err != nil { + + nodeID := s.cfg.P2P.NodeID() + if err := avs.IsDataAvailable(ctx, nodeID, s.CurrentSlot(), rob); err != nil { return 0, errors.Wrap(err, "could not validate blob data availability (AvailabilityStore.IsDataAvailable)") } } else { diff --git a/beacon-chain/blockchain/service_test.go b/beacon-chain/blockchain/service_test.go index a39cd265b7d4..69f00c3b8e75 100644 --- a/beacon-chain/blockchain/service_test.go +++ b/beacon-chain/blockchain/service_test.go @@ -104,6 +104,7 @@ func setupBeaconChain(t *testing.T, beaconDB db.Database) *Service { WithStateGen(stateGen), WithPayloadIDCache(cache.NewPayloadIDCache()), WithClockSynchronizer(startup.NewClockSynchronizer()), + WithP2PBroadcaster(&mockAccesser{}), } chainService, err := NewService(ctx, opts...) diff --git a/beacon-chain/blockchain/setup_test.go b/beacon-chain/blockchain/setup_test.go index 65af044040ba..d9d70a0bda85 100644 --- a/beacon-chain/blockchain/setup_test.go +++ b/beacon-chain/blockchain/setup_test.go @@ -134,6 +134,7 @@ func minimalTestService(t *testing.T, opts ...Option) (*Service, *testServiceReq WithBlobStorage(filesystem.NewEphemeralBlobStorage(t)), WithSyncChecker(mock.MockChecker{}), WithExecutionEngineCaller(&mockExecution.EngineClient{}), + WithP2PBroadcaster(&mockAccesser{}), } // append the variadic opts so they override the defaults by being processed afterwards opts = append(defOpts, opts...) diff --git a/beacon-chain/core/peerdas/helpers.go b/beacon-chain/core/peerdas/helpers.go index 87ed83fb77ff..3dddd20b92fb 100644 --- a/beacon-chain/core/peerdas/helpers.go +++ b/beacon-chain/core/peerdas/helpers.go @@ -94,7 +94,7 @@ func CustodyColumnSubnets(nodeId enode.ID, custodySubnetCount uint64) (map[uint6 func CustodyColumns(nodeId enode.ID, custodySubnetCount uint64) (map[uint64]bool, error) { dataColumnSidecarSubnetCount := params.BeaconConfig().DataColumnSidecarSubnetCount - // Compute the custodied subnets. + // Compute the custody subnets. subnetIds, err := CustodyColumnSubnets(nodeId, custodySubnetCount) if err != nil { return nil, errors.Wrap(err, "custody subnets") @@ -408,17 +408,23 @@ func DataColumnSidecarsForReconstruct( // VerifyDataColumnSidecarKZGProofs verifies the provided KZG Proofs for the particular // data column. func VerifyDataColumnSidecarKZGProofs(sc blocks.RODataColumn) (bool, error) { - if sc.ColumnIndex >= params.BeaconConfig().NumberOfColumns { + numberOfColumns := params.BeaconConfig().NumberOfColumns + + if sc.ColumnIndex >= numberOfColumns { return false, errIndexTooLarge } + if len(sc.DataColumn) != len(sc.KzgCommitments) || len(sc.KzgCommitments) != len(sc.KzgProof) { return false, errMismatchLength } - var commitments []kzg.Bytes48 - var indices []uint64 - var cells []kzg.Cell - var proofs []kzg.Bytes48 + count := len(sc.DataColumn) + + commitments := make([]kzg.Bytes48, 0, count) + indices := make([]uint64, 0, count) + cells := make([]kzg.Cell, 0, count) + proofs := make([]kzg.Bytes48, 0, count) + for i := range sc.DataColumn { commitments = append(commitments, kzg.Bytes48(sc.KzgCommitments[i])) indices = append(indices, sc.ColumnIndex) diff --git a/beacon-chain/das/BUILD.bazel b/beacon-chain/das/BUILD.bazel index 50385ea7341c..8c6797177618 100644 --- a/beacon-chain/das/BUILD.bazel +++ b/beacon-chain/das/BUILD.bazel @@ -12,6 +12,7 @@ go_library( importpath = "github.com/prysmaticlabs/prysm/v5/beacon-chain/das", visibility = ["//visibility:public"], deps = [ + "//beacon-chain/core/peerdas:go_default_library", "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/verification:go_default_library", "//config/fieldparams:go_default_library", @@ -30,6 +31,7 @@ go_library( go_test( name = "go_default_test", srcs = [ + "availability_columns_test.go", "availability_test.go", "cache_test.go", ], @@ -37,6 +39,7 @@ go_test( deps = [ "//beacon-chain/db/filesystem:go_default_library", "//beacon-chain/verification:go_default_library", + "//cmd/beacon-chain/flags:go_default_library", "//config/fieldparams:go_default_library", "//config/params:go_default_library", "//consensus-types/blocks:go_default_library", @@ -45,6 +48,7 @@ go_test( "//testing/require:go_default_library", "//testing/util:go_default_library", "//time/slots:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_pkg_errors//:go_default_library", ], ) diff --git a/beacon-chain/das/availability.go b/beacon-chain/das/availability.go index 7a8a2105838a..bef2d9d3d560 100644 --- a/beacon-chain/das/availability.go +++ b/beacon-chain/das/availability.go @@ -4,6 +4,7 @@ import ( "context" "fmt" + "github.com/ethereum/go-ethereum/p2p/enode" errors "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" @@ -80,7 +81,7 @@ func (s *LazilyPersistentStore) Persist(current primitives.Slot, sc ...blocks.RO // IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified. // BlobSidecars already in the db are assumed to have been previously verified against the block. -func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error { +func (s *LazilyPersistentStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error { blockCommitments, err := commitmentsToCheck(b, current) if err != nil { return errors.Wrapf(err, "could check data availability for block %#x", b.Root()) diff --git a/beacon-chain/das/availability_columns.go b/beacon-chain/das/availability_columns.go index 277650d96811..8383873d4f36 100644 --- a/beacon-chain/das/availability_columns.go +++ b/beacon-chain/das/availability_columns.go @@ -6,6 +6,7 @@ import ( "github.com/ethereum/go-ethereum/p2p/enode" errors "github.com/pkg/errors" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" "github.com/prysmaticlabs/prysm/v5/config/params" @@ -75,39 +76,58 @@ func (s *LazilyPersistentStoreColumn) PersistColumns(current primitives.Slot, sc // IsDataAvailable returns nil if all the commitments in the given block are persisted to the db and have been verified. // BlobSidecars already in the db are assumed to have been previously verified against the block. -func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error { - blockCommitments, err := fullCommitmentsToCheck(b, current) +func (s *LazilyPersistentStoreColumn) IsDataAvailable( + ctx context.Context, + nodeID enode.ID, + currentSlot primitives.Slot, + block blocks.ROBlock, +) error { + blockCommitments, err := fullCommitmentsToCheck(nodeID, block, currentSlot) if err != nil { - return errors.Wrapf(err, "could check data availability for block %#x", b.Root()) + return errors.Wrapf(err, "full commitments to check with block root `%#x` and current slot `%d`", block.Root(), currentSlot) } - // Return early for blocks that are pre-deneb or which do not have any commitments. + + // Return early for blocks that do not have any commitments. if blockCommitments.count() == 0 { return nil } - key := keyFromBlock(b) + // Build the cache key for the block. + key := keyFromBlock(block) + + // Retrieve the cache entry for the block, or create an empty one if it doesn't exist. entry := s.cache.ensure(key) + + // Delete the cache entry for the block at the end. defer s.cache.delete(key) - root := b.Root() - sumz, err := s.store.WaitForSummarizer(ctx) + + // Get the root of the block. + blockRoot := block.Root() + + // Wait for the summarizer to be ready before proceeding. + summarizer, err := s.store.WaitForSummarizer(ctx) if err != nil { - log.WithField("root", fmt.Sprintf("%#x", b.Root())). + log. + WithField("root", fmt.Sprintf("%#x", blockRoot)). WithError(err). Debug("Failed to receive BlobStorageSummarizer within IsDataAvailable") } else { - entry.setDiskSummary(sumz.Summary(root)) + // Get the summary for the block, and set it in the cache entry. + summary := summarizer.Summary(blockRoot) + entry.setDiskSummary(summary) } // Verify we have all the expected sidecars, and fail fast if any are missing or inconsistent. // We don't try to salvage problematic batches because this indicates a misbehaving peer and we'd rather // ignore their response and decrease their peer score. - sidecars, err := entry.filterColumns(root, &blockCommitments) + sidecars, err := entry.filterColumns(blockRoot, blockCommitments) if err != nil { return errors.Wrap(err, "incomplete BlobSidecar batch") } - // Do thorough verifications of each BlobSidecar for the block. - // Same as above, we don't save BlobSidecars if there are any problems with the batch. - vscs, err := s.verifier.VerifiedRODataColumns(ctx, b, sidecars) + + // Do thorough verifications of each RODataColumns for the block. + // Same as above, we don't save DataColumnsSidecars if there are any problems with the batch. + vscs, err := s.verifier.VerifiedRODataColumns(ctx, block, sidecars) if err != nil { var me verification.VerificationMultiError ok := errors.As(err, &me) @@ -120,33 +140,62 @@ func (s *LazilyPersistentStoreColumn) IsDataAvailable(ctx context.Context, curre log.WithFields(lf). Debug("invalid ColumnSidecars received") } - return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", root) + return errors.Wrapf(err, "invalid ColumnSidecars received for block %#x", blockRoot) } + // Ensure that each column sidecar is written to disk. for i := range vscs { if err := s.store.SaveDataColumn(vscs[i]); err != nil { - return errors.Wrapf(err, "failed to save ColumnSidecar index %d for block %#x", vscs[i].ColumnIndex, root) + return errors.Wrapf(err, "save data columns for index `%d` for block `%#x`", vscs[i].ColumnIndex, blockRoot) } } - // All ColumnSidecars are persisted - da check succeeds. + + // All ColumnSidecars are persisted - data availability check succeeds. return nil } -func fullCommitmentsToCheck(b blocks.ROBlock, current primitives.Slot) (safeCommitmentsArray, error) { - var ar safeCommitmentsArray - if b.Version() < version.Deneb { - return ar, nil +// fullCommitmentsToCheck returns the commitments to check for a given block. +func fullCommitmentsToCheck(nodeID enode.ID, block blocks.ROBlock, currentSlot primitives.Slot) (*safeCommitmentsArray, error) { + // Return early for blocks that are pre-deneb. + if block.Version() < version.Deneb { + return &safeCommitmentsArray{}, nil } - // We are only required to check within MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS - if !params.WithinDAPeriod(slots.ToEpoch(b.Block().Slot()), slots.ToEpoch(current)) { - return ar, nil + + // Compute the block epoch. + blockSlot := block.Block().Slot() + blockEpoch := slots.ToEpoch(blockSlot) + + // Compute the current spoch. + currentEpoch := slots.ToEpoch(currentSlot) + + // Return early if the request is out of the MIN_EPOCHS_FOR_DATA_COLUMN_SIDECARS_REQUESTS window. + if !params.WithinDAPeriod(blockEpoch, currentEpoch) { + return &safeCommitmentsArray{}, nil } - kc, err := b.Block().Body().BlobKzgCommitments() + + // Retrieve the KZG commitments for the block. + kzgCommitments, err := block.Block().Body().BlobKzgCommitments() if err != nil { - return ar, err + return nil, errors.Wrap(err, "blob KZG commitments") } - for i := range ar { - copy(ar[i], kc) + + // Return early if there are no commitments in the block. + if len(kzgCommitments) == 0 { + return &safeCommitmentsArray{}, nil } - return ar, nil + + // Retrieve the custody columns. + custodySubnetCount := peerdas.CustodySubnetCount() + custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) + if err != nil { + return nil, errors.Wrap(err, "custody columns") + } + + // Create a safe commitments array for the custody columns. + commitmentsArray := &safeCommitmentsArray{} + for column := range custodyColumns { + commitmentsArray[column] = kzgCommitments + } + + return commitmentsArray, nil } diff --git a/beacon-chain/das/availability_columns_test.go b/beacon-chain/das/availability_columns_test.go new file mode 100644 index 000000000000..0405756d96ae --- /dev/null +++ b/beacon-chain/das/availability_columns_test.go @@ -0,0 +1,94 @@ +package das + +import ( + "testing" + + "github.com/ethereum/go-ethereum/p2p/enode" + "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" + "github.com/prysmaticlabs/prysm/v5/config/params" + "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" + "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" + "github.com/prysmaticlabs/prysm/v5/encoding/bytesutil" + "github.com/prysmaticlabs/prysm/v5/testing/require" + "github.com/prysmaticlabs/prysm/v5/testing/util" + "github.com/prysmaticlabs/prysm/v5/time/slots" +) + +func TestFullCommitmentsToCheck(t *testing.T) { + windowSlots, err := slots.EpochEnd(params.BeaconConfig().MinEpochsForDataColumnSidecarsRequest) + require.NoError(t, err) + commits := [][]byte{ + bytesutil.PadTo([]byte("a"), 48), + bytesutil.PadTo([]byte("b"), 48), + bytesutil.PadTo([]byte("c"), 48), + bytesutil.PadTo([]byte("d"), 48), + } + cases := []struct { + name string + commits [][]byte + block func(*testing.T) blocks.ROBlock + slot primitives.Slot + err error + }{ + { + name: "pre deneb", + block: func(t *testing.T) blocks.ROBlock { + bb := util.NewBeaconBlockBellatrix() + sb, err := blocks.NewSignedBeaconBlock(bb) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + return rb + }, + }, + { + name: "commitments within da", + block: func(t *testing.T) blocks.ROBlock { + d := util.NewBeaconBlockDeneb() + d.Block.Body.BlobKzgCommitments = commits + d.Block.Slot = 100 + sb, err := blocks.NewSignedBeaconBlock(d) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + return rb + }, + commits: commits, + slot: 100, + }, + { + name: "commitments outside da", + block: func(t *testing.T) blocks.ROBlock { + d := util.NewBeaconBlockDeneb() + // block is from slot 0, "current slot" is window size +1 (so outside the window) + d.Block.Body.BlobKzgCommitments = commits + sb, err := blocks.NewSignedBeaconBlock(d) + require.NoError(t, err) + rb, err := blocks.NewROBlock(sb) + require.NoError(t, err) + return rb + }, + slot: windowSlots + 1, + }, + } + for _, c := range cases { + t.Run(c.name, func(t *testing.T) { + resetFlags := flags.Get() + gFlags := new(flags.GlobalFlags) + gFlags.SubscribeToAllSubnets = true + flags.Init(gFlags) + defer flags.Init(resetFlags) + + b := c.block(t) + co, err := fullCommitmentsToCheck(enode.ID{}, b, c.slot) + if c.err != nil { + require.ErrorIs(t, err, c.err) + } else { + require.NoError(t, err) + } + for i := 0; i < len(co); i++ { + require.DeepEqual(t, c.commits, co[i]) + } + }) + } +} diff --git a/beacon-chain/das/availability_test.go b/beacon-chain/das/availability_test.go index e59830feb0ce..770409c84b7e 100644 --- a/beacon-chain/das/availability_test.go +++ b/beacon-chain/das/availability_test.go @@ -5,6 +5,7 @@ import ( "context" "testing" + "github.com/ethereum/go-ethereum/p2p/enode" errors "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db/filesystem" "github.com/prysmaticlabs/prysm/v5/beacon-chain/verification" @@ -124,18 +125,18 @@ func TestLazilyPersistent_Missing(t *testing.T) { // Only one commitment persisted, should return error with other indices require.NoError(t, as.Persist(1, scs[2])) - err := as.IsDataAvailable(ctx, 1, blk) + err := as.IsDataAvailable(ctx, enode.ID{}, 1, blk) require.ErrorIs(t, err, errMissingSidecar) // All but one persisted, return missing idx require.NoError(t, as.Persist(1, scs[0])) - err = as.IsDataAvailable(ctx, 1, blk) + err = as.IsDataAvailable(ctx, enode.ID{}, 1, blk) require.ErrorIs(t, err, errMissingSidecar) // All persisted, return nil require.NoError(t, as.Persist(1, scs...)) - require.NoError(t, as.IsDataAvailable(ctx, 1, blk)) + require.NoError(t, as.IsDataAvailable(ctx, enode.ID{}, 1, blk)) } func TestLazilyPersistent_Mismatch(t *testing.T) { @@ -150,7 +151,7 @@ func TestLazilyPersistent_Mismatch(t *testing.T) { // Only one commitment persisted, should return error with other indices require.NoError(t, as.Persist(1, scs[0])) - err := as.IsDataAvailable(ctx, 1, blk) + err := as.IsDataAvailable(ctx, enode.ID{}, 1, blk) require.NotNil(t, err) require.ErrorIs(t, err, errCommitmentMismatch) } diff --git a/beacon-chain/das/cache.go b/beacon-chain/das/cache.go index dc683b6fc0ec..e18f540d7c0d 100644 --- a/beacon-chain/das/cache.go +++ b/beacon-chain/das/cache.go @@ -134,33 +134,37 @@ func (e *cacheEntry) filter(root [32]byte, kc safeCommitmentArray) ([]blocks.ROB return scs, nil } -func (e *cacheEntry) filterColumns(root [32]byte, kc *safeCommitmentsArray) ([]blocks.RODataColumn, error) { - if e.diskSummary.AllAvailable(kc.count()) { +func (e *cacheEntry) filterColumns(root [32]byte, commitmentsArray *safeCommitmentsArray) ([]blocks.RODataColumn, error) { + nonEmptyIndices := commitmentsArray.nonEmptyIndices() + if e.diskSummary.AllDataColumnsAvailable(nonEmptyIndices) { return nil, nil } - scs := make([]blocks.RODataColumn, 0, kc.count()) + + commitmentsCount := commitmentsArray.count() + sidecars := make([]blocks.RODataColumn, 0, commitmentsCount) + for i := uint64(0); i < fieldparams.NumberOfColumns; i++ { - // We already have this blob, we don't need to write it or validate it. + // Skip if we arleady store this data column. if e.diskSummary.HasIndex(i) { continue } - if kc[i] == nil { - if e.colScs[i] != nil { - return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, no block commitment", root, i, e.scs[i].KzgCommitment) - } + + if commitmentsArray[i] == nil { continue } if e.colScs[i] == nil { return nil, errors.Wrapf(errMissingSidecar, "root=%#x, index=%#x", root, i) } - if !reflect.DeepEqual(kc[i], e.colScs[i].KzgCommitments) { - return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, kc[i]) + + if !reflect.DeepEqual(commitmentsArray[i], e.colScs[i].KzgCommitments) { + return nil, errors.Wrapf(errCommitmentMismatch, "root=%#x, index=%#x, commitment=%#x, block commitment=%#x", root, i, e.colScs[i].KzgCommitments, commitmentsArray[i]) } - scs = append(scs, *e.colScs[i]) + + sidecars = append(sidecars, *e.colScs[i]) } - return scs, nil + return sidecars, nil } // safeCommitmentArray is a fixed size array of commitment byte slices. This is helpful for avoiding @@ -176,13 +180,32 @@ func (s safeCommitmentArray) count() int { return fieldparams.MaxBlobsPerBlock } +// safeCommitmentsArray is a fixed size array of commitments. +// This is helpful for avoiding gratuitous bounds checks. type safeCommitmentsArray [fieldparams.NumberOfColumns][][]byte +// count returns the number of commitments in the array. func (s *safeCommitmentsArray) count() int { + count := 0 + for i := range s { - if s[i] == nil { - return i + if s[i] != nil { + count++ } } - return fieldparams.NumberOfColumns + + return count +} + +// nonEmptyIndices returns a map of indices that are non-nil in the array. +func (s *safeCommitmentsArray) nonEmptyIndices() map[uint64]bool { + columns := make(map[uint64]bool) + + for i := range s { + if s[i] != nil { + columns[uint64(i)] = true + } + } + + return columns } diff --git a/beacon-chain/das/iface.go b/beacon-chain/das/iface.go index 6a0b024a8f96..2e9ed2716845 100644 --- a/beacon-chain/das/iface.go +++ b/beacon-chain/das/iface.go @@ -3,6 +3,7 @@ package das import ( "context" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ) @@ -14,6 +15,6 @@ import ( // IsDataAvailable guarantees that all blobs committed to in the block have been // durably persisted before returning a non-error value. type AvailabilityStore interface { - IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error + IsDataAvailable(ctx context.Context, nodeID enode.ID, current primitives.Slot, b blocks.ROBlock) error Persist(current primitives.Slot, sc ...blocks.ROBlob) error } diff --git a/beacon-chain/das/mock.go b/beacon-chain/das/mock.go index a329570523aa..d930beb1b48a 100644 --- a/beacon-chain/das/mock.go +++ b/beacon-chain/das/mock.go @@ -3,6 +3,7 @@ package das import ( "context" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/prysmaticlabs/prysm/v5/consensus-types/blocks" "github.com/prysmaticlabs/prysm/v5/consensus-types/primitives" ) @@ -16,7 +17,7 @@ type MockAvailabilityStore struct { var _ AvailabilityStore = &MockAvailabilityStore{} // IsDataAvailable satisfies the corresponding method of the AvailabilityStore interface in a way that is useful for tests. -func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, current primitives.Slot, b blocks.ROBlock) error { +func (m *MockAvailabilityStore) IsDataAvailable(ctx context.Context, _ enode.ID, current primitives.Slot, b blocks.ROBlock) error { if m.VerifyAvailabilityCallback != nil { return m.VerifyAvailabilityCallback(ctx, current, b) } diff --git a/beacon-chain/forkchoice/doubly-linked-tree/reorg_late_blocks.go b/beacon-chain/forkchoice/doubly-linked-tree/reorg_late_blocks.go index 8bc717802fae..1d7691ac4923 100644 --- a/beacon-chain/forkchoice/doubly-linked-tree/reorg_late_blocks.go +++ b/beacon-chain/forkchoice/doubly-linked-tree/reorg_late_blocks.go @@ -53,7 +53,7 @@ func (f *ForkChoice) ShouldOverrideFCU() (override bool) { // Only reorg blocks that arrive late early, err := head.arrivedEarly(f.store.genesisTime) if err != nil { - log.WithError(err).Error("could not check if block arrived early") + log.WithError(err).Error("Could not check if block arrived early") return } if early { diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_eth1data.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_eth1data.go index 92eb2960dae9..ee4d0de60851 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_eth1data.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_eth1data.go @@ -54,7 +54,7 @@ func (vs *Server) eth1DataMajorityVote(ctx context.Context, beaconState state.Be // by ETH1_FOLLOW_DISTANCE. The head state should maintain the same ETH1Data until this condition has passed, so // trust the existing head for the right eth1 vote until we can get a meaningful value from the deposit contract. if latestValidTime < genesisTime+followDistanceSeconds { - log.WithField("genesisTime", genesisTime).WithField("latestValidTime", latestValidTime).Warn("voting period before genesis + follow distance, using eth1data from head") + log.WithField("genesisTime", genesisTime).WithField("latestValidTime", latestValidTime).Warn("Voting period before genesis + follow distance, using eth1data from head") return vs.HeadFetcher.HeadETH1Data(), nil } diff --git a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go index c928a31e955c..00d98853fd2d 100644 --- a/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go +++ b/beacon-chain/rpc/prysm/v1alpha1/validator/proposer_execution_payload.go @@ -101,7 +101,7 @@ func (vs *Server) getLocalPayloadFromEngine( return nil, errors.Wrap(err, "could not get cached payload from execution client") } } - log.WithFields(logFields).Debug("payload ID cache miss") + log.WithFields(logFields).Debug("Payload ID cache miss") parentHash, err := vs.getParentBlockHash(ctx, st, slot) switch { case errors.Is(err, errActivationNotReached) || errors.Is(err, errNoTerminalBlockHash): @@ -190,7 +190,7 @@ func (vs *Server) getLocalPayloadFromEngine( } warnIfFeeRecipientDiffers(val.FeeRecipient[:], res.ExecutionData.FeeRecipient()) - log.WithField("value", res.Bid).Debug("received execution payload from local engine") + log.WithField("value", res.Bid).Debug("Received execution payload from local engine") return res, nil } diff --git a/beacon-chain/sync/backfill/BUILD.bazel b/beacon-chain/sync/backfill/BUILD.bazel index 90e7dbbd44a3..048eb470b946 100644 --- a/beacon-chain/sync/backfill/BUILD.bazel +++ b/beacon-chain/sync/backfill/BUILD.bazel @@ -41,6 +41,7 @@ go_library( "//runtime:go_default_library", "//runtime/version:go_default_library", "//time/slots:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_pkg_errors//:go_default_library", "@com_github_prometheus_client_golang//prometheus:go_default_library", diff --git a/beacon-chain/sync/backfill/status.go b/beacon-chain/sync/backfill/status.go index 99de1a06b8b9..10178b38a79f 100644 --- a/beacon-chain/sync/backfill/status.go +++ b/beacon-chain/sync/backfill/status.go @@ -4,6 +4,7 @@ import ( "context" "sync" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/pkg/errors" "github.com/prysmaticlabs/prysm/v5/beacon-chain/das" "github.com/prysmaticlabs/prysm/v5/beacon-chain/db" @@ -88,8 +89,11 @@ func (s *Store) fillBack(ctx context.Context, current primitives.Slot, blocks [] status.LowParentRoot, highest.Root(), status.LowSlot, highest.Block().Slot()) } + // TODO: Use the real node ID when backfill is implemented for data columns. + emptyNodeID := enode.ID{} + for i := range blocks { - if err := store.IsDataAvailable(ctx, current, blocks[i]); err != nil { + if err := store.IsDataAvailable(ctx, emptyNodeID, current, blocks[i]); err != nil { return nil, err } } diff --git a/beacon-chain/sync/initial-sync/BUILD.bazel b/beacon-chain/sync/initial-sync/BUILD.bazel index 6e53a634f043..da6ec0c57ae3 100644 --- a/beacon-chain/sync/initial-sync/BUILD.bazel +++ b/beacon-chain/sync/initial-sync/BUILD.bazel @@ -48,6 +48,7 @@ go_library( "//runtime/version:go_default_library", "//time:go_default_library", "//time/slots:go_default_library", + "@com_github_ethereum_go_ethereum//p2p/enode:go_default_library", "@com_github_libp2p_go_libp2p//core/peer:go_default_library", "@com_github_paulbellamy_ratecounter//:go_default_library", "@com_github_pkg_errors//:go_default_library", diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher.go b/beacon-chain/sync/initial-sync/blocks_fetcher.go index 66bfda83e429..98aec160a177 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher.go @@ -705,11 +705,11 @@ func (f *blocksFetcher) blocksWithMissingDataColumnsBoundaries( } // custodyAllNeededColumns filter `inputPeers` that custody all columns in `columns`. -func (f *blocksFetcher) custodyAllNeededColumns(inputPeers []peer.ID, columns map[uint64]bool) ([]peer.ID, error) { - outputPeers := make([]peer.ID, 0, len(inputPeers)) +func (f *blocksFetcher) custodyAllNeededColumns(inputPeers map[peer.ID]bool, columns map[uint64]bool) (map[peer.ID]bool, error) { + outputPeers := make(map[peer.ID]bool, len(inputPeers)) loop: - for _, peer := range inputPeers { + for peer := range inputPeers { // Get the node ID from the peer ID. nodeID, err := p2p.ConvertPeerIDToNodeID(peer) if err != nil { @@ -731,7 +731,7 @@ loop: } } - outputPeers = append(outputPeers, peer) + outputPeers[peer] = true } return outputPeers, nil @@ -842,11 +842,16 @@ func maxInt(slice []int) int { func (f *blocksFetcher) requestDataColumnsFromPeers( ctx context.Context, request *p2ppb.DataColumnSidecarsByRangeRequest, - peers []peer.ID, + peers map[peer.ID]bool, ) ([]blocks.RODataColumn, peer.ID, error) { + peersSlice := make([]peer.ID, 0, len(peers)) + for peer := range peers { + peersSlice = append(peersSlice, peer) + } + // Shuffle peers to avoid always querying the same peers - f.rand.Shuffle(len(peers), func(i, j int) { - peers[i], peers[j] = peers[j], peers[i] + f.rand.Shuffle(len(peersSlice), func(i, j int) { + peersSlice[i], peersSlice[j] = peersSlice[j], peersSlice[i] }) var columnsLog interface{} = "all" @@ -863,7 +868,7 @@ func (f *blocksFetcher) requestDataColumnsFromPeers( "items": request.Count * columnsCount, }) - for _, peer := range peers { + for _, peer := range peersSlice { log := log.WithField("peer", peer) if ctx.Err() != nil { @@ -1071,7 +1076,7 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( } // Filter peers. - filteredPeers, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns) + filteredPeers, descriptions, err := f.peersWithSlotAndDataColumns(peersToFilter, lastSlot, missingDataColumns) if err != nil { return errors.Wrap(err, "peers with slot and data columns") } @@ -1081,11 +1086,16 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( WithFields(logrus.Fields{ "peers": peersToFilter, "filteredPeers": filteredPeers, - "delay": delay, + "waitDuration": delay, "targetSlot": lastSlot, }). Warning("No peers available to retrieve missing data columns, retrying later") + // If no peers are available, log the descriptions to help debugging. + for _, description := range descriptions { + log.Debug(description) + } + time.Sleep(delay) continue } @@ -1112,7 +1122,7 @@ func (f *blocksFetcher) retrieveMissingDataColumnsFromPeers( if len(roDataColumns) == 0 { log. WithFields(logrus.Fields{ - "peers": peers, + "peers": peersToFilter, "filteredPeers": filteredPeers, "delay": delay, "start": startSlot, diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go index f76cfd0e2a51..7e0622755f85 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_test.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_test.go @@ -1370,25 +1370,36 @@ func TestCustodyAllNeededColumns(t *testing.T) { 4 * params.BeaconConfig().CustodyRequirement, 32 * params.BeaconConfig().CustodyRequirement, 4 * params.BeaconConfig().CustodyRequirement, - 32 * params.BeaconConfig().CustodyRequirement} + 32 * params.BeaconConfig().CustodyRequirement, + } - peersID := make([]peer.ID, 0, len(custodyCounts)) + expected := make(map[peer.ID]bool) + + peersID := make(map[peer.ID]bool, len(custodyCounts)) for _, custodyCount := range custodyCounts { peerRecord, peerID := createPeer(t, len(peersID), custodyCount) - peersID = append(peersID, peerID) + peersID[peerID] = true p2p.Peers().Add(peerRecord, peerID, nil, network.DirOutbound) + if custodyCount == 32*params.BeaconConfig().CustodyRequirement { + expected[peerID] = true + } } - expected := []peer.ID{peersID[1], peersID[3]} - - blocksFetcher := newBlocksFetcher(context.Background(), &blocksFetcherConfig{ - p2p: p2p, - }) + blocksFetcher := newBlocksFetcher( + context.Background(), + &blocksFetcherConfig{ + p2p: p2p, + }, + ) actual, err := blocksFetcher.custodyAllNeededColumns(peersID, dataColumns) require.NoError(t, err) - require.DeepSSZEqual(t, expected, actual) + require.Equal(t, len(expected), len(actual)) + for peerID := range expected { + _, ok := actual[peerID] + require.Equal(t, true, ok) + } } func TestCustodyColumns(t *testing.T) { diff --git a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go index 212aacd02303..88689398be89 100644 --- a/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go +++ b/beacon-chain/sync/initial-sync/blocks_fetcher_utils.go @@ -374,7 +374,7 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( peers []peer.ID, targetSlot primitives.Slot, dataColumns map[uint64]bool, -) ([]peer.ID, error) { +) (map[peer.ID]bool, []string, error) { peersCount := len(peers) // TODO: Uncomment when we are not in devnet any more. @@ -390,23 +390,58 @@ func (f *blocksFetcher) peersWithSlotAndDataColumns( // TODO: Modify to retrieve data columns from all possible peers. // TODO: If a peer does respond some of the request columns, do not re-request responded columns. - peersWithAdmissibleHeadSlot := make([]peer.ID, 0, peersCount) + // Compute the target epoch from the target slot. + targetEpoch := slots.ToEpoch(targetSlot) - // Filter out peers with head slot lower than the target slot. + peersWithAdmissibleHeadEpoch := make(map[peer.ID]bool, peersCount) + descriptions := make([]string, 0, peersCount) + + // Filter out peers with head epoch lower than our target epoch. + // Technically, we should be able to use the head slot from the peer. + // However, our vision of the head slot of the peer is updated twice per epoch + // via P2P messages. So it is likely that we think the peer is lagging behind + // while it is actually not. + // ==> We use the head epoch as a proxy instead. + // However, if the peer is actually lagging for a few slots, + // we may requests some data columns it doesn't have yet. for _, peer := range peers { peerChainState, err := f.p2p.Peers().ChainState(peer) - if err != nil || peerChainState == nil || peerChainState.HeadSlot < targetSlot { + + if err != nil { + description := fmt.Sprintf("peer %s: error: %s", peer, err) + descriptions = append(descriptions, description) continue } - peersWithAdmissibleHeadSlot = append(peersWithAdmissibleHeadSlot, peer) + if peerChainState == nil { + description := fmt.Sprintf("peer %s: chain state is nil", peer) + descriptions = append(descriptions, description) + continue + } + + peerHeadEpoch := slots.ToEpoch(peerChainState.HeadSlot) + + if peerHeadEpoch < targetEpoch { + description := fmt.Sprintf("peer %s: head epoch %d < target epoch %d", peer, peerHeadEpoch, targetEpoch) + descriptions = append(descriptions, description) + continue + } + + peersWithAdmissibleHeadEpoch[peer] = true } // Filter out peers that do not have all the data columns needed. - finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadSlot, dataColumns) + finalPeers, err := f.custodyAllNeededColumns(peersWithAdmissibleHeadEpoch, dataColumns) if err != nil { - return nil, errors.Wrap(err, "custody all needed columns") + return nil, nil, errors.Wrap(err, "custody all needed columns") + } + + for peer := range peersWithAdmissibleHeadEpoch { + if _, ok := finalPeers[peer]; !ok { + description := fmt.Sprintf("peer %s: does not custody all needed columns", peer) + descriptions = append(descriptions, description) + } } - return finalPeers, nil + return finalPeers, descriptions, nil } diff --git a/beacon-chain/sync/initial-sync/round_robin.go b/beacon-chain/sync/initial-sync/round_robin.go index 438cfce31de9..1b81db9a3e12 100644 --- a/beacon-chain/sync/initial-sync/round_robin.go +++ b/beacon-chain/sync/initial-sync/round_robin.go @@ -234,12 +234,18 @@ func syncFields(b blocks.ROBlock) logrus.Fields { } // highestFinalizedEpoch returns the absolute highest finalized epoch of all connected peers. -// Note this can be lower than our finalized epoch if we have no peers or peers that are all behind us. +// It returns `0` if no peers are connected. +// Note this can be lower than our finalized epoch if our connected peers are all behind us. func (s *Service) highestFinalizedEpoch() primitives.Epoch { highest := primitives.Epoch(0) for _, pid := range s.cfg.P2P.Peers().Connected() { peerChainState, err := s.cfg.P2P.Peers().ChainState(pid) - if err == nil && peerChainState != nil && peerChainState.FinalizedEpoch > highest { + + if err != nil || peerChainState == nil { + continue + } + + if peerChainState.FinalizedEpoch > highest { highest = peerChainState.FinalizedEpoch } } diff --git a/beacon-chain/sync/initial-sync/service.go b/beacon-chain/sync/initial-sync/service.go index 343215032292..37a2caccca70 100644 --- a/beacon-chain/sync/initial-sync/service.go +++ b/beacon-chain/sync/initial-sync/service.go @@ -8,6 +8,7 @@ import ( "fmt" "time" + "github.com/ethereum/go-ethereum/p2p/enode" "github.com/libp2p/go-libp2p/core/peer" "github.com/paulbellamy/ratecounter" "github.com/pkg/errors" @@ -408,7 +409,10 @@ func (s *Service) fetchOriginBlobs(pids []peer.ID) error { if err := avs.Persist(current, sidecars...); err != nil { return err } - if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil { + + // node ID is not used for checking blobs data availability. + emptyNodeID := enode.ID{} + if err := avs.IsDataAvailable(s.ctx, emptyNodeID, current, rob); err != nil { log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Blobs from peer for origin block were unusable") continue } @@ -462,7 +466,9 @@ func (s *Service) fetchOriginColumns(pids []peer.ID) error { if err := avs.PersistColumns(current, sidecars...); err != nil { return err } - if err := avs.IsDataAvailable(s.ctx, current, rob); err != nil { + + nodeID := s.cfg.P2P.NodeID() + if err := avs.IsDataAvailable(s.ctx, nodeID, current, rob); err != nil { log.WithField("root", fmt.Sprintf("%#x", r)).WithField("peerID", pids[i]).Warn("Columns from peer for origin block were unusable") continue } diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go index 8cf28bcf2eb9..d677b6dba6c1 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_range.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_range.go @@ -6,7 +6,7 @@ import ( libp2pcore "github.com/libp2p/go-libp2p/core" "github.com/pkg/errors" - "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p" + "github.com/prysmaticlabs/prysm/v5/beacon-chain/core/peerdas" p2ptypes "github.com/prysmaticlabs/prysm/v5/beacon-chain/p2p/types" "github.com/prysmaticlabs/prysm/v5/cmd/beacon-chain/flags" fieldparams "github.com/prysmaticlabs/prysm/v5/config/fieldparams" @@ -16,6 +16,7 @@ import ( "github.com/prysmaticlabs/prysm/v5/monitoring/tracing/trace" pb "github.com/prysmaticlabs/prysm/v5/proto/prysm/v1alpha1" "github.com/prysmaticlabs/prysm/v5/time/slots" + "github.com/sirupsen/logrus" ) func (s *Service) streamDataColumnBatch(ctx context.Context, batch blockBatch, wQuota uint64, wantedIndexes map[uint64]bool, stream libp2pcore.Stream) (uint64, error) { @@ -69,12 +70,54 @@ func (s *Service) dataColumnSidecarsByRangeRPCHandler(ctx context.Context, msg i ctx, cancel := context.WithTimeout(ctx, respTimeout) defer cancel() SetRPCStreamDeadlines(stream) - log := log.WithField("handler", p2p.DataColumnSidecarsByRangeName[1:]) // slice the leading slash off the name var r, ok := msg.(*pb.DataColumnSidecarsByRangeRequest) if !ok { return errors.New("message is not type *pb.DataColumnSidecarsByRangeRequest") } + + // Compute custody columns. + nodeID := s.cfg.p2p.NodeID() + numberOfColumns := params.BeaconConfig().NumberOfColumns + custodySubnetCount := peerdas.CustodySubnetCount() + custodyColumns, err := peerdas.CustodyColumns(nodeID, custodySubnetCount) + if err != nil { + s.writeErrorResponseToStream(responseCodeServerError, err.Error(), stream) + return err + } + + custodyColumnsCount := uint64(len(custodyColumns)) + + // Compute requested columns. + requestedColumns := r.Columns + requestedColumnsCount := uint64(len(requestedColumns)) + + // Format log fields. + + var ( + custodyColumnsLog interface{} = "all" + requestedColumnsLog interface{} = "all" + ) + + if custodyColumnsCount != numberOfColumns { + custodyColumnsLog = uint64MapToSortedSlice(custodyColumns) + } + + if requestedColumnsCount != numberOfColumns { + requestedColumnsLog = requestedColumns + } + + // Get the remote peer. + remotePeer := stream.Conn().RemotePeer() + + log.WithFields(logrus.Fields{ + "remotePeer": remotePeer, + "custodyColumns": custodyColumnsLog, + "requestedColumns": requestedColumnsLog, + "startSlot": r.StartSlot, + "count": r.Count, + }).Debug("Serving data columns by range request") + if err := s.rateLimiter.validateRequest(stream, 1); err != nil { return err }