Skip to content

Commit

Permalink
Refactor the public modules, use internal maps for transceivers, send…
Browse files Browse the repository at this point in the history
…ers and receivers
  • Loading branch information
LVala committed May 8, 2024
1 parent f21de60 commit 15875fb
Show file tree
Hide file tree
Showing 8 changed files with 291 additions and 186 deletions.
81 changes: 51 additions & 30 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -465,6 +465,26 @@ defmodule ExWebRTC.PeerConnection do
{:reply, :ok, state}
end

@impl true
def handle_call(:get_connection_state, _from, state) do
{:reply, state.conn_state, state}
end

@impl true
def handle_call(:get_ice_connection_state, _from, state) do
{:reply, state.ice_state, state}
end

@impl true
def handle_call(:get_ice_gathering_state, _from, state) do
{:reply, state.ice_gathering_state, state}
end

@impl true
def handle_call(:get_signaling_state, _from, state) do
{:reply, state.signaling_state, state}
end

@impl true
def handle_call({:create_offer, _options}, _from, %{signaling_state: ss} = state)
when ss not in [:stable, :have_local_offer] do
Expand Down Expand Up @@ -658,7 +678,8 @@ defmodule ExWebRTC.PeerConnection do

@impl true
def handle_call(:get_transceivers, _from, state) do
{:reply, state.transceivers, state}
transceivers = Enum.map(state.transceivers, &RTPTransceiver.to_struct/1)
{:reply, transceivers, state}
end

@impl true
Expand All @@ -667,25 +688,25 @@ defmodule ExWebRTC.PeerConnection do
{ssrc, rtx_ssrc} = generate_ssrcs(state)
options = [{:ssrc, ssrc}, {:rtx_ssrc, rtx_ssrc} | options]

transceiver = RTPTransceiver.new(kind, nil, state.config, options)
state = %{state | transceivers: state.transceivers ++ [transceiver]}
tr = RTPTransceiver.new(kind, nil, state.config, options)
state = %{state | transceivers: state.transceivers ++ [tr]}

state = update_negotiation_needed(state)

{:reply, {:ok, transceiver}, state}
{:reply, {:ok, RTPTransceiver.to_struct(tr)}, state}
end

@impl true
def handle_call({:add_transceiver, %MediaStreamTrack{} = track, options}, _from, state) do
{ssrc, rtx_ssrc} = generate_ssrcs(state)
options = [{:ssrc, ssrc}, {:rtx_ssrc, rtx_ssrc} | options]

transceiver = RTPTransceiver.new(track.kind, track, state.config, options)
state = %{state | transceivers: state.transceivers ++ [transceiver]}
tr = RTPTransceiver.new(track.kind, track, state.config, options)
state = %{state | transceivers: state.transceivers ++ [tr]}

state = update_negotiation_needed(state)

{:reply, {:ok, transceiver}, state}
{:reply, {:ok, RTPTransceiver.to_struct(tr)}, state}
end

@impl true
Expand All @@ -698,7 +719,7 @@ defmodule ExWebRTC.PeerConnection do

