diff --git a/lib/libp2p/libp2p.ex b/lib/libp2p/libp2p.ex deleted file mode 100644 index e23bf2e7f..000000000 --- a/lib/libp2p/libp2p.ex +++ /dev/null @@ -1,312 +0,0 @@ -defmodule Libp2p do - @moduledoc """ - Bindings for P2P network primitives. - """ - - @on_load :load_nifs - - def load_nifs() do - dir = :code.priv_dir(:lambda_ethereum_consensus) - :erlang.load_nif(dir ++ ~c"/native/libp2p_nif", 0) - end - - @typedoc """ - A handle to a Go resource. - """ - @opaque handle :: reference - - @typedoc """ - A handle to a host.Host. - """ - @opaque host :: handle - - @typedoc """ - A handle to a peerstore.Peerstore. - """ - @opaque peerstore :: handle - - @typedoc """ - A handle to a peer.ID. - """ - @opaque peer_id :: handle - - @typedoc """ - A handle to a []multiaddr.MultiAddr. - """ - @opaque addrs :: handle - - @typedoc """ - A handle to a stream. - """ - @opaque stream :: handle - - @typedoc """ - A handle to an Option. - """ - @opaque option :: handle - - @typedoc """ - A handle to a discv5 listener. - """ - @opaque listener :: handle - - @typedoc """ - A discv5 node iterator. - """ - @opaque iterator :: handle - - @typedoc """ - A node using discv5. - """ - @opaque discv5_node :: handle - - @typedoc """ - PubSub handle. - """ - @opaque pub_sub :: handle - - @typedoc """ - Topic handle. - """ - @opaque topic :: handle - - @typedoc """ - PubSub topic subscription. - """ - @opaque subscription :: handle - - @typedoc """ - PubSub message. - """ - @opaque message :: handle - - @typedoc """ - An error returned by this module. - """ - @type error :: {:error, binary} - - @doc """ - The ttl for a "permanent address" (e.g. bootstrap nodes). - """ - @spec ttl_permanent_addr :: integer - def ttl_permanent_addr(), do: 2 ** 63 - 1 - - @doc """ - Gets next subscription message. - """ - @spec next_subscription_message(timeout) :: {:ok, message} | :cancelled | :timeout - def next_subscription_message(timeout \\ :infinity) do - receive do - {:sub, result} -> result - after - timeout -> :timeout - end - end - - @doc """ - Synchronously connects to the given peer. - """ - @spec host_connect(host, peer_id) :: :ok | error - def host_connect(host, peer_id) do - :ok = host_connect_async(host, peer_id) - - receive do - {:connect, result} -> result - end - end - - @doc """ - Returns an `Option` that can be passed to `host_new` - as an argument to configures libp2p to listen on the - given (unparsed) addresses. - """ - @spec listen_addr_strings(binary) :: {:ok, option} | error - def listen_addr_strings(_addr), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Creates a Host, with the given options. - """ - @spec host_new(list(option)) :: {:ok, host} | error - def host_new(_option_list \\ []), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Deletes a Host. - """ - @spec host_close(host) :: :ok | error - def host_close(_host), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Sets the stream handler associated to a protocol id. - """ - @spec host_set_stream_handler(host, binary) :: :ok | error - def host_set_stream_handler(_host, _protocol_id), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Creates a new `Stream` connected to the - peer with the given id, using the protocol with given id. - """ - @spec host_new_stream(host, peer_id, binary) :: {:ok, stream} | error - def host_new_stream(_host, _peer_id, _protocol_id), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Asynchronously connects to the given peer. The result is sent - to `self()` in the shape of `{connect, :ok | {:error, reason}}`. - See `host_connect/2` for a synchronous version. - """ - @spec host_connect_async(host, peer_id) :: :ok - def host_connect_async(_host, _peer_id), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Gets the `Peerstore` of the given `Host`. - """ - @spec host_peerstore(host) :: {:ok, peerstore} | error - def host_peerstore(_host), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Gets the `ID` of the given `Host`. - """ - @spec host_id(host) :: {:ok, peer_id} | error - def host_id(_host), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Gets the addresses of the given `Host`. - """ - @spec host_addrs(host) :: {:ok, addrs} | error - def host_addrs(_host), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Adds the addresses of the peer with the given ID to - the `Peerstore`. The addresses are valid for the given - TTL. - """ - @spec peerstore_add_addrs(peerstore, peer_id, addrs, integer) :: :ok | error - def peerstore_add_addrs(_peerstore, _peer_id, _addrs, _ttl), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Reads bytes from the stream (up to a predefined maximum). - """ - @spec stream_read(stream) :: {:ok, binary} | error - def stream_read(_stream), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Writes data into the stream. - """ - @spec stream_write(stream, binary) :: :ok | error - def stream_write(_stream, _data), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Closes the stream. - """ - @spec stream_close(stream) :: :ok | error - def stream_close(_stream), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Closes the write side of the stream. - """ - @spec stream_close_write(stream) :: :ok | error - def stream_close_write(_stream), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Returns the ID of the underlying protocol. - """ - @spec stream_protocol(stream) :: {:ok, binary} | error - def stream_protocol(_stream), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Creates a discv5 listener. - """ - @spec listen_v5(binary, list(binary)) :: {:ok, listener} | error - def listen_v5(_addr, _bootnodes), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Creates a discv5 nodes iterator for random nodes. - """ - @spec listener_random_nodes(listener) :: {:ok, iterator} | error - def listener_random_nodes(_listener), - do: :erlang.nif_error(:not_implemented) - - @doc """ - Moves the iterator to the next node. - Returns false if there are no more nodes. - """ - @spec iterator_next(iterator) :: boolean - def iterator_next(_iterator), do: :erlang.nif_error(:not_implemented) - - @doc """ - Returns the current node. - WARN: you need to call iterator_next before calling this function! - """ - @spec iterator_node(iterator) :: {:ok, discv5_node} | error - def iterator_node(_iterator), do: :erlang.nif_error(:not_implemented) - - @doc """ - Returns the published TCP port of the node, or nil. - """ - @spec node_tcp(discv5_node) :: integer | nil - def node_tcp(_node), do: :erlang.nif_error(:not_implemented) - - @doc """ - Returns the multiaddresses of the node. - """ - @spec node_multiaddr(discv5_node) :: {:ok, addrs} | error - def node_multiaddr(_node), do: :erlang.nif_error(:not_implemented) - - @doc """ - Returns the ID of the node. - """ - @spec node_id(discv5_node) :: {:ok, peer_id} | error - def node_id(_node), do: :erlang.nif_error(:not_implemented) - - @doc """ - Creates a new GossipSub router. - """ - @spec new_gossip_sub(host) :: {:ok, pub_sub} | error - def new_gossip_sub(_host), do: :erlang.nif_error(:not_implemented) - - @doc """ - Joins a topic. If the topic was already joined, this will fail. - """ - @spec pub_sub_join(pub_sub, binary) :: {:ok, topic} | error - def pub_sub_join(_pubsub, _topic_str), do: :erlang.nif_error(:not_implemented) - - @doc """ - Subscribes to the given topic. After this call, `self()` will start receiving - messages from the topic. Those can be received via `get_subscription_message/0`. - """ - @spec topic_subscribe(topic) :: {:ok, subscription} | error - def topic_subscribe(_topic), do: :erlang.nif_error(:not_implemented) - - @doc """ - Publishes data on the given topic. - """ - @spec topic_publish(topic, binary) :: :ok | error - def topic_publish(_topic, _data), do: :erlang.nif_error(:not_implemented) - - @doc """ - Cancels a given subscription. - """ - @spec subscription_cancel(subscription) :: :ok - def subscription_cancel(_subscription), do: :erlang.nif_error(:not_implemented) - - @doc """ - Gets the application data from a message. - """ - @spec message_data(message) :: {:ok, binary} | error - def message_data(_message), do: :erlang.nif_error(:not_implemented) -end diff --git a/lib/libp2p/libp2p_stream.ex b/lib/libp2p/libp2p_stream.ex deleted file mode 100644 index 89e52adcc..000000000 --- a/lib/libp2p/libp2p_stream.ex +++ /dev/null @@ -1,29 +0,0 @@ -defmodule Libp2p.Stream do - @moduledoc """ - Wrapper over a stream handle. - TODO: make this a RW stream - """ - - @doc """ - Creates a new `Stream` connected to the - peer with the given id, using the protocol with given id. - It returns an `Enumerable` that can be used to read. - """ - @spec from(Libp2p.stream()) :: Enumerable.t() - def from(stream) do - Stream.resource( - fn -> {stream, :ok} end, - &stream_next/1, - fn {st, _} -> Libp2p.stream_close(st) end - ) - end - - defp stream_next({stream, :error}), do: {:halt, {stream, :error}} - - defp stream_next({stream, :ok}) do - case Libp2p.stream_read(stream) do - {:ok, ""} -> {:halt, {stream, :ok}} - {res, chunk} -> {[{res, chunk}], {stream, res}} - end - end -end diff --git a/test/integration/libp2p_test.exs b/test/integration/libp2p_test.exs deleted file mode 100644 index e72b3c0dd..000000000 --- a/test/integration/libp2p_test.exs +++ /dev/null @@ -1,161 +0,0 @@ -defmodule Integration.Libp2pTest do - use ExUnit.Case - - setup_all do - bootnodes = YamlElixir.read_from_file!("config/networks/mainnet/boot_enr.yaml") - {:ok, bootnodes} - end - - @tag :skip - test "discover peer and add it to peerstore", bootnodes do - IO.puts("test1") - - {:ok, host} = Libp2p.host_new() - - {:ok, peerstore} = Libp2p.host_peerstore(host) - - {:ok, listener} = - Libp2p.listen_v5("0.0.0.0:25000", bootnodes) - - {:ok, iterator} = Libp2p.listener_random_nodes(listener) - - true = Libp2p.iterator_next(iterator) - {:ok, node} = Libp2p.iterator_node(iterator) - - {:ok, id} = Libp2p.node_id(node) - {:ok, addrs} = Libp2p.node_multiaddr(node) - - :ok = Libp2p.peerstore_add_addrs(peerstore, id, addrs, Libp2p.ttl_permanent_addr()) - - :ok = Libp2p.host_close(host) - end - - defp find_peers(host, iterator, fun) do - {:ok, peerstore} = Libp2p.host_peerstore(host) - - true = Libp2p.iterator_next(iterator) - - {:ok, node} = Libp2p.iterator_node(iterator) - - if Libp2p.node_tcp(node) != nil do - {:ok, id} = Libp2p.node_id(node) - {:ok, addrs} = Libp2p.node_multiaddr(node) - - # 1 minute - ttl = 6 * 10 ** 10 - :ok = Libp2p.peerstore_add_addrs(peerstore, id, addrs, ttl) - - fun.(id) - end - - find_peers(host, iterator, fun) - end - - defp receive_response(stream) do - case Libp2p.stream_read(stream) do - {:ok, msg} -> IO.puts(["\"#{Base.encode16(msg)}\""]) - _ -> :ok - end - end - - defp read_gossip_msg(sub) do - receive do - {:sub, {:ok, msg}} -> - # NOTE: gossip messages are Snappy-compressed with BLOCK format (not frame) - msg - |> Libp2p.message_data() - |> then(fn {:ok, d} -> d end) - |> Base.encode16() - |> then(&IO.puts(["\"#{&1}\""])) - - {:sub, {:error, err}} -> - IO.puts(err) - end - - read_gossip_msg(sub) - end - - @tag :skip - @tag timeout: :infinity - test "discover new peers", bootnodes do - {:ok, host} = Libp2p.host_new() - - # ask for metadata - protocol_id = "/eth2/beacon_chain/req/metadata/2/ssz_snappy" - - :ok = Libp2p.host_set_stream_handler(host, protocol_id) - - {:ok, listener} = - Libp2p.listen_v5("0.0.0.0:45122", bootnodes) - - {:ok, iterator} = Libp2p.listener_random_nodes(listener) - - find_peers(host, iterator, fn id -> - with {:ok, stream} <- Libp2p.host_new_stream(host, id, protocol_id) do - receive_response(stream) - end - end) - - :ok = Libp2p.host_close(host) - end - - @tag :skip - @tag timeout: :infinity - test "ping peers", bootnodes do - {:ok, host} = Libp2p.host_new() - - # ping peers - protocol_id = "/eth2/beacon_chain/req/ping/1/ssz_snappy" - # uncompressed payload - payload = Base.decode16!("0000000000000000") - {:ok, compressed_payload} = Snappy.compress(payload) - msg = <<8, compressed_payload::binary>> - - :ok = Libp2p.host_set_stream_handler(host, protocol_id) - - {:ok, listener} = - Libp2p.listen_v5("0.0.0.0:45123", bootnodes) - - {:ok, iterator} = Libp2p.listener_random_nodes(listener) - - find_peers(host, iterator, fn id -> - with {:ok, stream} <- Libp2p.host_new_stream(host, id, protocol_id), - :ok <- Libp2p.stream_write(stream, msg) do - receive_response(stream) - else - {:error, err} -> IO.puts(err) - end - end) - - :ok = Libp2p.host_close(host) - end - - @tag :skip - @tag timeout: :infinity - test "Gossip with CL peers", bootnodes do - # Setup host - {:ok, host} = Libp2p.host_new() - - # Create GossipSubs - {:ok, gsub} = Libp2p.new_gossip_sub(host) - - # Topic for Mainnet, Capella fork, attestation subnet 0 - topic_str = "/eth2/bba4da96/beacon_attestation_0/ssz_snappy" - - # Join the topic - {:ok, topic} = Libp2p.pub_sub_join(gsub, topic_str) - - # Subscribe to the topic - {:ok, sub} = Libp2p.topic_subscribe(topic) - - # Start discovery in another process - {:ok, listener} = Libp2p.listen_v5("0.0.0.0:45124", bootnodes) - {:ok, iterator} = Libp2p.listener_random_nodes(listener) - spawn(fn -> find_peers(host, iterator, &Libp2p.host_connect(host, &1)) end) - - # Read gossip messages - read_gossip_msg(sub) - - :ok = Libp2p.host_close(host) - end -end diff --git a/test/unit/libp2p_test.exs b/test/unit/libp2p_test.exs deleted file mode 100644 index eb7ee4427..000000000 --- a/test/unit/libp2p_test.exs +++ /dev/null @@ -1,157 +0,0 @@ -defmodule Unit.Libp2pTest do - use ExUnit.Case - doctest Libp2p - - @tag :skip - test "Create and destroy host" do - {:ok, host} = Libp2p.host_new() - assert host != 0 - :ok = Libp2p.host_close(host) - end - - @tag :skip - test "Use peerstore in place of host fails" do - {:ok, host} = Libp2p.host_new() - {:ok, peerstore} = Libp2p.host_peerstore(host) - {:error, "invalid Host"} = Libp2p.host_close(peerstore) - :ok = Libp2p.host_close(host) - end - - @tag :skip - test "Set stream handler" do - {:ok, host} = Libp2p.host_new() - assert host != 0 - :ok = Libp2p.host_set_stream_handler(host, "/my-app/amazing-protocol/1.0.1") - :ok = Libp2p.host_close(host) - end - - @tag :skip - test "listen_addr_strings parsing" do - {:ok, option} = Libp2p.listen_addr_strings("/ip4/127.0.0.1/tcp/48787") - assert option != 0 - end - - @tag :skip - test "Start two hosts, and play one round of ping-pong" do - # Setup sender - {:ok, addr} = Libp2p.listen_addr_strings("/ip4/127.0.0.1/tcp/48787") - {:ok, sender} = Libp2p.host_new([addr]) - # Setup receiver - {:ok, addr} = Libp2p.listen_addr_strings("/ip4/127.0.0.1/tcp/48789") - {:ok, recver} = Libp2p.host_new([addr]) - - protocol_id = "/pong" - - # (recver) Set stream handler - :ok = Libp2p.host_set_stream_handler(recver, protocol_id) - - # (sender) Add recver address to peerstore - {:ok, peerstore} = Libp2p.host_peerstore(sender) - {:ok, id} = Libp2p.host_id(recver) - {:ok, addrs} = Libp2p.host_addrs(recver) - - :ok = Libp2p.peerstore_add_addrs(peerstore, id, addrs, Libp2p.ttl_permanent_addr()) - - # (sender) Create stream sender -> recver - {:ok, send} = Libp2p.host_new_stream(sender, id, protocol_id) - - # (sender) Write "ping" to stream - :ok = Libp2p.stream_write(send, "ping") - :ok = Libp2p.stream_close_write(send) - - # (recver) Receive the stream via the configured stream handler - {:ok, recv} = - receive do - {:req, msg} -> msg - after - 1000 -> :timeout - end - - # (recver) Read the "ping" message from the stream - {:ok, "ping"} = Libp2p.stream_read(recv) - {:ok, ""} = Libp2p.stream_read(recv) - - # (recver) Write "pong" to the stream - :ok = Libp2p.stream_write(recv, "pong") - :ok = Libp2p.stream_close_write(recv) - - # (sender) Read the "pong" message from the stream - {:ok, "pong"} = Libp2p.stream_read(send) - - :ok = Libp2p.stream_close(send) - :ok = Libp2p.stream_close(recv) - - # Close both hosts - :ok = Libp2p.host_close(sender) - :ok = Libp2p.host_close(recver) - end - - defp retrying_receive(topic_sender, msg) do - # (sender) Give a head start to the other process - Process.sleep(1) - - # (sender) Publish a message to the topic - :ok = Libp2p.topic_publish(topic_sender, msg) - - receive do - :ok -> :ok - after - 20 -> retrying_receive(topic_sender, msg) - end - end - - @tag :skip - test "start two hosts, and gossip about" do - # Setup sender - {:ok, addr} = Libp2p.listen_addr_strings("/ip4/127.0.0.1/tcp/48787") - {:ok, sender} = Libp2p.host_new([addr]) - # Setup receiver - {:ok, addr} = Libp2p.listen_addr_strings("/ip4/127.0.0.1/tcp/48789") - {:ok, recver} = Libp2p.host_new([addr]) - - # (sender) Connect to recver - {:ok, peerstore} = Libp2p.host_peerstore(sender) - {:ok, id} = Libp2p.host_id(recver) - {:ok, addrs} = Libp2p.host_addrs(recver) - - :ok = Libp2p.peerstore_add_addrs(peerstore, id, addrs, Libp2p.ttl_permanent_addr()) - :ok = Libp2p.host_connect(sender, id) - - # Create GossipSubs - {:ok, gsub_sender} = Libp2p.new_gossip_sub(sender) - {:ok, gsub_recver} = Libp2p.new_gossip_sub(recver) - - topic = "/test/gossipping" - - # Join the topic - {:ok, topic_sender} = Libp2p.pub_sub_join(gsub_sender, topic) - {:ok, topic_recver} = Libp2p.pub_sub_join(gsub_recver, topic) - - pid = self() - msg = "hello world!" - - spawn_link(fn -> - # (recver) Subscribe to the topic - {:ok, sub_recver} = Libp2p.topic_subscribe(topic_recver) - - assert {:ok, message} = Libp2p.next_subscription_message() - - Libp2p.subscription_cancel(sub_recver) - - # Subscription returns error before cancelling - assert :cancelled = Libp2p.next_subscription_message() - - # (recver) Get the application data from the message - {:ok, data} = Libp2p.message_data(message) - - assert data == msg - send(pid, :ok) - end) - - retrying_receive(topic_sender, msg) - - # Close both hosts - :ok = Libp2p.host_close(sender) - :ok = Libp2p.host_close(recver) - end -end