From 66e474d88c29b5d75ffabb16b47b2fd9ed38d50f Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Wala?= Date: Mon, 25 Sep 2023 15:41:12 +0200 Subject: [PATCH] Add mocked candidate exchange functionalities --- examples/example.exs | 52 ++++++++-- examples/example.js | 10 +- lib/ex_webrtc/ice_candidate.ex | 13 +++ lib/ex_webrtc/peer_connection.ex | 148 ++++++++++++++++++++++----- lib/ex_webrtc/session_description.ex | 4 +- 5 files changed, 182 insertions(+), 45 deletions(-) create mode 100644 lib/ex_webrtc/ice_candidate.ex diff --git a/examples/example.exs b/examples/example.exs index d1c3e39f..96f035ec 100644 --- a/examples/example.exs +++ b/examples/example.exs @@ -1,17 +1,17 @@ Mix.install([{:gun, "~> 2.0.1"}, {:ex_webrtc, path: "./", force: true}, {:jason, "~> 1.4.0"}]) require Logger -Logger.configure(level: :info) +Logger.configure(level: :debug) defmodule Peer do use GenServer require Logger - alias ExWebRTC.{PeerConnection, SessionDescription} + alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription} @ice_servers [ - %{urls: "stun:stun.stunprotocol.org:3478"}, + # %{urls: "stun:stun.stunprotocol.org:3478"}, %{urls: "stun:stun.l.google.com:19302"} ] @@ -60,7 +60,7 @@ defmodule Peer do def handle_info({:gun_ws, _, _, {:text, msg}}, state) do msg |> Jason.decode!() - |> handle_ws_message(state.peer_connection) + |> handle_ws_message(state) {:noreply, state} end @@ -71,25 +71,55 @@ defmodule Peer do exit(:ws_down) end + @impl true + def handle_info({:ex_webrtc, msg}, state) do + Logger.info("Received ExWebRTC message: #{inspect(msg)}") + handle_webrtc_message(msg, state) + + {:noreply, state} + end + @impl true def handle_info(msg, state) do Logger.warning("Received unknown msg: #{inspect(msg)}") {:noreply, state} end - defp handle_ws_message(%{type: "offer", data: data}, pc) do - Logger.info("Received SDP offer: #{data}") - {:ok, desc} = SessionDescription.from_init(data) - PeerConnection.addRemoteDescription(desc) + defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, state) do + Logger.info("Received SDP offer: #{inspect(sdp)}") + offer = %SessionDescription{type: :offer, sdp: sdp} + :ok = PeerConnection.set_remote_description(state.peer_connection, offer) + {:ok, answer} = PeerConnection.create_answer(state.peer_connection) + msg = %{"type" => "answer", "sdp" => answer.sdp} + :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) end - defp handle_ws_message(%{type: "ice", data: data}, pc) do - Logger.info("Received remote ICE candidate: #{data}") + defp handle_ws_message(%{"type" => "ice", "data" => data}, state) do + Logger.info("Received remote ICE candidate: #{inspect(data)}") + candidate = %IceCandidate{ + candidate: data["candidate"], + sdp_mid: data["sdpMid"], + sdp_m_line_index: data["sdpMLineIndex"], + username_fragment: data["usernameFragment"] + } + :ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate) end - defp handle_ws_message(msg, _pc) do + defp handle_ws_message(msg, _state) do Logger.info("Received unexpected message: #{inspect(msg)}") end + + defp handle_webrtc_message({:ice_candidate, candidate}, state) do + candidate = %{ + "candidate" => candidate.candidate, + "sdpMid" => candidate.sdp_mid, + "sdpMLineIndex" => candidate.sdp_m_line_index, + "usernameFragment" => candidate.username_fragment + } + msg = %{"type" => "ice", "data" => candidate} + :gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)}) + + end end {:ok, pid} = Peer.start_link() diff --git a/examples/example.js b/examples/example.js index db1ccdb5..97d49f64 100644 --- a/examples/example.js +++ b/examples/example.js @@ -12,11 +12,13 @@ const mediaConstraints = { const start_connection = async (ws) => { const pc = new RTCPeerConnection(pcConfig); + pc.onconnectionstatechange = _ => console.log("Connection state changed:", pc.connectionState); + pc.onicecandidate = event => { console.log("New local ICE candidate:", event.candidate); if (event.candidate !== null) { - ws.send(JSON.stringify({type: "ice", data: event.candidate.candidate})); + ws.send(JSON.stringify({type: "ice", data: event.candidate})); } }; @@ -27,10 +29,8 @@ const start_connection = async (ws) => { console.log("Received message:", msg); if (msg.type === "answer") { - console.log("Received SDP answer:", msg.data); - pc.setRemoteDescription(msg.data); + pc.setRemoteDescription(msg); } else if (msg.type === "ice") { - console.log("Received ICE candidate:", msg.data); pc.addIceCandidate(msg.data); } }; @@ -44,7 +44,7 @@ const start_connection = async (ws) => { console.log("Generated SDP offer:", desc); await pc.setLocalDescription(desc); - ws.send(JSON.stringify({type: "offer", data: desc.sdp})) + ws.send(JSON.stringify(desc)) }; const ws = new WebSocket("ws://127.0.0.1:4000/websocket"); diff --git a/lib/ex_webrtc/ice_candidate.ex b/lib/ex_webrtc/ice_candidate.ex new file mode 100644 index 00000000..ae0a2a65 --- /dev/null +++ b/lib/ex_webrtc/ice_candidate.ex @@ -0,0 +1,13 @@ +defmodule ExWebRTC.IceCandidate do + @moduledoc false + + # not exacly the same as W3 IceCandidate + @type t() :: %__MODULE__{ + candidate: term() | nil, + sdp_mid: term() | nil, + sdp_m_line_index: term() | nil, + username_fragment: term() | nil + } + + defstruct [:candidate, :username_fragment, :sdp_mid, :sdp_m_line_index] +end diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 61d817e9..a27f6f7b 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -4,14 +4,15 @@ defmodule ExWebRTC.PeerConnection do use GenServer alias __MODULE__.Configuration - alias ExWebRTC.SessionDescription + alias ExICE.ICEAgent + alias ExWebRTC.{IceCandidate, SessionDescription} @type peer_connection() :: GenServer.server() @type offer_options() :: [ice_restart: boolean()] @type answer_options() :: [] - @enforce_keys [:config] + @enforce_keys [:config, :owner] defstruct @enforce_keys ++ [ :current_local_desc, @@ -26,11 +27,11 @@ defmodule ExWebRTC.PeerConnection do #### API #### def start_link(configuration \\ []) do - GenServer.start_link(__MODULE__, configuration) + GenServer.start_link(__MODULE__, {self(), configuration}) end def start(configuration \\ []) do - GenServer.start(__MODULE__, configuration) + GenServer.start(__MODULE__, {self(), configuration}) end @spec create_offer(peer_connection(), offer_options()) :: @@ -57,15 +58,27 @@ defmodule ExWebRTC.PeerConnection do GenServer.call(peer_connection, {:set_remote_description, description}) end + @spec add_ice_candidate(peer_connection(), IceCandidate.t()) :: + :ok | {:error, :TODO} + def add_ice_candidate(peer_connection, candidate) do + GenServer.call(peer_connection, {:add_ice_candidate, candidate}) + end + #### CALLBACKS #### @impl true - def init(config) do + def init({owner, config}) do config = struct(Configuration, config) :ok = Configuration.check_support(config) - state = %__MODULE__{config: config} + stun_servers = + config.ice_servers + |> Enum.flat_map(&if(is_list(&1.urls), do: &1.urls, else: [&1.urls])) + |> Enum.filter(&String.starts_with?(&1, "stun:")) + + {:ok, ice_agent} = ICEAgent.start_link(:controlled, stun_servers: stun_servers) + state = %__MODULE__{owner: owner, config: config, ice_agent: ice_agent} {:ok, state} end @@ -75,12 +88,44 @@ defmodule ExWebRTC.PeerConnection do end @impl true + def handle_call({:create_answer, _options}, _from, state) + when state.signaling_state in [:have_remote_offer, :have_local_pranswer] do + # hardcoded answer based generated by chrome with no added tracks + # in response to offer also generated by chrome with 1 audio track + {:ok, ufrag, pwd} = ICEAgent.get_local_credentials(state.ice_agent) + + test_answer = + "v=0\r\no=- 7596991810024734139 2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=group:BUNDLE 0\r\na=extmap-allow-mixed\r\na=msid-semantic: WMS\r\nm=audio 9 UDP/TLS/RTP/SAVPF 111 63 9 0 8 13 110 126\r\nc=IN IP4 0.0.0.0\r\na=rtcp:9 IN IP4 0.0.0.0\r\na=ice-ufrag:vx/1\r\na=ice-pwd:ldFUrCsXvndFY2L1u0UQ7ikf\r\na=ice-options:trickle\r\na=fingerprint:sha-256 76:61:77:1E:7C:2E:BB:CD:19:B5:27:4E:A7:40:84:06:6B:17:97:AB:C4:61:90:16:EE:96:9F:9E:BD:42:96:3D\r\na=setup:active\r\na=mid:0\r\na=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\na=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\r\na=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\r\na=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid\r\na=recvonly\r\na=rtcp-mux\r\na=rtpmap:111 opus/48000/2\r\na=rtcp-fb:111 transport-cc\r\na=fmtp:111 minptime=10;useinbandfec=1\r\na=rtpmap:63 red/48000/2\r\na=fmtp:63 111/111\r\na=rtpmap:9 G722/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:13 CN/8000\r\na=rtpmap:110 telephone-event/48000\r\na=rtpmap:126 telephone-event/8000\r\n" + + sdp = ExSDP.parse!(test_answer) + media = hd(sdp.media) + + attrs = + Enum.map(media.attributes, fn + {:ice_ufrag, _} -> {:ice_ufrag, ufrag} + {:ice_pwd, _} -> {:ice_pwd, pwd} + other -> other + end) + + media = Map.put(media, :attributes, attrs) + + sdp = + sdp + |> Map.put(:media, [media]) + |> to_string() + + desc = %SessionDescription{type: :answer, sdp: sdp} + {:reply, {:ok, desc}, state} + end + def handle_call({:create_answer, _options}, _from, state) do - {:reply, :ok, state} + {:reply, {:error, :invalid_state}, state} end @impl true - def handle_call({:set_local_description, desc}, _from, state) do + def handle_call({:set_local_description, _desc}, _from, state) do + # temporary, so the dialyzer will shut up + maybe_next_state(:stable, :local, :offer) {:reply, :ok, state} end @@ -88,35 +133,84 @@ defmodule ExWebRTC.PeerConnection do def handle_call({:set_remote_description, desc}, _from, state) do %SessionDescription{type: type, sdp: sdp} = desc - cond do - # TODO handle rollback - type == :rollback -> + case type do + :rollback -> {:reply, :ok, state} - valid_transition?(:remote, state.signaling_state, type) -> - with {:ok, sdp} <- ExSDP.parse(sdp), - {:ok, state} <- apply_remote_description(type, sdp, state) do - {:reply, :ok, state} + other_type -> + with {:ok, next_state} <- maybe_next_state(state.signaling_state, :remote, other_type), + {:ok, sdp} <- ExSDP.parse(sdp), + {:ok, state} <- apply_remote_description(other_type, sdp, state) do + {:reply, :ok, %{state | signaling_state: next_state}} + else + error -> {:reply, error, state} end + end + end - true -> - {:reply, :error, state} + @impl true + def handle_call({:add_ice_candidate, candidate}, _from, state) do + with "candidate:" <> attr <- candidate.candidate do + ICEAgent.add_remote_candidate(state.ice_agent, attr) end + + {:reply, :ok, state} end - defp apply_remote_description(_type, _sdp, state) do - {:ok, state} + @impl true + def handle_info({:ex_ice, _from, {:new_candidate, candidate}}, state) do + candidate = %IceCandidate{ + candidate: "candidate:" <> candidate, + sdp_mid: 0, + sdp_m_line_index: 0 + # username_fragment: "vx/1" + } + + send(state.owner, {:ex_webrtc, {:ice_candidate, candidate}}) + + {:noreply, state} + end + + @impl true + def handle_info(msg, state) do + IO.inspect(msg, label: :OTHER_MSG) + {:noreply, state} end - defp valid_transition?(_, _, :rollback), do: false + defp apply_remote_description(_type, sdp, state) do + # TODO apply steps listed in RFC 8829 5.10 + media = hd(sdp.media) + {:ice_ufrag, ufrag} = ExSDP.Media.get_attribute(media, :ice_ufrag) + {:ice_pwd, pwd} = ExSDP.Media.get_attribute(media, :ice_pwd) + + :ok = ICEAgent.set_remote_credentials(state.ice_agent, ufrag, pwd) + :ok = ICEAgent.gather_candidates(state.ice_agent) + + {:ok, %{state | current_remote_desc: sdp}} + end + + # Signaling state machine, RFC 8829 3.2 + defp maybe_next_state(:stable, :remote, :offer), do: {:ok, :have_remote_offer} + defp maybe_next_state(:stable, :local, :offer), do: {:ok, :have_local_offer} + defp maybe_next_state(:stable, _, _), do: {:error, :invalid_transition} + + defp maybe_next_state(:have_local_offer, :local, :offer), do: {:ok, :have_local_offer} + defp maybe_next_state(:have_local_offer, :remote, :answer), do: {:ok, :stable} + defp maybe_next_state(:have_local_offer, :remote, :pranswer), do: {:ok, :have_remote_pranswer} + defp maybe_next_state(:have_local_offer, _, _), do: {:error, :invalid_transition} + + defp maybe_next_state(:have_remote_offer, :remote, :offer), do: {:ok, :have_remote_offer} + defp maybe_next_state(:have_remote_offer, :local, :answer), do: {:ok, :stable} + defp maybe_next_state(:have_remote_offer, :local, :pranswer), do: {:ok, :stable} + defp maybe_next_state(:have_remote_offer, _, _), do: {:error, :invalid_transition} - defp valid_transition?(:remote, state, :offer) - when state in [:stable, :have_remote_offer], - do: true + defp maybe_next_state(:have_local_pranswer, :local, :pranswer), do: {:ok, :have_local_pranswer} + defp maybe_next_state(:have_local_pranswer, :local, :answer), do: {:ok, :stable} + defp maybe_next_state(:have_local_pranswer, _, _), do: {:error, :invalid_transition} - defp valid_transition?(:remote, state, type) - when state in [:have_local_offer, :have_remote_pranswer] and type in [:answer, :pranswer], - do: true + defp maybe_next_state(:have_remote_pranswer, :remote, :pranswer), + do: {:ok, :have_remote_pranswer} - defp valid_transition?(:remote, _, _), do: false + defp maybe_next_state(:have_remote_pranswer, :remote, :answer), do: {:ok, :stable} + defp maybe_next_state(:have_remote_pranswer, _, _), do: {:error, :invalid_transition} end diff --git a/lib/ex_webrtc/session_description.ex b/lib/ex_webrtc/session_description.ex index 42996c71..d26f6e79 100644 --- a/lib/ex_webrtc/session_description.ex +++ b/lib/ex_webrtc/session_description.ex @@ -15,12 +15,12 @@ defmodule ExWebRTC.SessionDescription do @enforce_keys [:type, :sdp] defstruct @enforce_keys - @spec from_init(%{String.t() => String.t()}) :: {:ok, t()} | :error + @spec from_json(%{String.t() => String.t()}) :: {:ok, t()} | :error def from_init(%{"type" => type}) when type not in ["answer", "offer", "pranswer", "rollback"], do: :error - def from_init(%{"type" => type, "sdp" => sdp}) do + def from_json(%{"type" => type, "sdp" => sdp}) do type = String.to_atom(type) {:ok, %__MODULE__{type: type, sdp: sdp}} end