Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: check blob data availability #879

Merged
merged 8 commits into from
Mar 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions lib/kzg.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,20 @@ defmodule Kzg do
def verify_blob_kzg_proof_batch(_blobs, _kzg_commitments, _kzg_proofs) do
:erlang.nif_error(:nif_not_loaded)
end

################
### Wrappers ###
################

@spec blob_kzg_proof_batch_valid?(
list(Types.blob()),
list(Types.kzg_commitment()),
list(Types.kzg_proof())
) :: boolean()
def blob_kzg_proof_batch_valid?(blobs, kzg_commitments, kzg_proofs) do
case verify_blob_kzg_proof_batch(blobs, kzg_commitments, kzg_proofs) do
{:ok, result} -> result
{:error, _} -> false
end
end
end
43 changes: 29 additions & 14 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
def handle_cast({:add_block, %SignedBeaconBlock{message: block} = signed_block}, state) do
block_root = Ssz.hash_tree_root!(block)

# If already processing or processed, ignore it
if Map.has_key?(state, block_root) or Blocks.has_block?(block_root) do
{:noreply, state}
else
{:noreply, state |> Map.put(block_root, {signed_block, :pending})}
cond do
# If already processing or processed, ignore it
Map.has_key?(state, block_root) or Blocks.has_block?(block_root) ->
state

blocks_to_missing_blobs([{block_root, signed_block}]) |> Enum.empty?() ->
state |> Map.put(block_root, {signed_block, :pending})

true ->
state |> Map.put(block_root, {signed_block, :download_blobs})
end
|> then(&{:noreply, &1})
end

@impl true
Expand Down Expand Up @@ -153,13 +159,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
@impl true
def handle_info(:download_blobs, state) do
blocks_with_blobs =
Enum.filter(state, fn {_, {_, s}} -> s == :download_blobs end)
|> Enum.map(fn {root, {block, _}} -> {root, block} end)

blobs_to_download =
blocks_with_blobs
Stream.filter(state, fn {_, {_, s}} -> s == :download_blobs end)
|> Enum.sort_by(fn {_, {signed_block, _}} -> signed_block.message.slot end)
|> Stream.map(fn {root, {block, _}} -> {root, block} end)
|> Enum.take(16)
|> blocks_to_missing_blobs()

blobs_to_download = blocks_to_missing_blobs(blocks_with_blobs)

downloaded_blobs =
blobs_to_download
Expand Down Expand Up @@ -201,12 +206,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
defp blocks_to_missing_blobs(blocks) do
Enum.flat_map(blocks, fn {block_root,
%{message: %{body: %{blob_kzg_commitments: commitments}}}} ->
0..(length(commitments) - 1)
|> Enum.reject(&match?({:ok, _}, BlobDb.get_blob(block_root, &1)))
|> Enum.map(&%Types.BlobIdentifier{block_root: block_root, index: &1})
Stream.with_index(commitments)
|> Enum.filter(&blob_needs_download?(&1, block_root))
|> Enum.map(&%Types.BlobIdentifier{block_root: block_root, index: elem(&1, 1)})
end)
end

defp blob_needs_download?({commitment, index}, block_root) do
case BlobDb.get_blob_sidecar(block_root, index) do
{:ok, %{kzg_commitment: ^commitment}} ->
false

_ ->
true
end
end

def schedule_blocks_processing do
Process.send_after(__MODULE__, :process_blocks, 3000)
end
Expand Down
43 changes: 30 additions & 13 deletions lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
alias LambdaEthereumConsensus.Execution.ExecutionClient
alias LambdaEthereumConsensus.StateTransition
alias LambdaEthereumConsensus.StateTransition.{Accessors, EpochProcessing, Misc, Predicates}
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.BlockStates

Expand Down Expand Up @@ -79,31 +80,47 @@
finalized_root != Store.get_checkpoint_block(store, block.parent_root, finalized_epoch) ->
{:error, "block isn't descendant of latest finalized block"}

