Skip to content

Commit

Permalink
Convert RTPTransceiver to GenServer
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Nov 21, 2023
1 parent daa448a commit b732ece
Show file tree
Hide file tree
Showing 3 changed files with 177 additions and 149 deletions.
82 changes: 69 additions & 13 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ defmodule ExWebRTC.PeerConnection do
DTLSTransport,
IceCandidate,
MediaStreamTrack,
RTPCodecParameters,
RTPTransceiver,
SDPUtils,
SessionDescription,
Expand Down Expand Up @@ -91,13 +92,15 @@ defmodule ExWebRTC.PeerConnection do
GenServer.call(peer_connection, {:add_ice_candidate, candidate})
end

@spec get_transceivers(peer_connection()) :: [RTPTransceiver.t()]
@spec get_transceivers(peer_connection()) :: [
{mid :: String.t(), RTPTransceiver.rtp_transceiver()}
]
def get_transceivers(peer_connection) do
GenServer.call(peer_connection, :get_transceivers)
end

@spec add_transceiver(peer_connection(), RTPTransceiver.kind(), transceiver_options()) ::
{:ok, RTPTransceiver.t()} | {:error, :TODO}
{:ok, RTPTransceiver.rtp_transceiver()} | {:error, :TODO}
def add_transceiver(peer_connection, kind, options \\ []) do
GenServer.call(peer_connection, {:add_transceiver, kind, options})
end
Expand Down Expand Up @@ -161,7 +164,7 @@ defmodule ExWebRTC.PeerConnection do
rtcp: true
]

mlines = Enum.map(transceivers, &RTPTransceiver.to_offer_mline(&1, opts))
mlines = Enum.map(transceivers, fn {_, t} -> SDPUtils.to_offer_mline(t, opts) end)

mids =
Enum.map(mlines, fn mline ->
Expand Down Expand Up @@ -218,8 +221,8 @@ defmodule ExWebRTC.PeerConnection do
mlines =
Enum.map(remote_offer.media, fn mline ->
{:mid, mid} = ExSDP.Media.get_attribute(mline, :mid)
{_ix, transceiver} = RTPTransceiver.find_by_mid(state.transceivers, mid)
RTPTransceiver.to_answer_mline(transceiver, mline, opts)
{_, transceiver} = Enum.find(state.transceivers, fn {m, _} -> m == mid end)
SDPUtils.to_answer_mline(transceiver, mline, opts)
end)

mids =
Expand Down Expand Up @@ -314,15 +317,16 @@ defmodule ExWebRTC.PeerConnection do
:video -> {state.config.video_rtp_hdr_exts, state.config.video_codecs}
end

transceiver = %RTPTransceiver{
props = %{
mid: nil,
direction: direction,
kind: kind,
codecs: codecs,
rtp_hdr_exts: rtp_hdr_exts
}

transceivers = List.insert_at(state.transceivers, -1, transceiver)
{:ok, transceiver} = RTPTransceiver.start_link(kind, props)

transceivers = state.transceivers ++ [{nil, transceiver}]
{:reply, {:ok, transceiver}, %{state | transceivers: transceivers}}
end

Expand Down Expand Up @@ -408,7 +412,7 @@ defmodule ExWebRTC.PeerConnection do
new_transceivers
# only take new transceivers that can receive tracks
|> Enum.filter(fn tr ->
RTPTransceiver.find_by_mid(state.transceivers, tr.mid) == nil and
Enum.all?(state.transceivers, fn {m, _} -> tr.mid != m end) and
tr.direction in [:recvonly, :sendrecv]
end)
|> Enum.map(fn tr -> MediaStreamTrack.from_transceiver(tr) end)
Expand Down Expand Up @@ -442,7 +446,7 @@ defmodule ExWebRTC.PeerConnection do
Enum.reduce_while(sdp.media, {:ok, transceivers}, fn mline, {:ok, transceivers} ->
case ExSDP.Media.get_attribute(mline, :mid) do
{:mid, mid} ->
transceivers = RTPTransceiver.update_or_create(transceivers, mid, mline, config)
transceivers = update_or_create_transceiver(transceivers, mid, mline, config)
{:cont, {:ok, transceivers}}

_other ->
Expand All @@ -451,11 +455,63 @@ defmodule ExWebRTC.PeerConnection do
end)
end

# searches for transceiver for a given mline
# if it exists, updates its configuration
# if it doesn't exist, creats a new one
# returns list of updated transceivers
defp update_or_create_transceiver(transceivers, mid, mline, config) do
codecs = get_codecs(mline, config)
rtp_hdr_exts = get_rtp_hdr_extensions(mline, config)
props = %{codecs: codecs, rtp_hdr_exts: rtp_hdr_exts}

Enum.find(transceivers, fn {m, _} -> m == mid end)
|> case do
{_, tr} ->
:ok = RTPTransceiver.update_properties(tr, props)
transceivers

nil ->
props = Map.merge(props, %{mid: mid, direction: :recvonly})
{:ok, tr} = RTPTransceiver.start_link(mline.type, props)
transceivers ++ [{mid, tr}]
end
end

defp get_codecs(mline, config) do
rtp_mappings = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTPMapping)
fmtps = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.FMTP)
all_rtcp_fbs = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTCPFeedback)

