Skip to content

Commit

Permalink
Merge branch 'main' into feat/ping_metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
karasakalmt authored Dec 12, 2023
2 parents ebc38b5 + 3f6ac3b commit 936668e
Show file tree
Hide file tree
Showing 36 changed files with 2,063 additions and 422 deletions.
131 changes: 131 additions & 0 deletions docs/concurrency_design.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,131 @@
# Store concurrency design

## Current situation

The following is a sequence diagram on the lifecycle of a block, from the moment its notification arrives in the LibP2P port and until it's processed and saved. Each lane is a separate process and may encompass many different modules.

```mermaid
sequenceDiagram
participant port as LibP2P Port <br> (Genserver)
participant sub as Subscriber <br> (GenStage)
participant consumer as GossipConsumer <br> (broadway)
participant pending as Pending Blocks <br> (GenServer)
participant store as Fork-choice store (GenServer)
participant DB as KV Store
port ->> sub: gossip(id)
sub ->> port: accept(id)
sub ->> consumer: handle_demand
consumer ->> consumer: decompress <br> decode <br> call handler
consumer ->> pending: decoded block
pending ->> store: has_block(block)
store -->> pending: false
pending ->> store: has_block(parent)
store -->> pending: true
pending ->> store: on_block(block)
store ->> store: validate block <br> calculate state transition <br> add state
store ->> DB: store block
store -->> pending: :ok
```

Let's look at the main issues and some improvements that may help with them.

### Blocking Calls

`Store.on_block(block)` (write operation) is blocking. This operation is particularly big, as it performs the state transition. These causes some issues:

- It's a call, so the calling process (in our case the pending blocks processor) will be blocked until the state transition is finished. No further blocks will be downloaded while this happens.
- Any other store call (adding an attestation, checking if a block is present) will be blocked.

Improvements:

- Making it a `cast`. The caller doesn't immediately need to know what's the result of the state transition. We can do that an async operation.
- Making the state transition be calculated in an async way, so the store can take other work like adding attestations while the cast happens.

### Concurrent downloads

Downloading a block is:

- A heavy IO operation (non-cpu consuming).
- Independent from downloading a different block.

Improvements:
- We should consider, instead of downloading them in sequence, downloading them in different tasks.

### Big Objects in Mailboxes

Blocks are pretty big objects and they are passed around in process mailboxes even for simple calls like `Store.has_block(block)`. We should minimize this kind of interactions as putting big structures in mailboxes slows their processing down.

Improvements:

- We could store the blocks in the DB immediately after downloading them.
- Checking if a block is present could be done directly with the DB, without need to check the store.
- If we want faster access for blocks, we can build an ETS block cache.

### Other issues

- States aren't ever stored in the DB. This is not a concurrency issue, but we should fix it.
- Low priority, but we should evaluate dropping the Subscriber genserver and broadway, and have one task per message under a supervisor.

## State Diagram

These are the states that a block may have:

- New: just downloaded, decompressed and decoded
- Pending: no parent.
- Child. Parent is present and downloaded.
- BlockChild: Parent is a valid block.
- StateChild: Parent’s state transition is calculated.
- Included: we calculated the state transition for this block and the state is available. It's now part of the fork tree.

The block diagram looks something like this:

```mermaid
stateDiagram-v2
[*] --> New: Download, decompress, decode
New --> Child: Parent is present
New --> Pending: Parent is not present
Pending --> Child: Parent is downloaded
Child --> BlockChild: Parent is a valid block (but not a state)
Child --> Invalid: Parent is Invalid
BlockChild --> Invalid: store validation fails
BlockChild --> StateChild: Parent state is present
StateChild --> NewState: state transition calculated
StateChild --> Invalid: state transition fails
```

### A possible new design

```mermaid
sequenceDiagram
participant port as LibP2P Port <br> (Genserver)
participant decoder as Decoder <br> (Supervised task)
participant tracker as Block Tracker <br> (GenServer)
participant down as Downloader <br> (Supervised task)
participant store as Fork Choice Store <br> (Genserver)
participant state_t as State Transition Task <br> (Supervised task)
participant DB as KV Store
port ->> decoder: gossip(id)
decoder ->> port: accept(id)
decoder ->> decoder: decompress <br> decode <br> call handler
decoder ->> DB: store_block_if_not_present(block)
decoder ->> tracker: new_block(root)
tracker ->> DB: present?(parent_root)
DB -->> tracker: false
tracker ->> down: download(parent_root)
down ->> DB: store_block_if_not_present(parent_root)
down ->> tracker: downloaded(parent_root)
tracker ->> store: on_block(root)
store ->> DB: get_block(root)
store ->> store: validate block
store ->> state_t: state_transition(block)
state_t ->> DB: store_state(new_state)
state_t ->> store: on_state(new_state)
state_t ->> tracker: on_state(new_state)
```

