Skip to content

Commit

Permalink
Add outbound retransmissions
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala committed Apr 18, 2024
1 parent 945615b commit 9effff3
Show file tree
Hide file tree
Showing 6 changed files with 226 additions and 64 deletions.
85 changes: 60 additions & 25 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -201,14 +201,12 @@ defmodule ExWebRTC.PeerConnection do
"""
@spec send_rtp(
peer_connection(),
MediaStreamTrack.t() | MediaStreamTrack.id(),
ExRTP.Packet.t()
MediaStreamTrack.id(),
ExRTP.Packet.t(),
rtx?: boolean()
) :: :ok
def send_rtp(peer_connection, %MediaStreamTrack{id: track_id}, packet),
do: send_rtp(peer_connection, track_id, packet)

def send_rtp(peer_connection, track_id, packet) when is_integer(track_id) do
GenServer.cast(peer_connection, {:send_rtp, track_id, packet})
def send_rtp(peer_connection, track_id, packet, opts \\ []) do
GenServer.cast(peer_connection, {:send_rtp, track_id, packet, opts})
end

@doc """
Expand Down Expand Up @@ -479,7 +477,9 @@ defmodule ExWebRTC.PeerConnection do
@impl true
def handle_call({:add_transceiver, kind, options}, _from, state)
when kind in [:audio, :video] do
options = Keyword.put(options, :ssrc, generate_ssrc(state))
{ssrc, rtx_ssrc} = generate_ssrcs(state)
options = [{:ssrc, ssrc}, {:rtx_ssrc, rtx_ssrc} | options]

transceiver = RTPTransceiver.new(kind, nil, state.config, options)
state = %{state | transceivers: state.transceivers ++ [transceiver]}

Expand All @@ -490,7 +490,9 @@ defmodule ExWebRTC.PeerConnection do

@impl true
def handle_call({:add_transceiver, %MediaStreamTrack{} = track, options}, _from, state) do
options = Keyword.put(options, :ssrc, generate_ssrc(state))
{ssrc, rtx_ssrc} = generate_ssrcs(state)
options = [{:ssrc, ssrc}, {:rtx_ssrc, rtx_ssrc} | options]

transceiver = RTPTransceiver.new(track.kind, track, state.config, options)
state = %{state | transceivers: state.transceivers ++ [transceiver]}

Expand Down Expand Up @@ -558,16 +560,24 @@ defmodule ExWebRTC.PeerConnection do
false
end)

{ssrc, rtx_ssrc} = generate_ssrcs(state)

{transceivers, sender} =
case free_transceiver_idx do
nil ->
options = [direction: :sendrecv, ssrc: generate_ssrc(state), added_by_add_track: true]
options = [
direction: :sendrecv,
added_by_add_track: true,
ssrc: ssrc,
rtx_ssrc: rtx_ssrc
]

tr = RTPTransceiver.new(kind, track, state.config, options)
{state.transceivers ++ [tr], tr.sender}

idx ->
tr = Enum.at(state.transceivers, idx)
tr = RTPTransceiver.add_track(tr, track, generate_ssrc(state))
tr = RTPTransceiver.add_track(tr, track, ssrc, rtx_ssrc)
{List.replace_at(state.transceivers, idx, tr), tr.sender}
end

Expand All @@ -594,7 +604,8 @@ defmodule ExWebRTC.PeerConnection do
{:reply, {:error, :invalid_track_type}, state}

tr.direction in [:sendrecv, :sendonly] ->
tr = RTPTransceiver.replace_track(tr, track, generate_ssrc(state))
{ssrc, rtx_ssrc} = generate_ssrcs(state)
tr = RTPTransceiver.replace_track(tr, track, ssrc, rtx_ssrc)
transceivers = List.replace_at(state.transceivers, tr_idx, tr)
state = %{state | transceivers: transceivers}
{:reply, :ok, state}
Expand Down Expand Up @@ -760,7 +771,9 @@ defmodule ExWebRTC.PeerConnection do
end

@impl true
def handle_cast({:send_rtp, track_id, packet}, state) do
def handle_cast({:send_rtp, track_id, packet, opts}, state) do
rtx? = Keyword.get(opts, :rtx?, false)

