diff --git a/lib/ex_webrtc/peer_connection.ex b/lib/ex_webrtc/peer_connection.ex index 2c98823..078de2c 100644 --- a/lib/ex_webrtc/peer_connection.ex +++ b/lib/ex_webrtc/peer_connection.ex @@ -18,7 +18,6 @@ defmodule ExWebRTC.PeerConnection do MediaStreamTrack, RTPTransceiver, RTPSender, - RTPReceiver, SDPUtils, SessionDescription, Utils @@ -907,29 +906,8 @@ defmodule ExWebRTC.PeerConnection do end) rtp_stats = - Enum.flat_map(state.transceivers, fn tr -> - tr_stats = %{kind: tr.kind, mid: tr.mid} - - case tr.current_direction do - :sendonly -> - stats = RTPSender.get_stats(tr.sender, timestamp) - [Map.merge(stats, tr_stats)] - - :recvonly -> - stats = RTPReceiver.get_stats(tr.receiver, timestamp) - Enum.map(stats, &Map.merge(&1, tr_stats)) - - :sendrecv -> - sender_stats = RTPSender.get_stats(tr.sender, timestamp) - receiver_stats = RTPReceiver.get_stats(tr.receiver, timestamp) - - [Map.merge(sender_stats, tr_stats)] ++ - Enum.map(receiver_stats, &Map.merge(&1, tr_stats)) - - _other -> - [] - end - end) + state.transceivers + |> Enum.flat_map(&RTPTransceiver.get_stats(&1, timestamp)) |> Map.new(fn stats -> {stats.id, stats} end) stats = %{ @@ -1026,23 +1004,28 @@ defmodule ExWebRTC.PeerConnection do @impl true def handle_cast({:send_pli, track_id, rid}, state) do state.transceivers - |> Enum.find(fn tr -> tr.receiver.track.id == track_id end) + |> Enum.with_index() + |> Enum.find(fn {tr, _idx} -> tr.receiver.track.id == track_id end) |> case do - %{receiver: %{layers: %{^rid => %{ssrc: ssrc}}}} when ssrc != nil -> - encoded = - %ExRTCP.Packet.PayloadFeedback.PLI{sender_ssrc: 1, media_ssrc: ssrc} - |> ExRTCP.Packet.encode() + {tr, idx} -> + case RTPTransceiver.get_pli(tr, rid) do + {pli, tr} -> + encoded = ExRTCP.Packet.encode(pli) + :ok = DTLSTransport.send_rtcp(state.dtls_transport, encoded) + {:noreply, %{state | transceivers: List.replace_at(state.transceivers, idx, tr)}} - :ok = DTLSTransport.send_rtcp(state.dtls_transport, encoded) + :error -> + Logger.warning( + "Unable to send PLI for track #{inspect(track_id)}, rid #{inspect(rid)}" + ) + + {:noreply, state} + end nil -> Logger.warning("Attempted to send PLI for non existent track #{inspect(track_id)}") - - _other -> - Logger.warning("Unable to send PLI for track #{inspect(track_id)}, rid #{inspect(rid)}") + {:noreply, state} end - - {:noreply, state} end @impl true @@ -1852,6 +1835,21 @@ defmodule ExWebRTC.PeerConnection do end end + defp handle_rtcp_packet(state, %ExRTCP.Packet.PayloadFeedback.PLI{} = pli) do + state.transceivers + |> Enum.with_index() + |> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == pli.media_ssrc end) + |> case do + nil -> + state + + {tr, idx} -> + tr = RTPTransceiver.receive_pli(tr, pli) + transceivers = List.replace_at(state.transceivers, idx, tr) + %{state | transceivers: transceivers} + end + end + defp handle_rtcp_packet(state, _packet), do: state defp do_get_description(nil, _candidates), do: nil diff --git a/lib/ex_webrtc/rtp_receiver.ex b/lib/ex_webrtc/rtp_receiver.ex index 5dfbf7e..63552c6 100644 --- a/lib/ex_webrtc/rtp_receiver.ex +++ b/lib/ex_webrtc/rtp_receiver.ex @@ -5,7 +5,7 @@ defmodule ExWebRTC.RTPReceiver do require Logger - alias ExRTCP.Packet.TransportFeedback.NACK + alias ExRTCP.Packet.{TransportFeedback.NACK, PayloadFeedback.PLI} alias ExSDP.Attribute.Extmap alias ExWebRTC.{MediaStreamTrack, Utils, RTPCodecParameters} alias __MODULE__.{NACKGenerator, ReportRecorder, SimulcastDemuxer} @@ -29,6 +29,8 @@ defmodule ExWebRTC.RTPReceiver do bytes_received: non_neg_integer(), packets_received: non_neg_integer(), markers_received: non_neg_integer(), + nack_count: non_neg_integer(), + pli_count: non_neg_integer(), report_recorder: ReportRecorder.t(), nack_generator: NACKGenerator.t() } @@ -221,7 +223,13 @@ defmodule ExWebRTC.RTPReceiver do Enum.map_reduce(receiver.layers, [], fn {rid, layer}, nacks -> {nack, nack_generator} = NACKGenerator.get_feedback(layer.nack_generator) nacks = if(nack != nil, do: [nack | nacks], else: nacks) - layer = %{layer | nack_generator: nack_generator} + + layer = %{ + layer + | nack_generator: nack_generator, + nack_count: layer.nack_count + if(nack != nil, do: 1, else: 0) + } + {{rid, layer}, nacks} end) @@ -229,21 +237,39 @@ defmodule ExWebRTC.RTPReceiver do {nacks, receiver} end + @doc false + @spec get_pli(receiver(), String.t() | nil) :: {PLI.t(), receiver()} | :error + def get_pli(receiver, rid) do + case receiver do + %{layers: %{^rid => %{ssrc: ssrc} = layer}} when ssrc != nil -> + layer = %{layer | pli_count: layer.pli_count + 1} + pli = %PLI{sender_ssrc: 1, media_ssrc: ssrc} + {pli, %{receiver | layers: Map.put(receiver.layers, rid, layer)}} + + _other -> + :error + end + end + @doc false @spec get_stats(receiver(), non_neg_integer()) :: [map()] def get_stats(receiver, timestamp) do Enum.map(receiver.layers, fn {rid, layer} -> id = if(rid == nil, do: receiver.track.id, else: "#{receiver.track.id}:#{rid}") + codec = receiver.codec && String.split(receiver.codec.mime_type, "/") |> List.last() %{ id: id, rid: rid, + codec: codec, type: :inbound_rtp, timestamp: timestamp, ssrc: layer.ssrc, bytes_received: layer.bytes_received, packets_received: layer.packets_received, - markers_received: layer.markers_received + markers_received: layer.markers_received, + nack_count: layer.nack_count, + pli_count: layer.pli_count } end) end @@ -259,7 +285,9 @@ defmodule ExWebRTC.RTPReceiver do packets_received: 0, markers_received: 0, report_recorder: report_recorder, - nack_generator: %NACKGenerator{} + nack_generator: %NACKGenerator{}, + nack_count: 0, + pli_count: 0 } end end diff --git a/lib/ex_webrtc/rtp_sender.ex b/lib/ex_webrtc/rtp_sender.ex index e1c3e07..04779df 100644 --- a/lib/ex_webrtc/rtp_sender.ex +++ b/lib/ex_webrtc/rtp_sender.ex @@ -3,6 +3,7 @@ defmodule ExWebRTC.RTPSender do Implementation of the [RTCRtpSender](https://www.w3.org/TR/webrtc/#rtcrtpsender-interface). """ + alias ExRTCP.Packet.{TransportFeedback.NACK, PayloadFeedback.PLI} alias ExWebRTC.{MediaStreamTrack, RTPCodecParameters, Utils} alias ExSDP.Attribute.Extmap alias __MODULE__.{NACKResponder, ReportRecorder} @@ -24,7 +25,11 @@ defmodule ExWebRTC.RTPSender do rtx_ssrc: non_neg_integer() | nil, packets_sent: non_neg_integer(), bytes_sent: non_neg_integer(), + retransmitted_packets_sent: non_neg_integer(), + retransmitted_bytes_sent: non_neg_integer(), markers_sent: non_neg_integer(), + nack_count: non_neg_integer(), + pli_count: non_neg_integer(), reports?: boolean(), outbound_rtx?: boolean(), report_recorder: ReportRecorder.t(), @@ -86,7 +91,11 @@ defmodule ExWebRTC.RTPSender do mid: mid, packets_sent: 0, bytes_sent: 0, + retransmitted_packets_sent: 0, + retransmitted_bytes_sent: 0, markers_sent: 0, + nack_count: 0, + pli_count: 0, reports?: :rtcp_reports in features, outbound_rtx?: :outbound_rtx in features, report_recorder: %ReportRecorder{clock_rate: codec && codec.clock_rate}, @@ -159,6 +168,17 @@ defmodule ExWebRTC.RTPSender do data = ExRTP.Packet.encode(packet) + sender = + if rtx? do + %{ + sender + | retransmitted_packets_sent: sender.retransmitted_packets_sent + 1, + retransmitted_bytes_sent: sender.retransmitted_bytes_sent + byte_size(data) + } + else + sender + end + sender = %{ sender | packets_sent: sender.packets_sent + 1, @@ -172,15 +192,20 @@ defmodule ExWebRTC.RTPSender do end @doc false - @spec receive_nack(sender(), ExRTCP.Packet.TransportFeedback.NACK.t()) :: - {[ExRTP.Packet.t()], sender()} + @spec receive_nack(sender(), NACK.t()) :: {[ExRTP.Packet.t()], sender()} def receive_nack(sender, nack) do {packets, nack_responder} = NACKResponder.get_rtx(sender.nack_responder, nack) - sender = %{sender | nack_responder: nack_responder} + sender = %{sender | nack_responder: nack_responder, nack_count: sender.nack_count + 1} {packets, sender} end + @doc false + @spec receive_pli(sender(), PLI.t()) :: sender() + def receive_pli(sender, _pli) do + %{sender | pli_count: sender.pli_count + 1} + end + @doc false @spec get_reports(sender()) :: {[ExRTCP.Packet.SenderReport.t()], sender()} def get_reports(sender) do @@ -204,7 +229,11 @@ defmodule ExWebRTC.RTPSender do ssrc: sender.ssrc, packets_sent: sender.packets_sent, bytes_sent: sender.bytes_sent, - markers_sent: sender.markers_sent + markers_sent: sender.markers_sent, + retransmitted_packets_sent: sender.retransmitted_packets_sent, + retransmitted_bytes_sent: sender.retransmitted_bytes_sent, + nack_count: sender.nack_count, + pli_count: sender.pli_count } end end diff --git a/lib/ex_webrtc/rtp_transceiver.ex b/lib/ex_webrtc/rtp_transceiver.ex index 2105257..56b99cb 100644 --- a/lib/ex_webrtc/rtp_transceiver.ex +++ b/lib/ex_webrtc/rtp_transceiver.ex @@ -15,7 +15,7 @@ defmodule ExWebRTC.RTPTransceiver do Utils } - alias ExRTCP.Packet.{ReceiverReport, SenderReport, TransportFeedback.NACK} + alias ExRTCP.Packet.{ReceiverReport, SenderReport, TransportFeedback.NACK, PayloadFeedback.PLI} @report_interval 1000 @nack_interval 100 @@ -355,6 +355,13 @@ defmodule ExWebRTC.RTPTransceiver do {packets, transceiver} end + @doc false + @spec receive_pli(transceiver(), PLI.t()) :: transceiver() + def receive_pli(transceiver, pli) do + sender = RTPSender.receive_pli(transceiver.sender, pli) + %{transceiver | sender: sender} + end + @doc false @spec send_packet(transceiver(), ExRTP.Packet.t(), boolean()) :: {binary(), transceiver()} def send_packet(transceiver, packet, rtx?) do @@ -395,6 +402,15 @@ defmodule ExWebRTC.RTPTransceiver do {nacks, transceiver} end + @doc false + @spec get_pli(transceiver(), String.t() | nil) :: {PLI.t(), transceiver()} | :error + def get_pli(transceiver, rid) do + case RTPReceiver.get_pli(transceiver.receiver, rid) do + :error -> :error + {pli, receiver} -> {pli, %{transceiver | receiver: receiver}} + end + end + @doc false @spec to_answer_mline(transceiver(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t() def to_answer_mline(transceiver, mline, opts) do @@ -467,6 +483,32 @@ defmodule ExWebRTC.RTPTransceiver do %{transceiver | direction: :inactive, stopping: true} end + @doc false + @spec get_stats(transceiver(), non_neg_integer()) :: [map()] + def get_stats(transceiver, timestamp) do + tr_stats = %{kind: transceiver.kind, mid: transceiver.mid} + + case transceiver.current_direction do + :sendonly -> + stats = RTPSender.get_stats(transceiver.sender, timestamp) + [Map.merge(stats, tr_stats)] + + :recvonly -> + stats = RTPReceiver.get_stats(transceiver.receiver, timestamp) + Enum.map(stats, &Map.merge(&1, tr_stats)) + + :sendrecv -> + sender_stats = RTPSender.get_stats(transceiver.sender, timestamp) + receiver_stats = RTPReceiver.get_stats(transceiver.receiver, timestamp) + + [Map.merge(sender_stats, tr_stats)] ++ + Enum.map(receiver_stats, &Map.merge(&1, tr_stats)) + + _other -> + [] + end + end + defp to_mline(transceiver, opts) do pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end) diff --git a/test/ex_webrtc/rtp_receiver_test.exs b/test/ex_webrtc/rtp_receiver_test.exs index d1652f1..d1ac282 100644 --- a/test/ex_webrtc/rtp_receiver_test.exs +++ b/test/ex_webrtc/rtp_receiver_test.exs @@ -28,7 +28,10 @@ defmodule ExWebRTC.RTPReceiverTest do ssrc: 1234, bytes_received: byte_size(raw_packet1), packets_received: 1, - markers_received: 0 + markers_received: 0, + codec: "opus", + nack_count: 0, + pli_count: 0 } ] == RTPReceiver.get_stats(receiver, timestamp) @@ -45,7 +48,10 @@ defmodule ExWebRTC.RTPReceiverTest do ssrc: 1234, bytes_received: byte_size(raw_packet1) + byte_size(raw_packet2), packets_received: 2, - markers_received: 1 + markers_received: 1, + codec: "opus", + nack_count: 0, + pli_count: 0 } ] == RTPReceiver.get_stats(receiver, timestamp) end diff --git a/test/ex_webrtc/rtp_sender_test.exs b/test/ex_webrtc/rtp_sender_test.exs index 6793ada..c56dcd6 100644 --- a/test/ex_webrtc/rtp_sender_test.exs +++ b/test/ex_webrtc/rtp_sender_test.exs @@ -64,7 +64,11 @@ defmodule ExWebRTC.RTPSenderTest do ssrc: sender.ssrc, packets_sent: 0, bytes_sent: 0, - markers_sent: 0 + markers_sent: 0, + nack_count: 0, + pli_count: 0, + retransmitted_packets_sent: 0, + retransmitted_bytes_sent: 0 } == RTPSender.get_stats(sender, timestamp) packet = ExRTP.Packet.new(payload) @@ -77,7 +81,11 @@ defmodule ExWebRTC.RTPSenderTest do ssrc: sender.ssrc, packets_sent: 1, bytes_sent: byte_size(data1), - markers_sent: 0 + markers_sent: 0, + nack_count: 0, + pli_count: 0, + retransmitted_packets_sent: 0, + retransmitted_bytes_sent: 0 } == RTPSender.get_stats(sender, timestamp) packet = ExRTP.Packet.new(payload, marker: true) @@ -90,7 +98,11 @@ defmodule ExWebRTC.RTPSenderTest do ssrc: sender.ssrc, packets_sent: 2, bytes_sent: byte_size(data1) + byte_size(data2), - markers_sent: 1 + markers_sent: 1, + nack_count: 0, + pli_count: 0, + retransmitted_packets_sent: 0, + retransmitted_bytes_sent: 0 } == RTPSender.get_stats(sender, timestamp) end end