Skip to content

Commit

Permalink
fix e2e errors
Browse files Browse the repository at this point in the history
  • Loading branch information
g11tech committed Jul 23, 2023
1 parent 880522a commit 89ff53b
Showing 1 changed file with 117 additions and 108 deletions.
225 changes: 117 additions & 108 deletions packages/beacon-node/src/api/impl/beacon/blocks/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,118 @@ export function getBeaconBlockApi({
network,
db,
}: Pick<ApiModules, "chain" | "config" | "metrics" | "network" | "db">): ServerApi<routes.beacon.block.Api> {
const publishBlock: ServerApi<routes.beacon.block.Api>["publishBlock"] = async (
signedBlockOrContents,
opts: PublishBlockOpts = defaultPublishOpts
) => {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents);
blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map((sblob) => sblob.message)
),
null
);
} else {
signedBlock = signedBlockOrContents;
signedBlobs = [];
// TODO: Once API supports submitting data as SSZ, replace null with blockBytes
blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null);
}

// check what validations have been requested before broadcasting and publishing the block
// TODO: add validation time to metrics
const broadcastValidation = opts.broadcastValidation ?? defaultPublishOpts.broadcastValidation;
switch (broadcastValidation) {
case routes.beacon.BroadcastValidation.none:
break;

case routes.beacon.BroadcastValidation.consensus: {
// check if this beacon node produced the block else run validations
const blockRoot = toHex(
chain.config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);

if (!chain.producedBlockRoot.has(blockRoot) && !chain.producedBlindedBlockRoot.has(blockRoot)) {
// error or log warning that we support consensus val on blocks produced via this beacon node
const message = "Consensus validation not implemented yet for block not produced by this beacon node";
if (chain.opts.broadcastValidationStrictness === "error") {
throw Error(message);
} else {
chain.logger.warn(message);
}
}
break;
}

default: {
// error or log warning we do not support this validation
const message = `Broadcast validation of ${broadcastValidation} type not implemented yet`;
if (chain.opts.broadcastValidationStrictness === "error") {
throw Error(message);
} else {
chain.logger.warn(message);
}
}
}

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
}

// TODO: Validate block
metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message);
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
peer: IDENTITY_PEER_ID,
});
}
throw e;
}),
...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)),
];
await promiseAllMaybeAsync(publishPromises);
};

const publishBlindedBlock: ServerApi<routes.beacon.block.Api>["publishBlindedBlock"] = async (
signedBlindedBlockOrContents,
opts: PublishBlockOpts = defaultPublishOpts
) => {
const executionBuilder = chain.executionBuilder;
if (!executionBuilder) throw Error("exeutionBuilder required to publish SignedBlindedBeaconBlock");
// Mechanism for blobs & blocks on builder is not yet finalized
if (isSignedBlindedBlockContents(signedBlindedBlockOrContents)) {
throw Error("exeutionBuilder not yet implemented for deneb+ forks");
} else {
const signedBlockOrContents = await executionBuilder.submitBlindedBlock(signedBlindedBlockOrContents);
// the full block is published by relay and it's possible that the block is already known to us by gossip
// see https://github.com/ChainSafe/lodestar/issues/5404
return publishBlock(signedBlockOrContents, {...opts, ignoreIfKnown: true});
}
};