# TODO: iterating over transceivers is not optimal
# but this is, most likely, going to be refactored anyways
{transceiver, idx} =
Expand Down Expand Up @@ -792,7 +805,7 @@ defmodule ExWebRTC.PeerConnection do
{packet, state}
end

{packet, transceiver} = RTPTransceiver.send_packet(transceiver, packet)
{packet, transceiver} = RTPTransceiver.send_packet(transceiver, packet, rtx?)
:ok = DTLSTransport.send_rtp(state.dtls_transport, packet)

transceivers = List.replace_at(state.transceivers, idx, transceiver)
Expand Down Expand Up @@ -932,13 +945,13 @@ defmodule ExWebRTC.PeerConnection do
def handle_info({:dtls_transport, _pid, {:rtcp, data}}, state) do
case ExRTCP.CompoundPacket.decode(data) do
{:ok, packets} ->
transceivers =
Enum.reduce(packets, state.transceivers, fn packet, transceivers ->
handle_report(packet, transceivers)
state =
Enum.reduce(packets, state, fn packet, state ->
handle_rtcp_packet(state, packet)
end)

notify(state.owner, {:rtcp, packets})
{:noreply, %{state | transceivers: transceivers}}
{:noreply, state}

{:error, _res} ->
case data do
Expand Down Expand Up @@ -1588,23 +1601,42 @@ defmodule ExWebRTC.PeerConnection do
end
end

defp handle_report(%ExRTCP.Packet.SenderReport{} = report, transceivers) do
defp handle_rtcp_packet(state, %ExRTCP.Packet.SenderReport{} = report) do
transceiver =
transceivers
state.transceivers
|> Enum.with_index()
|> Enum.find(fn {tr, _idx} -> tr.receiver.ssrc == report.ssrc end)

case transceiver do
nil ->
transceivers
state

{tr, idx} ->
tr = RTPTransceiver.receive_report(tr, report)
List.replace_at(transceivers, idx, tr)
transceivers = List.replace_at(state.transceivers, idx, tr)
%{state | transceivers: transceivers}
end
end

defp handle_rtcp_packet(state, %ExRTCP.Packet.TransportFeedback.NACK{} = nack) do
transceiver =
state.transceivers
|> Enum.with_index()
|> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == nack.media_ssrc end)

case transceiver do
nil ->
state

{tr, idx} ->
{packets, tr} = RTPTransceiver.receive_nack(tr, nack)
for packet <- packets, do: send_rtp(self(), tr.sender.track.id, packet, rtx?: true)
transceivers = List.replace_at(state.transceivers, idx, tr)
%{state | transceivers: transceivers}
end
end

defp handle_report(_report, transceivers), do: transceivers
defp handle_rtcp_packet(state, _packet), do: state

defp do_get_description(nil, _candidates), do: nil

Expand All @@ -1613,10 +1645,13 @@ defmodule ExWebRTC.PeerConnection do
%SessionDescription{type: type, sdp: to_string(sdp)}
end

defp generate_ssrc(state) do
defp generate_ssrcs(state) do
# we need other ssrcs when generating 2 ssrcs back to back (i.e. ssrc and rtx ssrc)
rtp_sender_ssrcs = Enum.map(state.transceivers, & &1.sender.ssrc)
ssrcs = MapSet.new(Map.keys(state.demuxer.ssrc_to_mid) ++ rtp_sender_ssrcs)
do_generate_ssrc(ssrcs, 200)
ssrc = do_generate_ssrc(ssrcs, 200)
rtx_ssrc = do_generate_ssrc(MapSet.put(ssrcs, ssrc), 200)
{ssrc, rtx_ssrc}
end

# this is practically impossible so it's easier to raise
Expand Down
66 changes: 49 additions & 17 deletions lib/ex_webrtc/rtp_sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule ExWebRTC.RTPSender do

alias ExWebRTC.{MediaStreamTrack, RTPCodecParameters, Utils}
alias ExSDP.Attribute.Extmap
alias __MODULE__.ReportRecorder
alias __MODULE__.{NackResponder, ReportRecorder}

