Skip to content

Commit

Permalink
feat: check blob data availability (#879)
Browse files Browse the repository at this point in the history
  • Loading branch information
MegaRedHand authored Mar 15, 2024
1 parent f09ebd4 commit 1eb37a3
Show file tree
Hide file tree
Showing 8 changed files with 171 additions and 54 deletions.
16 changes: 16 additions & 0 deletions lib/kzg.ex
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,20 @@ defmodule Kzg do
def verify_blob_kzg_proof_batch(_blobs, _kzg_commitments, _kzg_proofs) do
:erlang.nif_error(:nif_not_loaded)
end

################
### Wrappers ###
################

@spec blob_kzg_proof_batch_valid?(
list(Types.blob()),
list(Types.kzg_commitment()),
list(Types.kzg_proof())
) :: boolean()
def blob_kzg_proof_batch_valid?(blobs, kzg_commitments, kzg_proofs) do
case verify_blob_kzg_proof_batch(blobs, kzg_commitments, kzg_proofs) do
{:ok, result} -> result
{:error, _} -> false
end
end
end
43 changes: 29 additions & 14 deletions lib/lambda_ethereum_consensus/beacon/pending_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,18 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
def handle_cast({:add_block, %SignedBeaconBlock{message: block} = signed_block}, state) do
block_root = Ssz.hash_tree_root!(block)

# If already processing or processed, ignore it
if Map.has_key?(state, block_root) or Blocks.has_block?(block_root) do
{:noreply, state}
else
{:noreply, state |> Map.put(block_root, {signed_block, :pending})}
cond do
# If already processing or processed, ignore it
Map.has_key?(state, block_root) or Blocks.has_block?(block_root) ->
state

blocks_to_missing_blobs([{block_root, signed_block}]) |> Enum.empty?() ->
state |> Map.put(block_root, {signed_block, :pending})

true ->
state |> Map.put(block_root, {signed_block, :download_blobs})
end
|> then(&{:noreply, &1})
end

@impl true
Expand Down Expand Up @@ -153,13 +159,12 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
@impl true
def handle_info(:download_blobs, state) do
blocks_with_blobs =
Enum.filter(state, fn {_, {_, s}} -> s == :download_blobs end)
|> Enum.map(fn {root, {block, _}} -> {root, block} end)

blobs_to_download =
blocks_with_blobs
Stream.filter(state, fn {_, {_, s}} -> s == :download_blobs end)
|> Enum.sort_by(fn {_, {signed_block, _}} -> signed_block.message.slot end)
|> Stream.map(fn {root, {block, _}} -> {root, block} end)
|> Enum.take(16)
|> blocks_to_missing_blobs()

blobs_to_download = blocks_to_missing_blobs(blocks_with_blobs)

downloaded_blobs =
blobs_to_download
Expand Down Expand Up @@ -201,12 +206,22 @@ defmodule LambdaEthereumConsensus.Beacon.PendingBlocks do
defp blocks_to_missing_blobs(blocks) do
Enum.flat_map(blocks, fn {block_root,
%{message: %{body: %{blob_kzg_commitments: commitments}}}} ->
0..(length(commitments) - 1)
|> Enum.reject(&match?({:ok, _}, BlobDb.get_blob(block_root, &1)))
|> Enum.map(&%Types.BlobIdentifier{block_root: block_root, index: &1})
Stream.with_index(commitments)
|> Enum.filter(&blob_needs_download?(&1, block_root))
|> Enum.map(&%Types.BlobIdentifier{block_root: block_root, index: elem(&1, 1)})
end)
end

defp blob_needs_download?({commitment, index}, block_root) do
case BlobDb.get_blob_sidecar(block_root, index) do
{:ok, %{kzg_commitment: ^commitment}} ->
false

_ ->
true
end
end

def schedule_blocks_processing do
Process.send_after(__MODULE__, :process_blocks, 3000)
end
Expand Down
43 changes: 30 additions & 13 deletions lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
alias LambdaEthereumConsensus.Execution.ExecutionClient
alias LambdaEthereumConsensus.StateTransition
alias LambdaEthereumConsensus.StateTransition.{Accessors, EpochProcessing, Misc, Predicates}
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias LambdaEthereumConsensus.Store.BlockStates

Expand Down Expand Up @@ -79,31 +80,47 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
finalized_root != Store.get_checkpoint_block(store, block.parent_root, finalized_epoch) ->
{:error, "block isn't descendant of latest finalized block"}

# Check blob data is available
HardForkAliasInjection.deneb?() and
not (Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments)) ->
{:error, "blob data not available"}

