Skip to content

Commit

Permalink
Merge branch 'main' into p2p-beacon-blocks-by-root
Browse files Browse the repository at this point in the history
  • Loading branch information
h3lio5 authored Dec 8, 2023
2 parents fd2b09e + 99f49f9 commit 4917cf3
Show file tree
Hide file tree
Showing 30 changed files with 2,072 additions and 822 deletions.
60 changes: 0 additions & 60 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -167,66 +167,6 @@ Our aim is to infuse these strengths into the Ethereum consensus client ecosyste

We also have for objective to bootstart an Ethereum Elixir community, and to make Elixir a first-class citizen in the Ethereum ecosystem.

## Roadmap

**1. Block Subscription - Mid September**
- Libp2p discovery and block retrieval
- SSZ + snappy
- `on_block` callback: Save the latest block in fork-choice store, conduct basic checks. GetHead returns the last obtained block.
- Beacon API: Return block root (`GET /beacon/states/{state_id}/root`)

**2. Checkpoint Sync - October**
- Libp2p primitives for sync
- Support checkpoint Sync from a known provider
- Sync from the latest finalized block
- BeaconAPI: Return headers for head block
- EngineAPI: Validate incoming blocks

**3. Attestations - Mid October**
- Libp2p attestation retrieval
- Basic beacon state representation
- Store attestations (last message sent by each validator)
- `on_attestation` callback for attestations sent via Gossip
- Process attestations from blocks
- Beacon API: Return head block root (`GET /beacon/states/head/root`)

**4. Deposits - November**
- BLS signature checks
- Update consensus state view of deposit contract (`process_eth1_data`)
- Process deposit operation to update validator list (`process_deposit`)
- Verify block signatures (`verify_block_signature`)

**5. Slots and Fork-choice - Mid November**
- `on_tick`/`process_slot` in state transition; a GenServer that calls this periodically
- `on_block`: Add slot-related checks and epoch calculations (excluding finalization)
- Get-head uses the messages
- Block header validation
- EngineAPI: Process execution payload
- BeaconAPI: Ensure getting head values point to the heaviest

**6. Finality and Slashing - Mid November**
- Epoch processing
- `on_block`: Prune fork-choice store; reject blocks before finalization
- Add RANDAO mix to the beacon state
- BeaconAPI: Retrieve finality checkpoints, randao mixes
- Process attester slashings and proposer slashings
- EngineAPI: fork-choice updates

**7. Rewards, Shuffling - December**
- Process rewards `on_epoch` for a checkpoint
- Handle Deposits and Withdrawals
- Implement RANDAO
- Calculate committee for a given state
- Conduct shuffling
- Integrate with Grafana
- BeaconAPI: Retrieve randao mix for a given block

**8. Validator Features - Mid December/January 2024**
- Create attestations
- Monitor for slashings
- Create slashing proofs
- BeaconAPI: Post blocks, slashings, voluntary exits, and withdrawals

## Contributor Package

Dream of becoming an Ethereum core developer? Eager to shape the protocol that will underpin tomorrow's world? Want to collaborate with a passionate team, learn, grow, and be a pivotal part of the Ethereum Elixir community?
Expand Down
131 changes: 131 additions & 0 deletions docs/concurrency_design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Store concurrency design

## Current situation

The following is a sequence diagram on the lifecycle of a block, from the moment its notification arrives in the LibP2P port and until it's processed and saved. Each lane is a separate process and may encompass many different modules.

```mermaid
sequenceDiagram
participant port as LibP2P Port <br> (Genserver)
participant sub as Subscriber <br> (GenStage)
participant consumer as GossipConsumer <br> (broadway)
participant pending as Pending Blocks <br> (GenServer)
participant store as Fork-choice store (GenServer)
participant DB as KV Store
port ->> sub: gossip(id)
sub ->> port: accept(id)
sub ->> consumer: handle_demand
consumer ->> consumer: decompress <br> decode <br> call handler
consumer ->> pending: decoded block
pending ->> store: has_block(block)
store -->> pending: false
pending ->> store: has_block(parent)
store -->> pending: true
pending ->> store: on_block(block)
store ->> store: validate block <br> calculate state transition <br> add state
store ->> DB: store block
store -->> pending: :ok
```

Let's look at the main issues and some improvements that may help with them.

### Blocking Calls

`Store.on_block(block)` (write operation) is blocking. This operation is particularly big, as it performs the state transition. These causes some issues:

- It's a call, so the calling process (in our case the pending blocks processor) will be blocked until the state transition is finished. No further blocks will be downloaded while this happens.
- Any other store call (adding an attestation, checking if a block is present) will be blocked.

Improvements:

- Making it a `cast`. The caller doesn't immediately need to know what's the result of the state transition. We can do that an async operation.
- Making the state transition be calculated in an async way, so the store can take other work like adding attestations while the cast happens.

