Skip to content

Commit

Permalink
Improve inbound/outbound RTP stats (#134)
Browse files Browse the repository at this point in the history
  • Loading branch information
LVala authored Jul 10, 2024
1 parent 0eb774c commit 3afedc6
Show file tree
Hide file tree
Showing 6 changed files with 164 additions and 49 deletions.
68 changes: 33 additions & 35 deletions lib/ex_webrtc/peer_connection.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ defmodule ExWebRTC.PeerConnection do
MediaStreamTrack,
RTPTransceiver,
RTPSender,
RTPReceiver,
SDPUtils,
SessionDescription,
Utils
Expand Down Expand Up @@ -907,29 +906,8 @@ defmodule ExWebRTC.PeerConnection do
end)

rtp_stats =
Enum.flat_map(state.transceivers, fn tr ->
tr_stats = %{kind: tr.kind, mid: tr.mid}

case tr.current_direction do
:sendonly ->
stats = RTPSender.get_stats(tr.sender, timestamp)
[Map.merge(stats, tr_stats)]

:recvonly ->
stats = RTPReceiver.get_stats(tr.receiver, timestamp)
Enum.map(stats, &Map.merge(&1, tr_stats))

:sendrecv ->
sender_stats = RTPSender.get_stats(tr.sender, timestamp)
receiver_stats = RTPReceiver.get_stats(tr.receiver, timestamp)

[Map.merge(sender_stats, tr_stats)] ++
Enum.map(receiver_stats, &Map.merge(&1, tr_stats))

_other ->
[]
end
end)
state.transceivers
|> Enum.flat_map(&RTPTransceiver.get_stats(&1, timestamp))
|> Map.new(fn stats -> {stats.id, stats} end)

stats = %{
Expand Down Expand Up @@ -1026,23 +1004,28 @@ defmodule ExWebRTC.PeerConnection do
@impl true
def handle_cast({:send_pli, track_id, rid}, state) do
state.transceivers
|> Enum.find(fn tr -> tr.receiver.track.id == track_id end)
|> Enum.with_index()
|> Enum.find(fn {tr, _idx} -> tr.receiver.track.id == track_id end)
|> case do
%{receiver: %{layers: %{^rid => %{ssrc: ssrc}}}} when ssrc != nil ->
encoded =
%ExRTCP.Packet.PayloadFeedback.PLI{sender_ssrc: 1, media_ssrc: ssrc}
|> ExRTCP.Packet.encode()
{tr, idx} ->
case RTPTransceiver.get_pli(tr, rid) do
{pli, tr} ->
encoded = ExRTCP.Packet.encode(pli)
:ok = DTLSTransport.send_rtcp(state.dtls_transport, encoded)
{:noreply, %{state | transceivers: List.replace_at(state.transceivers, idx, tr)}}

:ok = DTLSTransport.send_rtcp(state.dtls_transport, encoded)
:error ->
Logger.warning(
"Unable to send PLI for track #{inspect(track_id)}, rid #{inspect(rid)}"
)

{:noreply, state}
end

nil ->
Logger.warning("Attempted to send PLI for non existent track #{inspect(track_id)}")

_other ->
Logger.warning("Unable to send PLI for track #{inspect(track_id)}, rid #{inspect(rid)}")
{:noreply, state}
end

{:noreply, state}
end

@impl true
Expand Down Expand Up @@ -1852,6 +1835,21 @@ defmodule ExWebRTC.PeerConnection do
end
end

defp handle_rtcp_packet(state, %ExRTCP.Packet.PayloadFeedback.PLI{} = pli) do
state.transceivers
|> Enum.with_index()
|> Enum.find(fn {tr, _idx} -> tr.sender.ssrc == pli.media_ssrc end)
|> case do
nil ->
state

{tr, idx} ->
tr = RTPTransceiver.receive_pli(tr, pli)
transceivers = List.replace_at(state.transceivers, idx, tr)
%{state | transceivers: transceivers}
end
end

defp handle_rtcp_packet(state, _packet), do: state

defp do_get_description(nil, _candidates), do: nil
Expand Down
36 changes: 32 additions & 4 deletions lib/ex_webrtc/rtp_receiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule ExWebRTC.RTPReceiver do

require Logger

alias ExRTCP.Packet.TransportFeedback.NACK
alias ExRTCP.Packet.{TransportFeedback.NACK, PayloadFeedback.PLI}
alias ExSDP.Attribute.Extmap
alias ExWebRTC.{MediaStreamTrack, Utils, RTPCodecParameters}
alias __MODULE__.{NACKGenerator, ReportRecorder, SimulcastDemuxer}
Expand All @@ -29,6 +29,8 @@ defmodule ExWebRTC.RTPReceiver do
bytes_received: non_neg_integer(),
packets_received: non_neg_integer(),
markers_received: non_neg_integer(),
nack_count: non_neg_integer(),
pli_count: non_neg_integer(),
report_recorder: ReportRecorder.t(),
nack_generator: NACKGenerator.t()
}
Expand Down Expand Up @@ -221,29 +223,53 @@ defmodule ExWebRTC.RTPReceiver do
Enum.map_reduce(receiver.layers, [], fn {rid, layer}, nacks ->
{nack, nack_generator} = NACKGenerator.get_feedback(layer.nack_generator)
nacks = if(nack != nil, do: [nack | nacks], else: nacks)
layer = %{layer | nack_generator: nack_generator}

layer = %{
layer
| nack_generator: nack_generator,
nack_count: layer.nack_count + if(nack != nil, do: 1, else: 0)
}

{{rid, layer}, nacks}
end)

