Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/main' into ssz_implement_bitlist
Browse files Browse the repository at this point in the history
  • Loading branch information
f3r10 committed Dec 11, 2023
2 parents 131110c + 99f49f9 commit 99455c9
Show file tree
Hide file tree
Showing 17 changed files with 1,423 additions and 209 deletions.
131 changes: 131 additions & 0 deletions docs/concurrency_design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Store concurrency design

## Current situation

The following is a sequence diagram on the lifecycle of a block, from the moment its notification arrives in the LibP2P port and until it's processed and saved. Each lane is a separate process and may encompass many different modules.

```mermaid
sequenceDiagram
participant port as LibP2P Port <br> (Genserver)
participant sub as Subscriber <br> (GenStage)
participant consumer as GossipConsumer <br> (broadway)
participant pending as Pending Blocks <br> (GenServer)
participant store as Fork-choice store (GenServer)
participant DB as KV Store
port ->> sub: gossip(id)
sub ->> port: accept(id)
sub ->> consumer: handle_demand
consumer ->> consumer: decompress <br> decode <br> call handler
consumer ->> pending: decoded block
pending ->> store: has_block(block)
store -->> pending: false
pending ->> store: has_block(parent)
store -->> pending: true
pending ->> store: on_block(block)
store ->> store: validate block <br> calculate state transition <br> add state
store ->> DB: store block
store -->> pending: :ok
```

Let's look at the main issues and some improvements that may help with them.

### Blocking Calls

`Store.on_block(block)` (write operation) is blocking. This operation is particularly big, as it performs the state transition. These causes some issues:

- It's a call, so the calling process (in our case the pending blocks processor) will be blocked until the state transition is finished. No further blocks will be downloaded while this happens.
- Any other store call (adding an attestation, checking if a block is present) will be blocked.

Improvements:

- Making it a `cast`. The caller doesn't immediately need to know what's the result of the state transition. We can do that an async operation.
- Making the state transition be calculated in an async way, so the store can take other work like adding attestations while the cast happens.

### Concurrent downloads

Downloading a block is:

- A heavy IO operation (non-cpu consuming).
- Independent from downloading a different block.

Improvements:
- We should consider, instead of downloading them in sequence, downloading them in different tasks.

### Big Objects in Mailboxes

Blocks are pretty big objects and they are passed around in process mailboxes even for simple calls like `Store.has_block(block)`. We should minimize this kind of interactions as putting big structures in mailboxes slows their processing down.

Improvements:

- We could store the blocks in the DB immediately after downloading them.
- Checking if a block is present could be done directly with the DB, without need to check the store.
- If we want faster access for blocks, we can build an ETS block cache.

### Other issues

- States aren't ever stored in the DB. This is not a concurrency issue, but we should fix it.
- Low priority, but we should evaluate dropping the Subscriber genserver and broadway, and have one task per message under a supervisor.

## State Diagram

These are the states that a block may have:

- New: just downloaded, decompressed and decoded
- Pending: no parent.
- Child. Parent is present and downloaded.
- BlockChild: Parent is a valid block.
- StateChild: Parent’s state transition is calculated.
- Included: we calculated the state transition for this block and the state is available. It's now part of the fork tree.

The block diagram looks something like this:

```mermaid
stateDiagram-v2
[*] --> New: Download, decompress, decode
New --> Child: Parent is present
New --> Pending: Parent is not present
Pending --> Child: Parent is downloaded
Child --> BlockChild: Parent is a valid block (but not a state)
Child --> Invalid: Parent is Invalid
BlockChild --> Invalid: store validation fails
BlockChild --> StateChild: Parent state is present
StateChild --> NewState: state transition calculated
StateChild --> Invalid: state transition fails
```

### A possible new design

```mermaid
sequenceDiagram
participant port as LibP2P Port <br> (Genserver)
participant decoder as Decoder <br> (Supervised task)
participant tracker as Block Tracker <br> (GenServer)
participant down as Downloader <br> (Supervised task)
participant store as Fork Choice Store <br> (Genserver)
participant state_t as State Transition Task <br> (Supervised task)
participant DB as KV Store
port ->> decoder: gossip(id)
decoder ->> port: accept(id)
decoder ->> decoder: decompress <br> decode <br> call handler
decoder ->> DB: store_block_if_not_present(block)
decoder ->> tracker: new_block(root)
tracker ->> DB: present?(parent_root)
DB -->> tracker: false
tracker ->> down: download(parent_root)
down ->> DB: store_block_if_not_present(parent_root)
down ->> tracker: downloaded(parent_root)
tracker ->> store: on_block(root)
store ->> DB: get_block(root)
store ->> store: validate block
store ->> state_t: state_transition(block)
state_t ->> DB: store_state(new_state)
state_t ->> store: on_state(new_state)
state_t ->> tracker: on_state(new_state)
```

Some pending definitions:

- The block tracker could eventually be a block cache, and maintain blocks and their state in an ETS that can be accessed easily by other processes.
39 changes: 20 additions & 19 deletions lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
|> update_checkpoints(state.current_justified_checkpoint, state.finalized_checkpoint)
# Eagerly compute unrealized justification and finality
|> compute_pulled_up_tip(block_root)
|> then(&{:ok, &1})
end
end

Expand Down Expand Up @@ -216,27 +215,29 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
end)
end

# Pull up the post-state of the block to the next epoch boundary
def compute_pulled_up_tip(%Store{block_states: states} = store, block_root) do
# Pull up the post-state of the block to the next epoch boundary
# TODO: handle possible errors
{:ok, state} = EpochProcessing.process_justification_and_finalization(states[block_root])
result = EpochProcessing.process_justification_and_finalization(states[block_root])

block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()
with {:ok, state} <- result do
block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()

unrealized_justifications =
Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)
unrealized_justifications =
Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)

%Store{store | unrealized_justifications: unrealized_justifications}
|> update_unrealized_checkpoints(
state.current_justified_checkpoint,
state.finalized_checkpoint
)
|> if_then_update(
block_epoch < current_epoch,
# If the block is from a prior epoch, apply the realized values
&update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
)
%Store{store | unrealized_justifications: unrealized_justifications}
|> update_unrealized_checkpoints(
state.current_justified_checkpoint,
state.finalized_checkpoint
)
|> if_then_update(
block_epoch < current_epoch,
# If the block is from a prior epoch, apply the realized values
&update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
)
|> then(&{:ok, &1})
end
end

# Update unrealized checkpoints in store if necessary
Expand Down Expand Up @@ -335,7 +336,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
message = %Checkpoint{epoch: target.epoch, root: beacon_block_root}

attesting_indices
|> Stream.filter(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(&(not Map.has_key?(messages, &1) or target.epoch > messages[&1].epoch))
|> Enum.reduce(messages, &Map.put(&2, &1, message))
|> then(&{:ok, %Store{store | latest_messages: &1}})
Expand Down
30 changes: 13 additions & 17 deletions lib/lambda_ethereum_consensus/fork_choice/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,31 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do

Stream.cycle([nil])
|> Enum.reduce_while(head, fn nil, head ->
children =
blocks
|> Stream.filter(fn {_, block} -> block.parent_root == head end)
|> Enum.map(fn {root, _} -> root end)

if Enum.empty?(children) do
{:halt, head}
else
{:cont, Enum.max_by(children, fn root -> get_weight(store, root) end)}
end
blocks
|> Stream.filter(fn {_, block} -> block.parent_root == head end)
|> Stream.map(fn {root, _} -> root end)
# Ties broken by favoring block with lexicographically higher root
|> Enum.sort(:desc)
|> then(fn
[] -> {:halt, head}
c -> {:cont, Enum.max_by(c, &get_weight(store, &1))}
end)
end)
|> then(&{:ok, &1})
end

defp get_weight(%Store{} = store, root) do
state = store.checkpoint_states[store.justified_checkpoint]

unslashed_and_active_indices =
Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state))
|> Enum.filter(fn i -> not Enum.at(state.validators, i).slashed end)

attestation_score =
unslashed_and_active_indices
Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state))
|> Stream.reject(&Enum.at(state.validators, &1).slashed)
|> Stream.filter(&Map.has_key?(store.latest_messages, &1))
|> Stream.filter(&(not MapSet.member?(store.equivocating_indices, &1)))
|> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(fn i ->
Store.get_ancestor(store, store.latest_messages[i].root, store.blocks[root].slot) == root
end)
|> Stream.map(fn i -> Enum.at(state.validators, i).effective_balance end)
|> Stream.map(&Enum.at(state.validators, &1).effective_balance)
|> Enum.sum()

if store.proposer_boost_root == <<0::256>> or
Expand Down
8 changes: 5 additions & 3 deletions lib/lambda_ethereum_consensus/state_transition/accessors.ex
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
Return the current epoch.
"""
@spec get_current_epoch(BeaconState.t()) :: SszTypes.epoch()
def get_current_epoch(%BeaconState{slot: slot} = _state) do
def get_current_epoch(%BeaconState{slot: slot}) do
Misc.compute_epoch_at_slot(slot)
end

Expand Down Expand Up @@ -373,8 +373,10 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
@spec get_block_root_at_slot(BeaconState.t(), SszTypes.slot()) ::
{:ok, SszTypes.root()} | {:error, binary()}
def get_block_root_at_slot(state, slot) do
if slot < state.slot and state.slot <= slot + ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT") do
root = Enum.at(state.block_roots, rem(slot, ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT")))
slots_per_historical_root = ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT")

if slot < state.slot and state.slot <= slot + slots_per_historical_root do
root = Enum.at(state.block_roots, rem(slot, slots_per_historical_root))
{:ok, root}
else
{:error, "Block root not available"}
Expand Down
Loading

0 comments on commit 99455c9

Please sign in to comment.