Skip to content

Commit

Permalink
Fix sync committee when syncing (Consensys#8192)
Browse files Browse the repository at this point in the history
  • Loading branch information
tbenr authored Apr 15, 2024
1 parent 84b6ac6 commit e9ef718
Show file tree
Hide file tree
Showing 7 changed files with 88 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -490,6 +490,9 @@ public SafeFuture<Optional<Attestation>> createAggregate(
@Override
public SafeFuture<Optional<SyncCommitteeContribution>> createSyncCommitteeContribution(
final UInt64 slot, final int subcommitteeIndex, final Bytes32 beaconBlockRoot) {
if (isSyncActive()) {
return NodeSyncingException.failedFuture();
}
return SafeFuture.completedFuture(
syncCommitteeMessagePool.createContribution(slot, beaconBlockRoot, subcommitteeIndex));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@
import tech.pegasys.teku.spec.datastructures.operations.AttestationData;
import tech.pegasys.teku.spec.datastructures.operations.SignedAggregateAndProof;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SignedContributionAndProof;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeContribution;
import tech.pegasys.teku.spec.datastructures.operations.versions.altair.SyncCommitteeMessage;
import tech.pegasys.teku.spec.datastructures.state.Checkpoint;
import tech.pegasys.teku.spec.datastructures.state.CheckpointState;
Expand Down Expand Up @@ -696,6 +697,17 @@ public void createAggregate_shouldFailWhenNodeIsSyncing() {
assertThatThrownBy(result::get).hasRootCauseInstanceOf(NodeSyncingException.class);
}

@Test
public void createSyncCommitteeContribution() {
nodeIsSyncing();
final SafeFuture<Optional<SyncCommitteeContribution>> result =
validatorApiHandler.createSyncCommitteeContribution(
ONE, 0, dataStructureUtil.randomBytes32());

assertThat(result).isCompletedExceptionally();
assertThatThrownBy(result::get).hasRootCauseInstanceOf(NodeSyncingException.class);
}

@Test
public void createAggregate_shouldReturnAggregateFromAttestationPool() {
final AttestationData attestationData = dataStructureUtil.randomAttestationData();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,8 @@ public class ExecutionLayerChannelStub implements ExecutionLayerChannel {
lastBuilderBlobsBundle = Optional.empty();
private Optional<PowBlock> lastValidBlock = Optional.empty();

private boolean online = true;

public ExecutionLayerChannelStub(
final Spec spec,
final TimeProvider timeProvider,
Expand Down Expand Up @@ -147,6 +149,8 @@ public void addPosBlock(final Bytes32 blockHash, final PayloadStatus payloadStat

@Override
public SafeFuture<Optional<PowBlock>> eth1GetPowBlock(final Bytes32 blockHash) {
offlineCheck();

if (!transitionEmulationEnabled) {
requestedPowBlocks.add(blockHash);
return SafeFuture.completedFuture(Optional.ofNullable(knownBlocks.get(blockHash)));
Expand All @@ -172,6 +176,8 @@ public SafeFuture<Optional<PowBlock>> eth1GetPowBlock(final Bytes32 blockHash) {

@Override
public SafeFuture<PowBlock> eth1GetPowChainHead() {
offlineCheck();

if (!transitionEmulationEnabled) {
return SafeFuture.failedFuture(
new UnsupportedOperationException("getPowChainHead not supported"));
Expand All @@ -195,6 +201,8 @@ public SafeFuture<PowBlock> eth1GetPowChainHead() {
public SafeFuture<ForkChoiceUpdatedResult> engineForkChoiceUpdated(
final ForkChoiceState forkChoiceState,
final Optional<PayloadBuildingAttributes> payloadBuildingAttributes) {
offlineCheck();

if (!bellatrixActivationDetected) {
LOG.info(
"forkChoiceUpdated received before terminalBlock has been sent. Assuming transition already happened");
Expand Down Expand Up @@ -229,6 +237,8 @@ public SafeFuture<ForkChoiceUpdatedResult> engineForkChoiceUpdated(
@Override
public SafeFuture<GetPayloadResponse> engineGetPayload(
final ExecutionPayloadContext executionPayloadContext, final BeaconState state) {
offlineCheck();

if (!bellatrixActivationDetected) {
LOG.info(
"getPayload received before terminalBlock has been sent. Assuming transition already happened");
Expand Down Expand Up @@ -312,6 +322,8 @@ public SafeFuture<GetPayloadResponse> engineGetPayload(

@Override
public SafeFuture<PayloadStatus> engineNewPayload(final NewPayloadRequest newPayloadRequest) {
offlineCheck();

final ExecutionPayload executionPayload = newPayloadRequest.getExecutionPayload();
final PayloadStatus returnedStatus =
Optional.ofNullable(knownPosBlocks.get(executionPayload.getBlockHash()))
Expand All @@ -327,12 +339,15 @@ public SafeFuture<PayloadStatus> engineNewPayload(final NewPayloadRequest newPay

@Override
public SafeFuture<List<ClientVersion>> engineGetClientVersion(final ClientVersion clientVersion) {
offlineCheck();

return SafeFuture.completedFuture(List.of(STUB_CLIENT_VERSION));
}

@Override
public SafeFuture<Void> builderRegisterValidators(
final SszList<SignedValidatorRegistration> signedValidatorRegistrations, final UInt64 slot) {
offlineCheck();
return SafeFuture.COMPLETE;
}

Expand All @@ -343,6 +358,8 @@ public SafeFuture<HeaderWithFallbackData> builderGetHeader(
final SafeFuture<UInt256> payloadValueResult,
final Optional<UInt64> requestedBuilderBoostFactor,
final BlockProductionPerformance blockProductionPerformance) {
offlineCheck();

final UInt64 slot = state.getSlot();
LOG.info(
"getPayloadHeader: payloadId: {} slot: {} ... delegating to getPayload ...",
Expand Down Expand Up @@ -392,6 +409,8 @@ public SafeFuture<HeaderWithFallbackData> builderGetHeader(
public SafeFuture<BuilderPayload> builderGetPayload(
final SignedBeaconBlock signedBeaconBlock,
final Function<UInt64, Optional<ExecutionPayloadResult>> getCachedPayloadResultFunction) {
offlineCheck();

final UInt64 slot = signedBeaconBlock.getSlot();
final SchemaDefinitions schemaDefinitions = spec.atSlot(slot).getSchemaDefinitions();
final Optional<SchemaDefinitionsBellatrix> schemaDefinitionsBellatrix =
Expand Down Expand Up @@ -463,6 +482,10 @@ public Set<Bytes32> getRequestedPowBlocks() {
return requestedPowBlocks;
}

public void setOnline(final boolean online) {
this.online = online;
}

@SuppressWarnings("unused")
private static class HeadAndAttributes {
private final Bytes32 head;
Expand All @@ -476,6 +499,12 @@ private HeadAndAttributes(final Bytes32 head, final PayloadBuildingAttributes at
}
}

private void offlineCheck() {
if (!online) {
throw new RuntimeException("stub is offline");
}
}

private void checkBellatrixActivation() {
if (!bellatrixActivationDetected) {
LOG.info("Bellatrix activation detected");
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
/*
* Copyright Consensys Software Inc., 2022
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/

package tech.pegasys.teku.validator.client.duties.synccommittee;

import tech.pegasys.teku.infrastructure.unsigned.UInt64;

public class ChainHeadTooOldException extends RuntimeException {

public ChainHeadTooOldException(final UInt64 headSlot, final UInt64 slot) {
super("Chain head too old. Head slot: " + headSlot + ", requested slot: " + slot);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import tech.pegasys.teku.validator.api.ValidatorTimingChannel;

public class ChainHeadTracker implements ValidatorTimingChannel {
public static final int HEAD_TOO_OLD_THRESHOLD = 32;

private UInt64 headBlockSlot = UInt64.ZERO;
private Optional<Bytes32> headBlockRoot = Optional.empty();
Expand All @@ -33,6 +34,10 @@ public synchronized Optional<Bytes32> getCurrentChainHead(final UInt64 atSlot) {
// We've moved on and no longer have a reference to what the head block was at that slot
throw new ChainHeadBeyondSlotException(atSlot);
}
if (headBlockRoot.isPresent()
&& atSlot.minusMinZero(headBlockSlot).isGreaterThan(HEAD_TOO_OLD_THRESHOLD)) {
throw new ChainHeadTooOldException(headBlockSlot, atSlot);
}
return headBlockRoot;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,16 +88,16 @@ public SafeFuture<DutyResult> performProductionDuty(final UInt64 slot) {
}
try {
lastSignatureBlockRoot = chainHeadTracker.getCurrentChainHead(slot);
} catch (ChainHeadBeyondSlotException ex) {
return chainHeadBeyondSlotFailure(ex);
} catch (final ChainHeadBeyondSlotException | ChainHeadTooOldException ex) {
return chainHeadSlotCheckFailure(ex);
}
lastSignatureSlot = Optional.of(slot);
return lastSignatureBlockRoot
.map(blockRoot -> productionDuty.produceMessages(slot, blockRoot))
.orElse(SafeFuture.completedFuture(DutyResult.NODE_SYNCING));
}

private SafeFuture<DutyResult> chainHeadBeyondSlotFailure(final ChainHeadBeyondSlotException ex) {
private SafeFuture<DutyResult> chainHeadSlotCheckFailure(final RuntimeException ex) {
return SafeFuture.completedFuture(DutyResult.forError(getAllValidatorKeys(), ex));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static tech.pegasys.teku.validator.client.duties.synccommittee.ChainHeadTracker.HEAD_TOO_OLD_THRESHOLD;

import org.apache.tuweni.bytes.Bytes32;
import org.junit.jupiter.api.Test;
import tech.pegasys.teku.infrastructure.unsigned.UInt64;
import tech.pegasys.teku.spec.TestSpecFactory;
import tech.pegasys.teku.spec.util.DataStructureUtil;
import tech.pegasys.teku.validator.client.duties.synccommittee.ChainHeadBeyondSlotException;
import tech.pegasys.teku.validator.client.duties.synccommittee.ChainHeadTooOldException;
import tech.pegasys.teku.validator.client.duties.synccommittee.ChainHeadTracker;

class ChainHeadTrackerTest {
Expand Down Expand Up @@ -60,6 +62,17 @@ void shouldThrowCustomExceptionWhenHeadIsAfterRequestedSlot() {
ChainHeadBeyondSlotException.class, () -> tracker.getCurrentChainHead(slot.minus(1)));
}

@Test
void shouldThrowCustomExceptionWhenHeadSlotIsTooOld() {
final UInt64 slot = dataStructureUtil.randomUInt64();
final Bytes32 headBlockRoot = dataStructureUtil.randomBytes32();
updateHead(slot, headBlockRoot);

assertThrows(
ChainHeadTooOldException.class,
() -> tracker.getCurrentChainHead(slot.plus(HEAD_TOO_OLD_THRESHOLD + 1)));
}

private void updateHead(final UInt64 slot, final Bytes32 headBlockRoot) {
tracker.onHeadUpdate(
slot, dataStructureUtil.randomBytes32(), dataStructureUtil.randomBytes32(), headBlockRoot);
Expand Down

0 comments on commit e9ef718

Please sign in to comment.