diff --git a/lib/kzg.ex b/lib/kzg.ex index 7688f464f..178e6f2a2 100644 --- a/lib/kzg.ex +++ b/lib/kzg.ex @@ -47,4 +47,20 @@ defmodule Kzg do def verify_blob_kzg_proof_batch(_blobs, _kzg_commitments, _kzg_proofs) do :erlang.nif_error(:nif_not_loaded) end + + ################ + ### Wrappers ### + ################ + + @spec blob_kzg_proof_batch_valid?( + list(Types.blob()), + list(Types.kzg_commitment()), + list(Types.kzg_proof()) + ) :: boolean() + def blob_kzg_proof_batch_valid?(blobs, kzg_commitments, kzg_proofs) do + case verify_blob_kzg_proof_batch(blobs, kzg_commitments, kzg_proofs) do + {:ok, result} -> result + {:error, _} -> false + end + end end diff --git a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex index c7f4ef554..128a7edda 100644 --- a/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex +++ b/lib/lambda_ethereum_consensus/beacon/pending_blocks.ex @@ -59,12 +59,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do def handle_cast({:add_block, %SignedBeaconBlock{message: block} = signed_block}, state) do block_root = Ssz.hash_tree_root!(block) - # If already processing or processed, ignore it - if Map.has_key?(state, block_root) or Blocks.has_block?(block_root) do - {:noreply, state} - else - {:noreply, state |> Map.put(block_root, {signed_block, :pending})} + cond do + # If already processing or processed, ignore it + Map.has_key?(state, block_root) or Blocks.has_block?(block_root) -> + state + + blocks_to_missing_blobs([{block_root, signed_block}]) |> Enum.empty?() -> + state |> Map.put(block_root, {signed_block, :pending}) + + true -> + state |> Map.put(block_root, {signed_block, :download_blobs}) end + |> then(&{:noreply, &1}) end @impl true @@ -153,13 +159,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do @impl true def handle_info(:download_blobs, state) do blocks_with_blobs = - Enum.filter(state, fn {_, {_, s}} -> s == :download_blobs end) - |> Enum.map(fn {root, {block, _}} -> {root, block} end) - - blobs_to_download = - blocks_with_blobs + Stream.filter(state, fn {_, {_, s}} -> s == :download_blobs end) + |> Enum.sort_by(fn {_, {signed_block, _}} -> signed_block.message.slot end) + |> Stream.map(fn {root, {block, _}} -> {root, block} end) |> Enum.take(16) - |> blocks_to_missing_blobs() + + blobs_to_download = blocks_to_missing_blobs(blocks_with_blobs) downloaded_blobs = blobs_to_download @@ -201,12 +206,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do defp blocks_to_missing_blobs(blocks) do Enum.flat_map(blocks, fn {block_root, %{message: %{body: %{blob_kzg_commitments: commitments}}}} -> - 0..(length(commitments) - 1) - |> Enum.reject(&match?({:ok, _}, BlobDb.get_blob(block_root, &1))) - |> Enum.map(&%Types.BlobIdentifier{block_root: block_root, index: &1}) + Stream.with_index(commitments) + |> Enum.filter(&blob_needs_download?(&1, block_root)) + |> Enum.map(&%Types.BlobIdentifier{block_root: block_root, index: elem(&1, 1)}) end) end + defp blob_needs_download?({commitment, index}, block_root) do + case BlobDb.get_blob_sidecar(block_root, index) do + {:ok, %{kzg_commitment: ^commitment}} -> + false + + _ -> + true + end + end + def schedule_blocks_processing do Process.send_after(__MODULE__, :process_blocks, 3000) end diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex index bcaec1b33..07baa2496 100644 --- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex +++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex @@ -7,6 +7,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do alias LambdaEthereumConsensus.Execution.ExecutionClient alias LambdaEthereumConsensus.StateTransition alias LambdaEthereumConsensus.StateTransition.{Accessors, EpochProcessing, Misc, Predicates} + alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks alias LambdaEthereumConsensus.Store.BlockStates @@ -79,13 +80,20 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do finalized_root != Store.get_checkpoint_block(store, block.parent_root, finalized_epoch) -> {:error, "block isn't descendant of latest finalized block"} - # Check blob data is available - HardForkAliasInjection.deneb?() and - not (Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments)) -> - {:error, "blob data not available"} - true -> - compute_post_state(store, signed_block, base_state) + HardForkAliasInjection.on_deneb do + is_data_available = + Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments) + + # Check blob data is available + if is_data_available do + compute_post_state(store, signed_block, base_state) + else + {:error, "blob data not available"} + end + else + compute_post_state(store, signed_block, base_state) + end end end @@ -93,17 +101,26 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do Equivalent to `is_data_available` from the spec. Returns true if the blob's data is available from the network. """ - # TODO: remove when implemented - @dialyzer {:no_match, on_block: 2} @spec data_available?(Types.root(), [Types.kzg_commitment()]) :: boolean() - def data_available?(_beacon_block_root, _blob_kzg_commitments) do + def data_available?(_beacon_block_root, []), do: true + + def data_available?(beacon_block_root, blob_kzg_commitments) do # TODO: the p2p network does not guarantee sidecar retrieval # outside of `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS`. Should we # handle that case somehow here? - # TODO: fetch blobs and proofs from the DB, and verify them - # blobs, proofs = retrieve_blobs_and_proofs(beacon_block_root) - # return verify_blob_kzg_proof_batch(blobs, blob_kzg_commitments, proofs) - true + blobs = + 0..(length(blob_kzg_commitments) - 1)//1 + |> Enum.map(&BlobDb.get_blob_with_proof(beacon_block_root, &1)) + + if Enum.all?(blobs, &match?({:ok, _}, &1)) do + {blobs, proofs} = + Stream.map(blobs, fn {:ok, {blob, proof}} -> {blob, proof} end) + |> Enum.unzip() + + Kzg.blob_kzg_proof_batch_valid?(blobs, blob_kzg_commitments, proofs) + else + false + end end @doc """ diff --git a/lib/lambda_ethereum_consensus/store/blob_db.ex b/lib/lambda_ethereum_consensus/store/blob_db.ex index 4320bfd49..fdc3865a6 100644 --- a/lib/lambda_ethereum_consensus/store/blob_db.ex +++ b/lib/lambda_ethereum_consensus/store/blob_db.ex @@ -2,29 +2,62 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do @moduledoc """ Storage and retrieval of blobs. """ + alias LambdaEthereumConsensus.SszEx alias LambdaEthereumConsensus.Store.Db + alias Types.Blobdata alias Types.BlobSidecar - @blob_prefix "blob" + @blob_sidecar_prefix "blob_sidecar" + @blobdata_prefix "blobdata" @spec store_blob(BlobSidecar.t()) :: :ok def store_blob(%BlobSidecar{signed_block_header: %{message: block}} = blob) do block_root = Ssz.hash_tree_root!(block) {:ok, encoded_blob} = Ssz.to_ssz(blob) - key = blob_key(block_root, blob.index) + key = blob_sidecar_key(block_root, blob.index) Db.put(key, encoded_blob) + + {:ok, encoded_blobdata} = + SszEx.encode(%Blobdata{blob: blob.blob, proof: blob.kzg_proof}, Blobdata) + + key = blobdata_key(block_root, blob.index) + Db.put(key, encoded_blobdata) end - @spec get_blob(Types.root(), Types.blob_index()) :: + # TODO: this is only used for tests + @spec store_blob_with_proof(Types.root(), Types.uint64(), Types.blob(), Types.kzg_proof()) :: + :ok + def store_blob_with_proof(block_root, index, blob, proof) do + {:ok, encoded_blobdata} = SszEx.encode(%Blobdata{blob: blob, proof: proof}, Blobdata) + key = blobdata_key(block_root, index) + Db.put(key, encoded_blobdata) + end + + @spec get_blob_sidecar(Types.root(), Types.blob_index()) :: {:ok, BlobSidecar.t()} | {:error, String.t()} | :not_found - def get_blob(block_root, blob_index) do - key = blob_key(block_root, blob_index) + def get_blob_sidecar(block_root, blob_index) do + key = blob_sidecar_key(block_root, blob_index) with {:ok, signed_block} <- Db.get(key) do Ssz.from_ssz(signed_block, BlobSidecar) end end - defp blob_key(block_root, blob_index), do: @blob_prefix <> block_root <> <> + @spec get_blob_with_proof(Types.root(), Types.blob_index()) :: + {:ok, {Types.blob(), Types.kzg_proof()}} | {:error, String.t()} | :not_found + def get_blob_with_proof(block_root, blob_index) do + key = blobdata_key(block_root, blob_index) + + with {:ok, encoded_blobdata} <- Db.get(key), + {:ok, blobdata} <- SszEx.decode(encoded_blobdata, Blobdata) do + %{blob: blob, proof: proof} = blobdata + {:ok, {blob, proof}} + end + end + + defp blob_sidecar_key(block_root, blob_index), + do: @blob_sidecar_prefix <> block_root <> <> + + defp blobdata_key(block_root, blob_index), do: @blobdata_prefix <> block_root <> <> end diff --git a/lib/types/blobdata.ex b/lib/types/blobdata.ex new file mode 100644 index 000000000..f20e60a0c --- /dev/null +++ b/lib/types/blobdata.ex @@ -0,0 +1,22 @@ +# TODO: maybe allow doing this without declaring a new struct? +defmodule Types.Blobdata do + @moduledoc """ + BlobSidecar data optimized for usage in `on_block`. + This is needed to run the spectests. + """ + @behaviour LambdaEthereumConsensus.Container + + fields = [:blob, :proof] + @enforce_keys fields + defstruct fields + + @type t :: %__MODULE__{blob: Types.blob(), proof: Types.kzg_proof()} + + @impl LambdaEthereumConsensus.Container + def schema do + [ + blob: TypeAliases.blob(), + proof: TypeAliases.kzg_proof() + ] + end +end diff --git a/test/spec/runners/fork_choice.ex b/test/spec/runners/fork_choice.ex index 4b3925b01..faae408fd 100644 --- a/test/spec/runners/fork_choice.ex +++ b/test/spec/runners/fork_choice.ex @@ -8,6 +8,7 @@ defmodule ForkChoiceTestRunner do alias LambdaEthereumConsensus.ForkChoice.Handlers alias LambdaEthereumConsensus.ForkChoice.Helpers + alias LambdaEthereumConsensus.Store.BlobDb alias LambdaEthereumConsensus.Store.Blocks alias Types.BeaconBlock alias Types.BeaconState @@ -16,21 +17,9 @@ defmodule ForkChoiceTestRunner do use HardForkAliasInjection - # TODO: implement blob checks - @disabled_deneb [ - "invalid_data_unavailable", - "invalid_wrong_proofs_length", - "invalid_incorrect_proof", - "invalid_wrong_blobs_length" - ] - @impl TestRunner def skip?(%SpecTestCase{fork: "capella"}), do: false - - def skip?(%SpecTestCase{fork: "deneb", case: testcase}) do - Enum.member?(@disabled_deneb, testcase) - end - + def skip?(%SpecTestCase{fork: "deneb"}), do: false def skip?(_testcase), do: true @impl TestRunner @@ -84,12 +73,16 @@ defmodule ForkChoiceTestRunner do {:ok, new_store} end - defp apply_step(case_dir, store, %{block: "block_0x" <> hash = file}) do + defp apply_step(case_dir, store, %{block: "block_0x" <> hash = file} = step) do block = SpecTestUtils.read_ssz_from_file!(case_dir <> "/#{file}.ssz_snappy", SignedBeaconBlock) assert Ssz.hash_tree_root!(block) == Base.decode16!(hash, case: :mixed) + HardForkAliasInjection.on_deneb do + load_blob_data(case_dir, block, step) + end + with {:ok, new_store} <- Handlers.on_block(store, block), {:ok, new_store} <- block.message.body.attestations @@ -176,4 +169,24 @@ defmodule ForkChoiceTestRunner do {:ok, store} end + + # TODO: validate the filename's hash + defp load_blob_data(case_dir, block, %{blobs: "blobs_0x" <> _hash = blobs_file, proofs: proofs}) do + schema = {:list, TypeAliases.blob(), ChainSpec.get("MAX_BLOBS_PER_BLOCK")} + + blobs = + SpecTestUtils.read_ssz_ex_from_file!(case_dir <> "/#{blobs_file}.ssz_snappy", schema) + + block_root = Ssz.hash_tree_root!(block.message) + + Stream.zip([proofs, blobs]) + |> Stream.with_index() + |> Enum.each(fn {{proof, blob}, i} -> + BlobDb.store_blob_with_proof(block_root, i, blob, proof) + end) + end + + defp load_blob_data(_case_dir, block, %{}) do + assert Enum.empty?(block.message.body.blob_kzg_commitments) + end end diff --git a/test/spec/tasks/generate_spec_tests.ex b/test/spec/tasks/generate_spec_tests.ex index c05c3d590..84b93054f 100644 --- a/test/spec/tasks/generate_spec_tests.ex +++ b/test/spec/tasks/generate_spec_tests.ex @@ -62,22 +62,22 @@ defmodule Mix.Tasks.GenerateSpecTests do module_name = MetaUtils.test_module(config, fork, runner) runner_module = MetaUtils.runner_module(runner) - # TODO: we can isolate tests that use the DB from each other by using ExUnit's tmp_dir context option. header = """ defmodule #{module_name} do use ExUnit.Case, async: false setup_all do - start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: "tmp/#{config}_#{fork}_#{runner}_test_db"}) - start_link_supervised!(LambdaEthereumConsensus.Store.Blocks) - start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates) Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec) |> Keyword.put(:config, #{chain_spec_config(config)}) |> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1)) end - setup do + setup %{tmp_dir: tmp_dir} do on_exit(fn -> LambdaEthereumConsensus.StateTransition.Cache.clear_cache() end) + start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir}) + start_link_supervised!(LambdaEthereumConsensus.Store.Blocks) + start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates) + :ok end """ @@ -92,6 +92,7 @@ defmodule Mix.Tasks.GenerateSpecTests do defp generate_case(runner_module, testcase) do """ + @tag :tmp_dir #{if runner_module.skip?(testcase), do: "\n@tag :skip", else: ""} test "#{SpecTestCase.name(testcase)}" do testcase = #{inspect(testcase)} diff --git a/test/spec/utils.ex b/test/spec/utils.ex index 3565701c1..fbc14c8b0 100644 --- a/test/spec/utils.ex +++ b/test/spec/utils.ex @@ -65,7 +65,7 @@ defmodule SpecTestUtils do end end - @spec read_ssz_ex_from_optional_file!(binary, module) :: any() | nil + @spec read_ssz_ex_from_optional_file!(binary, SszEx.schema()) :: any() | nil def read_ssz_ex_from_optional_file!(file_path, ssz_type) do if File.exists?(file_path) do compressed = File.read!(file_path) @@ -85,7 +85,7 @@ defmodule SpecTestUtils do end end - @spec read_ssz_ex_from_file!(binary, module) :: any() + @spec read_ssz_ex_from_file!(binary, SszEx.schema()) :: any() def read_ssz_ex_from_file!(file_path, ssz_type) do case read_ssz_ex_from_optional_file!(file_path, ssz_type) do nil -> raise "File not found: #{file_path}"