Skip to content

Commit

Permalink
Merge branch 'main' into main-handler-metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
avilagaston9 authored Jul 23, 2024
2 parents e7a2945 + fbfe49c commit 1809aaa
Show file tree
Hide file tree
Showing 11 changed files with 553 additions and 93 deletions.
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/p2p/gossip/beacon_block.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ defmodule LambdaEthereumConsensus.P2P.Gossip.BeaconBlock do
### Private functions
##########################

@spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:ignore, atom()}
@spec validate(SignedBeaconBlock.t(), Types.slot()) :: :ok | {:ignore, String.t()}
defp validate(%SignedBeaconBlock{message: block}, current_slot) do
min_slot = current_slot - ChainSpec.get("SLOTS_PER_EPOCH")

Expand Down
147 changes: 64 additions & 83 deletions lib/lambda_ethereum_consensus/store/state_db.ex
Original file line number Diff line number Diff line change
@@ -1,119 +1,100 @@
defmodule LambdaEthereumConsensus.Store.StateDb do
@moduledoc """
Beacon node state storage.
This module offers an interface to manage beacon node state storage.
The module coordinates the interaction with the following key-value stores:
* `StateInfoByRoot` - Maps state roots to states.
* `StateRootByBlockRoot` - Maps block roots to state roots.
* `BlockRootBySlot` - Maps slots to block roots.
"""
require Logger
alias LambdaEthereumConsensus.Store.Db
alias LambdaEthereumConsensus.Store.Utils
alias LambdaEthereumConsensus.Store.StateDb.BlockRootBySlot
alias LambdaEthereumConsensus.Store.StateDb.StateInfoByRoot
alias LambdaEthereumConsensus.Store.StateDb.StateRootByBlockRoot
alias Types.BeaconState
alias Types.StateInfo

@state_prefix "beacon_state"
@state_block_prefix "beacon_state_by_state"
@stateslot_prefix @state_prefix <> "slot"
##########################
### Public API
##########################

@spec store_state_info(StateInfo.t()) :: :ok
def store_state_info(%StateInfo{} = state_info) do
key_block = state_key(state_info.block_root)
key_state = block_key(state_info.root)
Db.put(key_block, StateInfo.encode(state_info))
Db.put(key_state, state_info.root)

StateInfoByRoot.put(state_info.root, state_info)
StateRootByBlockRoot.put(state_info.block_root, state_info.root)
# WARN: this overrides any previous mapping for the same slot
slothash_key_block = root_by_slot_key(state_info.beacon_state.slot)
Db.put(slothash_key_block, state_info.root)
end

