Skip to content

Commit

Permalink
add sync block download. fix lint dialyzer and sync_blocks.
Browse files Browse the repository at this point in the history
  • Loading branch information
Arkenan committed Jun 17, 2024
1 parent fd1ed80 commit 5ac39ee
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 8 deletions.
2 changes: 1 addition & 1 deletion lib/lambda_ethereum_consensus/beacon/sync_blocks.ex
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ defmodule LambdaEthereumConsensus.Beacon.SyncBlocks do
@spec fetch_blocks_by_slot(Types.slot(), non_neg_integer()) ::
{:ok, [SignedBeaconBlock.t()]} | {:error, String.t()}
def fetch_blocks_by_slot(from, count) do
case BlockDownloader.request_blocks_by_range(from, count, 0) do
case BlockDownloader.request_blocks_by_range_sync(from, count, 0) do
{:ok, blocks} ->
{:ok, blocks}

Expand Down
4 changes: 4 additions & 0 deletions lib/lambda_ethereum_consensus/metrics.ex
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
defmodule LambdaEthereumConsensus.Metrics do
@moduledoc """
Basic telemetry metric generation to be used across the node.
"""

def tracer({:joined, %{topic: topic}}, _state) do
:telemetry.execute([:network, :pubsub_topic_active], %{active: 1}, %{
topic: get_topic_name(topic)
Expand Down
39 changes: 32 additions & 7 deletions lib/lambda_ethereum_consensus/p2p/block_downloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -17,15 +17,40 @@ defmodule LambdaEthereumConsensus.P2P.BlockDownloader do
# so we want to try again with a different peer
@default_retries 5

@spec request_blocks_by_range(
Types.slot(),
non_neg_integer(),
({:ok, [SignedBeaconBlock.t()]} | {:error, any()} -> term()),
non_neg_integer()
) :: :ok
@type download_result :: {:ok, [SignedBeaconBlock.t()]} | {:error, any()}
@type on_blocks :: (download_result() -> term())

@doc """
Requests a series of blocks in batch, and synchronously (the caller will block waiting for the
result). As this is a synchronous function, the caller process must be different than Libp2pPort.
Arguments:
- slot: the slot that marks the start of the requested range.
- count: the amount of blocks that will be requested for download.
- retries (optional): if the download fails the request will retry, using a different random
peer. This argument determines the amount of times that will happen before returning an error.
"""
@spec request_blocks_by_range_sync(Types.slot(), non_neg_integer(), non_neg_integer()) ::
download_result()
def request_blocks_by_range_sync(slot, count, retries \\ @default_retries)

def request_blocks_by_range_sync(_slot, 0, _retries), do: {:ok, []}

def request_blocks_by_range_sync(slot, count, retries) do
pid = self()
request_blocks_by_range(slot, count, fn result -> send(pid, result) end, retries)

receive do
result -> result
end
end

@spec request_blocks_by_range(Types.slot(), non_neg_integer(), on_blocks(), non_neg_integer()) ::
:ok
@spec request_blocks_by_range(Types.slot(), non_neg_integer(), on_blocks()) :: :ok
def request_blocks_by_range(slot, count, on_blocks, retries \\ @default_retries)

def request_blocks_by_range(_slot, 0, _on_blocks, _retries), do: {:ok, []}
def request_blocks_by_range(_slot, 0, _on_blocks, _retries), do: :ok

def request_blocks_by_range(slot, count, on_blocks, retries) do
Logger.debug("Requesting block", slot: slot)
Expand Down

0 comments on commit 5ac39ee

Please sign in to comment.