@mid_uri "urn:ietf:params:rtp-hdrext:sdes:mid"

Expand All @@ -18,21 +18,26 @@ defmodule ExWebRTC.RTPSender do
rtp_hdr_exts: %{Extmap.extension_id() => Extmap.t()},
mid: String.t() | nil,
pt: non_neg_integer() | nil,
rtx_pt: non_neg_integer() | nil,
ssrc: non_neg_integer() | nil,
rtx_ssrc: non_neg_integer() | nil,
packets_sent: non_neg_integer(),
bytes_sent: non_neg_integer(),
markers_sent: non_neg_integer(),
report_recorder: ReportRecorder.t()
report_recorder: ReportRecorder.t(),
nack_responder: NackResponder.t()
}

@enforce_keys [:id, :report_recorder]
@enforce_keys [:id, :report_recorder, :nack_responder]
defstruct @enforce_keys ++
[
:track,
:codec,
:mid,
:pt,
:rtx_pt,
:ssrc,
:rtx_ssrc,
rtp_hdr_exts: %{},
packets_sent: 0,
bytes_sent: 0,
Expand All @@ -43,36 +48,45 @@ defmodule ExWebRTC.RTPSender do
@spec new(
MediaStreamTrack.t() | nil,
RTPCodecParameters.t() | nil,
RTPCodecParameters.t() | nil,
[Extmap.t()],
String.t() | nil,
non_neg_integer | nil
non_neg_integer() | nil,
non_neg_integer() | nil
) :: t()
def new(track, codec, rtp_hdr_exts, mid \\ nil, ssrc) do
def new(track, codec, rtx_codec, rtp_hdr_exts, mid \\ nil, ssrc, rtx_ssrc) do
# convert to a map to be able to find extension id using extension uri
rtp_hdr_exts = Map.new(rtp_hdr_exts, fn extmap -> {extmap.uri, extmap} end)
# TODO: handle cases when codec == nil (no valid codecs after negotiation)
pt = if codec != nil, do: codec.payload_type, else: nil
rtx_pt = if rtx_codec != nil, do: rtx_codec.payload_type, else: nil

%__MODULE__{
id: Utils.generate_id(),
track: track,
codec: codec,
rtp_hdr_exts: rtp_hdr_exts,
pt: pt,
rtx_pt: rtx_pt,
ssrc: ssrc,
rtx_ssrc: rtx_ssrc,
mid: mid,
report_recorder: %ReportRecorder{clock_rate: codec && codec.clock_rate}
report_recorder: %ReportRecorder{clock_rate: codec && codec.clock_rate},
nack_responder: %NackResponder{}
}
end

@doc false
@spec update(t(), String.t(), RTPCodecParameters.t() | nil, [Extmap.t()]) :: t()
def update(sender, mid, codec, rtp_hdr_exts) do
@spec update(t(), String.t(), RTPCodecParameters.t() | nil, RTPCodecParameters.t() | nil, [
Extmap.t()
]) :: t()
def update(sender, mid, codec, rtx_codec, rtp_hdr_exts) do
if sender.mid != nil and mid != sender.mid, do: raise(ArgumentError)
# convert to a map to be able to find extension id using extension uri
rtp_hdr_exts = Map.new(rtp_hdr_exts, fn extmap -> {extmap.uri, extmap} end)
# TODO: handle cases when codec == nil (no valid codecs after negotiation)
pt = if codec != nil, do: codec.payload_type, else: nil
rtx_pt = if rtx_codec != nil, do: rtx_codec.payload_type, else: nil

report_recorder = %ReportRecorder{
sender.report_recorder
Expand All @@ -85,6 +99,7 @@ defmodule ExWebRTC.RTPSender do
codec: codec,
rtp_hdr_exts: rtp_hdr_exts,
pt: pt,
rtx_pt: rtx_pt,
report_recorder: report_recorder
}
end
Expand All @@ -93,35 +108,52 @@ defmodule ExWebRTC.RTPSender do
# * assigns SSRC, pt, mid
# * serializes to binary
@doc false
@spec send_packet(t(), ExRTP.Packet.t()) :: {binary(), t()}
def send_packet(sender, packet) do
@spec send_packet(t(), ExRTP.Packet.t(), boolean()) :: {binary(), t()}
def send_packet(sender, packet, rtx?) do
%Extmap{} = mid_extmap = Map.fetch!(sender.rtp_hdr_exts, @mid_uri)

mid_ext =
%ExRTP.Packet.Extension.SourceDescription{text: sender.mid}
|> ExRTP.Packet.Extension.SourceDescription.to_raw(mid_extmap.id)

packet = %{packet | payload_type: sender.pt, ssrc: sender.ssrc}

report_recorder = ReportRecorder.record_packet(sender.report_recorder, packet)
{pt, ssrc} =
if rtx? do
{sender.rtx_pt, sender.rtx_ssrc}
else
{sender.pt, sender.ssrc}
end

data =
packet
packet =
%{packet | payload_type: pt, ssrc: ssrc}
|> ExRTP.Packet.remove_extension(mid_extmap.id)
|> ExRTP.Packet.add_extension(mid_ext)
|> ExRTP.Packet.encode()

report_recorder = ReportRecorder.record_packet(sender.report_recorder, packet)
nack_responder = NackResponder.record_packet(sender.nack_responder, packet)

data = ExRTP.Packet.encode(packet)

sender = %{
sender
| packets_sent: sender.packets_sent + 1,
bytes_sent: sender.bytes_sent + byte_size(data),
markers_sent: sender.markers_sent + Utils.to_int(packet.marker),
report_recorder: report_recorder
report_recorder: report_recorder,
nack_responder: nack_responder
}

{data, sender}
end

@doc false
@spec receive_nack(t(), ExRTCP.Packet.TransportFeedback.NACK.t()) :: {[ExRTP.Packet.t()], t()}
def receive_nack(sender, nack) do
{packets, nack_responder} = NackResponder.get_rtx(sender.nack_responder, nack)
sender = %{sender | nack_responder: nack_responder}

{packets, sender}
end

@doc false
@spec get_stats(t(), non_neg_integer()) :: map()
def get_stats(sender, timestamp) do
Expand Down
53 changes: 53 additions & 0 deletions lib/ex_webrtc/rtp_sender/nack_responder.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
defmodule ExWebRTC.RTPSender.NackResponder do
@moduledoc nil

alias ExRTP.Packet
alias ExRTCP.Packet.TransportFeedback.NACK

@max_packets 200

@type t() :: %__MODULE__{
packets: %{non_neg_integer() => Packet.t()},
seq_no: non_neg_integer()
}

defstruct packets: %{},
seq_no: Enum.random(0..0xFFFF)

@doc """
Records send RTP packets.
"""
@spec record_packet(t(), Packet.t()) :: t()
def record_packet(responder, packet) do
key = rem(packet.sequence_number, @max_packets)
packets = Map.put(responder.packets, key, packet)

%__MODULE__{responder | packets: packets}
end

@doc """
Returns RTX RTP packets to be retransmited based on received NACK feedback.
"""
@spec get_rtx(t(), NACK.t()) :: {[ExRTP.Packet.t()], t()}
def get_rtx(responder, nack) do
seq_nos = NACK.to_sequence_numbers(nack)

{packets, seq_no} =
seq_nos
|> Enum.map(fn seq_no -> {seq_no, Map.get(responder.packets, rem(seq_no, @max_packets))} end)
|> Enum.filter(fn {seq_no, packet} -> packet != nil and packet.sequence_number == seq_no end)
# ssrc will be assigned by the sender
|> Enum.map_reduce(responder.seq_no, fn {seq_no, packet}, rtx_seq_no ->
rtx_packet = %Packet{
packet
| sequence_number: rtx_seq_no,
payload: <<seq_no::16, packet.payload::binary>>
}

{rtx_packet, rtx_seq_no + 1}
end)

responder = %__MODULE__{responder | seq_no: seq_no}
{packets, responder}
end
end
Loading

0 comments on commit 9effff3

Please sign in to comment.