Skip to content

Commit

Permalink
Add mocked candidate exchange functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Sep 25, 2023
1 parent 0401fad commit 66e474d
Show file tree
Hide file tree
Showing 5 changed files with 182 additions and 45 deletions.
52 changes: 41 additions & 11 deletions examples/example.exs
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
Mix.install([{:gun, "~> 2.0.1"}, {:ex_webrtc, path: "./", force: true}, {:jason, "~> 1.4.0"}])

require Logger
Logger.configure(level: :info)
Logger.configure(level: :debug)

defmodule Peer do
use GenServer

require Logger

alias ExWebRTC.{PeerConnection, SessionDescription}
alias ExWebRTC.{IceCandidate, PeerConnection, SessionDescription}

@ice_servers [
%{urls: "stun:stun.stunprotocol.org:3478"},
# %{urls: "stun:stun.stunprotocol.org:3478"},
%{urls: "stun:stun.l.google.com:19302"}
]

Expand Down Expand Up @@ -60,7 +60,7 @@ defmodule Peer do
def handle_info({:gun_ws, _, _, {:text, msg}}, state) do
msg
|> Jason.decode!()
|> handle_ws_message(state.peer_connection)
|> handle_ws_message(state)

{:noreply, state}
end
Expand All @@ -71,25 +71,55 @@ defmodule Peer do
exit(:ws_down)
end

@impl true
def handle_info({:ex_webrtc, msg}, state) do
Logger.info("Received ExWebRTC message: #{inspect(msg)}")
handle_webrtc_message(msg, state)

{:noreply, state}
end

@impl true
def handle_info(msg, state) do
Logger.warning("Received unknown msg: #{inspect(msg)}")
{:noreply, state}
end

defp handle_ws_message(%{type: "offer", data: data}, pc) do
Logger.info("Received SDP offer: #{data}")
{:ok, desc} = SessionDescription.from_init(data)
PeerConnection.addRemoteDescription(desc)
defp handle_ws_message(%{"type" => "offer", "sdp" => sdp}, state) do
Logger.info("Received SDP offer: #{inspect(sdp)}")
offer = %SessionDescription{type: :offer, sdp: sdp}
:ok = PeerConnection.set_remote_description(state.peer_connection, offer)
{:ok, answer} = PeerConnection.create_answer(state.peer_connection)
msg = %{"type" => "answer", "sdp" => answer.sdp}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})
end

defp handle_ws_message(%{type: "ice", data: data}, pc) do
Logger.info("Received remote ICE candidate: #{data}")
defp handle_ws_message(%{"type" => "ice", "data" => data}, state) do
Logger.info("Received remote ICE candidate: #{inspect(data)}")
candidate = %IceCandidate{
candidate: data["candidate"],
sdp_mid: data["sdpMid"],
sdp_m_line_index: data["sdpMLineIndex"],
username_fragment: data["usernameFragment"]
}
:ok = PeerConnection.add_ice_candidate(state.peer_connection, candidate)
end

defp handle_ws_message(msg, _pc) do
defp handle_ws_message(msg, _state) do
Logger.info("Received unexpected message: #{inspect(msg)}")
end

defp handle_webrtc_message({:ice_candidate, candidate}, state) do
candidate = %{
"candidate" => candidate.candidate,
"sdpMid" => candidate.sdp_mid,
"sdpMLineIndex" => candidate.sdp_m_line_index,
"usernameFragment" => candidate.username_fragment
}
msg = %{"type" => "ice", "data" => candidate}
:gun.ws_send(state.conn, state.stream, {:text, Jason.encode!(msg)})

end
end