receiver = %{receiver | layers: Map.new(layers)}
{nacks, receiver}
end

@doc false
@spec get_pli(receiver(), String.t() | nil) :: {PLI.t(), receiver()} | :error
def get_pli(receiver, rid) do
case receiver do
%{layers: %{^rid => %{ssrc: ssrc} = layer}} when ssrc != nil ->
layer = %{layer | pli_count: layer.pli_count + 1}
pli = %PLI{sender_ssrc: 1, media_ssrc: ssrc}
{pli, %{receiver | layers: Map.put(receiver.layers, rid, layer)}}

_other ->
:error
end
end

@doc false
@spec get_stats(receiver(), non_neg_integer()) :: [map()]
def get_stats(receiver, timestamp) do
Enum.map(receiver.layers, fn {rid, layer} ->
id = if(rid == nil, do: receiver.track.id, else: "#{receiver.track.id}:#{rid}")
codec = receiver.codec && String.split(receiver.codec.mime_type, "/") |> List.last()

%{
id: id,
rid: rid,
codec: codec,
type: :inbound_rtp,
timestamp: timestamp,
ssrc: layer.ssrc,
bytes_received: layer.bytes_received,
packets_received: layer.packets_received,
markers_received: layer.markers_received
markers_received: layer.markers_received,
nack_count: layer.nack_count,
pli_count: layer.pli_count
}
end)
end
Expand All @@ -259,7 +285,9 @@ defmodule ExWebRTC.RTPReceiver do
packets_received: 0,
markers_received: 0,
report_recorder: report_recorder,
nack_generator: %NACKGenerator{}
nack_generator: %NACKGenerator{},
nack_count: 0,
pli_count: 0
}
end
end
37 changes: 33 additions & 4 deletions lib/ex_webrtc/rtp_sender.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ defmodule ExWebRTC.RTPSender do
Implementation of the [RTCRtpSender](https://www.w3.org/TR/webrtc/#rtcrtpsender-interface).
"""

alias ExRTCP.Packet.{TransportFeedback.NACK, PayloadFeedback.PLI}
alias ExWebRTC.{MediaStreamTrack, RTPCodecParameters, Utils}
alias ExSDP.Attribute.Extmap
alias __MODULE__.{NACKResponder, ReportRecorder}
Expand All @@ -24,7 +25,11 @@ defmodule ExWebRTC.RTPSender do
rtx_ssrc: non_neg_integer() | nil,
packets_sent: non_neg_integer(),
bytes_sent: non_neg_integer(),
retransmitted_packets_sent: non_neg_integer(),
retransmitted_bytes_sent: non_neg_integer(),
markers_sent: non_neg_integer(),
nack_count: non_neg_integer(),
pli_count: non_neg_integer(),
reports?: boolean(),
outbound_rtx?: boolean(),
report_recorder: ReportRecorder.t(),
Expand Down Expand Up @@ -86,7 +91,11 @@ defmodule ExWebRTC.RTPSender do
mid: mid,
packets_sent: 0,
bytes_sent: 0,
retransmitted_packets_sent: 0,
retransmitted_bytes_sent: 0,
markers_sent: 0,
nack_count: 0,
pli_count: 0,
reports?: :rtcp_reports in features,
outbound_rtx?: :outbound_rtx in features,
report_recorder: %ReportRecorder{clock_rate: codec && codec.clock_rate},
Expand Down Expand Up @@ -159,6 +168,17 @@ defmodule ExWebRTC.RTPSender do

data = ExRTP.Packet.encode(packet)

sender =
if rtx? do
%{
sender
| retransmitted_packets_sent: sender.retransmitted_packets_sent + 1,
retransmitted_bytes_sent: sender.retransmitted_bytes_sent + byte_size(data)
}
else
sender
end

sender = %{
sender
| packets_sent: sender.packets_sent + 1,
Expand All @@ -172,15 +192,20 @@ defmodule ExWebRTC.RTPSender do
end

@doc false
@spec receive_nack(sender(), ExRTCP.Packet.TransportFeedback.NACK.t()) ::
{[ExRTP.Packet.t()], sender()}
@spec receive_nack(sender(), NACK.t()) :: {[ExRTP.Packet.t()], sender()}
def receive_nack(sender, nack) do
{packets, nack_responder} = NACKResponder.get_rtx(sender.nack_responder, nack)
sender = %{sender | nack_responder: nack_responder}
sender = %{sender | nack_responder: nack_responder, nack_count: sender.nack_count + 1}

{packets, sender}
end

@doc false
@spec receive_pli(sender(), PLI.t()) :: sender()
def receive_pli(sender, _pli) do
%{sender | pli_count: sender.pli_count + 1}
end

@doc false
@spec get_reports(sender()) :: {[ExRTCP.Packet.SenderReport.t()], sender()}
def get_reports(sender) do
Expand All @@ -204,7 +229,11 @@ defmodule ExWebRTC.RTPSender do
ssrc: sender.ssrc,
packets_sent: sender.packets_sent,
bytes_sent: sender.bytes_sent,
markers_sent: sender.markers_sent
markers_sent: sender.markers_sent,
retransmitted_packets_sent: sender.retransmitted_packets_sent,
retransmitted_bytes_sent: sender.retransmitted_bytes_sent,
nack_count: sender.nack_count,
pli_count: sender.pli_count
}
end
end
44 changes: 43 additions & 1 deletion lib/ex_webrtc/rtp_transceiver.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule ExWebRTC.RTPTransceiver do
Utils
}

alias ExRTCP.Packet.{ReceiverReport, SenderReport, TransportFeedback.NACK}
alias ExRTCP.Packet.{ReceiverReport, SenderReport, TransportFeedback.NACK, PayloadFeedback.PLI}

@report_interval 1000
@nack_interval 100
Expand Down Expand Up @@ -355,6 +355,13 @@ defmodule ExWebRTC.RTPTransceiver do
{packets, transceiver}
end

@doc false
@spec receive_pli(transceiver(), PLI.t()) :: transceiver()
def receive_pli(transceiver, pli) do
sender = RTPSender.receive_pli(transceiver.sender, pli)
%{transceiver | sender: sender}
end

@doc false
@spec send_packet(transceiver(), ExRTP.Packet.t(), boolean()) :: {binary(), transceiver()}
def send_packet(transceiver, packet, rtx?) do
Expand Down Expand Up @@ -395,6 +402,15 @@ defmodule ExWebRTC.RTPTransceiver do
{nacks, transceiver}
end

@doc false
@spec get_pli(transceiver(), String.t() | nil) :: {PLI.t(), transceiver()} | :error
def get_pli(transceiver, rid) do
case RTPReceiver.get_pli(transceiver.receiver, rid) do
:error -> :error
{pli, receiver} -> {pli, %{transceiver | receiver: receiver}}
end
end

@doc false
@spec to_answer_mline(transceiver(), ExSDP.Media.t(), Keyword.t()) :: ExSDP.Media.t()
def to_answer_mline(transceiver, mline, opts) do
Expand Down Expand Up @@ -467,6 +483,32 @@ defmodule ExWebRTC.RTPTransceiver do
%{transceiver | direction: :inactive, stopping: true}
end

@doc false
@spec get_stats(transceiver(), non_neg_integer()) :: [map()]
def get_stats(transceiver, timestamp) do
tr_stats = %{kind: transceiver.kind, mid: transceiver.mid}

case transceiver.current_direction do
:sendonly ->
stats = RTPSender.get_stats(transceiver.sender, timestamp)
[Map.merge(stats, tr_stats)]

:recvonly ->
stats = RTPReceiver.get_stats(transceiver.receiver, timestamp)
Enum.map(stats, &Map.merge(&1, tr_stats))

:sendrecv ->
sender_stats = RTPSender.get_stats(transceiver.sender, timestamp)
receiver_stats = RTPReceiver.get_stats(transceiver.receiver, timestamp)

[Map.merge(sender_stats, tr_stats)] ++
Enum.map(receiver_stats, &Map.merge(&1, tr_stats))

_other ->
[]
end
end

defp to_mline(transceiver, opts) do
pt = Enum.map(transceiver.codecs, fn codec -> codec.payload_type end)

Expand Down
10 changes: 8 additions & 2 deletions test/ex_webrtc/rtp_receiver_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,10 @@ defmodule ExWebRTC.RTPReceiverTest do
ssrc: 1234,
bytes_received: byte_size(raw_packet1),
packets_received: 1,
markers_received: 0
markers_received: 0,
codec: "opus",
nack_count: 0,
pli_count: 0
}
] == RTPReceiver.get_stats(receiver, timestamp)

Expand All @@ -45,7 +48,10 @@ defmodule ExWebRTC.RTPReceiverTest do
ssrc: 1234,
bytes_received: byte_size(raw_packet1) + byte_size(raw_packet2),
packets_received: 2,
markers_received: 1
markers_received: 1,
codec: "opus",
nack_count: 0,
pli_count: 0
}
] == RTPReceiver.get_stats(receiver, timestamp)
end
Expand Down
Loading

0 comments on commit 3afedc6

Please sign in to comment.