# Check blob data is available
HardForkAliasInjection.deneb?() and
not (Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments)) ->
{:error, "blob data not available"}

true ->
compute_post_state(store, signed_block, base_state)
HardForkAliasInjection.on_deneb do
is_data_available =
Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments)

# Check blob data is available
if is_data_available do
compute_post_state(store, signed_block, base_state)
else
{:error, "blob data not available"}
end
else
compute_post_state(store, signed_block, base_state)
end
end
end

@doc """
Equivalent to `is_data_available` from the spec.
Returns true if the blob's data is available from the network.
"""
# TODO: remove when implemented
@dialyzer {:no_match, on_block: 2}
@spec data_available?(Types.root(), [Types.kzg_commitment()]) :: boolean()
def data_available?(_beacon_block_root, _blob_kzg_commitments) do
def data_available?(_beacon_block_root, []), do: true

def data_available?(beacon_block_root, blob_kzg_commitments) do
# TODO: the p2p network does not guarantee sidecar retrieval
# outside of `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS`. Should we
# handle that case somehow here?
# TODO: fetch blobs and proofs from the DB, and verify them
# blobs, proofs = retrieve_blobs_and_proofs(beacon_block_root)
# return verify_blob_kzg_proof_batch(blobs, blob_kzg_commitments, proofs)
true
blobs =
0..(length(blob_kzg_commitments) - 1)//1
|> Enum.map(&BlobDb.get_blob_with_proof(beacon_block_root, &1))

if Enum.all?(blobs, &match?({:ok, _}, &1)) do
{blobs, proofs} =
Stream.map(blobs, fn {:ok, {blob, proof}} -> {blob, proof} end)
|> Enum.unzip()

Kzg.blob_kzg_proof_batch_valid?(blobs, blob_kzg_commitments, proofs)
else
false
end
end

