diff --git a/docs/concurrency_design.md b/docs/concurrency_design.md
new file mode 100644
index 000000000..cf0f13552
--- /dev/null
+++ b/docs/concurrency_design.md
@@ -0,0 +1,131 @@
+# Store concurrency design
+
+## Current situation
+
+The following is a sequence diagram on the lifecycle of a block, from the moment its notification arrives in the LibP2P port and until it's processed and saved. Each lane is a separate process and may encompass many different modules.
+
+```mermaid
+sequenceDiagram
+
+participant port as LibP2P Port
(Genserver)
+participant sub as Subscriber
(GenStage)
+participant consumer as GossipConsumer
(broadway)
+participant pending as Pending Blocks
(GenServer)
+participant store as Fork-choice store (GenServer)
+participant DB as KV Store
+
+port ->> sub: gossip(id)
+sub ->> port: accept(id)
+sub ->> consumer: handle_demand
+consumer ->> consumer: decompress
decode
call handler
+consumer ->> pending: decoded block
+pending ->> store: has_block(block)
+store -->> pending: false
+pending ->> store: has_block(parent)
+store -->> pending: true
+pending ->> store: on_block(block)
+store ->> store: validate block
calculate state transition
add state
+store ->> DB: store block
+store -->> pending: :ok
+```
+
+Let's look at the main issues and some improvements that may help with them.
+
+### Blocking Calls
+
+`Store.on_block(block)` (write operation) is blocking. This operation is particularly big, as it performs the state transition. These causes some issues:
+
+- It's a call, so the calling process (in our case the pending blocks processor) will be blocked until the state transition is finished. No further blocks will be downloaded while this happens.
+- Any other store call (adding an attestation, checking if a block is present) will be blocked.
+
+Improvements:
+
+- Making it a `cast`. The caller doesn't immediately need to know what's the result of the state transition. We can do that an async operation.
+- Making the state transition be calculated in an async way, so the store can take other work like adding attestations while the cast happens.
+
+### Concurrent downloads
+
+Downloading a block is:
+
+- A heavy IO operation (non-cpu consuming).
+- Independent from downloading a different block.
+
+Improvements:
+- We should consider, instead of downloading them in sequence, downloading them in different tasks.
+
+### Big Objects in Mailboxes
+
+Blocks are pretty big objects and they are passed around in process mailboxes even for simple calls like `Store.has_block(block)`. We should minimize this kind of interactions as putting big structures in mailboxes slows their processing down.
+
+Improvements:
+
+- We could store the blocks in the DB immediately after downloading them.
+- Checking if a block is present could be done directly with the DB, without need to check the store.
+- If we want faster access for blocks, we can build an ETS block cache.
+
+### Other issues
+
+- States aren't ever stored in the DB. This is not a concurrency issue, but we should fix it.
+- Low priority, but we should evaluate dropping the Subscriber genserver and broadway, and have one task per message under a supervisor.
+
+## State Diagram
+
+These are the states that a block may have:
+
+- New: just downloaded, decompressed and decoded
+- Pending: no parent.
+- Child. Parent is present and downloaded.
+- BlockChild: Parent is a valid block.
+- StateChild: Parent’s state transition is calculated.
+- Included: we calculated the state transition for this block and the state is available. It's now part of the fork tree.
+
+The block diagram looks something like this:
+
+```mermaid
+stateDiagram-v2
+ [*] --> New: Download, decompress, decode
+ New --> Child: Parent is present
+ New --> Pending: Parent is not present
+ Pending --> Child: Parent is downloaded
+ Child --> BlockChild: Parent is a valid block (but not a state)
+ Child --> Invalid: Parent is Invalid
+ BlockChild --> Invalid: store validation fails
+ BlockChild --> StateChild: Parent state is present
+ StateChild --> NewState: state transition calculated
+ StateChild --> Invalid: state transition fails
+```
+
+### A possible new design
+
+```mermaid
+sequenceDiagram
+ participant port as LibP2P Port
(Genserver)
+ participant decoder as Decoder
(Supervised task)
+ participant tracker as Block Tracker
(GenServer)
+ participant down as Downloader
(Supervised task)
+ participant store as Fork Choice Store
(Genserver)
+ participant state_t as State Transition Task
(Supervised task)
+ participant DB as KV Store
+
+ port ->> decoder: gossip(id)
+ decoder ->> port: accept(id)
+ decoder ->> decoder: decompress
decode
call handler
+ decoder ->> DB: store_block_if_not_present(block)
+ decoder ->> tracker: new_block(root)
+ tracker ->> DB: present?(parent_root)
+ DB -->> tracker: false
+ tracker ->> down: download(parent_root)
+ down ->> DB: store_block_if_not_present(parent_root)
+ down ->> tracker: downloaded(parent_root)
+ tracker ->> store: on_block(root)
+ store ->> DB: get_block(root)
+ store ->> store: validate block
+ store ->> state_t: state_transition(block)
+ state_t ->> DB: store_state(new_state)
+ state_t ->> store: on_state(new_state)
+ state_t ->> tracker: on_state(new_state)
+```
+
+Some pending definitions:
+
+- The block tracker could eventually be a block cache, and maintain blocks and their state in an ETS that can be accessed easily by other processes.
diff --git a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex
index 5d06630f5..e6223e954 100644
--- a/lib/lambda_ethereum_consensus/fork_choice/handlers.ex
+++ b/lib/lambda_ethereum_consensus/fork_choice/handlers.ex
@@ -172,7 +172,6 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
|> update_checkpoints(state.current_justified_checkpoint, state.finalized_checkpoint)
# Eagerly compute unrealized justification and finality
|> compute_pulled_up_tip(block_root)
- |> then(&{:ok, &1})
end
end
@@ -216,27 +215,29 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
end)
end
+ # Pull up the post-state of the block to the next epoch boundary
def compute_pulled_up_tip(%Store{block_states: states} = store, block_root) do
- # Pull up the post-state of the block to the next epoch boundary
- # TODO: handle possible errors
- {:ok, state} = EpochProcessing.process_justification_and_finalization(states[block_root])
+ result = EpochProcessing.process_justification_and_finalization(states[block_root])
- block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
- current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()
+ with {:ok, state} <- result do
+ block_epoch = Misc.compute_epoch_at_slot(store.blocks[block_root].slot)
+ current_epoch = store |> Store.get_current_slot() |> Misc.compute_epoch_at_slot()
- unrealized_justifications =
- Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)
+ unrealized_justifications =
+ Map.put(store.unrealized_justifications, block_root, state.current_justified_checkpoint)
- %Store{store | unrealized_justifications: unrealized_justifications}
- |> update_unrealized_checkpoints(
- state.current_justified_checkpoint,
- state.finalized_checkpoint
- )
- |> if_then_update(
- block_epoch < current_epoch,
- # If the block is from a prior epoch, apply the realized values
- &update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
- )
+ %Store{store | unrealized_justifications: unrealized_justifications}
+ |> update_unrealized_checkpoints(
+ state.current_justified_checkpoint,
+ state.finalized_checkpoint
+ )
+ |> if_then_update(
+ block_epoch < current_epoch,
+ # If the block is from a prior epoch, apply the realized values
+ &update_checkpoints(&1, state.current_justified_checkpoint, state.finalized_checkpoint)
+ )
+ |> then(&{:ok, &1})
+ end
end
# Update unrealized checkpoints in store if necessary
@@ -335,7 +336,7 @@ defmodule LambdaEthereumConsensus.ForkChoice.Handlers do
message = %Checkpoint{epoch: target.epoch, root: beacon_block_root}
attesting_indices
- |> Stream.filter(&MapSet.member?(store.equivocating_indices, &1))
+ |> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(&(not Map.has_key?(messages, &1) or target.epoch > messages[&1].epoch))
|> Enum.reduce(messages, &Map.put(&2, &1, message))
|> then(&{:ok, %Store{store | latest_messages: &1}})
diff --git a/lib/lambda_ethereum_consensus/fork_choice/helpers.ex b/lib/lambda_ethereum_consensus/fork_choice/helpers.ex
index ce38e83ee..1ac8c5d93 100644
--- a/lib/lambda_ethereum_consensus/fork_choice/helpers.ex
+++ b/lib/lambda_ethereum_consensus/fork_choice/helpers.ex
@@ -58,16 +58,15 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do
Stream.cycle([nil])
|> Enum.reduce_while(head, fn nil, head ->
- children =
- blocks
- |> Stream.filter(fn {_, block} -> block.parent_root == head end)
- |> Enum.map(fn {root, _} -> root end)
-
- if Enum.empty?(children) do
- {:halt, head}
- else
- {:cont, Enum.max_by(children, fn root -> get_weight(store, root) end)}
- end
+ blocks
+ |> Stream.filter(fn {_, block} -> block.parent_root == head end)
+ |> Stream.map(fn {root, _} -> root end)
+ # Ties broken by favoring block with lexicographically higher root
+ |> Enum.sort(:desc)
+ |> then(fn
+ [] -> {:halt, head}
+ c -> {:cont, Enum.max_by(c, &get_weight(store, &1))}
+ end)
end)
|> then(&{:ok, &1})
end
@@ -75,18 +74,15 @@ defmodule LambdaEthereumConsensus.ForkChoice.Helpers do
defp get_weight(%Store{} = store, root) do
state = store.checkpoint_states[store.justified_checkpoint]
- unslashed_and_active_indices =
- Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state))
- |> Enum.filter(fn i -> not Enum.at(state.validators, i).slashed end)
-
attestation_score =
- unslashed_and_active_indices
+ Accessors.get_active_validator_indices(state, Accessors.get_current_epoch(state))
+ |> Stream.reject(&Enum.at(state.validators, &1).slashed)
|> Stream.filter(&Map.has_key?(store.latest_messages, &1))
- |> Stream.filter(&(not MapSet.member?(store.equivocating_indices, &1)))
+ |> Stream.reject(&MapSet.member?(store.equivocating_indices, &1))
|> Stream.filter(fn i ->
Store.get_ancestor(store, store.latest_messages[i].root, store.blocks[root].slot) == root
end)
- |> Stream.map(fn i -> Enum.at(state.validators, i).effective_balance end)
+ |> Stream.map(&Enum.at(state.validators, &1).effective_balance)
|> Enum.sum()
if store.proposer_boost_root == <<0::256>> or
diff --git a/lib/lambda_ethereum_consensus/state_transition/accessors.ex b/lib/lambda_ethereum_consensus/state_transition/accessors.ex
index 376e28a54..1399d7283 100644
--- a/lib/lambda_ethereum_consensus/state_transition/accessors.ex
+++ b/lib/lambda_ethereum_consensus/state_transition/accessors.ex
@@ -120,7 +120,7 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
Return the current epoch.
"""
@spec get_current_epoch(BeaconState.t()) :: SszTypes.epoch()
- def get_current_epoch(%BeaconState{slot: slot} = _state) do
+ def get_current_epoch(%BeaconState{slot: slot}) do
Misc.compute_epoch_at_slot(slot)
end
@@ -373,8 +373,10 @@ defmodule LambdaEthereumConsensus.StateTransition.Accessors do
@spec get_block_root_at_slot(BeaconState.t(), SszTypes.slot()) ::
{:ok, SszTypes.root()} | {:error, binary()}
def get_block_root_at_slot(state, slot) do
- if slot < state.slot and state.slot <= slot + ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT") do
- root = Enum.at(state.block_roots, rem(slot, ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT")))
+ slots_per_historical_root = ChainSpec.get("SLOTS_PER_HISTORICAL_ROOT")
+
+ if slot < state.slot and state.slot <= slot + slots_per_historical_root do
+ root = Enum.at(state.block_roots, rem(slot, slots_per_historical_root))
{:ok, root}
else
{:error, "Block root not available"}
diff --git a/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex b/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex
index 9773582b7..fe31f6d41 100644
--- a/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex
+++ b/lib/lambda_ethereum_consensus/state_transition/epoch_processing.ex
@@ -364,8 +364,6 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do
previous_target_balance,
current_target_balance
)
- else
- {:error, reason} -> {:error, reason}
end
end
end
@@ -373,55 +371,28 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do
defp weigh_justification_and_finalization(
state,
total_active_balance,
- previous_epoch_target_balance,
- current_epoch_target_balance
+ previous_target_balance,
+ current_target_balance
) do
previous_epoch = Accessors.get_previous_epoch(state)
current_epoch = Accessors.get_current_epoch(state)
- old_previous_justified_checkpoint = state.previous_justified_checkpoint
- old_current_justified_checkpoint = state.current_justified_checkpoint
-
- with {:ok, previous_block_root} <- Accessors.get_block_root(state, previous_epoch),
- {:ok, current_block_root} <- Accessors.get_block_root(state, current_epoch) do
- new_state =
- state
- |> update_first_bit()
- |> update_previous_epoch_justified(
- previous_epoch_target_balance * 3 >= total_active_balance * 2,
- previous_epoch,
- previous_block_root
- )
- |> update_current_epoch_justified(
- current_epoch_target_balance * 3 >= total_active_balance * 2,
- current_epoch,
- current_block_root
- )
- |> update_checkpoint_finalization(
- old_previous_justified_checkpoint,
- current_epoch,
- 1..3,
- 3
- )
- |> update_checkpoint_finalization(
- old_previous_justified_checkpoint,
- current_epoch,
- 1..2,
- 2
- )
- |> update_checkpoint_finalization(
- old_current_justified_checkpoint,
- current_epoch,
- 0..2,
- 2
- )
- |> update_checkpoint_finalization(
- old_current_justified_checkpoint,
- current_epoch,
- 0..1,
- 1
- )
-
- {:ok, new_state}
+ old_previous_justified = state.previous_justified_checkpoint
+ old_current_justified = state.current_justified_checkpoint
+ previous_is_justified = previous_target_balance * 3 >= total_active_balance * 2
+ current_is_justified = current_target_balance * 3 >= total_active_balance * 2
+
+ new_state = update_first_bit(state)
+
+ with {:ok, new_state} <-
+ update_epoch_justified(new_state, previous_is_justified, previous_epoch, 1),
+ {:ok, new_state} <-
+ update_epoch_justified(new_state, current_is_justified, current_epoch, 0) do
+ new_state
+ |> update_checkpoint_finalization(old_previous_justified, current_epoch, 1..3, 3)
+ |> update_checkpoint_finalization(old_previous_justified, current_epoch, 1..2, 2)
+ |> update_checkpoint_finalization(old_current_justified, current_epoch, 0..2, 2)
+ |> update_checkpoint_finalization(old_current_justified, current_epoch, 0..1, 1)
+ |> then(&{:ok, &1})
end
end
@@ -439,50 +410,21 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do
}
end
- defp update_previous_epoch_justified(state, true, previous_epoch, previous_block_root) do
- new_checkpoint = %SszTypes.Checkpoint{
- epoch: previous_epoch,
- root: previous_block_root
- }
-
- bits =
- state.justification_bits
- |> BitVector.new(4)
- |> BitVector.set(1)
- |> to_byte()
-
- %BeaconState{
- state
- | current_justified_checkpoint: new_checkpoint,
- justification_bits: bits
- }
- end
-
- defp update_previous_epoch_justified(state, false, _previous_epoch, _previous_block_root) do
- state
- end
-
- defp update_current_epoch_justified(state, true, current_epoch, current_block_root) do
- new_checkpoint = %SszTypes.Checkpoint{
- epoch: current_epoch,
- root: current_block_root
- }
+ defp update_epoch_justified(state, false, _, _), do: {:ok, state}
- bits =
- state.justification_bits
- |> BitVector.new(4)
- |> BitVector.set(0)
- |> to_byte()
+ defp update_epoch_justified(state, true, epoch, index) do
+ with {:ok, block_root} <- Accessors.get_block_root(state, epoch) do
+ new_checkpoint = %SszTypes.Checkpoint{epoch: epoch, root: block_root}
- %BeaconState{
- state
- | current_justified_checkpoint: new_checkpoint,
- justification_bits: bits
- }
- end
+ bits =
+ state.justification_bits
+ |> BitVector.new(4)
+ |> BitVector.set(index)
+ |> to_byte()
- defp update_current_epoch_justified(state, false, _current_epoch, _current_block_root) do
- state
+ %{state | current_justified_checkpoint: new_checkpoint, justification_bits: bits}
+ |> then(&{:ok, &1})
+ end
end
defp update_checkpoint_finalization(
@@ -497,7 +439,7 @@ defmodule LambdaEthereumConsensus.StateTransition.EpochProcessing do
|> BitVector.new(4)
|> BitVector.all?(range)
- if bits_set && old_justified_checkpoint.epoch + offset == current_epoch do
+ if bits_set and old_justified_checkpoint.epoch + offset == current_epoch do
%BeaconState{state | finalized_checkpoint: old_justified_checkpoint}
else
state
diff --git a/lib/lambda_ethereum_consensus/state_transition/predicates.ex b/lib/lambda_ethereum_consensus/state_transition/predicates.ex
index 189f01277..cf3885d31 100644
--- a/lib/lambda_ethereum_consensus/state_transition/predicates.ex
+++ b/lib/lambda_ethereum_consensus/state_transition/predicates.ex
@@ -123,11 +123,14 @@ defmodule LambdaEthereumConsensus.StateTransition.Predicates do
SszTypes.root()
) :: boolean
def is_valid_merkle_branch?(leaf, branch, depth, index, root) do
- root ==
- branch
- |> Enum.take(depth)
- |> Enum.with_index()
- |> Enum.reduce(leaf, fn {v, i}, value -> hash_merkle_node(v, value, index, i) end)
+ root == generate_merkle_proof(leaf, branch, depth, index)
+ end
+
+ def generate_merkle_proof(leaf, branch, depth, index) do
+ branch
+ |> Enum.take(depth)
+ |> Enum.with_index()
+ |> Enum.reduce(leaf, fn {v, i}, value -> hash_merkle_node(v, value, index, i) end)
end
defp hash_merkle_node(value_1, value_2, index, i) do
diff --git a/lib/lambda_ethereum_consensus/telemetry.ex b/lib/lambda_ethereum_consensus/telemetry.ex
index 26c89d79f..79920d62e 100644
--- a/lib/lambda_ethereum_consensus/telemetry.ex
+++ b/lib/lambda_ethereum_consensus/telemetry.ex
@@ -59,6 +59,15 @@ defmodule LambdaEthereumConsensus.Telemetry do
counter("peers.connection.count", tags: [:result]),
counter("peers.challenge.count", tags: [:result]),
counter("network.request.count", tags: [:result, :type, :reason]),
+ counter("network.pubsub_peers.count", tags: [:result]),
+ sum("network.pubsub_topic_active.active", tags: [:topic]),
+ counter("network.pubsub_topics_graft.count", tags: [:topic]),
+ counter("network.pubsub_topics_prune.count", tags: [:topic]),
+ counter("network.pubsub_topics_deliver_message.count", tags: [:topic]),
+ counter("network.pubsub_topics_duplicate_message.count", tags: [:topic]),
+ counter("network.pubsub_topics_reject_message.count", tags: [:topic]),
+ counter("network.pubsub_topics_un_deliverable_message.count", tags: [:topic]),
+ counter("network.pubsub_topics_validate_message.count", tags: [:topic]),
counter("port.message.count", tags: [:function, :direction]),
sum("network.request.blocks", tags: [:result, :type, :reason]),
diff --git a/lib/libp2p_port.ex b/lib/libp2p_port.ex
index d77abd552..32593996d 100644
--- a/lib/libp2p_port.ex
+++ b/lib/libp2p_port.ex
@@ -1,8 +1,7 @@
defmodule LambdaEthereumConsensus.Libp2pPort do
@moduledoc """
A GenServer that allows other elixir processes to send and receive commands to/from
- the LibP2P server in Go. For now, it only supports subscribing and unsubscribing from
- topics.
+ the LibP2P server in Go.
Requests are generated with an ID, which is returned when calling. Those IDs appear
in the responses that might be listened to by other processes.
@@ -26,6 +25,7 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
SendResponse,
SetHandler,
SubscribeToTopic,
+ Tracer,
UnsubscribeFromTopic,
ValidateMessage
}
@@ -310,6 +310,68 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
send(pid, {:response, result})
end
+ defp handle_notification(%Tracer{t: {:add_peer, %{}}}, _state) do
+ :telemetry.execute([:network, :pubsub_peers], %{}, %{
+ result: "add"
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:remove_peer, %{}}}, _state) do
+ :telemetry.execute([:network, :pubsub_peers], %{}, %{
+ result: "remove"
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:joined, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topic_active], %{active: 1}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:left, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topic_active], %{active: -1}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:grafted, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_graft], %{}, %{topic: get_topic_name(topic)})
+ end
+
+ defp handle_notification(%Tracer{t: {:pruned, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_prune], %{}, %{topic: get_topic_name(topic)})
+ end
+
+ defp handle_notification(%Tracer{t: {:deliver_message, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_deliver_message], %{}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:duplicate_message, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_duplicate_message], %{}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:reject_message, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_reject_message], %{}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:un_deliverable_message, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_un_deliverable_message], %{}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
+ defp handle_notification(%Tracer{t: {:validate_message, %{topic: topic}}}, _state) do
+ :telemetry.execute([:network, :pubsub_topics_validate_message], %{}, %{
+ topic: get_topic_name(topic)
+ })
+ end
+
defp parse_args(args) do
args
|> Keyword.validate!(@default_args)
@@ -339,4 +401,11 @@ defmodule LambdaEthereumConsensus.Libp2pPort do
{:response, {res, %ResultMessage{message: message}}} -> [res | message] |> List.to_tuple()
end
end
+
+ defp get_topic_name(topic) do
+ case topic |> String.split("/") |> Enum.fetch(3) do
+ {:ok, name} -> name
+ :error -> topic
+ end
+ end
end
diff --git a/lib/mix/tasks/generate_spec_tests.ex b/lib/mix/tasks/generate_spec_tests.ex
index 17d3d39cf..56a89210c 100644
--- a/lib/mix/tasks/generate_spec_tests.ex
+++ b/lib/mix/tasks/generate_spec_tests.ex
@@ -10,23 +10,30 @@ defmodule Mix.Tasks.GenerateSpecTests do
use Mix.Task
require Logger
- @configs ["mainnet", "minimal"]
- @forks ["altair", "deneb", "phase0"]
+ @configs ["mainnet", "minimal", "general"]
+ @forks ["phase0", "altair", "bellatrix", "capella", "deneb"]
@shortdoc "Generates tests for spec test files"
@impl Mix.Task
def run(_args) do
{:ok, file_names} = File.ls(Path.join(["lib", "spec", "runners"]))
- runners = Enum.map(file_names, fn file_name -> Path.basename(file_name, ".ex") end)
+ runners = Enum.map(file_names, &Path.basename(&1, ".ex"))
+ # Generate all tests for Capella fork
for config <- @configs, runner <- runners do
generate_test(config, "capella", runner)
end
+ # Generate tests for all forks in general preset
for fork <- @forks, runner <- runners do
generate_test("general", fork, runner)
end
+ # Generate shuffling tests for all testcases
+ for config <- @configs, fork <- @forks do
+ generate_test(config, fork, "shuffling")
+ end
+
File.touch(Path.join(["test", "generated"]))
end
@@ -45,9 +52,9 @@ defmodule Mix.Tasks.GenerateSpecTests do
end
defp test_module(cases, config, fork, runner) do
- r = Macro.camelize(runner)
c = Macro.camelize(config)
f = Macro.camelize(fork)
+ r = Macro.camelize(runner)
module_name = "Elixir.#{c}.#{f}.#{r}Test" |> String.to_atom()
runner_module = "Elixir.#{r}TestRunner" |> String.to_atom()
diff --git a/lib/spec/runners/fork_choice.ex b/lib/spec/runners/fork_choice.ex
index 0e6044987..34be34a8e 100644
--- a/lib/spec/runners/fork_choice.ex
+++ b/lib/spec/runners/fork_choice.ex
@@ -12,64 +12,64 @@ defmodule ForkChoiceTestRunner do
@disabled_on_block_cases [
# "basic",
- "incompatible_justification_update_end_of_epoch",
- "incompatible_justification_update_start_of_epoch",
- "justification_update_beginning_of_epoch",
- "justification_update_end_of_epoch",
- "justification_withholding",
- "justification_withholding_reverse_order",
- "justified_update_always_if_better",
- "justified_update_monotonic",
- "justified_update_not_realized_finality",
- "new_finalized_slot_is_justified_checkpoint_ancestor",
- "not_pull_up_current_epoch_block",
+ # "incompatible_justification_update_end_of_epoch",
+ # "incompatible_justification_update_start_of_epoch",
+ # "justification_update_beginning_of_epoch",
+ # "justification_update_end_of_epoch",
+ # "justification_withholding",
+ # "justification_withholding_reverse_order",
+ # "justified_update_always_if_better",
+ # "justified_update_monotonic",
+ # "justified_update_not_realized_finality",
+ # "new_finalized_slot_is_justified_checkpoint_ancestor",
+ # "not_pull_up_current_epoch_block",
# "on_block_bad_parent_root",
- "on_block_before_finalized",
- "on_block_checkpoints",
- "on_block_finalized_skip_slots",
- "on_block_finalized_skip_slots_not_in_skip_chain",
+ # "on_block_before_finalized",
+ # "on_block_checkpoints",
+ # "on_block_finalized_skip_slots",
+ # "on_block_finalized_skip_slots_not_in_skip_chain",
# "on_block_future_block",
# "proposer_boost",
# "proposer_boost_root_same_slot_untimely_block",
- "pull_up_on_tick",
- "pull_up_past_epoch_block"
+ # "pull_up_on_tick",
+ # "pull_up_past_epoch_block"
]
@disabled_ex_ante_cases [
# "ex_ante_attestations_is_greater_than_proposer_boost_with_boost",
- "ex_ante_sandwich_with_boost_not_sufficient",
- "ex_ante_sandwich_with_honest_attestation",
- "ex_ante_sandwich_without_attestations"
+ # "ex_ante_sandwich_with_boost_not_sufficient",
+ # "ex_ante_sandwich_with_honest_attestation",
+ # "ex_ante_sandwich_without_attestations",
# "ex_ante_vanilla"
]
@disabled_get_head_cases [
# "chain_no_attestations",
- "discard_equivocations_on_attester_slashing",
- "discard_equivocations_slashed_validator_censoring",
- "filtered_block_tree",
+ # "discard_equivocations_on_attester_slashing",
+ # "discard_equivocations_slashed_validator_censoring",
+ # "filtered_block_tree",
# "genesis",
- "proposer_boost_correct_head",
- "shorter_chain_but_heavier_weight",
- "split_tie_breaker_no_attestations",
- "voting_source_beyond_two_epoch",
- "voting_source_within_two_epoch"
+ # "proposer_boost_correct_head",
+ # "shorter_chain_but_heavier_weight",
+ # "split_tie_breaker_no_attestations",
+ # "voting_source_beyond_two_epoch",
+ # "voting_source_within_two_epoch"
]
@disabled_reorg_cases [
- "delayed_justification_current_epoch",
- "delayed_justification_previous_epoch",
- "include_votes_another_empty_chain_with_enough_ffg_votes_current_epoch",
- "include_votes_another_empty_chain_with_enough_ffg_votes_previous_epoch",
- "include_votes_another_empty_chain_without_enough_ffg_votes_current_epoch",
- "simple_attempted_reorg_delayed_justification_current_epoch",
- "simple_attempted_reorg_delayed_justification_previous_epoch",
- "simple_attempted_reorg_without_enough_ffg_votes"
+ # "delayed_justification_current_epoch",
+ # "delayed_justification_previous_epoch",
+ # "include_votes_another_empty_chain_with_enough_ffg_votes_current_epoch",
+ # "include_votes_another_empty_chain_with_enough_ffg_votes_previous_epoch",
+ # "include_votes_another_empty_chain_without_enough_ffg_votes_current_epoch",
+ # "simple_attempted_reorg_delayed_justification_current_epoch",
+ # "simple_attempted_reorg_delayed_justification_previous_epoch",
+ # "simple_attempted_reorg_without_enough_ffg_votes"
]
@disabled_withholding_cases [
- "withholding_attack",
- "withholding_attack_unviable_honest_chain"
+ # "withholding_attack",
+ # "withholding_attack_unviable_honest_chain"
]
@impl TestRunner
@@ -81,9 +81,7 @@ defmodule ForkChoiceTestRunner do
Enum.member?(@disabled_withholding_cases, testcase)
end
- def skip?(_testcase) do
- true
- end
+ def skip?(_testcase), do: true
@impl TestRunner
def run_test_case(testcase) do
@@ -207,8 +205,4 @@ defmodule ForkChoiceTestRunner do
{:ok, store}
end
-
- defp apply_step(_, _, _) do
- {:error, "unknown step"}
- end
end
diff --git a/lib/spec/runners/light_client.ex b/lib/spec/runners/light_client.ex
new file mode 100644
index 000000000..131d50b37
--- /dev/null
+++ b/lib/spec/runners/light_client.ex
@@ -0,0 +1,52 @@
+defmodule LightClientTestRunner do
+ alias LambdaEthereumConsensus.StateTransition.Predicates
+ use ExUnit.CaseTemplate
+ use TestRunner
+
+ @moduledoc """
+ Runner for LightClient test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/light_client
+ """
+
+ # Remove handler from here once you implement the corresponding functions
+ @disabled_handlers [
+ # "single_merkle_proof",
+ "sync",
+ "update_ranking"
+ ]
+
+ @impl TestRunner
+ def skip?(%SpecTestCase{} = testcase) do
+ Enum.member?(@disabled_handlers, testcase.handler)
+ end
+
+ @impl TestRunner
+ def run_test_case(%SpecTestCase{} = testcase) do
+ handle(testcase.handler, testcase)
+ end
+
+ defp handle("single_merkle_proof", testcase) do
+ case_dir = SpecTestCase.dir(testcase)
+
+ object_root =
+ SpecTestUtils.read_ssz_from_file!(
+ case_dir <> "/object.ssz_snappy",
+ String.to_existing_atom("Elixir.SszTypes." <> testcase.suite)
+ )
+ |> Ssz.hash_tree_root!()
+
+ %{leaf: leaf, leaf_index: leaf_index, branch: branch} =
+ YamlElixir.read_from_file!(case_dir <> "/proof.yaml")
+ |> SpecTestUtils.sanitize_yaml()
+
+ res =
+ Predicates.is_valid_merkle_branch?(
+ leaf,
+ branch,
+ Constants.deposit_contract_tree_depth() + 1,
+ leaf_index,
+ object_root
+ )
+
+ assert true == res
+ end
+end
diff --git a/lib/spec/runners/sanity.ex b/lib/spec/runners/sanity.ex
index 6535858a2..218ea5dc8 100644
--- a/lib/spec/runners/sanity.ex
+++ b/lib/spec/runners/sanity.ex
@@ -40,22 +40,22 @@ defmodule SanityTestRunner do
# "historical_batch",
# "inactivity_scores_full_participation_leaking",
# "inactivity_scores_leaking",
- "invalid_all_zeroed_sig",
- "invalid_duplicate_attester_slashing_same_block",
- "invalid_duplicate_bls_changes_same_block",
+ # "invalid_all_zeroed_sig",
+ # "invalid_duplicate_attester_slashing_same_block",
+ # "invalid_duplicate_bls_changes_same_block",
# "invalid_duplicate_deposit_same_block",
- "invalid_duplicate_proposer_slashings_same_block",
- "invalid_duplicate_validator_exit_same_block",
- "invalid_incorrect_block_sig",
+ # "invalid_duplicate_proposer_slashings_same_block",
+ # "invalid_duplicate_validator_exit_same_block",
+ # "invalid_incorrect_block_sig",
# "invalid_incorrect_proposer_index_sig_from_expected_proposer",
# "invalid_incorrect_proposer_index_sig_from_proposer_index",
- "invalid_incorrect_state_root",
+ # "invalid_incorrect_state_root",
# "invalid_only_increase_deposit_count",
# "invalid_parent_from_same_slot",
# "invalid_prev_slot_block_transition",
# "invalid_same_slot_block_transition",
- "invalid_similar_proposer_slashings_same_block",
- "invalid_two_bls_changes_of_different_addresses_same_validator_same_block",
+ # "invalid_similar_proposer_slashings_same_block",
+ # "invalid_two_bls_changes_of_different_addresses_same_validator_same_block",
# "invalid_withdrawal_fail_second_block_payload_isnt_compatible",
# "is_execution_enabled_false",
# "many_partial_withdrawals_in_epoch_transition",
@@ -69,7 +69,7 @@ defmodule SanityTestRunner do
# "proposer_slashing",
# "skipped_slots",
# "slash_and_exit_diff_index",
- "slash_and_exit_same_index"
+ # "slash_and_exit_same_index"
# "sync_committee_committee__empty",
# "sync_committee_committee__full",
# "sync_committee_committee__half",
@@ -87,8 +87,10 @@ defmodule SanityTestRunner do
# "slots_1",
# "slots_2",
# "over_epoch_boundary",
- "historical_accumulator",
- "double_empty_epoch"
+ # NOTE: too long to run in CI
+ # TODO: optimize
+ "historical_accumulator"
+ # "double_empty_epoch"
]
@impl TestRunner
diff --git a/lib/spec/runners/shuffling.ex b/lib/spec/runners/shuffling.ex
index f6cec793e..740c7e714 100644
--- a/lib/spec/runners/shuffling.ex
+++ b/lib/spec/runners/shuffling.ex
@@ -1,12 +1,13 @@
defmodule ShufflingTestRunner do
- alias LambdaEthereumConsensus.StateTransition.Misc
- use ExUnit.CaseTemplate
- use TestRunner
-
@moduledoc """
Runner for Operations test cases. See: https://github.com/ethereum/consensus-specs/tree/dev/tests/formats/shuffling
"""
+ use ExUnit.CaseTemplate
+ use TestRunner
+
+ alias LambdaEthereumConsensus.StateTransition.Misc
+
# Remove handler from here once you implement the corresponding functions
@disabled_handlers [
# "core"
diff --git a/metrics/grafana/provisioning/dashboards/home.json b/metrics/grafana/provisioning/dashboards/home.json
index 17eadf027..feda66b71 100644
--- a/metrics/grafana/provisioning/dashboards/home.json
+++ b/metrics/grafana/provisioning/dashboards/home.json
@@ -67,9 +67,7 @@
"minVizWidth": 75,
"orientation": "auto",
"reduceOptions": {
- "calcs": [
- "lastNotNull"
- ],
+ "calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@@ -130,9 +128,7 @@
},
"pieType": "pie",
"reduceOptions": {
- "calcs": [
- "lastNotNull"
- ],
+ "calcs": ["lastNotNull"],
"fields": "",
"values": false
},
@@ -546,7 +542,7 @@
},
"gridPos": {
"h": 6,
- "w": 12,
+ "w": 24,
"x": 0,
"y": 12
},
@@ -897,7 +893,815 @@
"refId": "D"
}
],
- "title": "Libp2pPort messages",
+ "title": "Libp2pPort Messages",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 24,
+ "x": 12,
+ "y": 12
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "sum(network_pubsub_peers_count{result=\"add\"}) - sum(network_pubsub_peers_count{result=\"remove\"})",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "__auto",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Peers (Gossip)",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 24,
+ "x": 12,
+ "y": 12
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "network_pubsub_topic_active_active",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Topics Activity",
+ "type": "heatmap"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 12,
+ "x": 12,
+ "y": 18
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "network_pubsub_topics_graft_count{} - network_pubsub_topics_prune_count{}",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Grafted",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 12,
+ "x": 12,
+ "y": 18
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "rate(network_pubsub_topics_deliver_message_count{}[$__rate_interval])",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Deliver Messages",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 12,
+ "x": 0,
+ "y": 18
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "rate(network_pubsub_topics_duplicate_message_count{}[$__rate_interval])",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Duplicate Messages",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 12,
+ "x": 0,
+ "y": 18
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "rate(network_pubsub_topics_reject_message_count{}[$__rate_interval])",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Reject Messages",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 12,
+ "x": 12,
+ "y": 18
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "rate(network_pubsub_topics_un_deliverable_message_count{}[$__rate_interval])",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Undeliverable Messages",
+ "type": "timeseries"
+ },
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "fieldConfig": {
+ "defaults": {
+ "color": {
+ "mode": "palette-classic"
+ },
+ "custom": {
+ "axisBorderShow": false,
+ "axisCenteredZero": false,
+ "axisColorMode": "text",
+ "axisLabel": "",
+ "axisPlacement": "auto",
+ "barAlignment": 0,
+ "drawStyle": "line",
+ "fillOpacity": 0,
+ "gradientMode": "none",
+ "hideFrom": {
+ "legend": false,
+ "tooltip": false,
+ "viz": false
+ },
+ "insertNulls": false,
+ "lineInterpolation": "linear",
+ "lineWidth": 1,
+ "pointSize": 5,
+ "scaleDistribution": {
+ "type": "linear"
+ },
+ "showPoints": "auto",
+ "spanNulls": false,
+ "stacking": {
+ "group": "A",
+ "mode": "none"
+ },
+ "thresholdsStyle": {
+ "mode": "off"
+ }
+ },
+ "mappings": [],
+ "thresholds": {
+ "mode": "absolute",
+ "steps": [
+ {
+ "color": "green",
+ "value": null
+ },
+ {
+ "color": "red",
+ "value": 80
+ }
+ ]
+ }
+ },
+ "overrides": []
+ },
+ "gridPos": {
+ "h": 6,
+ "w": 12,
+ "x": 12,
+ "y": 18
+ },
+ "id": 12,
+ "options": {
+ "legend": {
+ "calcs": [],
+ "displayMode": "list",
+ "placement": "bottom",
+ "showLegend": true
+ },
+ "tooltip": {
+ "mode": "single",
+ "sort": "none"
+ }
+ },
+ "targets": [
+ {
+ "datasource": {
+ "type": "prometheus",
+ "uid": "PBFA97CFB590B2093"
+ },
+ "disableTextWrap": false,
+ "editorMode": "code",
+ "exemplar": false,
+ "expr": "rate(network_pubsub_topics_validate_message_count{}[$__rate_interval])",
+ "fullMetaSearch": false,
+ "includeNullMetadata": true,
+ "instant": false,
+ "interval": "",
+ "legendFormat": "{{topic}}",
+ "range": true,
+ "refId": "A",
+ "useBackend": false
+ }
+ ],
+ "title": "Validate Messages",
"type": "timeseries"
}
],
diff --git a/native/libp2p_port/internal/proto_helpers/proto_helpers.go b/native/libp2p_port/internal/proto_helpers/proto_helpers.go
index 5d63947b6..9fb9b6888 100644
--- a/native/libp2p_port/internal/proto_helpers/proto_helpers.go
+++ b/native/libp2p_port/internal/proto_helpers/proto_helpers.go
@@ -20,6 +20,72 @@ func ConfigFromInitArgs(initArgs *proto_defs.InitArgs) Config {
}
}
+func AddPeerNotification() proto_defs.Notification {
+ addPeerNotification := &proto_defs.AddPeerGossip{}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_AddPeer{AddPeer: addPeerNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func RemovePeerNotification() proto_defs.Notification {
+ removePeerNotification := &proto_defs.RemovePeerGossip{}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_RemovePeer{RemovePeer: removePeerNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func JoinNotification(topic string) proto_defs.Notification {
+ joinNotification := &proto_defs.Join{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Joined{Joined: joinNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func LeaveNofication(topic string) proto_defs.Notification {
+ leaveNofication := &proto_defs.Leave{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Left{Left: leaveNofication}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func GraftNotification(topic string) proto_defs.Notification {
+ graftNotification := &proto_defs.Graft{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Grafted{Grafted: graftNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func PruneNotification(topic string) proto_defs.Notification {
+ pruneNotification := &proto_defs.Prune{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_Pruned{Pruned: pruneNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func ValidateMessageNotification(topic string) proto_defs.Notification {
+ validateMessageNotification := &proto_defs.ValidateMessageGossip{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_ValidateMessage{ValidateMessage: validateMessageNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func DeliverMessageNotification(topic string) proto_defs.Notification {
+ deliverMessageNotification := &proto_defs.DeliverMessage{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_DeliverMessage{DeliverMessage: deliverMessageNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func UndeliverableMessageNotification(topic string) proto_defs.Notification {
+ unDeliverableMessageNotification := &proto_defs.UnDeliverableMessage{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_UnDeliverableMessage{UnDeliverableMessage: unDeliverableMessageNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func RejectMessageNotification(topic string) proto_defs.Notification {
+ rejectMessageNotification := &proto_defs.RejectMessage{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_RejectMessage{RejectMessage: rejectMessageNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
+func DuplicateMessageNotification(topic string) proto_defs.Notification {
+ duplicateMessageNotification := &proto_defs.DuplicateMessage{Topic: topic}
+ tracer := &proto_defs.Tracer{T: &proto_defs.Tracer_DuplicateMessage{DuplicateMessage: duplicateMessageNotification}}
+ return proto_defs.Notification{N: &proto_defs.Notification_Tracer{Tracer: tracer}}
+}
+
func GossipNotification(topic string, handler, msgId, message []byte) proto_defs.Notification {
gossipSubNotification := &proto_defs.GossipSub{Topic: []byte(topic), Handler: handler, MsgId: msgId, Message: message}
return proto_defs.Notification{N: &proto_defs.Notification_Gossip{Gossip: gossipSubNotification}}
diff --git a/native/libp2p_port/internal/subscriptions/subscriptions.go b/native/libp2p_port/internal/subscriptions/subscriptions.go
index 74e526d6c..bdd69de95 100644
--- a/native/libp2p_port/internal/subscriptions/subscriptions.go
+++ b/native/libp2p_port/internal/subscriptions/subscriptions.go
@@ -14,6 +14,7 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
+ "github.com/libp2p/go-libp2p/core/protocol"
)
type subscription struct {
@@ -28,6 +29,81 @@ type Subscriber struct {
port *port.Port
}
+type GossipTracer struct {
+ port *port.Port
+}
+
+func (g GossipTracer) AddPeer(p peer.ID, proto protocol.ID) {
+ notification := proto_helpers.AddPeerNotification()
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) RemovePeer(p peer.ID) {
+ notification := proto_helpers.RemovePeerNotification()
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) Join(topic string) {
+ notification := proto_helpers.JoinNotification(topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) Leave(topic string) {
+ notification := proto_helpers.LeaveNofication(topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) Graft(p peer.ID, topic string) {
+ notification := proto_helpers.GraftNotification(topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) Prune(p peer.ID, topic string) {
+ notification := proto_helpers.PruneNotification(topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) ValidateMessage(msg *pubsub.Message) {
+ notification := proto_helpers.ValidateMessageNotification(*msg.Topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) DeliverMessage(msg *pubsub.Message) {
+ notification := proto_helpers.DeliverMessageNotification(*msg.Topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) UndeliverableMessage(msg *pubsub.Message) {
+ notification := proto_helpers.UndeliverableMessageNotification(*msg.Topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) RejectMessage(msg *pubsub.Message, reason string) {
+ notification := proto_helpers.RejectMessageNotification(*msg.Topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) DuplicateMessage(msg *pubsub.Message) {
+ notification := proto_helpers.DuplicateMessageNotification(*msg.Topic)
+ g.port.SendNotification(¬ification)
+}
+
+func (g GossipTracer) ThrottlePeer(p peer.ID) {
+ // no-op
+}
+
+func (g GossipTracer) RecvRPC(rpc *pubsub.RPC) {
+ // no-op
+}
+
+func (g GossipTracer) SendRPC(rpc *pubsub.RPC, p peer.ID) {
+ // no-op
+}
+
+func (g GossipTracer) DropRPC(rpc *pubsub.RPC, p peer.ID) {
+ // no-op
+}
+
func NewSubscriber(p *port.Port, h host.Host) Subscriber {
heartbeat := 700 * time.Millisecond
gsubParams := pubsub.DefaultGossipSubParams()
@@ -74,6 +150,7 @@ func NewSubscriber(p *port.Port, h host.Host) Subscriber {
pubsub.WithPeerOutboundQueueSize(600),
pubsub.WithValidateQueueSize(600),
pubsub.WithMaxMessageSize(10 * (1 << 20)), // 10 MB
+ pubsub.WithRawTracer(GossipTracer{port: p}),
}
gsub, err := pubsub.NewGossipSub(context.Background(), h, options...)
diff --git a/proto/libp2p.proto b/proto/libp2p.proto
index a1b65f867..936852217 100644
--- a/proto/libp2p.proto
+++ b/proto/libp2p.proto
@@ -27,6 +27,47 @@ message UnsubscribeFromTopic {
string name = 1;
}
+message AddPeerGossip {}
+message RemovePeerGossip {}
+
+message Join {
+ // topic that was joined
+ string topic = 1;
+}
+
+message Leave {
+ // topic that was abandoned
+ string topic = 1;
+}
+
+message Graft {
+ string topic = 1;
+}
+
+message Prune {
+ string topic = 1;
+}
+
+message ValidateMessageGossip {
+ string topic = 1;
+}
+
+message DeliverMessage {
+ string topic = 1;
+}
+
+message UnDeliverableMessage {
+ string topic = 1;
+}
+
+message RejectMessage {
+ string topic = 1;
+}
+
+message DuplicateMessage {
+ string topic = 1;
+}
+
message AddPeer {
bytes id = 1;
repeated string addrs = 2;
@@ -117,11 +158,28 @@ message Result {
}
}
+message Tracer {
+ oneof t {
+ Join joined = 1;
+ Leave left = 2;
+ Graft grafted = 3;
+ Prune pruned = 4;
+ ValidateMessageGossip validate_message = 5;
+ DeliverMessage deliver_message = 6;
+ UnDeliverableMessage un_deliverable_message = 7;
+ RejectMessage reject_message = 8;
+ DuplicateMessage duplicate_message = 9;
+ AddPeerGossip add_peer = 10;
+ RemovePeerGossip remove_peer = 11;
+ }
+}
+
message Notification {
oneof n {
GossipSub gossip = 1;
Request request = 2;
NewPeer new_peer = 3;
Result result = 4;
+ Tracer tracer = 5;
}
}