diff --git a/beacon-chain/sync/data_columns_sampling.go b/beacon-chain/sync/data_columns_sampling.go index cded76c5c512..1bb0aaf58e6e 100644 --- a/beacon-chain/sync/data_columns_sampling.go +++ b/beacon-chain/sync/data_columns_sampling.go @@ -170,8 +170,6 @@ func (d *dataColumnSampler1D) refreshPeerInfo() { } } - log.WithField("columnFromPeer", d.columnFromPeer).Debug("Peer info refreshed") - columnWithNoPeers := make([]uint64, 0) for column, peers := range d.peerFromColumn { if len(peers) == 0 { @@ -228,7 +226,7 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event return } - if coreTime.PeerDASIsActive(data.Slot) { + if !coreTime.PeerDASIsActive(data.Slot) { // We do not trigger sampling if peerDAS is not active yet. return } @@ -249,22 +247,12 @@ func (d *dataColumnSampler1D) handleStateNotification(ctx context.Context, event // Randomize columns for sample selection. randomizedColumns := randomizeColumns(d.nonCustodyColumns) samplesCount := min(params.BeaconConfig().SamplesPerSlot, uint64(len(d.nonCustodyColumns))-params.BeaconConfig().NumberOfColumns/2) - ok, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount) + + // TODO: Use the first output of `incrementalDAS` as input of the fork choice rule. + _, _, err = d.incrementalDAS(ctx, data.BlockRoot, randomizedColumns, samplesCount) if err != nil { log.WithError(err).Error("Failed to run incremental DAS") } - - if ok { - log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", data.BlockRoot), - "columns": randomizedColumns, - }).Debug("Data column sampling successful") - } else { - log.WithFields(logrus.Fields{ - "root": fmt.Sprintf("%#x", data.BlockRoot), - "columns": randomizedColumns, - }).Warning("Data column sampling failed") - } } // incrementalDAS samples data columns from active peers using incremental DAS. @@ -280,10 +268,15 @@ func (d *dataColumnSampler1D) incrementalDAS( firstColumnToSample, extendedSampleCount := uint64(0), peerdas.ExtendedSampleCount(sampleCount, allowedFailures) roundSummaries := make([]roundSummary, 0, 1) // We optimistically allocate only one round summary. + start := time.Now() + for round := 1; ; /*No exit condition */ round++ { if extendedSampleCount > uint64(len(columns)) { // We already tried to sample all possible columns, this is the unhappy path. - log.WithField("root", fmt.Sprintf("%#x", root)).Warning("Some columns are still missing after sampling all possible columns") + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "round": round - 1, + }).Warning("Some columns are still missing after trying to sample all possible columns") return false, roundSummaries, nil } @@ -291,6 +284,12 @@ func (d *dataColumnSampler1D) incrementalDAS( columnsToSample := columns[firstColumnToSample:extendedSampleCount] columnsToSampleCount := extendedSampleCount - firstColumnToSample + log.WithFields(logrus.Fields{ + "root": fmt.Sprintf("%#x", root), + "columns": columnsToSample, + "round": round, + }).Debug("Start data columns sampling") + // Sample data columns from peers in parallel. retrievedSamples := d.sampleDataColumns(ctx, root, columnsToSample) @@ -311,7 +310,8 @@ func (d *dataColumnSampler1D) incrementalDAS( // All columns were correctly sampled, this is the happy path. log.WithFields(logrus.Fields{ "root": fmt.Sprintf("%#x", root), - "roundsNeeded": round, + "neededRounds": round, + "duration": time.Since(start), }).Debug("All columns were successfully sampled") return true, roundSummaries, nil } @@ -429,14 +429,14 @@ func (d *dataColumnSampler1D) sampleDataColumnsFromPeer( "peerID": pid, "root": fmt.Sprintf("%#x", root), "requestedColumns": sortedSliceFromMap(requestedColumns), - }).Debug("All requested columns were successfully sampled from peer") + }).Debug("Sampled columns from peer successfully") } else { log.WithFields(logrus.Fields{ "peerID": pid, "root": fmt.Sprintf("%#x", root), "requestedColumns": sortedSliceFromMap(requestedColumns), "retrievedColumns": sortedSliceFromMap(retrievedColumns), - }).Debug("Some requested columns were not sampled from peer") + }).Debug("Sampled columns from peer with some errors") } return retrievedColumns diff --git a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go index af5f934ff8f8..fb1f8db054e5 100644 --- a/beacon-chain/sync/rpc_data_column_sidecars_by_root.go +++ b/beacon-chain/sync/rpc_data_column_sidecars_by_root.go @@ -94,7 +94,7 @@ func (s *Service) dataColumnSidecarByRootRPCHandler(ctx context.Context, msg int "requested": requestedColumnsList, "custodiedCount": len(custodiedColumnsList), "requestedCount": len(requestedColumnsList), - }).Debug("Received data column sidecar by root request") + }).Debug("Data column sidecar by root request received") // Subscribe to the data column feed. rootIndexChan := make(chan filesystem.RootIndexPair) diff --git a/beacon-chain/sync/rpc_send_request.go b/beacon-chain/sync/rpc_send_request.go index 5ebfc5bf695e..e50db250cb62 100644 --- a/beacon-chain/sync/rpc_send_request.go +++ b/beacon-chain/sync/rpc_send_request.go @@ -232,7 +232,6 @@ func SendDataColumnSidecarByRoot( } // Send the request to the peer. - log.WithField("topic", topic).Debug("Sending data column sidecar request") stream, err := p2pApi.Send(ctx, req, topic, pid) if err != nil { return nil, errors.Wrap(err, "send") diff --git a/beacon-chain/sync/service.go b/beacon-chain/sync/service.go index 52ec56a830dd..38a608621808 100644 --- a/beacon-chain/sync/service.go +++ b/beacon-chain/sync/service.go @@ -239,7 +239,7 @@ func (s *Service) Start() { s.newColumnProposerVerifier = v.VerifyProposer go s.verifierRoutine() - go s.registerHandlers() + go s.startTasksPostInitialSync() s.cfg.p2p.AddConnectionHandler(s.reValidatePeer, s.sendGoodbye) s.cfg.p2p.AddDisconnectionHandler(func(_ context.Context, _ peer.ID) error { @@ -254,12 +254,6 @@ func (s *Service) Start() { // Update sync metrics. async.RunEvery(s.ctx, syncMetricsInterval, s.updateMetrics) - - // Run data column sampling - if params.PeerDASEnabled() { - s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier) - go s.sampler.Run(s.ctx) - } } // Stop the regular sync service. @@ -337,23 +331,37 @@ func (s *Service) waitForChainStart() { s.markForChainStart() } -func (s *Service) registerHandlers() { +func (s *Service) startTasksPostInitialSync() { + // Wait for the chain to start. s.waitForChainStart() + select { case <-s.initialSyncComplete: - // Register respective pubsub handlers at state synced event. - digest, err := s.currentForkDigest() + // Compute the current epoch. + currentSlot := slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix())) + currentEpoch := slots.ToEpoch(currentSlot) + + // Compute the current fork forkDigest. + forkDigest, err := s.currentForkDigest() if err != nil { log.WithError(err).Error("Could not retrieve current fork digest") return } - currentEpoch := slots.ToEpoch(slots.CurrentSlot(uint64(s.cfg.clock.GenesisTime().Unix()))) - s.registerSubscribers(currentEpoch, digest) + + // Register respective pubsub handlers at state synced event. + s.registerSubscribers(currentEpoch, forkDigest) + + // Start the fork watcher. go s.forkWatcher() - return + + // Start data columns sampling if peerDAS is enabled. + if params.PeerDASEnabled() { + s.sampler = newDataColumnSampler1D(s.cfg.p2p, s.cfg.clock, s.ctxMap, s.cfg.stateNotifier) + go s.sampler.Run(s.ctx) + } + case <-s.ctx.Done(): log.Debug("Context closed, exiting goroutine") - return } } diff --git a/beacon-chain/sync/service_test.go b/beacon-chain/sync/service_test.go index 637e3cea0f46..78f42589ae6e 100644 --- a/beacon-chain/sync/service_test.go +++ b/beacon-chain/sync/service_test.go @@ -62,7 +62,7 @@ func TestSyncHandlers_WaitToSync(t *testing.T) { } topic := "/eth2/%x/beacon_block" - go r.registerHandlers() + go r.startTasksPostInitialSync() time.Sleep(100 * time.Millisecond) var vr [32]byte @@ -143,7 +143,7 @@ func TestSyncHandlers_WaitTillSynced(t *testing.T) { syncCompleteCh := make(chan bool) go func() { - r.registerHandlers() + r.startTasksPostInitialSync() syncCompleteCh <- true }() @@ -200,7 +200,7 @@ func TestSyncService_StopCleanly(t *testing.T) { initialSyncComplete: make(chan struct{}), } - go r.registerHandlers() + go r.startTasksPostInitialSync() var vr [32]byte require.NoError(t, gs.SetClock(startup.NewClock(time.Now(), vr))) r.waitForChainStart()