Skip to content

Commit

Permalink
Implement basic DataChannel establishment functionalities
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Aug 20, 2024
1 parent 2133705 commit 66d05eb
Show file tree
Hide file tree
Showing 5 changed files with 519 additions and 17 deletions.
31 changes: 31 additions & 0 deletions lib/ex_webrtc/data_channel.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
defmodule ExWebRTC.DataChannel do
@moduledoc """
TODO
"""

@type order() :: :ordered | :unordered

@type id() :: non_neg_integer()

@type ready_state() :: :connecting | :open | :closing | :closed

@type options() :: [
ordered: order(),
max_packet_life_time: non_neg_integer(),
max_retransmits: non_neg_integer(),
protocol: String.t()
]

@type t() :: %__MODULE__{
id: non_neg_integer() | nil,
label: String.t(),
max_packet_life_time: non_neg_integer() | nil,
max_retransmits: non_neg_integer() | nil,
ordered: order(),
protocol: String.t(),
ready_state: ready_state()
}

@enforce_keys [:id, :label, :ordered, :protocol, :ready_state]
defstruct @enforce_keys ++ [:max_packet_life_time, :max_retransmits]
end
180 changes: 166 additions & 14 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ defmodule ExWebRTC.PeerConnection do
alias __MODULE__.{Configuration, Demuxer, TWCCRecorder}

