From 6f1be2b2c08cb24871689fbac4e0e6fd0d69b557 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20=C5=9Aled=C5=BA?= Date: Mon, 15 Apr 2024 22:39:17 +0200 Subject: [PATCH] Respond to incoming NACK --- examples/echo/lib/echo/peer_handler.ex | 20 ++++++- .../lib/send_from_file/peer_handler.ex | 2 + lib/ex_webrtc/peer_connection.ex | 37 +++++++++--- lib/ex_webrtc/rtp_sender.ex | 56 ++++++++++++++++--- lib/ex_webrtc/rtp_transceiver.ex | 7 +++ 5 files changed, 101 insertions(+), 21 deletions(-) diff --git a/examples/echo/lib/echo/peer_handler.ex b/examples/echo/lib/echo/peer_handler.ex index 088909e..eac2229 100644 --- a/examples/echo/lib/echo/peer_handler.ex +++ b/examples/echo/lib/echo/peer_handler.ex @@ -139,8 +139,15 @@ defmodule Echo.PeerHandler do {:ok, state} end - defp handle_webrtc_msg({:rtcp, _packets}, state) do - # do something with RTCP packets + defp handle_webrtc_msg({:rtcp, packets}, state) do + for packet <- packets do + %mod{} = packet + + if mod == ExRTCP.Packet.TransportFeedback.NACK do + # dbg(packet) + end + end + {:ok, state} end @@ -150,7 +157,14 @@ defmodule Echo.PeerHandler do end defp handle_webrtc_msg({:rtp, id, packet}, %{in_video_track_id: id} = state) do - PeerConnection.send_rtp(state.peer_connection, state.out_video_track_id, packet) + rnd = Enum.random(1..100) + + if rnd in 1..2 do + Logger.info("Dropping packet") + else + PeerConnection.send_rtp(state.peer_connection, state.out_video_track_id, packet) + end + {:ok, state} end diff --git a/examples/send_from_file/lib/send_from_file/peer_handler.ex b/examples/send_from_file/lib/send_from_file/peer_handler.ex index 0afc769..209e04a 100644 --- a/examples/send_from_file/lib/send_from_file/peer_handler.ex +++ b/examples/send_from_file/lib/send_from_file/peer_handler.ex @@ -14,6 +14,8 @@ defmodule SendFromFile.PeerHandler do alias ExWebRTC.Media.{IVF, Ogg} alias ExWebRTC.RTP.{OpusPayloader, VP8Payloader} + alias ExSDP.Attribute.FMTP + @behaviour WebSock @video_file "./video.ivf" diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index d77a332..4a326ef 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -932,13 +932,13 @@ defmodule ExWebRTC.PeerConnection do def handle_info({:dtls_transport, _pid, {:rtcp, data}}, state) do case ExRTCP.CompoundPacket.decode(data) do {:ok, packets} -> - transceivers = - Enum.reduce(packets, state.transceivers, fn packet, transceivers -> - handle_report(packet, transceivers) + state = + Enum.reduce(packets, state, fn packet, state -> + handle_rtcp_packet(state, packet) end) notify(state.owner, {:rtcp, packets}) - {:noreply, %{state | transceivers: transceivers}} + {:noreply, state} {:error, _res} -> case data do @@ -1588,23 +1588,42 @@ defmodule ExWebRTC.PeerConnection do end end - defp handle_report(%ExRTCP.Packet.SenderReport{} = report, transceivers) do + defp handle_rtcp_packet(state, %ExRTCP.Packet.SenderReport{} = report) do transceiver = - transceivers + state.transceivers |> Enum.with_index() |> Enum.find(fn {tr, _idx} -> tr.receiver.ssrc == report.ssrc end) case transceiver do nil -> - transceivers + state {tr, idx} -> tr = RTPTransceiver.receive_report(tr, report) - List.replace_at(transceivers, idx, tr) + transceivers = List.replace_at(state.transceivers, idx, tr) + %{state | transceivers: transceivers} + end + end + + defp handle_rtcp_packet(state, %ExRTCP.Packet.TransportFeedback.NACK{} = nack) do + transceiver = + state.transceivers + |> Enum.with_index() + |> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == nack.media_ssrc end) + + case transceiver do + nil -> + state + + {tr, idx} -> + {packets, tr} = RTPTransceiver.receive_nack(tr, nack) + for packet <- packets, do: send_rtp(self(), tr.sender.track.id, packet) + transceivers = List.replace_at(state.transceivers, idx, tr) + %{state | transceivers: transceivers} end end - defp handle_report(_report, transceivers), do: transceivers + defp handle_rtcp_packet(state, _packet), do: state defp do_get_description(nil, _candidates), do: nil diff --git a/lib/ex_webrtc/rtp_sender.ex b/lib/ex_webrtc/rtp_sender.ex index 75769f9..03cc032 100644 --- a/lib/ex_webrtc/rtp_sender.ex +++ b/lib/ex_webrtc/rtp_sender.ex @@ -3,12 +3,16 @@ defmodule ExWebRTC.RTPSender do Implementation of the [RTCRtpSender](https://www.w3.org/TR/webrtc/#rtcrtpsender-interface). """ + import Bitwise + alias ExWebRTC.{MediaStreamTrack, RTPCodecParameters, Utils} alias ExSDP.Attribute.Extmap alias __MODULE__.ReportRecorder @mid_uri "urn:ietf:params:rtp-hdrext:sdes:mid" + @history_size 200 + @type id() :: integer() @type t() :: %__MODULE__{ @@ -22,7 +26,8 @@ defmodule ExWebRTC.RTPSender do packets_sent: non_neg_integer(), bytes_sent: non_neg_integer(), markers_sent: non_neg_integer(), - report_recorder: ReportRecorder.t() + report_recorder: ReportRecorder.t(), + history: %{} } @enforce_keys [:id, :report_recorder] @@ -36,7 +41,8 @@ defmodule ExWebRTC.RTPSender do rtp_hdr_exts: %{}, packets_sent: 0, bytes_sent: 0, - markers_sent: 0 + markers_sent: 0, + history: %{} ] @doc false @@ -101,27 +107,39 @@ defmodule ExWebRTC.RTPSender do %ExRTP.Packet.Extension.SourceDescription{text: sender.mid} |> ExRTP.Packet.Extension.SourceDescription.to_raw(mid_extmap.id) - packet = %{packet | payload_type: sender.pt, ssrc: sender.ssrc} + packet = + %{packet | payload_type: sender.pt, ssrc: sender.ssrc} + |> ExRTP.Packet.remove_extension(mid_extmap.id) + |> ExRTP.Packet.add_extension(mid_ext) report_recorder = ReportRecorder.record_packet(sender.report_recorder, packet) - data = - packet - |> ExRTP.Packet.remove_extension(mid_extmap.id) - |> ExRTP.Packet.add_extension(mid_ext) - |> ExRTP.Packet.encode() + data = ExRTP.Packet.encode(packet) + + history = Map.put(sender.history, rem(packet.sequence_number, @history_size), packet) sender = %{ sender | packets_sent: sender.packets_sent + 1, bytes_sent: sender.bytes_sent + byte_size(data), markers_sent: sender.markers_sent + Utils.to_int(packet.marker), - report_recorder: report_recorder + report_recorder: report_recorder, + history: history } {data, sender} end + def receive_nack(sender, nack) do + packets = + nack.nacks + |> Enum.flat_map(&get_lost_packets(&1)) + |> Enum.map(&Map.get(sender.history, rem(&1, @history_size))) + |> Enum.reject(&(&1 == nil)) + + {packets, sender} + end + @doc false @spec get_stats(t(), non_neg_integer()) :: map() def get_stats(sender, timestamp) do @@ -135,4 +153,24 @@ defmodule ExWebRTC.RTPSender do markers_sent: sender.markers_sent } end + + # TODO move this to ex_rtcp + defp get_lost_packets(nack) do + <> = nack.blp + do_get_lost_packets(nack.pid, blp) + end + + defp do_get_lost_packets(pid, blp, i \\ 0, acc \\ []) + defp do_get_lost_packets(pid, 0, _i, acc), do: [pid | Enum.reverse(acc)] + + defp do_get_lost_packets(pid, blp, i, acc) do + lost = blp >>> i &&& 1 + + if lost == 1 do + lost_seq_no = pid + i + 1 + do_get_lost_packets(pid, blp, i + 1, [lost_seq_no | acc]) + else + do_get_lost_packets(pid, blp, i + 1, acc) + end + end end diff --git a/lib/ex_webrtc/rtp_transceiver.ex b/lib/ex_webrtc/rtp_transceiver.ex index 45726b9..b525f39 100644 --- a/lib/ex_webrtc/rtp_transceiver.ex +++ b/lib/ex_webrtc/rtp_transceiver.ex @@ -231,6 +231,13 @@ defmodule ExWebRTC.RTPTransceiver do %__MODULE__{transceiver | receiver: receiver} end + def receive_nack(transceiver, nack) do + {packets, sender} = RTPSender.receive_nack(transceiver.sender, nack) + + tr = %__MODULE__{transceiver | sender: sender} + {packets, tr} + end + @doc false @spec send_packet(t(), ExRTP.Packet.t()) :: {binary(), t()} def send_packet(transceiver, packet) do