Some pending definitions:

- The block tracker could eventually be a block cache, and maintain blocks and their state in an ETS that can be accessed easily by other processes.
111 changes: 75 additions & 36 deletions lib/constants.ex
Original file line number Diff line number Diff line change
@@ -1,81 +1,120 @@
defmodule Constants do
@moduledoc """
Constants module with 0-arity functions.
The following values are (non-configurable) constants used throughout the specification.
"""

@spec genesis_epoch() :: integer
### Misc

@spec genesis_epoch() :: SszTypes.slot()
def genesis_epoch, do: 0

@spec genesis_slot() :: integer
@spec genesis_slot() :: SszTypes.slot()
def genesis_slot, do: 0

@spec bls_withdrawal_prefix() :: <<_::8>>
@far_future_epoch 2 ** 64 - 1

@spec far_future_epoch() :: non_neg_integer()
def far_future_epoch, do: @far_future_epoch

@spec base_rewards_per_epoch() :: non_neg_integer()
def base_rewards_per_epoch, do: 4

@spec deposit_contract_tree_depth() :: non_neg_integer()
def deposit_contract_tree_depth, do: 32

@spec justification_bits_length() :: non_neg_integer()
def justification_bits_length, do: 4

@spec endianness() :: atom()
def endianness, do: :little

@spec participation_flag_weights() :: list(non_neg_integer())
def participation_flag_weights,
do: [timely_source_weight(), timely_target_weight(), timely_head_weight()]

### Withdrawal prefixes

@spec bls_withdrawal_prefix() :: SszTypes.bytes1()
def bls_withdrawal_prefix, do: <<0>>

@spec eth1_address_withdrawal_prefix() :: <<_::8>>
@spec eth1_address_withdrawal_prefix() :: SszTypes.bytes1()
def eth1_address_withdrawal_prefix, do: <<1>>

@spec domain_beacon_attester() :: SszTypes.domain_type()
def domain_beacon_attester, do: <<1, 0, 0, 0>>
### Domain types

@spec domain_beacon_proposer() :: SszTypes.domain_type()
def domain_beacon_proposer, do: <<0, 0, 0, 0>>

@spec domain_deposit() :: SszTypes.domain_type()
def domain_deposit, do: <<3, 0, 0, 0>>
@spec domain_beacon_attester() :: SszTypes.domain_type()
def domain_beacon_attester, do: <<1, 0, 0, 0>>

@spec domain_randao() :: SszTypes.domain_type()
def domain_randao, do: <<2, 0, 0, 0>>

@spec domain_sync_committee() :: SszTypes.domain_type()
def domain_sync_committee, do: <<7, 0, 0, 0>>
@spec domain_deposit() :: SszTypes.domain_type()
def domain_deposit, do: <<3, 0, 0, 0>>

@spec domain_voluntary_exit() :: SszTypes.domain_type()
def domain_voluntary_exit, do: <<4, 0, 0, 0>>

@spec domain_selection_proof() :: SszTypes.domain_type()
def domain_selection_proof, do: <<5, 0, 0, 0>>

@spec domain_aggregate_and_proof() :: SszTypes.domain_type()
def domain_aggregate_and_proof, do: <<6, 0, 0, 0>>

@spec domain_application_mask() :: SszTypes.domain_type()
def domain_application_mask, do: <<0, 0, 0, 1>>

@spec domain_sync_committee() :: SszTypes.domain_type()
def domain_sync_committee, do: <<7, 0, 0, 0>>

@spec domain_sync_committee_selection_proof() :: SszTypes.domain_type()
def domain_sync_committee_selection_proof, do: <<8, 0, 0, 0>>

@spec domain_contribution_and_proof() :: SszTypes.domain_type()
def domain_contribution_and_proof, do: <<9, 0, 0, 0>>