true ->
compute_post_state(store, signed_block, base_state)
HardForkAliasInjection.on_deneb do
is_data_available =
Ssz.hash_tree_root!(block) |> data_available?(block.body.blob_kzg_commitments)

# Check blob data is available
if is_data_available do
compute_post_state(store, signed_block, base_state)
else
{:error, "blob data not available"}
end
else
compute_post_state(store, signed_block, base_state)
end
end
end

@doc """
Equivalent to `is_data_available` from the spec.
Returns true if the blob's data is available from the network.
"""
# TODO: remove when implemented
@dialyzer {:no_match, on_block: 2}
@spec data_available?(Types.root(), [Types.kzg_commitment()]) :: boolean()
def data_available?(_beacon_block_root, _blob_kzg_commitments) do
def data_available?(_beacon_block_root, []), do: true

def data_available?(beacon_block_root, blob_kzg_commitments) do
# TODO: the p2p network does not guarantee sidecar retrieval
# outside of `MIN_EPOCHS_FOR_BLOB_SIDECARS_REQUESTS`. Should we
# handle that case somehow here?
# TODO: fetch blobs and proofs from the DB, and verify them
# blobs, proofs = retrieve_blobs_and_proofs(beacon_block_root)
# return verify_blob_kzg_proof_batch(blobs, blob_kzg_commitments, proofs)
true
blobs =
0..(length(blob_kzg_commitments) - 1)//1
|> Enum.map(&BlobDb.get_blob_with_proof(beacon_block_root, &1))

if Enum.all?(blobs, &match?({:ok, _}, &1)) do
{blobs, proofs} =
Stream.map(blobs, fn {:ok, {blob, proof}} -> {blob, proof} end)
|> Enum.unzip()

Kzg.blob_kzg_proof_batch_valid?(blobs, blob_kzg_commitments, proofs)
else
false
end
end