### Concurrent downloads

Downloading a block is:

- A heavy IO operation (non-cpu consuming).
- Independent from downloading a different block.

Improvements:
- We should consider, instead of downloading them in sequence, downloading them in different tasks.

### Big Objects in Mailboxes

Blocks are pretty big objects and they are passed around in process mailboxes even for simple calls like `Store.has_block(block)`. We should minimize this kind of interactions as putting big structures in mailboxes slows their processing down.

Improvements:

- We could store the blocks in the DB immediately after downloading them.
- Checking if a block is present could be done directly with the DB, without need to check the store.
- If we want faster access for blocks, we can build an ETS block cache.

### Other issues

- States aren't ever stored in the DB. This is not a concurrency issue, but we should fix it.
- Low priority, but we should evaluate dropping the Subscriber genserver and broadway, and have one task per message under a supervisor.

## State Diagram

These are the states that a block may have:

- New: just downloaded, decompressed and decoded
- Pending: no parent.
- Child. Parent is present and downloaded.
- BlockChild: Parent is a valid block.
- StateChild: Parent’s state transition is calculated.
- Included: we calculated the state transition for this block and the state is available. It's now part of the fork tree.

The block diagram looks something like this:

```mermaid
stateDiagram-v2
[*] --> New: Download, decompress, decode
New --> Child: Parent is present
New --> Pending: Parent is not present
Pending --> Child: Parent is downloaded
Child --> BlockChild: Parent is a valid block (but not a state)
Child --> Invalid: Parent is Invalid
BlockChild --> Invalid: store validation fails
BlockChild --> StateChild: Parent state is present
StateChild --> NewState: state transition calculated
StateChild --> Invalid: state transition fails
```

### A possible new design

```mermaid
sequenceDiagram
participant port as LibP2P Port <br> (Genserver)
participant decoder as Decoder <br> (Supervised task)
participant tracker as Block Tracker <br> (GenServer)
participant down as Downloader <br> (Supervised task)
participant store as Fork Choice Store <br> (Genserver)
participant state_t as State Transition Task <br> (Supervised task)
participant DB as KV Store
port ->> decoder: gossip(id)
decoder ->> port: accept(id)
decoder ->> decoder: decompress <br> decode <br> call handler
decoder ->> DB: store_block_if_not_present(block)
decoder ->> tracker: new_block(root)
tracker ->> DB: present?(parent_root)
DB -->> tracker: false
tracker ->> down: download(parent_root)
down ->> DB: store_block_if_not_present(parent_root)
down ->> tracker: downloaded(parent_root)
tracker ->> store: on_block(root)
store ->> DB: get_block(root)
store ->> store: validate block
store ->> state_t: state_transition(block)
state_t ->> DB: store_state(new_state)
state_t ->> store: on_state(new_state)
state_t ->> tracker: on_state(new_state)
```

Some pending definitions:

- The block tracker could eventually be a block cache, and maintain blocks and their state in an ETS that can be accessed easily by other processes.
43 changes: 22 additions & 21 deletions lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
Store
}

import LambdaEthereumConsensus.Utils, only: [if_then_update: 3, map: 2]
import LambdaEthereumConsensus.Utils, only: [if_then_update: 3, map_ok: 2]

### Public API ###

Expand Down Expand Up @@ -172,7 +172,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
|> update_checkpoints(state.current_justified_checkpoint, state.finalized_checkpoint)
# Eagerly compute unrealized justification and finality
|> compute_pulled_up_tip(block_root)
|> then(&{:ok, &1})
end
end

Expand Down Expand Up @@ -216,27 +215,29 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
end)
end

# Pull up the post-state of the block to the next epoch boundary
def compute_pulled_up_tip(%Store{block_states: states} = store, block_root) do
# Pull up the post-state of the block to the next epoch boundary
# TODO: handle possible errors
{:ok, state} = EpochProcessing.process_justification_and_finalization(states[block_root])
result = EpochProcessing.process_justification_and_finalization(states[block_root])

block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()
with {:ok, state} <- result do
block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()

unrealized_justifications =
Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)
unrealized_justifications =
Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)

%Store{store | unrealized_justifications: unrealized_justifications}
|> update_unrealized_checkpoints(
state.current_justified_checkpoint,
state.finalized_checkpoint
)
|> if_then_update(
block_epoch < current_epoch,
# If the block is from a prior epoch, apply the realized values
&update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
)
%Store{store | unrealized_justifications: unrealized_justifications}
|> update_unrealized_checkpoints(
state.current_justified_checkpoint,
state.finalized_checkpoint
)
|> if_then_update(
block_epoch < current_epoch,
# If the block is from a prior epoch, apply the realized values
&update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
)
|> then(&{:ok, &1})
end
end