@doc """
Expand Down Expand Up @@ -184,7 +201,7 @@
%{message: block} = signed_block

payload = block.body.execution_payload
parent_beacon_block_root = state.latest_block_header.parent_root

Check warning on line 204 in lib/lambda_ethereum_consensus/fork_choice/handlers.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

variable "parent_beacon_block_root" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 204 in lib/lambda_ethereum_consensus/fork_choice/handlers.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

variable "parent_beacon_block_root" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 204 in lib/lambda_ethereum_consensus/fork_choice/handlers.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

variable "parent_beacon_block_root" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 204 in lib/lambda_ethereum_consensus/fork_choice/handlers.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

variable "parent_beacon_block_root" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 204 in lib/lambda_ethereum_consensus/fork_choice/handlers.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

variable "parent_beacon_block_root" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 204 in lib/lambda_ethereum_consensus/fork_choice/handlers.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

variable "parent_beacon_block_root" is unused (if the variable is not meant to be used, prefix it with an underscore)

# Make it a task so it runs concurrently with the state transition
payload_verification_task =
Expand Down
45 changes: 39 additions & 6 deletions lib/lambda_ethereum_consensus/store/blob_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,62 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do
@moduledoc """
Storage and retrieval of blobs.
"""
alias LambdaEthereumConsensus.SszEx
alias LambdaEthereumConsensus.Store.Db
alias Types.Blobdata
alias Types.BlobSidecar

@blob_prefix "blob"
@blob_sidecar_prefix "blob_sidecar"
@blobdata_prefix "blobdata"

@spec store_blob(BlobSidecar.t()) :: :ok
def store_blob(%BlobSidecar{signed_block_header: %{message: block}} = blob) do
block_root = Ssz.hash_tree_root!(block)
{:ok, encoded_blob} = Ssz.to_ssz(blob)

key = blob_key(block_root, blob.index)
key = blob_sidecar_key(block_root, blob.index)
Db.put(key, encoded_blob)

{:ok, encoded_blobdata} =
SszEx.encode(%Blobdata{blob: blob.blob, proof: blob.kzg_proof}, Blobdata)

key = blobdata_key(block_root, blob.index)
Db.put(key, encoded_blobdata)
end

@spec get_blob(Types.root(), Types.blob_index()) ::
# TODO: this is only used for tests
@spec store_blob_with_proof(Types.root(), Types.uint64(), Types.blob(), Types.kzg_proof()) ::
:ok
def store_blob_with_proof(block_root, index, blob, proof) do
{:ok, encoded_blobdata} = SszEx.encode(%Blobdata{blob: blob, proof: proof}, Blobdata)
key = blobdata_key(block_root, index)
Db.put(key, encoded_blobdata)
end

@spec get_blob_sidecar(Types.root(), Types.blob_index()) ::
{:ok, BlobSidecar.t()} | {:error, String.t()} | :not_found
def get_blob(block_root, blob_index) do
key = blob_key(block_root, blob_index)
def get_blob_sidecar(block_root, blob_index) do
key = blob_sidecar_key(block_root, blob_index)

with {:ok, signed_block} <- Db.get(key) do
Ssz.from_ssz(signed_block, BlobSidecar)
end
end

defp blob_key(block_root, blob_index), do: @blob_prefix <> block_root <> <<blob_index>>
@spec get_blob_with_proof(Types.root(), Types.blob_index()) ::
{:ok, {Types.blob(), Types.kzg_proof()}} | {:error, String.t()} | :not_found
def get_blob_with_proof(block_root, blob_index) do
key = blobdata_key(block_root, blob_index)

with {:ok, encoded_blobdata} <- Db.get(key),
{:ok, blobdata} <- SszEx.decode(encoded_blobdata, Blobdata) do
%{blob: blob, proof: proof} = blobdata
{:ok, {blob, proof}}
end
end

defp blob_sidecar_key(block_root, blob_index),
do: @blob_sidecar_prefix <> block_root <> <<blob_index>>

defp blobdata_key(block_root, blob_index), do: @blobdata_prefix <> block_root <> <<blob_index>>
end
22 changes: 22 additions & 0 deletions lib/types/blobdata.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# TODO: maybe allow doing this without declaring a new struct?
defmodule Types.Blobdata do
@moduledoc """
BlobSidecar data optimized for usage in `on_block`.
This is needed to run the spectests.
"""
@behaviour LambdaEthereumConsensus.Container

fields = [:blob, :proof]
@enforce_keys fields
defstruct fields

@type t :: %__MODULE__{blob: Types.blob(), proof: Types.kzg_proof()}

@impl LambdaEthereumConsensus.Container
def schema do
[
blob: TypeAliases.blob(),
proof: TypeAliases.kzg_proof()
]
end
end
41 changes: 27 additions & 14 deletions test/spec/runners/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@

alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.ForkChoice.Helpers
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias Types.BeaconBlock
alias Types.BeaconState
Expand All @@ -16,21 +17,9 @@

use HardForkAliasInjection

# TODO: implement blob checks
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🎉

@disabled_deneb [
"invalid_data_unavailable",
"invalid_wrong_proofs_length",
"invalid_incorrect_proof",
"invalid_wrong_blobs_length"
]

@impl TestRunner
def skip?(%SpecTestCase{fork: "capella"}), do: false

def skip?(%SpecTestCase{fork: "deneb", case: testcase}) do
Enum.member?(@disabled_deneb, testcase)
end

def skip?(%SpecTestCase{fork: "deneb"}), do: false
def skip?(_testcase), do: true

@impl TestRunner
Expand Down Expand Up @@ -84,12 +73,16 @@
{:ok, new_store}
end

defp apply_step(case_dir, store, %{block: "block_0x" <> hash = file}) do
defp apply_step(case_dir, store, %{block: "block_0x" <> hash = file} = step) do

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)
block =
SpecTestUtils.read_ssz_from_file!(case_dir <> "/#{file}.ssz_snappy", SignedBeaconBlock)

assert Ssz.hash_tree_root!(block) == Base.decode16!(hash, case: :mixed)

HardForkAliasInjection.on_deneb do
load_blob_data(case_dir, block, step)
end

with {:ok, new_store} <- Handlers.on_block(store, block),
{:ok, new_store} <-
block.message.body.attestations
Expand Down Expand Up @@ -176,4 +169,24 @@

{:ok, store}
end

# TODO: validate the filename's hash
defp load_blob_data(case_dir, block, %{blobs: "blobs_0x" <> _hash = blobs_file, proofs: proofs}) do

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

function load_blob_data/3 is unused
schema = {:list, TypeAliases.blob(), ChainSpec.get("MAX_BLOBS_PER_BLOCK")}

blobs =
SpecTestUtils.read_ssz_ex_from_file!(case_dir <> "/#{blobs_file}.ssz_snappy", schema)

block_root = Ssz.hash_tree_root!(block.message)

Stream.zip([proofs, blobs])
|> Stream.with_index()
|> Enum.each(fn {{proof, blob}, i} ->
BlobDb.store_blob_with_proof(block_root, i, blob, proof)
end)
end

defp load_blob_data(_case_dir, block, %{}) do
assert Enum.empty?(block.message.body.blob_kzg_commitments)
end
end
11 changes: 6 additions & 5 deletions test/spec/tasks/generate_spec_tests.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ defmodule Mix.Tasks.GenerateSpecTests do
module_name = MetaUtils.test_module(config, fork, runner)
runner_module = MetaUtils.runner_module(runner)

# TODO: we can isolate tests that use the DB from each other by using ExUnit's tmp_dir context option.
header = """
defmodule #{module_name} do
use ExUnit.Case, async: false

