Skip to content

Commit

Permalink
Fix data columns sampling (#14263)
Browse files Browse the repository at this point in the history
* Fix the obvious...

* Data columns sampling: Modify logging.

* `waitForChainStart`: Set it threadsafe - Do only wait once.

* Sampling: Wait for chain start before running the sampling.

Reason: `newDataColumnSampler1D` needs `s.ctxMap`.
`s.ctxMap` is only set when chain is started.

Previously `waitForChainStart` was only called in `s.registerHandlers`, it self called in a go-routine.

==> We had a race condition here: Sometimes `newDataColumnSampler1D` were called once `s.ctxMap` were set, sometimes not.

* Adresse Nishant's comments.

* Sampling: Improve logging.

* `waitForChainStart`: Remove `chainIsStarted` check.
  • Loading branch information
nalepae committed Jul 29, 2024
1 parent 940782f commit c9fa938
Show file tree
Hide file tree
Showing 5 changed files with 46 additions and 39 deletions.
40 changes: 20 additions & 20 deletions beacon-chain/sync/data_columns_sampling.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,8 +170,6 @@ func (d *dataColumnSampler1D) refreshPeerInfo() {
}
}

log.WithField("columnFromPeer", d.columnFromPeer).Debug("Peer info refreshed")

columnWithNoPeers := make([]uint64, 0)
for column, peers := range d.peerFromColumn {
if len(peers) == 0 {
Expand Down Expand Up @@ -228,7 +226,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
return
}

if coreTime.PeerDASIsActive(data.Slot) {
if !coreTime.PeerDASIsActive(data.Slot) {
// We do not trigger sampling if peerDAS is not active yet.
return
}
Expand All @@ -249,22 +247,12 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event
// Randomize columns for sample selection.
randomizedColumns := randomizeColumns(d.nonCustodyColumns)
samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2)
ok, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)

// TODO: Use the first output of `incrementalDAS` as input of the fork choice rule.
_, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount)
if err != nil {
log.WithError(err).Error("Failed to run incremental DAS")
}

if ok {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
}).Debug("Data column sampling successful")
} else {
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", data.BlockRoot),
"columns": randomizedColumns,
}).Warning("Data column sampling failed")
}
}

// incrementalDAS samples data columns from active peers using incremental DAS.
Expand All @@ -280,17 +268,28 @@ func (d *dataColumnSampler1D) incrementalDAS(
firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures)
roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary.

start := time.Now()

for round := 1; ; /*No exit condition */ round++ {
if extendedSampleCount > uint64(len(columns)) {
// We already tried to sample all possible columns, this is the unhappy path.
log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns")
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"round": round - 1,
}).Warning("Some columns are still missing after trying to sample all possible columns")
return false, roundSummaries, nil
}

// Get the columns to sample for this round.
columnsToSample := columns[firstColumnToSample:extendedSampleCount]
columnsToSampleCount := extendedSampleCount - firstColumnToSample

log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"columns": columnsToSample,
"round": round,
}).Debug("Start data columns sampling")

// Sample data columns from peers in parallel.
retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample)

Expand All @@ -311,7 +310,8 @@ func (d *dataColumnSampler1D) incrementalDAS(
// All columns were correctly sampled, this is the happy path.
log.WithFields(logrus.Fields{
"root": fmt.Sprintf("%#x", root),
"roundsNeeded": round,
"neededRounds": round,
"duration": time.Since(start),
}).Debug("All columns were successfully sampled")
return true, roundSummaries, nil
}
Expand Down Expand Up @@ -429,14 +429,14 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer(
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"requestedColumns": sortedSliceFromMap(requestedColumns),
}).Debug("All requested columns were successfully sampled from peer")
}).Debug("Sampled columns from peer successfully")
} else {
log.WithFields(logrus.Fields{
"peerID": pid,
"root": fmt.Sprintf("%#x", root),
"requestedColumns": sortedSliceFromMap(requestedColumns),
"retrievedColumns": sortedSliceFromMap(retrievedColumns),
}).Debug("Some requested columns were not sampled from peer")
}).Debug("Sampled columns from peer with some errors")
}