rtp_mappings
|> Stream.map(fn rtp_mapping ->
fmtp = Enum.find(fmtps, &(&1.pt == rtp_mapping.payload_type))

rtcp_fbs =
all_rtcp_fbs
|> Stream.filter(&(&1.pt == rtp_mapping.payload_type))
|> Enum.filter(&Configuration.is_supported_rtcp_fb(config, &1))

RTPCodecParameters.new(mline.type, rtp_mapping, fmtp, rtcp_fbs)
end)
|> Enum.filter(fn codec -> Configuration.is_supported_codec(config, codec) end)
end

defp get_rtp_hdr_extensions(mline, config) do
mline
|> ExSDP.Media.get_attributes(ExSDP.Attribute.Extmap)
|> Enum.filter(&Configuration.is_supported_rtp_hdr_extension(config, &1, mline.type))
end

defp assign_mids(transceivers, next_mid) do
{new_transceivers, _next_mid} =
Enum.map_reduce(transceivers, next_mid, fn
%{mid: nil} = t, nm -> {%{t | mid: to_string(nm)}, nm + 1}
other, nm -> {other, nm}
{nil, tr}, nm ->
mid = to_string(nm)
:ok = RTPTransceiver.update_properties(tr, %{mid: mid})
{{mid, tr}, nm + 1}

other, nm ->
{other, nm}
end)

new_transceivers
Expand All @@ -476,7 +532,7 @@ defmodule ExWebRTC.PeerConnection do
end