alias ExWebRTC.{
DataChannel,
DefaultICETransport,
DTLSTransport,
ICECandidate,
MediaStreamTrack,
RTPTransceiver,
RTPSender,
SCTPTransport,
SDPUtils,
SessionDescription,
Utils
Expand Down Expand Up @@ -163,6 +165,14 @@ defmodule ExWebRTC.PeerConnection do
GenServer.cast(peer_connection, {:send_pli, track_id, rid})
end

@doc """
TODO
"""
@spec send_data(peer_connection(), DataChannel.id(), binary()) :: :ok
def send_data(peer_connection, channel_id, data) do
GenServer.cast(peer_connection, {:send_data, channel_id, data})
end

#### MDN-API ####

@doc """
Expand Down Expand Up @@ -408,6 +418,15 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:remove_track, sender_id})
end

@doc """
TODO
"""
@spec create_data_channel(peer_connection(), String.t(), DataChannel.options()) ::
{:ok, DataChannel.t()} | {:error, atom()}
def create_data_channel(peer_connection, label, opts \\ []) do
GenServer.call(peer_connection, {:create_data_channel, label, opts})
end

@doc """
Closes the PeerConnection.
Expand Down Expand Up @@ -455,6 +474,7 @@ defmodule ExWebRTC.PeerConnection do
current_remote_desc: nil,
pending_remote_desc: nil,
negotiation_needed: false,
sctp_transport: SCTPTransport.new(),
ice_transport: DefaultICETransport,
ice_pid: ice_pid,
dtls_transport: dtls_transport,
Expand Down Expand Up @@ -591,8 +611,13 @@ defmodule ExWebRTC.PeerConnection do
mlines =
Enum.map(remote_offer.media, fn mline ->
{:mid, mid} = ExSDP.get_attribute(mline, :mid)
{_ix, transceiver} = find_transceiver(state.transceivers, mid)
RTPTransceiver.to_answer_mline(transceiver, mline, opts)

if SDPUtils.data_channel?(mline) do
generate_data_mline(mid, opts)
else
{_ix, transceiver} = find_transceiver(state.transceivers, mid)
RTPTransceiver.to_answer_mline(transceiver, mline, opts)
end
end)

mids = SDPUtils.get_bundle_mids(mlines)
Expand Down Expand Up @@ -851,6 +876,34 @@ defmodule ExWebRTC.PeerConnection do
end
end

@impl true
def handle_call({:create_data_channel, label, opts}, _from, state) do
ordered = Keyword.get(opts, :ordered, true)
lifetime = Keyword.get(opts, :max_packet_life_time)
max_rtx = Keyword.get(opts, :max_retransmits)
protocol = Keyword.get(opts, :protocol, "")

with true <- byte_size(label) < 65_535,
true <- lifetime == nil or max_rtx == nil do
{events, channel, sctp_transport} =
SCTPTransport.add_channel(
state.sctp_transport,
label,
ordered,
protocol,
lifetime,
max_rtx
)

# TODO: negotiation needed

handle_sctp_events(events, state)
{:reply, {:ok, channel}, %{state | sctp_transport: sctp_transport}}
else
_other -> {:reply, :error, state}
end
end

@impl true
def handle_call(:get_stats, _from, state) do
timestamp = System.os_time(:millisecond)
Expand Down Expand Up @@ -1032,6 +1085,15 @@ defmodule ExWebRTC.PeerConnection do
end
end

@impl true
def handle_cast({:send_data, channel_id, data}, state) do
# TODO: allow for configuring the type of data
{events, sctp_transport} = SCTPTransport.send(state.sctp_transport, channel_id, :string, data)
handle_sctp_events(events, state)

{:noreply, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_info({:ex_ice, _from, {:connection_state_change, new_ice_state}}, state) do
state = %{state | ice_state: new_ice_state}
Expand Down Expand Up @@ -1078,6 +1140,18 @@ defmodule ExWebRTC.PeerConnection do
state = %{state | dtls_state: new_dtls_state}
next_conn_state = next_conn_state(state.ice_state, new_dtls_state)
state = update_conn_state(state, next_conn_state)

state =
case state.current_remote_desc do
{:answer, _} when new_dtls_state == :connected ->
{events, sctp_transport} = SCTPTransport.connect(state.sctp_transport)
handle_sctp_events(events, state)
%{state | sctp_transport: sctp_transport}

_other ->
state
end

{:noreply, state}
end

Expand Down Expand Up @@ -1159,6 +1233,14 @@ defmodule ExWebRTC.PeerConnection do
end
end

@impl true
def handle_info({:dtls_transport, _pid, {:data, data}}, state) do
{events, sctp_transport} = SCTPTransport.handle_data(state.sctp_transport, data)
handle_sctp_events(events, state)

{:noreply, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_info(:send_twcc_feedback, %{twcc_recorder: twcc_recorder} = state) do
Process.send_after(self(), :send_twcc_feedback, @twcc_interval)
Expand Down Expand Up @@ -1229,6 +1311,14 @@ defmodule ExWebRTC.PeerConnection do
{:noreply, %{state | transceivers: transceivers}}
end

@impl true
def handle_info(:sctp_timeout, state) do
{events, sctp_transport} = SCTPTransport.handle_timeout(state.sctp_transport)
handle_sctp_events(events, state)

{:noreply, %{state | sctp_transport: sctp_transport}}
end

@impl true
def handle_info(msg, state) do
Logger.info("Received unexpected message: #{inspect(msg)}")
Expand All @@ -1248,7 +1338,7 @@ defmodule ExWebRTC.PeerConnection do
# converting them into mlines
next_mid = find_next_mid(state)

{transceivers, _next_mid} =
{transceivers, next_mid} =
Enum.map_reduce(state.transceivers, next_mid, fn
# In the initial offer, we can't have stopped transceivers, only stopping ones.
# Also, stopped transceivers are immediately removed.
Expand All @@ -1267,15 +1357,23 @@ defmodule ExWebRTC.PeerConnection do
|> Enum.reject(fn tr -> tr.stopping == true end)
|> Enum.map(&RTPTransceiver.to_offer_mline(&1, opts))

{transceivers, mlines}
data_mline =
if SCTPTransport.data_channels?(state.sctp_transport) do
[generate_data_mline(next_mid, opts)]
else
[]
end

{transceivers, mlines ++ data_mline}
end

defp generate_offer_mlines(state, opts) do
last_answer = get_last_answer(state)
next_mid = find_next_mid(state)
next_mline_idx = Enum.count(last_answer.media)

transceivers = assign_mlines(state.transceivers, last_answer, next_mid, next_mline_idx)
{transceivers, next_mid} =
assign_mlines(state.transceivers, last_answer, next_mid, next_mline_idx)

# The idea is as follows:
# * Iterate over current local mlines
Expand All @@ -1293,13 +1391,16 @@ defmodule ExWebRTC.PeerConnection do
current_local_desc.media
|> Stream.with_index()
|> Enum.map(fn {local_mline, idx} ->
case Enum.find(transceivers, &(&1.mline_idx == idx)) do
# if there is no transceiver, the mline must have been rejected
# in the past (in the offer or answer) so we always set the port to 0
nil ->
tr = Enum.find(transceivers, &(&1.mline_idx == idx))

cond do
SDPUtils.data_channel?(local_mline) ->
generate_data_mline(local_mline.mid, opts)

tr == nil ->
%{local_mline | port: 0}

tr ->
true ->
RTPTransceiver.to_offer_mline(tr, opts)
end
end)
Expand All @@ -1313,7 +1414,36 @@ defmodule ExWebRTC.PeerConnection do

final_mlines = final_mlines ++ rem_mlines

{transceivers, final_mlines}
data_mline =
if SCTPTransport.data_channels?(state.sctp_transport) and
not Enum.any?(final_mlines, &SDPUtils.data_channel?(&1)) do
[generate_data_mline(next_mid, opts)]
else
[]
end

{transceivers, final_mlines ++ data_mline}
end

def generate_data_mline(mid, opts) do
attributes =
[
{:mid, mid},
{:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)},
{:ice_pwd, Keyword.fetch!(opts, :ice_pwd)},
{:ice_options, Keyword.fetch!(opts, :ice_options)},
{:fingerprint, Keyword.fetch!(opts, :fingerprint)},
{:setup, Keyword.fetch!(opts, :setup)},
{"sctp-port", "5000"}
]

%ExSDP.Media{
ExSDP.Media.new("application", 9, "UDP/DTLS/SCTP", "webrtc-datachannel")
| # mline must be followed by a cline, which must contain
# the default value "IN IP4 0.0.0.0" (as there are no candidates yet)
connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}]
}
|> ExSDP.add_attributes(attributes)
end

# next_mline_idx is future mline idx to use if there are no mlines to recycle
Expand All @@ -1327,7 +1457,7 @@ defmodule ExWebRTC.PeerConnection do
result \\ []
)