return {
async getBlockHeaders(filters) {
// TODO - SLOW CODE: This code seems like it could be improved
Expand Down Expand Up @@ -192,118 +304,15 @@ export function getBeaconBlockApi({
};
},

async publishBlindedBlockV2(signedBlindedBlockOrContents, opts) {
await this.publishBlindedBlock(signedBlindedBlockOrContents, opts);
},
publishBlock,
publishBlindedBlock,

async publishBlindedBlock(signedBlindedBlockOrContents, opts: PublishBlockOpts = defaultPublishOpts) {
const executionBuilder = chain.executionBuilder;
if (!executionBuilder) throw Error("exeutionBuilder required to publish SignedBlindedBeaconBlock");
// Mechanism for blobs & blocks on builder is not yet finalized
if (isSignedBlindedBlockContents(signedBlindedBlockOrContents)) {
throw Error("exeutionBuilder not yet implemented for deneb+ forks");
} else {
const signedBlockOrContents = await executionBuilder.submitBlindedBlock(signedBlindedBlockOrContents);
// the full block is published by relay and it's possible that the block is already known to us by gossip
// see https://github.com/ChainSafe/lodestar/issues/5404
return this.publishBlock(signedBlockOrContents, {...opts, ignoreIfKnown: true});
}
async publishBlindedBlockV2(signedBlindedBlockOrContents, opts) {
await publishBlindedBlock(signedBlindedBlockOrContents, opts);
},

async publishBlockV2(signedBlockOrContents, opts: PublishBlockOpts = defaultPublishOpts) {
await this.publishBlock(signedBlockOrContents, opts);
},

async publishBlock(signedBlockOrContents, opts: PublishBlockOpts = defaultPublishOpts) {
const seenTimestampSec = Date.now() / 1000;
let blockForImport: BlockInput, signedBlock: allForks.SignedBeaconBlock, signedBlobs: deneb.SignedBlobSidecars;

if (isSignedBlockContents(signedBlockOrContents)) {
({signedBlock, signedBlobSidecars: signedBlobs} = signedBlockOrContents);
blockForImport = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.api,
// The blobsSidecar will be replaced in the followup PRs with just blobs
blobSidecarsToBlobsSidecar(
config,
signedBlock,
signedBlobs.map((sblob) => sblob.message)
),
null
);
} else {
signedBlock = signedBlockOrContents;
signedBlobs = [];
// TODO: Once API supports submitting data as SSZ, replace null with blockBytes
blockForImport = getBlockInput.preDeneb(config, signedBlock, BlockSource.api, null);
}

// check what validations have been requested before broadcasting and publishing the block
// TODO: add validation time to metrics
const broadcastValidation = opts.broadcastValidation ?? defaultPublishOpts.broadcastValidation;
switch (broadcastValidation) {
case routes.beacon.BroadcastValidation.none:
break;

case routes.beacon.BroadcastValidation.consensus: {
// check if this beacon node produced the block else run validations
const blockRoot = toHex(
chain.config.getForkTypes(signedBlock.message.slot).BeaconBlock.hashTreeRoot(signedBlock.message)
);

if (!chain.producedBlockRoot.has(blockRoot) && !chain.producedBlindedBlockRoot.has(blockRoot)) {
// error or log warning that we support consensus val on blocks produced via this beacon node
const message = "Consensus validation not implemented yet for block not produced by this beacon node";
if (chain.opts.broadcastValidationStrictness === "error") {
throw Error(message);
} else {
chain.logger.warn(message);
}
}
break;
}

default: {
// error or log warning we do not support this validation
const message = `Broadcast validation of ${broadcastValidation} type not implemented yet`;
if (chain.opts.broadcastValidationStrictness === "error") {
throw Error(message);
} else {
chain.logger.warn(message);
}
}
}

// Simple implementation of a pending block queue. Keeping the block here recycles the API logic, and keeps the
// REST request promise without any extra infrastructure.
const msToBlockSlot =
computeTimeAtSlot(config, blockForImport.block.message.slot, chain.genesisTime) * 1000 - Date.now();
if (msToBlockSlot <= MAX_API_CLOCK_DISPARITY_MS && msToBlockSlot > 0) {
// If block is a bit early, hold it in a promise. Equivalent to a pending queue.
await sleep(msToBlockSlot);
}

// TODO: Validate block
metrics?.registerBeaconBlock(OpSource.api, seenTimestampSec, blockForImport.block.message);
const publishPromises = [
// Send the block, regardless of whether or not it is valid. The API
// specification is very clear that this is the desired behaviour.
() => network.publishBeaconBlock(signedBlock) as Promise<unknown>,
() =>
// there is no rush to persist block since we published it to gossip anyway
chain.processBlock(blockForImport, {...opts, eagerPersistBlock: false}).catch((e) => {
if (e instanceof BlockError && e.type.code === BlockErrorCode.PARENT_UNKNOWN) {
network.events.emit(NetworkEvent.unknownBlockParent, {
blockInput: blockForImport,
peer: IDENTITY_PEER_ID,
});
}
throw e;
}),
...signedBlobs.map((signedBlob) => () => network.publishBlobSidecar(signedBlob)),
];
await promiseAllMaybeAsync(publishPromises);
await publishBlock(signedBlockOrContents, opts);
},

async getBlobSidecars(blockId) {
Expand Down

0 comments on commit 89ff53b

Please sign in to comment.