# Update unrealized checkpoints in store if necessary
Expand Down Expand Up @@ -323,7 +324,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
else: {:ok, &1}
)
)
|> map(
|> map_ok(
&{:ok, %Store{store | checkpoint_states: Map.put(store.checkpoint_states, target, &1)}}
)
end
Expand All @@ -335,7 +336,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
message = %Checkpoint{epoch: target.epoch, root: beacon_block_root}

attesting_indices
|> Stream.filter(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(&(not Map.has_key?(messages, &1) or target.epoch > messages[&1].epoch))
|> Enum.reduce(messages, &Map.put(&2, &1, message))
|> then(&{:ok, %Store{store | latest_messages: &1}})
Expand Down
41 changes: 19 additions & 22 deletions lib/lambda_ethereum_consensus/fork_choice/helpers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,35 +58,31 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do

Stream.cycle([nil])
|> Enum.reduce_while(head, fn nil, head ->
children =
blocks
|> Stream.filter(fn {_, block} -> block.parent_root == head end)
|> Enum.map(fn {root, _} -> root end)

if Enum.empty?(children) do
{:halt, head}
else
{:cont, Enum.max_by(children, fn root -> get_weight(store, root) end)}
end
blocks
|> Stream.filter(fn {_, block} -> block.parent_root == head end)
|> Stream.map(fn {root, _} -> root end)
# Ties broken by favoring block with lexicographically higher root
|> Enum.sort(:desc)
|> then(fn
[] -> {:halt, head}
c -> {:cont, Enum.max_by(c, &get_weight(store, &1))}
end)
end)
|> then(&{:ok, &1})
end

defp get_weight(%Store{} = store, root) do
state = store.checkpoint_states[store.justified_checkpoint]

unslashed_and_active_indices =
Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state))
|> Enum.filter(fn i -> not Enum.at(state.validators, i).slashed end)

attestation_score =
unslashed_and_active_indices
Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state))
|> Stream.reject(&Enum.at(state.validators, &1).slashed)
|> Stream.filter(&Map.has_key?(store.latest_messages, &1))
|> Stream.filter(&(not MapSet.member?(store.equivocating_indices, &1)))
|> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(fn i ->
Store.get_ancestor(store, store.latest_messages[i].root, store.blocks[root].slot) == root
end)
|> Stream.map(fn i -> Enum.at(state.validators, i).effective_balance end)
|> Stream.map(&Enum.at(state.validators, &1).effective_balance)
|> Enum.sum()

if store.proposer_boost_root == <<0::256>> or
Expand Down Expand Up @@ -123,14 +119,15 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do

# If any children branches contain expected finalized/justified checkpoints,
# add to filtered block-tree and signal viability to parent.
filter_block_tree_result = Enum.map(children, &filter_block_tree(store, &1, blocks))
{filter_block_tree_result, new_blocks} =
Enum.map_reduce(children, blocks, fn root, acc -> filter_block_tree(store, root, acc) end)

cond do
Enum.any?(filter_block_tree_result, fn {b, _} -> b end) ->
{true, Map.put(blocks, block_root, block)}
Enum.any?(filter_block_tree_result) ->
{true, Map.put(new_blocks, block_root, block)}

Enum.any?(children) ->
{false, blocks}
not Enum.empty?(children) ->
{false, new_blocks}

true ->
filter_leaf_block(store, block_root, block, blocks)
Expand Down
6 changes: 2 additions & 4 deletions lib/lambda_ethereum_consensus/p2p/gossip_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do

alias LambdaEthereumConsensus.Beacon.PendingBlocks
alias LambdaEthereumConsensus.ForkChoice.Store
alias LambdaEthereumConsensus.Utils.BitVector
alias SszTypes.{AggregateAndProof, SignedAggregateAndProof, SignedBeaconBlock}

@spec handle_message(String.t(), struct) :: :ok
Expand All @@ -31,7 +32,7 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do
"/eth2/bba4da96/beacon_aggregate_and_proof/ssz_snappy",
%SignedAggregateAndProof{message: %AggregateAndProof{aggregate: aggregate}}
) do
votes = count_bits(aggregate.aggregation_bits)
votes = BitVector.count(aggregate.aggregation_bits)
slot = aggregate.data.slot
root = aggregate.data.beacon_block_root |> Base.encode16()

Expand All @@ -49,7 +50,4 @@ defmodule LambdaEthereumConsensus.P2P.GossipHandler do
|> then(&"[#{topic_name}] decoded: '#{&1}'")
|> Logger.debug()
end

defp count_bits(bitstring),
do: for(<<bit::1 <- bitstring>>, do: bit) |> Enum.sum()
end
Loading

0 comments on commit 4917cf3

Please sign in to comment.