setup_all do
start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: "tmp/#{config}_#{fork}_#{runner}_test_db"})
start_link_supervised!(LambdaEthereumConsensus.Store.Blocks)
start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates)
Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec)
|> Keyword.put(:config, #{chain_spec_config(config)})
|> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1))
end

setup do
setup %{tmp_dir: tmp_dir} do
on_exit(fn -> LambdaEthereumConsensus.StateTransition.Cache.clear_cache() end)
start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir})
start_link_supervised!(LambdaEthereumConsensus.Store.Blocks)
start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates)
:ok
end
"""

Expand All @@ -92,6 +92,7 @@ defmodule Mix.Tasks.GenerateSpecTests do

defp generate_case(runner_module, testcase) do
"""
@tag :tmp_dir
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

whats this for?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The tmp_dir tag tells ExUnit it should create a temporary directory for the test and adds its path to the context with tmp_dir key. The blob tests reuse the same blocks on different testcases, which causes the tests to fail if some of those testcases aren't supposed to have blobs. That's why we need to nuke the DB after finishing each test.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

#{if runner_module.skip?(testcase), do: "\n@tag :skip", else: ""}
test "#{SpecTestCase.name(testcase)}" do
testcase = #{inspect(testcase)}
Expand Down
4 changes: 2 additions & 2 deletions test/spec/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule SpecTestUtils do
end
end

@spec read_ssz_ex_from_optional_file!(binary, module) :: any() | nil
@spec read_ssz_ex_from_optional_file!(binary, SszEx.schema()) :: any() | nil
def read_ssz_ex_from_optional_file!(file_path, ssz_type) do
if File.exists?(file_path) do
compressed = File.read!(file_path)
Expand All @@ -85,7 +85,7 @@ defmodule SpecTestUtils do
end
end

@spec read_ssz_ex_from_file!(binary, module) :: any()
@spec read_ssz_ex_from_file!(binary, SszEx.schema()) :: any()
def read_ssz_ex_from_file!(file_path, ssz_type) do
case read_ssz_ex_from_optional_file!(file_path, ssz_type) do
nil -> raise "File not found: #{file_path}"
Expand Down
Loading