@spec prune_states_older_than(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found
def prune_states_older_than(slot) do
Logger.info("[StateDb] Pruning started.", slot: slot)
last_finalized_key = slot |> root_by_slot_key()

with {:ok, it} <- Db.iterate(),
{:ok, @stateslot_prefix <> _slot, _value} <-
Exleveldb.iterator_move(it, last_finalized_key),
{:ok, slots_to_remove} <- get_slots_to_remove(it),
:ok <- Exleveldb.iterator_close(it) do
slots_to_remove |> Enum.each(&remove_state_by_slot/1)
Logger.info("[StateDb] Pruning finished. #{length(slots_to_remove)} states removed.")
end
end

@spec get_slots_to_remove(list(non_neg_integer()), :eleveldb.itr_ref()) ::
{:ok, list(non_neg_integer())}
defp get_slots_to_remove(slots_to_remove \\ [], iterator) do
case Exleveldb.iterator_move(iterator, :prev) do
{:ok, @stateslot_prefix <> <<slot::unsigned-size(64)>>, _root} ->
[slot | slots_to_remove] |> get_slots_to_remove(iterator)

_ ->
{:ok, slots_to_remove}
end
end

@spec remove_state_by_slot(non_neg_integer()) :: :ok | :not_found
defp remove_state_by_slot(slot) do
key_slot = root_by_slot_key(slot)

with {:ok, block_root} <- Db.get(key_slot),
key_block <- state_key(block_root),
{:ok, encoded_state} <- Db.get(key_block),
{:ok, state_info} <- StateInfo.decode(encoded_state, block_root) do
key_state = block_key(state_info.root)

Db.delete(key_slot)
Db.delete(key_block)
Db.delete(key_state)
end
BlockRootBySlot.put(state_info.beacon_state.slot, state_info.block_root)
end

@spec get_state_by_block_root(Types.root()) ::
{:ok, StateInfo.t()} | {:error, String.t()} | :not_found
def get_state_by_block_root(block_root) do
with {:ok, bin} <- block_root |> state_key() |> Db.get() do
StateInfo.decode(bin, block_root)
with {:ok, state_root} <- StateRootByBlockRoot.get(block_root) do
StateInfoByRoot.get(state_root)
end
end

@spec get_state_by_state_root(Types.root()) ::
{:ok, StateInfo.t()} | {:error, String.t()} | :not_found
def get_state_by_state_root(state_root) do
with {:ok, block_root} <- state_root |> block_key() |> Db.get() do
get_state_by_block_root(block_root)
end
end
def get_state_by_state_root(state_root), do: StateInfoByRoot.get(state_root)

@spec get_latest_state() ::
{:ok, StateInfo.t()} | {:error, String.t()} | :not_found
def get_latest_state() do
last_key = root_by_slot_key(0xFFFFFFFFFFFFFFFF)

with {:ok, it} <- Db.iterate(),
{:ok, _key, _value} <- Exleveldb.iterator_move(it, last_key),
{:ok, @stateslot_prefix <> _slot, root} <- Exleveldb.iterator_move(it, :prev),
:ok <- Exleveldb.iterator_close(it) do
get_state_by_block_root(root)
else
{:ok, _key, _value} -> :not_found
{:error, :invalid_iterator} -> :not_found
with {:ok, last_block_root} <- BlockRootBySlot.get_last_slot_block_root(),
{:ok, last_state_root} <- StateRootByBlockRoot.get(last_block_root) do
StateInfoByRoot.get(last_state_root)
end
end

@spec get_state_root_by_slot(Types.slot()) ::
{:ok, Types.root()} | {:error, String.t()} | :not_found
def get_state_root_by_slot(slot),
do: slot |> root_by_slot_key() |> Db.get()

@spec get_state_by_slot(Types.slot()) ::
{:ok, BeaconState.t()} | {:error, String.t()} | :not_found
def get_state_by_slot(slot) do
# WARN: this will return the latest state received for the given slot
with {:ok, root} <- get_state_root_by_slot(slot) do
get_state_by_block_root(root)
with {:ok, block_root} <- BlockRootBySlot.get(slot) do
get_state_by_block_root(block_root)
end
end

defp state_key(root), do: Utils.get_key(@state_prefix, root)
defp block_key(root), do: Utils.get_key(@state_block_prefix, root)
defp root_by_slot_key(slot), do: Utils.get_key(@stateslot_prefix, slot)
@spec prune_states_older_than(non_neg_integer()) :: :ok | {:error, String.t()} | :not_found
def prune_states_older_than(slot) do
Logger.info("[StateDb] Pruning started.", slot: slot)

result =
BlockRootBySlot.fold_keys(slot, 0, fn slot, acc ->
case BlockRootBySlot.get(slot) do
{:ok, _block_root} ->
remove_state_by_slot(slot)
acc + 1

other ->
Logger.error(
"[Block pruning] Failed to remove block from slot #{inspect(slot)}. Reason: #{inspect(other)}"
)
end
end)

# TODO: the separate get operation is avoided if we implement folding with values in KvSchema.
case result do
{:ok, n_removed} ->
Logger.info("[StateDb] Pruning finished. #{inspect(n_removed)} states removed.")

{:error, reason} ->
Logger.error("[StateDb] Error pruning states: #{inspect(reason)}")
end
end

##########################
### Private Functions
##########################

@spec remove_state_by_slot(non_neg_integer()) :: :ok | :not_found
defp remove_state_by_slot(slot) do
with {:ok, block_root} <- BlockRootBySlot.get(slot),
{:ok, state_root} <- StateRootByBlockRoot.get(block_root) do
BlockRootBySlot.delete(slot)
StateRootByBlockRoot.delete(block_root)
StateInfoByRoot.delete(state_root)
end
end
end
51 changes: 51 additions & 0 deletions lib/lambda_ethereum_consensus/store/state_db/block_root_by_slot.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
defmodule LambdaEthereumConsensus.Store.StateDb.BlockRootBySlot do
@moduledoc """
KvSchema that stores block roots indexed by slots.
"""
alias LambdaEthereumConsensus.Store.KvSchema
require Logger
use KvSchema, prefix: "statedb_block_root_by_slot"

@impl KvSchema
@spec encode_key(Types.slot()) :: {:ok, binary()} | {:error, binary()}
def encode_key(slot), do: {:ok, <<slot::64>>}

@impl KvSchema
@spec decode_key(binary()) :: {:ok, integer()} | {:error, binary()}
def decode_key(<<slot::64>>), do: {:ok, slot}

def decode_key(other) do
{:error, "[Block by slot] Could not decode slot, not 64 bit integer: #{other}"}
end

@impl KvSchema
@spec encode_value(Types.root()) :: {:ok, Types.root()} | {:error, binary()}
def encode_value(<<_::256>> = root), do: {:ok, root}

@impl KvSchema
@spec decode_value(Types.root()) :: {:ok, Types.root()} | {:error, binary()}
def decode_value(<<_::256>> = root), do: {:ok, root}

@spec get_last_slot_block_root() :: {:ok, Types.root()} | :not_found
def get_last_slot_block_root() do
with {:ok, first_slot} <- first_key() do
fold_keys(
first_slot,
nil,
fn slot, _acc ->
case get(slot) do
{:ok, block_root} ->
block_root

other ->
Logger.error(
"[Block pruning] Failed to find last slot root #{inspect(slot)}. Reason: #{inspect(other)}"
)
end
end,
direction: :next,
include_first: true
)
end
end
end
26 changes: 26 additions & 0 deletions lib/lambda_ethereum_consensus/store/state_db/state_info_by_root.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
defmodule LambdaEthereumConsensus.Store.StateDb.StateInfoByRoot do
@moduledoc """
KvSchema that stores states indexed by their roots.
"""

alias LambdaEthereumConsensus.Store.KvSchema
alias Types.StateInfo
use KvSchema, prefix: "statedb_state_by_root"

@impl KvSchema
@spec encode_key(Types.root()) :: {:ok, binary()}
def encode_key(root) when is_binary(root), do: {:ok, root}

@impl KvSchema
@spec decode_key(binary()) :: {:ok, Types.root()}
def decode_key(root) when is_binary(root), do: {:ok, root}

@impl KvSchema
@spec encode_value(StateInfo.t()) :: {:ok, binary()} | {:error, binary()}
def encode_value(%StateInfo{} = state_info), do: {:ok, StateInfo.encode(state_info)}

@impl KvSchema
@spec decode_value(binary()) :: {:ok, StateInfo.t()} | {:error, binary()}
def decode_value(encoded_state) when is_binary(encoded_state),
do: StateInfo.decode(encoded_state)
end
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule LambdaEthereumConsensus.Store.StateDb.StateRootByBlockRoot do
@moduledoc """
KvSchema that stores state roots indexed by BeaconBlock roots.
"""

alias LambdaEthereumConsensus.Store.KvSchema
use KvSchema, prefix: "statedb_state_root_by_block_root"

@impl KvSchema
@spec encode_key(Types.root()) :: {:ok, binary()}
def encode_key(<<_::256>> = root), do: {:ok, root}

@impl KvSchema
@spec decode_key(Types.root()) :: {:ok, Types.root()}
def decode_key(<<_::256>> = root), do: {:ok, root}

@impl KvSchema
@spec encode_value(Types.root()) :: {:ok, binary()}
def encode_value(<<_::256>> = root), do: {:ok, root}

@impl KvSchema
@spec decode_value(Types.root()) :: {:ok, Types.root()} | {:error, binary()}
def decode_value(<<_::256>> = root), do: {:ok, root}
end
16 changes: 9 additions & 7 deletions lib/types/state_info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ defmodule Types.StateInfo do
- root: The hash tree root of the state, so that we don't recalculate it before saving.
- encoded: The ssz encoded version of the state. It's common that we save a
state after
Warning: Do not modify this manually. If you do, you may need to re-encode the beacon state using `from_beacon_state`.
"""
alias Types.BeaconState

Expand Down Expand Up @@ -37,12 +38,12 @@ defmodule Types.StateInfo do

@spec encode(t()) :: binary()
def encode(%__MODULE__{} = state_info) do
{state_info.encoded, state_info.root} |> :erlang.term_to_binary()
{state_info.encoded, state_info.root, state_info.block_root} |> :erlang.term_to_binary()
end

@spec decode(binary(), Types.root()) :: {:ok, t()} | {:error, binary()}
def decode(bin, block_root) do
with {:ok, encoded, root} <- :erlang.binary_to_term(bin) |> validate_term(),
@spec decode(binary()) :: {:ok, t()} | {:error, binary()}
def decode(bin) do
with {:ok, encoded, root, block_root} <- :erlang.binary_to_term(bin) |> validate_term(),
{:ok, beacon_state} <- Ssz.from_ssz(encoded, BeaconState) do
{:ok,
%__MODULE__{
Expand All @@ -58,9 +59,10 @@ defmodule Types.StateInfo do
with :error <- Keyword.fetch(keyword, key), do: fun.()
end

@spec validate_term(term()) :: {:ok, binary(), Types.root()} | {:error, binary()}
defp validate_term({ssz_encoded, root}) when is_binary(ssz_encoded) and is_binary(root) do
{:ok, ssz_encoded, root}
@spec validate_term(term()) :: {:ok, binary(), Types.root(), Types.root()} | {:error, binary()}
defp validate_term({ssz_encoded, root, block_root})
when is_binary(ssz_encoded) and is_binary(root) and is_binary(root) do
{:ok, ssz_encoded, root, block_root}
end

defp validate_term(other) do
Expand Down
4 changes: 2 additions & 2 deletions test/unit/beacon_api/beacon_api_v1_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ defmodule Unit.BeaconApiTest.V1 do
beacon_state = Fixtures.Block.beacon_state()

patch(
LambdaEthereumConsensus.Store.StateDb,
:get_state_by_state_root,
LambdaEthereumConsensus.Store.StateDb.StateInfoByRoot,
:get,
{:ok, beacon_state}
)

Expand Down
Loading

0 comments on commit 1809aaa

Please sign in to comment.