idx ->
tr = Enum.at(state.transceivers, idx)
tr = %RTPTransceiver{tr | direction: direction}
tr = %{tr | direction: direction}
transceivers = List.replace_at(state.transceivers, idx, tr)
state = %{state | transceivers: transceivers}
state = update_negotiation_needed(state)
Expand Down Expand Up @@ -735,9 +756,9 @@ defmodule ExWebRTC.PeerConnection do
# we ignore the condition that sender has never been used to send
free_transceiver_idx =
Enum.find_index(state.transceivers, fn
%RTPTransceiver{
%{
kind: ^kind,
sender: %RTPSender{track: nil},
sender: %{track: nil},
current_direction: direction
}
when direction not in [:sendrecv, :sendonly] ->
Expand Down Expand Up @@ -972,7 +993,12 @@ defmodule ExWebRTC.PeerConnection do
end)

case transceiver do
%RTPTransceiver{} ->
nil ->
Logger.warning(
"Attempted to send packet to track with unrecognized id: #{inspect(track_id)}"
)

_other ->
{packet, state} =
case Map.fetch(state.config.video_rtp_hdr_exts, @twcc_uri) do
{:ok, %{id: id}} ->
Expand Down Expand Up @@ -1000,11 +1026,6 @@ defmodule ExWebRTC.PeerConnection do

{:noreply, state}

nil ->
Logger.warning(
"Attempted to send packet to track with unrecognized id: #{inspect(track_id)}"
)

{:noreply, state}
end
end
Expand Down Expand Up @@ -1083,7 +1104,7 @@ defmodule ExWebRTC.PeerConnection do
@impl true
def handle_info({:dtls_transport, _pid, {:rtp, data}}, state) do
with {:ok, demuxer, mid, packet} <- Demuxer.demux(state.demuxer, data),
{idx, %RTPTransceiver{} = t} <- find_transceiver(state.transceivers, mid) do
{idx, t} <- find_transceiver(state.transceivers, mid) do
# we always update the ssrc's for the one's from the latest packet
# although this is not a necessity, the feedbacks are transport-wide
twcc_recorder = %TWCCRecorder{
Expand Down Expand Up @@ -1248,13 +1269,13 @@ defmodule ExWebRTC.PeerConnection do
Enum.map_reduce(state.transceivers, next_mid, fn
# In the initial offer, we can't have stopped transceivers, only stopping ones.
# Also, stopped transceivers are immediately removed.
%RTPTransceiver{stopping: true, mid: nil} = tr, nm ->
%{stopping: true, mid: nil} = tr, nm ->
{tr, nm}

%RTPTransceiver{stopping: false, mid: nil} = tr, nm ->
%{stopping: false, mid: nil} = tr, nm ->
tr = RTPTransceiver.assign_mid(tr, to_string(nm))
# in the initial offer, mline_idx is the same as mid
tr = %RTPTransceiver{tr | mline_idx: nm}
tr = %{tr | mline_idx: nm}
{tr, nm + 1}
end)

Expand Down Expand Up @@ -1326,7 +1347,7 @@ defmodule ExWebRTC.PeerConnection do
defp assign_mlines([], _, _, _, _, result), do: Enum.reverse(result)

defp assign_mlines(
[%RTPTransceiver{mid: nil, mline_idx: nil, stopped: false} = tr | trs],
[%{mid: nil, mline_idx: nil, stopped: false} = tr | trs],
last_answer,
next_mid,
next_mline_idx,
Expand All @@ -1337,12 +1358,12 @@ defmodule ExWebRTC.PeerConnection do

case SDPUtils.find_free_mline_idx(last_answer, recycled_mlines) do
nil ->
tr = %RTPTransceiver{tr | mline_idx: next_mline_idx}
tr = %{tr | mline_idx: next_mline_idx}
result = [tr | result]
assign_mlines(trs, last_answer, next_mid + 1, next_mline_idx + 1, recycled_mlines, result)

idx ->
tr = %RTPTransceiver{tr | mline_idx: idx}
tr = %{tr | mline_idx: idx}
result = [tr | result]
recycled_mlines = [idx | recycled_mlines]
assign_mlines(trs, last_answer, next_mid + 1, next_mline_idx, recycled_mlines, result)
Expand Down Expand Up @@ -1448,7 +1469,7 @@ defmodule ExWebRTC.PeerConnection do
# This might result in unremovable transceiver when
# we add and stop it before the first offer.
# See https://github.com/w3c/webrtc-pc/issues/2923
%RTPTransceiver{mid: nil} ->
%{mid: nil} ->
false

tr ->
Expand Down Expand Up @@ -1566,7 +1587,7 @@ defmodule ExWebRTC.PeerConnection do
notify(owner, {:track_muted, tr.receiver.track.id})
end

tr = %RTPTransceiver{tr | current_direction: direction, fired_direction: direction}
tr = %{tr | current_direction: direction, fired_direction: direction}

# This is not defined in the W3C but see https://github.com/w3c/webrtc-pc/issues/2927
tr =
Expand Down Expand Up @@ -1596,12 +1617,12 @@ defmodule ExWebRTC.PeerConnection do
# after processing remote track but this shouldn't have any impact
{idx, tr} =
case find_transceiver_from_remote(transceivers, mline) do
{idx, %RTPTransceiver{} = tr} -> {idx, RTPTransceiver.update(tr, mline, config)}
{idx, tr} -> {idx, RTPTransceiver.update(tr, mline, config)}
nil -> {nil, RTPTransceiver.from_mline(mline, idx, config)}
end

tr = process_remote_track(tr, direction, owner)
tr = if sdp_type == :answer, do: %RTPTransceiver{tr | current_direction: direction}, else: tr
tr = if sdp_type == :answer, do: %{tr | current_direction: direction}, else: tr

tr =
if SDPUtils.rejected?(mline),
Expand All @@ -1623,7 +1644,7 @@ defmodule ExWebRTC.PeerConnection do
{:mid, mid} = ExSDP.get_attribute(mline, :mid)

case find_transceiver(transceivers, mid) do
{idx, %RTPTransceiver{} = tr} -> {idx, tr}
{idx, tr} -> {idx, tr}
nil -> find_associable_transceiver(transceivers, mline)
end
end
Expand All @@ -1649,7 +1670,7 @@ defmodule ExWebRTC.PeerConnection do
:ok
end

%RTPTransceiver{transceiver | fired_direction: direction}
%{transceiver | fired_direction: direction}
end

defp reverse_direction(:recvonly), do: :sendonly
Expand Down Expand Up @@ -1819,7 +1840,7 @@ defmodule ExWebRTC.PeerConnection do

# in case NACK was received, but RTX was not negotiated
# as NACK and RTX are negotited independently
{%RTPTransceiver{sender: %RTPSender{rtx_pt: nil}}, _idx} ->
{%{sender: %{rtx_pt: nil}}, _idx} ->
state

{tr, idx} ->
Expand Down
72 changes: 50 additions & 22 deletions lib/ex_webrtc/rtp_receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,11 @@ defmodule ExWebRTC.RTPReceiver do
alias ExWebRTC.{MediaStreamTrack, Utils, RTPCodecParameters}
alias __MODULE__.{NACKGenerator, ReportRecorder}

@type t() :: %__MODULE__{
@type id() :: integer()

@typedoc false
@type receiver() :: %{
id: id(),
track: MediaStreamTrack.t(),
codec: RTPCodecParameters.t() | nil,
ssrc: non_neg_integer() | nil,
Expand All @@ -19,42 +23,64 @@ defmodule ExWebRTC.RTPReceiver do
nack_generator: NACKGenerator.t()
}

@enforce_keys [:track, :codec, :report_recorder]
defstruct [
ssrc: nil,
bytes_received: 0,
packets_received: 0,
markers_received: 0,
nack_generator: %NACKGenerator{}
] ++ @enforce_keys
@typedoc """
Struct representing a receiver.
The fields mostly match these of [RTCRtpReceiver](https://developer.mozilla.org/en-US/docs/Web/API/RTCRtpReceiver),
except for:
* `id` - to uniquely identify the receiver.
* `codec` - codec this receiver is expected to receive.
"""
@type t() :: %__MODULE__{
id: id(),
track: MediaStreamTrack.t(),
codec: RTPCodecParameters.t() | nil
}

@enforce_keys [:id, :track, :codec]
defstruct @enforce_keys

@doc false
@spec to_struct(receiver()) :: t()
def to_struct(receiver) do
receiver
|> Map.take([:id, :track, :codec])
|> then(&struct!(__MODULE__, &1))
end

@doc false
@spec new(MediaStreamTrack.t(), RTPCodecParameters.t() | nil) :: t()
@spec new(MediaStreamTrack.t(), RTPCodecParameters.t() | nil) :: receiver()
def new(track, codec) do
report_recorder = %ReportRecorder{
clock_rate: codec && codec.clock_rate
}

%__MODULE__{
%{
id: Utils.generate_id(),
track: track,
codec: codec,
report_recorder: report_recorder
ssrc: nil,
bytes_received: 0,
packets_received: 0,
markers_received: 0,
report_recorder: report_recorder,
nack_generator: %NACKGenerator{}
}
end

@doc false
@spec update(t(), RTPCodecParameters.t() | nil) :: t()
@spec update(receiver(), RTPCodecParameters.t() | nil) :: receiver()
def update(receiver, codec) do
report_recorder = %ReportRecorder{
receiver.report_recorder
| clock_rate: codec && codec.clock_rate
}

%__MODULE__{receiver | codec: codec, report_recorder: report_recorder}
%{receiver | codec: codec, report_recorder: report_recorder}
end

@doc false
@spec receive_packet(t(), ExRTP.Packet.t(), non_neg_integer()) :: t()
@spec receive_packet(receiver(), ExRTP.Packet.t(), non_neg_integer()) :: receiver()
def receive_packet(receiver, packet, size) do
if packet.payload_type != receiver.codec.payload_type do
Logger.warning("Received packet with unexpected payload_type \
Expand All @@ -65,7 +91,7 @@ defmodule ExWebRTC.RTPReceiver do
nack_generator = NACKGenerator.record_packet(receiver.nack_generator, packet)

# TODO assign ssrc when applying local/remote description.
%__MODULE__{
%{
receiver
| ssrc: packet.ssrc,
bytes_received: receiver.bytes_received + size,
Expand All @@ -76,7 +102,8 @@ defmodule ExWebRTC.RTPReceiver do
}
end

@spec receive_rtx(t(), ExRTP.Packet.t(), non_neg_integer()) :: {:ok, ExRTP.Packet.t()} | :error
@spec receive_rtx(receiver(), ExRTP.Packet.t(), non_neg_integer()) ::
{:ok, ExRTP.Packet.t()} | :error
def receive_rtx(receiver, rtx_packet, apt) do
with <<seq_no::16, rest::binary>> <- rtx_packet.payload,
ssrc when ssrc != nil <- receiver.ssrc do
Expand All @@ -94,23 +121,24 @@ defmodule ExWebRTC.RTPReceiver do
end
end

@spec receive_report(t(), ExRTCP.Packet.SenderReport.t()) :: t()
@spec receive_report(receiver(), ExRTCP.Packet.SenderReport.t()) :: receiver()
def receive_report(receiver, report) do
report_recorder = ReportRecorder.record_report(receiver.report_recorder, report)

%__MODULE__{receiver | report_recorder: report_recorder}
%{receiver | report_recorder: report_recorder}
end

@doc false
@spec update_sender_ssrc(t(), non_neg_integer()) :: t()
@spec update_sender_ssrc(receiver(), non_neg_integer()) :: receiver()
def update_sender_ssrc(receiver, ssrc) do
report_recorder = %ReportRecorder{receiver.report_recorder | sender_ssrc: ssrc}
nack_generator = %NACKGenerator{receiver.nack_generator | sender_ssrc: ssrc}
%__MODULE__{receiver | report_recorder: report_recorder, nack_generator: nack_generator}

%{receiver | report_recorder: report_recorder, nack_generator: nack_generator}
end

@doc false
@spec get_stats(t(), non_neg_integer()) :: map()
@spec get_stats(receiver(), non_neg_integer()) :: map()
def get_stats(receiver, timestamp) do
%{
id: receiver.track.id,
Expand Down
5 changes: 1 addition & 4 deletions lib/ex_webrtc/rtp_receiver/nack_generator.ex
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
defmodule ExWebRTC.RTPReceiver.NACKGenerator do
@moduledoc nil
@moduledoc false
# for now, it mimics the Pion implementation, but there's some issues and remarks
# 1) NACKs are send at constant interval
# 2) no timing rules (like rtt) are taken into account
Expand All @@ -25,9 +25,6 @@ defmodule ExWebRTC.RTPReceiver.NACKGenerator do
last_sn: nil,
max_nack: @max_nack

@doc """
Records incoming RTP Packet.
"""
@spec record_packet(t(), ExRTP.Packet.t()) :: t()
def record_packet(generator, packet)

Expand Down
Loading

0 comments on commit 15875fb

Please sign in to comment.