{:ok, pid} = Peer.start_link()
Expand Down
10 changes: 5 additions & 5 deletions examples/example.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,11 +12,13 @@ const mediaConstraints = {
const start_connection = async (ws) => {
const pc = new RTCPeerConnection(pcConfig);

pc.onconnectionstatechange = _ => console.log("Connection state changed:", pc.connectionState);

pc.onicecandidate = event => {
console.log("New local ICE candidate:", event.candidate);

if (event.candidate !== null) {
ws.send(JSON.stringify({type: "ice", data: event.candidate.candidate}));
ws.send(JSON.stringify({type: "ice", data: event.candidate}));
}
};

Expand All @@ -27,10 +29,8 @@ const start_connection = async (ws) => {
console.log("Received message:", msg);

if (msg.type === "answer") {
console.log("Received SDP answer:", msg.data);
pc.setRemoteDescription(msg.data);
pc.setRemoteDescription(msg);
} else if (msg.type === "ice") {
console.log("Received ICE candidate:", msg.data);
pc.addIceCandidate(msg.data);
}
};
Expand All @@ -44,7 +44,7 @@ const start_connection = async (ws) => {
console.log("Generated SDP offer:", desc);
await pc.setLocalDescription(desc);

ws.send(JSON.stringify({type: "offer", data: desc.sdp}))
ws.send(JSON.stringify(desc))
};

const ws = new WebSocket("ws://127.0.0.1:4000/websocket");
Expand Down
13 changes: 13 additions & 0 deletions lib/ex_webrtc/ice_candidate.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
defmodule ExWebRTC.IceCandidate do
@moduledoc false

# not exacly the same as W3 IceCandidate
@type t() :: %__MODULE__{
candidate: term() | nil,
sdp_mid: term() | nil,
sdp_m_line_index: term() | nil,
username_fragment: term() | nil
}

defstruct [:candidate, :username_fragment, :sdp_mid, :sdp_m_line_index]
end
148 changes: 121 additions & 27 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@ defmodule ExWebRTC.PeerConnection do
use GenServer

alias __MODULE__.Configuration
alias ExWebRTC.SessionDescription
alias ExICE.ICEAgent
alias ExWebRTC.{IceCandidate, SessionDescription}

@type peer_connection() :: GenServer.server()

@type offer_options() :: [ice_restart: boolean()]
@type answer_options() :: []

@enforce_keys [:config]
@enforce_keys [:config, :owner]
defstruct @enforce_keys ++
[
:current_local_desc,
Expand All @@ -26,11 +27,11 @@ defmodule ExWebRTC.PeerConnection do
#### API ####

def start_link(configuration \\ []) do
GenServer.start_link(__MODULE__, configuration)
GenServer.start_link(__MODULE__, {self(), configuration})
end

def start(configuration \\ []) do
GenServer.start(__MODULE__, configuration)
GenServer.start(__MODULE__, {self(), configuration})
end

@spec create_offer(peer_connection(), offer_options()) ::
Expand All @@ -57,15 +58,27 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:set_remote_description, description})
end

@spec add_ice_candidate(peer_connection(), IceCandidate.t()) ::
:ok | {:error, :TODO}
def add_ice_candidate(peer_connection, candidate) do
GenServer.call(peer_connection, {:add_ice_candidate, candidate})
end

#### CALLBACKS ####

@impl true
def init(config) do
def init({owner, config}) do
config = struct(Configuration, config)
:ok = Configuration.check_support(config)

state = %__MODULE__{config: config}
stun_servers =
config.ice_servers
|> Enum.flat_map(&if(is_list(&1.urls), do: &1.urls, else: [&1.urls]))
|> Enum.filter(&String.starts_with?(&1, "stun:"))

{:ok, ice_agent} = ICEAgent.start_link(:controlled, stun_servers: stun_servers)

state = %__MODULE__{owner: owner, config: config, ice_agent: ice_agent}
{:ok, state}
end

Expand All @@ -75,48 +88,129 @@ defmodule ExWebRTC.PeerConnection do
end

@impl true
def handle_call({:create_answer, _options}, _from, state)
when state.signaling_state in [:have_remote_offer, :have_local_pranswer] do
# hardcoded answer based generated by chrome with no added tracks
# in response to offer also generated by chrome with 1 audio track
{:ok, ufrag, pwd} = ICEAgent.get_local_credentials(state.ice_agent)

test_answer =
"v=0\r\no=- 7596991810024734139 2 IN IP4 127.0.0.1\r\ns=-\r\nt=0 0\r\na=group:BUNDLE 0\r\na=extmap-allow-mixed\r\na=msid-semantic: WMS\r\nm=audio 9 UDP/TLS/RTP/SAVPF 111 63 9 0 8 13 110 126\r\nc=IN IP4 0.0.0.0\r\na=rtcp:9 IN IP4 0.0.0.0\r\na=ice-ufrag:vx/1\r\na=ice-pwd:ldFUrCsXvndFY2L1u0UQ7ikf\r\na=ice-options:trickle\r\na=fingerprint:sha-256 76:61:77:1E:7C:2E:BB:CD:19:B5:27:4E:A7:40:84:06:6B:17:97:AB:C4:61:90:16:EE:96:9F:9E:BD:42:96:3D\r\na=setup:active\r\na=mid:0\r\na=extmap:1 urn:ietf:params:rtp-hdrext:ssrc-audio-level\r\na=extmap:2 http://www.webrtc.org/experiments/rtp-hdrext/abs-send-time\r\na=extmap:3 http://www.ietf.org/id/draft-holmer-rmcat-transport-wide-cc-extensions-01\r\na=extmap:4 urn:ietf:params:rtp-hdrext:sdes:mid\r\na=recvonly\r\na=rtcp-mux\r\na=rtpmap:111 opus/48000/2\r\na=rtcp-fb:111 transport-cc\r\na=fmtp:111 minptime=10;useinbandfec=1\r\na=rtpmap:63 red/48000/2\r\na=fmtp:63 111/111\r\na=rtpmap:9 G722/8000\r\na=rtpmap:0 PCMU/8000\r\na=rtpmap:8 PCMA/8000\r\na=rtpmap:13 CN/8000\r\na=rtpmap:110 telephone-event/48000\r\na=rtpmap:126 telephone-event/8000\r\n"

sdp = ExSDP.parse!(test_answer)
media = hd(sdp.media)

attrs =
Enum.map(media.attributes, fn
{:ice_ufrag, _} -> {:ice_ufrag, ufrag}
{:ice_pwd, _} -> {:ice_pwd, pwd}
other -> other
end)

media = Map.put(media, :attributes, attrs)

sdp =
sdp
|> Map.put(:media, [media])
|> to_string()

desc = %SessionDescription{type: :answer, sdp: sdp}
{:reply, {:ok, desc}, state}
end

def handle_call({:create_answer, _options}, _from, state) do
{:reply, :ok, state}
{:reply, {:error, :invalid_state}, state}
end

@impl true
def handle_call({:set_local_description, desc}, _from, state) do
def handle_call({:set_local_description, _desc}, _from, state) do
# temporary, so the dialyzer will shut up
maybe_next_state(:stable, :local, :offer)
{:reply, :ok, state}
end

@impl true
def handle_call({:set_remote_description, desc}, _from, state) do
%SessionDescription{type: type, sdp: sdp} = desc

cond do
# TODO handle rollback
type == :rollback ->
case type do
:rollback ->
{:reply, :ok, state}

valid_transition?(:remote, state.signaling_state, type) ->
with {:ok, sdp} <- ExSDP.parse(sdp),
{:ok, state} <- apply_remote_description(type, sdp, state) do
{:reply, :ok, state}
other_type ->
with {:ok, next_state} <- maybe_next_state(state.signaling_state, :remote, other_type),
{:ok, sdp} <- ExSDP.parse(sdp),
{:ok, state} <- apply_remote_description(other_type, sdp, state) do
{:reply, :ok, %{state | signaling_state: next_state}}
else
error -> {:reply, error, state}
end
end
end

true ->
{:reply, :error, state}
@impl true
def handle_call({:add_ice_candidate, candidate}, _from, state) do
with "candidate:" <> attr <- candidate.candidate do
ICEAgent.add_remote_candidate(state.ice_agent, attr)
end

{:reply, :ok, state}
end

defp apply_remote_description(_type, _sdp, state) do
{:ok, state}
@impl true
def handle_info({:ex_ice, _from, {:new_candidate, candidate}}, state) do
candidate = %IceCandidate{
candidate: "candidate:" <> candidate,
sdp_mid: 0,
sdp_m_line_index: 0
# username_fragment: "vx/1"
}

send(state.owner, {:ex_webrtc, {:ice_candidate, candidate}})

{:noreply, state}
end

@impl true
def handle_info(msg, state) do
IO.inspect(msg, label: :OTHER_MSG)

Check warning on line 176 in lib/ex_webrtc/peer_connection.ex

View workflow job for this annotation

GitHub Actions / Lint (OTP 26 / Elixir 1.15)

There should be no calls to IO.inspect/1.
{:noreply, state}
end

defp valid_transition?(_, _, :rollback), do: false
defp apply_remote_description(_type, sdp, state) do
# TODO apply steps listed in RFC 8829 5.10
media = hd(sdp.media)
{:ice_ufrag, ufrag} = ExSDP.Media.get_attribute(media, :ice_ufrag)
{:ice_pwd, pwd} = ExSDP.Media.get_attribute(media, :ice_pwd)

:ok = ICEAgent.set_remote_credentials(state.ice_agent, ufrag, pwd)
:ok = ICEAgent.gather_candidates(state.ice_agent)

{:ok, %{state | current_remote_desc: sdp}}
end

# Signaling state machine, RFC 8829 3.2
defp maybe_next_state(:stable, :remote, :offer), do: {:ok, :have_remote_offer}
defp maybe_next_state(:stable, :local, :offer), do: {:ok, :have_local_offer}
defp maybe_next_state(:stable, _, _), do: {:error, :invalid_transition}

defp maybe_next_state(:have_local_offer, :local, :offer), do: {:ok, :have_local_offer}
defp maybe_next_state(:have_local_offer, :remote, :answer), do: {:ok, :stable}
defp maybe_next_state(:have_local_offer, :remote, :pranswer), do: {:ok, :have_remote_pranswer}
defp maybe_next_state(:have_local_offer, _, _), do: {:error, :invalid_transition}

defp maybe_next_state(:have_remote_offer, :remote, :offer), do: {:ok, :have_remote_offer}
defp maybe_next_state(:have_remote_offer, :local, :answer), do: {:ok, :stable}
defp maybe_next_state(:have_remote_offer, :local, :pranswer), do: {:ok, :stable}
defp maybe_next_state(:have_remote_offer, _, _), do: {:error, :invalid_transition}

defp valid_transition?(:remote, state, :offer)
when state in [:stable, :have_remote_offer],
do: true
defp maybe_next_state(:have_local_pranswer, :local, :pranswer), do: {:ok, :have_local_pranswer}
defp maybe_next_state(:have_local_pranswer, :local, :answer), do: {:ok, :stable}
defp maybe_next_state(:have_local_pranswer, _, _), do: {:error, :invalid_transition}

defp valid_transition?(:remote, state, type)
when state in [:have_local_offer, :have_remote_pranswer] and type in [:answer, :pranswer],
do: true
defp maybe_next_state(:have_remote_pranswer, :remote, :pranswer),
do: {:ok, :have_remote_pranswer}

defp valid_transition?(:remote, _, _), do: false
defp maybe_next_state(:have_remote_pranswer, :remote, :answer), do: {:ok, :stable}
defp maybe_next_state(:have_remote_pranswer, _, _), do: {:error, :invalid_transition}
end
4 changes: 2 additions & 2 deletions lib/ex_webrtc/session_description.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ defmodule ExWebRTC.SessionDescription do
@enforce_keys [:type, :sdp]
defstruct @enforce_keys

@spec from_init(%{String.t() => String.t()}) :: {:ok, t()} | :error
@spec from_json(%{String.t() => String.t()}) :: {:ok, t()} | :error
def from_init(%{"type" => type})
when type not in ["answer", "offer", "pranswer", "rollback"],
do: :error

def from_init(%{"type" => type, "sdp" => sdp}) do
def from_json(%{"type" => type, "sdp" => sdp}) do
type = String.to_atom(type)
{:ok, %__MODULE__{type: type, sdp: sdp}}
end
Expand Down

0 comments on commit 66e474d

Please sign in to comment.