return retrievedColumns
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/rpc_data_column_sidecars_by_root.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int
"requested": requestedColumnsList,
"custodiedCount": len(custodiedColumnsList),
"requestedCount": len(requestedColumnsList),
}).Debug("Received data column sidecar by root request")
}).Debug("Data column sidecar by root request received")

// Subscribe to the data column feed.
rootIndexChan := make(chan filesystem.RootIndexPair)
Expand Down
1 change: 0 additions & 1 deletion beacon-chain/sync/rpc_send_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,6 @@ func SendDataColumnSidecarByRoot(
}

// Send the request to the peer.
log.WithField("topic", topic).Debug("Sending data column sidecar request")
stream, err := p2pApi.Send(ctx, req, topic, pid)
if err != nil {
return nil, errors.Wrap(err, "send")
Expand Down
36 changes: 22 additions & 14 deletions beacon-chain/sync/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ func (s *Service) Start() {
s.newColumnProposerVerifier = v.VerifyProposer

go s.verifierRoutine()
go s.registerHandlers()
go s.startTasksPostInitialSync()

s.cfg.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGoodbye)
s.cfg.p2p.AddDisconnectionHandler(func(_ context.Context, _ peer.ID) error {
Expand All @@ -254,12 +254,6 @@ func (s *Service) Start() {

// Update sync metrics.
async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics)

// Run data column sampling
if params.PeerDASEnabled() {
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
go s.sampler.Run(s.ctx)
}
}

// Stop the regular sync service.
Expand Down Expand Up @@ -337,23 +331,37 @@ func (s *Service) waitForChainStart() {
s.markForChainStart()
}

func (s *Service) registerHandlers() {
func (s *Service) startTasksPostInitialSync() {
// Wait for the chain to start.
s.waitForChainStart()

select {
case <-s.initialSyncComplete:
// Register respective pubsub handlers at state synced event.
digest, err := s.currentForkDigest()
// Compute the current epoch.
currentSlot := slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix()))
currentEpoch := slots.ToEpoch(currentSlot)

// Compute the current fork forkDigest.
forkDigest, err := s.currentForkDigest()
if err != nil {
log.WithError(err).Error("Could not retrieve current fork digest")
return
}
currentEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix())))
s.registerSubscribers(currentEpoch, digest)

// Register respective pubsub handlers at state synced event.
s.registerSubscribers(currentEpoch, forkDigest)

// Start the fork watcher.
go s.forkWatcher()
return

// Start data columns sampling if peerDAS is enabled.
if params.PeerDASEnabled() {
s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier)
go s.sampler.Run(s.ctx)
}

case <-s.ctx.Done():
log.Debug("Context closed, exiting goroutine")
return
}
}

Expand Down
6 changes: 3 additions & 3 deletions beacon-chain/sync/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func TestSyncHandlers_WaitToSync(t *testing.T) {
}

topic := "/eth2/%x/beacon_block"
go r.registerHandlers()
go r.startTasksPostInitialSync()
time.Sleep(100 * time.Millisecond)

var vr [32]byte
Expand Down Expand Up @@ -143,7 +143,7 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) {

syncCompleteCh := make(chan bool)
go func() {
r.registerHandlers()
r.startTasksPostInitialSync()
syncCompleteCh <- true
}()

Expand Down Expand Up @@ -200,7 +200,7 @@ func TestSyncService_StopCleanly(t *testing.T) {
initialSyncComplete: make(chan struct{}),
}

go r.registerHandlers()
go r.startTasksPostInitialSync()
var vr [32]byte
require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr)))
r.waitForChainStart()
Expand Down

0 comments on commit c9fa938

Please sign in to comment.