@spec domain_bls_to_execution_change() :: SszTypes.domain_type()
def domain_bls_to_execution_change, do: <<10, 0, 0, 0>>

@spec timely_source_flag_index() :: integer
### Participation flag indices

@spec timely_source_flag_index() :: non_neg_integer()
def timely_source_flag_index, do: 0

@spec timely_target_flag_index() :: integer
@spec timely_target_flag_index() :: non_neg_integer()
def timely_target_flag_index, do: 1

@spec timely_head_flag_index() :: integer
@spec timely_head_flag_index() :: non_neg_integer()
def timely_head_flag_index, do: 2

@spec proposer_weight() :: integer
def proposer_weight, do: 8

@spec sync_reward_weight() :: integer
def sync_reward_weight, do: 2

@spec weight_denominator() :: integer
def weight_denominator, do: 64
### Incentivization weights

@spec participation_flag_weights() :: list(integer)
def participation_flag_weights,
do: [timely_source_weight(), timely_target_weight(), timely_head_weight()]

@spec base_reward_factor() :: integer
def base_reward_factor, do: 64

@spec timely_source_weight() :: integer
@spec timely_source_weight() :: non_neg_integer()
def timely_source_weight, do: 14

@spec timely_target_weight() :: integer
@spec timely_target_weight() :: non_neg_integer()
def timely_target_weight, do: 26

@spec timely_head_weight() :: integer
@spec timely_head_weight() :: non_neg_integer()
def timely_head_weight, do: 14

@spec far_future_epoch() :: integer
def far_future_epoch, do: 2 ** 64 - 1
@spec sync_reward_weight() :: non_neg_integer()
def sync_reward_weight, do: 2

@spec deposit_contract_tree_depth() :: integer
def deposit_contract_tree_depth, do: 32
@spec proposer_weight() :: non_neg_integer()
def proposer_weight, do: 8

@spec weight_denominator() :: non_neg_integer()
def weight_denominator, do: 64

@spec intervals_per_slot() :: integer
## Fork choice

@spec intervals_per_slot() :: non_neg_integer()
def intervals_per_slot, do: 3

@spec proposer_score_boost() :: non_neg_integer()
def proposer_score_boost, do: 3
end
39 changes: 20 additions & 19 deletions lib/lambda_ethereum_consensus/fork_choice/handlers.ex
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
|> update_checkpoints(state.current_justified_checkpoint, state.finalized_checkpoint)
# Eagerly compute unrealized justification and finality
|> compute_pulled_up_tip(block_root)
|> then(&{:ok, &1})
end
end

Expand Down Expand Up @@ -216,27 +215,29 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
end)
end

# Pull up the post-state of the block to the next epoch boundary
def compute_pulled_up_tip(%Store{block_states: states} = store, block_root) do
# Pull up the post-state of the block to the next epoch boundary
# TODO: handle possible errors
{:ok, state} = EpochProcessing.process_justification_and_finalization(states[block_root])
result = EpochProcessing.process_justification_and_finalization(states[block_root])

block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()
with {:ok, state} <- result do
block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()

unrealized_justifications =
Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)
unrealized_justifications =
Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)

%Store{store | unrealized_justifications: unrealized_justifications}
|> update_unrealized_checkpoints(
state.current_justified_checkpoint,
state.finalized_checkpoint
)
|> if_then_update(
block_epoch < current_epoch,
# If the block is from a prior epoch, apply the realized values
&update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
)
%Store{store | unrealized_justifications: unrealized_justifications}
|> update_unrealized_checkpoints(
state.current_justified_checkpoint,
state.finalized_checkpoint
)
|> if_then_update(
block_epoch < current_epoch,
# If the block is from a prior epoch, apply the realized values
&update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
)
|> then(&{:ok, &1})
end
end

# Update unrealized checkpoints in store if necessary
Expand Down Expand Up @@ -335,7 +336,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
message = %Checkpoint{epoch: target.epoch, root: beacon_block_root}

attesting_indices
|> Stream.filter(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(&(not Map.has_key?(messages, &1) or target.epoch > messages[&1].epoch))
|> Enum.reduce(messages, &Map.put(&2, &1, message))
|> then(&{:ok, %Store{store | latest_messages: &1}})
Expand Down
Loading

0 comments on commit 936668e

Please sign in to comment.