@doc """
Expand Down
45 changes: 39 additions & 6 deletions lib/lambda_ethereum_consensus/store/blob_db.ex
Original file line number Diff line number Diff line change
Expand Up @@ -2,29 +2,62 @@ defmodule LambdaEthereumConsensus.Store.BlobDb do
@moduledoc """
Storage and retrieval of blobs.
"""
alias LambdaEthereumConsensus.SszEx
alias LambdaEthereumConsensus.Store.Db
alias Types.Blobdata
alias Types.BlobSidecar

@blob_prefix "blob"
@blob_sidecar_prefix "blob_sidecar"
@blobdata_prefix "blobdata"

@spec store_blob(BlobSidecar.t()) :: :ok
def store_blob(%BlobSidecar{signed_block_header: %{message: block}} = blob) do
block_root = Ssz.hash_tree_root!(block)
{:ok, encoded_blob} = Ssz.to_ssz(blob)

key = blob_key(block_root, blob.index)
key = blob_sidecar_key(block_root, blob.index)
Db.put(key, encoded_blob)

{:ok, encoded_blobdata} =
SszEx.encode(%Blobdata{blob: blob.blob, proof: blob.kzg_proof}, Blobdata)

key = blobdata_key(block_root, blob.index)
Db.put(key, encoded_blobdata)
end

@spec get_blob(Types.root(), Types.blob_index()) ::
# TODO: this is only used for tests
@spec store_blob_with_proof(Types.root(), Types.uint64(), Types.blob(), Types.kzg_proof()) ::
:ok
def store_blob_with_proof(block_root, index, blob, proof) do
{:ok, encoded_blobdata} = SszEx.encode(%Blobdata{blob: blob, proof: proof}, Blobdata)
key = blobdata_key(block_root, index)
Db.put(key, encoded_blobdata)
end

@spec get_blob_sidecar(Types.root(), Types.blob_index()) ::
{:ok, BlobSidecar.t()} | {:error, String.t()} | :not_found
def get_blob(block_root, blob_index) do
key = blob_key(block_root, blob_index)
def get_blob_sidecar(block_root, blob_index) do
key = blob_sidecar_key(block_root, blob_index)

with {:ok, signed_block} <- Db.get(key) do
Ssz.from_ssz(signed_block, BlobSidecar)
end
end

defp blob_key(block_root, blob_index), do: @blob_prefix <> block_root <> <<blob_index>>
@spec get_blob_with_proof(Types.root(), Types.blob_index()) ::
{:ok, {Types.blob(), Types.kzg_proof()}} | {:error, String.t()} | :not_found
def get_blob_with_proof(block_root, blob_index) do
key = blobdata_key(block_root, blob_index)

with {:ok, encoded_blobdata} <- Db.get(key),
{:ok, blobdata} <- SszEx.decode(encoded_blobdata, Blobdata) do
%{blob: blob, proof: proof} = blobdata
{:ok, {blob, proof}}
end
end

defp blob_sidecar_key(block_root, blob_index),
do: @blob_sidecar_prefix <> block_root <> <<blob_index>>

defp blobdata_key(block_root, blob_index), do: @blobdata_prefix <> block_root <> <<blob_index>>
end
22 changes: 22 additions & 0 deletions lib/types/blobdata.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
# TODO: maybe allow doing this without declaring a new struct?
defmodule Types.Blobdata do
@moduledoc """
BlobSidecar data optimized for usage in `on_block`.
This is needed to run the spectests.
"""
@behaviour LambdaEthereumConsensus.Container

fields = [:blob, :proof]
@enforce_keys fields
defstruct fields

@type t :: %__MODULE__{blob: Types.blob(), proof: Types.kzg_proof()}

@impl LambdaEthereumConsensus.Container
def schema do
[
blob: TypeAliases.blob(),
proof: TypeAliases.kzg_proof()
]
end
end
41 changes: 27 additions & 14 deletions test/spec/runners/fork_choice.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ defmodule ForkChoiceTestRunner do

alias LambdaEthereumConsensus.ForkChoice.Handlers
alias LambdaEthereumConsensus.ForkChoice.Helpers
alias LambdaEthereumConsensus.Store.BlobDb
alias LambdaEthereumConsensus.Store.Blocks
alias Types.BeaconBlock
alias Types.BeaconState
Expand All @@ -16,21 +17,9 @@ defmodule ForkChoiceTestRunner do

use HardForkAliasInjection

# TODO: implement blob checks
@disabled_deneb [
"invalid_data_unavailable",
"invalid_wrong_proofs_length",
"invalid_incorrect_proof",
"invalid_wrong_blobs_length"
]

@impl TestRunner
def skip?(%SpecTestCase{fork: "capella"}), do: false

def skip?(%SpecTestCase{fork: "deneb", case: testcase}) do
Enum.member?(@disabled_deneb, testcase)
end

def skip?(%SpecTestCase{fork: "deneb"}), do: false
def skip?(_testcase), do: true

@impl TestRunner
Expand Down Expand Up @@ -84,12 +73,16 @@ defmodule ForkChoiceTestRunner do
{:ok, new_store}
end

defp apply_step(case_dir, store, %{block: "block_0x" <> hash = file}) do
defp apply_step(case_dir, store, %{block: "block_0x" <> hash = file} = step) do

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)

Check warning on line 76 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

variable "step" is unused (if the variable is not meant to be used, prefix it with an underscore)
block =
SpecTestUtils.read_ssz_from_file!(case_dir <> "/#{file}.ssz_snappy", SignedBeaconBlock)

assert Ssz.hash_tree_root!(block) == Base.decode16!(hash, case: :mixed)

HardForkAliasInjection.on_deneb do
load_blob_data(case_dir, block, step)
end

with {:ok, new_store} <- Handlers.on_block(store, block),
{:ok, new_store} <-
block.message.body.attestations
Expand Down Expand Up @@ -176,4 +169,24 @@ defmodule ForkChoiceTestRunner do

{:ok, store}
end

# TODO: validate the filename's hash
defp load_blob_data(case_dir, block, %{blobs: "blobs_0x" <> _hash = blobs_file, proofs: proofs}) do

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, general)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, mainnet)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

function load_blob_data/3 is unused

Check warning on line 174 in test/spec/runners/fork_choice.ex

View workflow job for this annotation

GitHub Actions / Run spec-tests (capella, minimal)

function load_blob_data/3 is unused
schema = {:list, TypeAliases.blob(), ChainSpec.get("MAX_BLOBS_PER_BLOCK")}

blobs =
SpecTestUtils.read_ssz_ex_from_file!(case_dir <> "/#{blobs_file}.ssz_snappy", schema)

block_root = Ssz.hash_tree_root!(block.message)

Stream.zip([proofs, blobs])
|> Stream.with_index()
|> Enum.each(fn {{proof, blob}, i} ->
BlobDb.store_blob_with_proof(block_root, i, blob, proof)
end)
end

defp load_blob_data(_case_dir, block, %{}) do
assert Enum.empty?(block.message.body.blob_kzg_commitments)
end
end
11 changes: 6 additions & 5 deletions test/spec/tasks/generate_spec_tests.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,22 +62,22 @@ defmodule Mix.Tasks.GenerateSpecTests do
module_name = MetaUtils.test_module(config, fork, runner)
runner_module = MetaUtils.runner_module(runner)

# TODO: we can isolate tests that use the DB from each other by using ExUnit's tmp_dir context option.
header = """
defmodule #{module_name} do
use ExUnit.Case, async: false
setup_all do
start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: "tmp/#{config}_#{fork}_#{runner}_test_db"})
start_link_supervised!(LambdaEthereumConsensus.Store.Blocks)
start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates)
Application.fetch_env!(:lambda_ethereum_consensus, ChainSpec)
|> Keyword.put(:config, #{chain_spec_config(config)})
|> then(&Application.put_env(:lambda_ethereum_consensus, ChainSpec, &1))
end
setup do
setup %{tmp_dir: tmp_dir} do
on_exit(fn -> LambdaEthereumConsensus.StateTransition.Cache.clear_cache() end)
start_link_supervised!({LambdaEthereumConsensus.Store.Db, dir: tmp_dir})
start_link_supervised!(LambdaEthereumConsensus.Store.Blocks)
start_link_supervised!(LambdaEthereumConsensus.Store.BlockStates)
:ok
end
"""

Expand All @@ -92,6 +92,7 @@ defmodule Mix.Tasks.GenerateSpecTests do

defp generate_case(runner_module, testcase) do
"""
@tag :tmp_dir
#{if runner_module.skip?(testcase), do: "\n@tag :skip", else: ""}
test "#{SpecTestCase.name(testcase)}" do
testcase = #{inspect(testcase)}
Expand Down
4 changes: 2 additions & 2 deletions test/spec/utils.ex
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ defmodule SpecTestUtils do
end
end

@spec read_ssz_ex_from_optional_file!(binary, module) :: any() | nil
@spec read_ssz_ex_from_optional_file!(binary, SszEx.schema()) :: any() | nil
def read_ssz_ex_from_optional_file!(file_path, ssz_type) do
if File.exists?(file_path) do
compressed = File.read!(file_path)
Expand All @@ -85,7 +85,7 @@ defmodule SpecTestUtils do
end
end

@spec read_ssz_ex_from_file!(binary, module) :: any()
@spec read_ssz_ex_from_file!(binary, SszEx.schema()) :: any()
def read_ssz_ex_from_file!(file_path, ssz_type) do
case read_ssz_ex_from_optional_file!(file_path, ssz_type) do
nil -> raise "File not found: #{file_path}"
Expand Down

0 comments on commit 1eb37a3

Please sign in to comment.