defp assign_mlines([], _, _, _, _, result), do: Enum.reverse(result)
defp assign_mlines([], _, next_mid, _, _, result), do: {Enum.reverse(result), next_mid}

defp assign_mlines(
[%{mid: nil, mline_idx: nil, stopped: false} = tr | trs],
Expand Down Expand Up @@ -1369,7 +1499,10 @@ defmodule ExWebRTC.PeerConnection do
state.ice_transport.gather_candidates(state.ice_pid)
end

transceivers = process_mlines_local(sdp.media, state.transceivers, type, state.owner)
transceivers =
sdp.media
|> Enum.reject(&SDPUtils.data_channel?/1)
|> process_mlines_local(state.transceivers, type, state.owner)

# TODO re-think order of those functions
# and demuxer update
Expand Down Expand Up @@ -1412,11 +1545,14 @@ defmodule ExWebRTC.PeerConnection do
state = %{state | config: config, twcc_extension_id: twcc_id}

transceivers =
process_mlines_remote(sdp.media, state.transceivers, type, state.config, state.owner)
sdp.media
|> Enum.reject(&SDPUtils.data_channel?/1)
|> process_mlines_remote(state.transceivers, type, state.config, state.owner)

# infer our role from the remote role
dtls_role = if dtls_role in [:actpass, :passive], do: :active, else: :passive
DTLSTransport.start_dtls(state.dtls_transport, dtls_role, peer_fingerprint)
sctp_transport = SCTPTransport.set_role(state.sctp_transport, dtls_role)

# ice_creds will be nil if all of the mlines in the description are rejected
# in such case, if this is the first remote description, connection won't be established
Expand All @@ -1439,6 +1575,7 @@ defmodule ExWebRTC.PeerConnection do
|> Map.replace!(:transceivers, transceivers)
|> remove_stopped_transceivers(type, sdp)
|> update_signaling_state(next_sig_state)
|> Map.replace!(:sctp_transport, sctp_transport)
|> Map.update!(:demuxer, &Demuxer.update(&1, sdp))

if state.signaling_state == :stable do
Expand Down Expand Up @@ -1876,6 +2013,21 @@ defmodule ExWebRTC.PeerConnection do

defp handle_rtcp_packet(state, _packet), do: {nil, state}

defp handle_sctp_events(events, state) do
for event <- events do
case event do
{:transmit, packets} ->
Enum.each(packets, &DTLSTransport.send_data(state.dtls_transport, &1))

{:channel_opened, channel} ->
notify(state.owner, {:data_channel, channel})

{:data, id, data} ->
notify(state.owner, {:data, id, data})
end
end
end

defp do_get_description(nil, _candidates), do: nil

defp do_get_description({type, sdp}, candidates) do
Expand Down
Loading

0 comments on commit 66d05eb

Please sign in to comment.