tsc_mids =
for %RTPTransceiver{mid: mid} when mid != nil <- state.transceivers,
for {mid, _transceiver} when mid != nil <- state.transceivers,
{mid, ""} <- Integer.parse(mid) do
mid
end
Expand Down
170 changes: 34 additions & 136 deletions lib/ex_webrtc/rtp_transceiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,164 +3,62 @@ defmodule ExWebRTC.RTPTransceiver do
RTPTransceiver
"""

alias ExWebRTC.{PeerConnection.Configuration, RTPCodecParameters, RTPReceiver}
use GenServer

alias ExWebRTC.RTPCodecParameters

@type rtp_transceiver() :: GenServer.server()

@type direction() :: :sendonly | :recvonly | :sendrecv | :inactive | :stopped
@type kind() :: :audio | :video

@type t() :: %__MODULE__{
mid: String.t(),
@type properties() :: %{
mid: String.t() | nil,
direction: direction(),
kind: kind(),
rtp_hdr_exts: [ExSDP.Attribute.Extmap.t()],
codecs: [RTPCodecParameters.t()],
rtp_receiver: nil
codecs: [RTPCodecParameters.t()]
}

@enforce_keys [:mid, :direction, :kind]
defstruct @enforce_keys ++ [codecs: [], rtp_hdr_exts: [], rtp_receiver: %RTPReceiver{}]

@doc false
def find_by_mid(transceivers, mid) do
transceivers
|> Enum.with_index(fn tr, idx -> {idx, tr} end)
|> Enum.find(fn {_idx, tr} -> tr.mid == mid end)
@spec start_link(kind(), properties()) :: GenServer.on_start()
def start_link(kind, properties) do
GenServer.start_link(__MODULE__, [kind, properties])
end

@doc false
@spec to_answer_mline(t(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t()
def to_answer_mline(transceiver, mline, opts) do
if transceiver.codecs == [] do
# reject mline and skip further processing
# see RFC 8299 sec. 5.3.1 and RFC 3264 sec. 6
%ExSDP.Media{mline | port: 0}
else
offered_direction = ExSDP.Media.get_attribute(mline, :direction)
direction = get_direction(offered_direction, transceiver.direction)
opts = Keyword.put(opts, :direction, direction)
to_mline(transceiver, opts)
end
@spec get_properties(rtp_transceiver()) :: {kind(), properties()}
def get_properties(transceiver) do
GenServer.call(transceiver, :get_properties)
end

@doc false
@spec to_offer_mline(t(), Keyword.t()) :: ExSDP.Media.t()
def to_offer_mline(transceiver, opts) do
to_mline(transceiver, opts)
@spec update_properties(rtp_transceiver(), map()) :: :ok
def update_properties(transceiver, properties) do
# properties should be a subset of properties() type, but typespecs suck
GenServer.call(transceiver, {:update_properties, properties})
end

# searches for transceiver for a given mline
# if it exists, updates its configuration
# if it doesn't exist, creats a new one
# returns list of updated transceivers
@doc false
def update_or_create(transceivers, mid, mline, config) do
case find_by_mid(transceivers, mid) do
{idx, %__MODULE__{} = tr} ->
List.replace_at(transceivers, idx, update(tr, mline, config))

nil ->
codecs = get_codecs(mline, config)
rtp_hdr_exts = get_rtp_hdr_extensions(mline, config)
ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC)
@impl true
def init([kind, props]) do
state = %{props | kind: kind, receiver: nil, sender: nil}

tr = %__MODULE__{
mid: mid,
direction: :recvonly,
kind: mline.type,
codecs: codecs,
rtp_hdr_exts: rtp_hdr_exts,
rtp_receiver: %RTPReceiver{ssrc: ssrc}
}

transceivers ++ [tr]
end
{:ok, state}
end

defp to_mline(transceiver, opts) do
pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end)

media_formats =
Enum.flat_map(transceiver.codecs, fn codec ->
[_type, encoding] = String.split(codec.mime_type, "/")

rtp_mapping = %ExSDP.Attribute.RTPMapping{
clock_rate: codec.clock_rate,
encoding: encoding,
params: codec.channels,
payload_type: codec.payload_type
}

[rtp_mapping, codec.sdp_fmtp_line, codec.rtcp_fbs]
end)

attributes =
if(Keyword.get(opts, :rtcp, false), do: [{"rtcp", "9 IN IP4 0.0.0.0"}], else: []) ++
[
Keyword.get(opts, :direction, transceiver.direction),
{:mid, transceiver.mid},
{:ice_ufrag, Keyword.fetch!(opts, :ice_ufrag)},
{:ice_pwd, Keyword.fetch!(opts, :ice_pwd)},
{:ice_options, Keyword.fetch!(opts, :ice_options)},
{:fingerprint, Keyword.fetch!(opts, :fingerprint)},
{:setup, Keyword.fetch!(opts, :setup)},
:rtcp_mux
] ++ transceiver.rtp_hdr_exts

%ExSDP.Media{
ExSDP.Media.new(transceiver.kind, 9, "UDP/TLS/RTP/SAVPF", pt)
| # mline must be followed by a cline, which must contain
# the default value "IN IP4 0.0.0.0" (as there are no candidates yet)
connection_data: [%ExSDP.ConnectionData{address: {0, 0, 0, 0}}]
}
|> ExSDP.Media.add_attributes(attributes ++ media_formats)
end

# RFC 3264 (6.1) + RFC 8829 (5.3.1)
# AFAIK one of the cases should always match
# bc we won't assign/create an inactive transceiver to i.e. sendonly mline
# also neither of the arguments should ever be :stopped
defp get_direction(_, :inactive), do: :inactive
defp get_direction(:sendonly, t) when t in [:sendrecv, :recvonly], do: :recvonly
defp get_direction(:recvonly, t) when t in [:sendrecv, :sendonly], do: :sendonly
defp get_direction(o, other) when o in [:sendrecv, nil], do: other
defp get_direction(:inactive, _), do: :inactive

defp update(transceiver, mline, config) do
codecs = get_codecs(mline, config)
rtp_hdr_exts = get_rtp_hdr_extensions(mline, config)
ssrc = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.SSRC)
rtp_receiver = %RTPReceiver{ssrc: ssrc}

%__MODULE__{
transceiver
| codecs: codecs,
rtp_hdr_exts: rtp_hdr_exts,
rtp_receiver: rtp_receiver
}
@impl true
def handle_call(:get_properties, _from, state) do
properties = Map.take(state, [:mid, :direction, :kind, :rtp_hrd_exts, :codecs])
{:reply, properties, state}
end

defp get_codecs(mline, config) do
rtp_mappings = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTPMapping)
fmtps = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.FMTP)
all_rtcp_fbs = ExSDP.Media.get_attributes(mline, ExSDP.Attribute.RTCPFeedback)

rtp_mappings
|> Stream.map(fn rtp_mapping ->
fmtp = Enum.find(fmtps, &(&1.pt == rtp_mapping.payload_type))

rtcp_fbs =
all_rtcp_fbs
|> Stream.filter(&(&1.pt == rtp_mapping.payload_type))
|> Enum.filter(&Configuration.is_supported_rtcp_fb(config, &1))

RTPCodecParameters.new(mline.type, rtp_mapping, fmtp, rtcp_fbs)
end)
|> Enum.filter(fn codec -> Configuration.is_supported_codec(config, codec) end)
end
@impl true
def handle_call({:update_properties, properties}, _from, state) do
# TODO: there's more to it that simply overriding the state's values
state =
properties
|> Map.take([:mid, :direction, :rtp_hdr_exts, :codecs])
|> then(&Map.merge(state, &1))

defp get_rtp_hdr_extensions(mline, config) do
mline
|> ExSDP.Media.get_attributes(ExSDP.Attribute.Extmap)
|> Enum.filter(&Configuration.is_supported_rtp_hdr_extension(config, &1, mline.type))
{:reply, :ok, state}
end
end
Loading

0 comments on commit b732ece

Please sign in to comment.