From b142d047a74456d380bd05fd0bcdf8ccbbde4e51 Mon Sep 17 00:00:00 2001 From: Jakub Pisarek <99591440+sgfn@users.noreply.github.com> Date: Thu, 8 Aug 2024 13:49:34 +0200 Subject: [PATCH] Dynamic dispatch for (de)payloaders (#152) --- .../lib/save_to_file/peer_handler.ex | 13 +++--- .../lib/send_from_file/peer_handler.ex | 10 ++--- lib/ex_webrtc/rtp/depayloader.ex | 41 +++++++++++++++-- lib/ex_webrtc/rtp/payloader.ex | 39 ++++++++++++++-- test/ex_webrtc/rtp/depayloader_test.exs | 44 +++++++++++++++++++ test/ex_webrtc/rtp/payloader_test.exs | 41 +++++++++++++++++ 6 files changed, 172 insertions(+), 16 deletions(-) create mode 100644 test/ex_webrtc/rtp/depayloader_test.exs create mode 100644 test/ex_webrtc/rtp/payloader_test.exs diff --git a/examples/save_to_file/lib/save_to_file/peer_handler.ex b/examples/save_to_file/lib/save_to_file/peer_handler.ex index b991f08..ca32a73 100644 --- a/examples/save_to_file/lib/save_to_file/peer_handler.ex +++ b/examples/save_to_file/lib/save_to_file/peer_handler.ex @@ -10,7 +10,7 @@ defmodule SaveToFile.PeerHandler do } alias ExWebRTC.Media.{IVF, Ogg} - alias ExWebRTC.RTP.{Opus, VP8} + alias ExWebRTC.RTP.Depayloader @behaviour WebSock @@ -141,9 +141,11 @@ defmodule SaveToFile.PeerHandler do timebase_num: 1 ) + {:ok, video_depayloader} = @video_codecs |> hd() |> Depayloader.new() + state = %{ state - | video_depayloader: VP8.Depayloader.new(), + | video_depayloader: video_depayloader, video_writer: video_writer, video_track_id: id } @@ -154,10 +156,11 @@ defmodule SaveToFile.PeerHandler do defp handle_webrtc_msg({:track, %MediaStreamTrack{kind: :audio, id: id}}, state) do # by default uses 1 mono channel and 48k clock rate {:ok, audio_writer} = Ogg.Writer.open(@audio_file) + {:ok, audio_depayloader} = @audio_codecs |> hd() |> Depayloader.new() state = %{ state - | audio_depayloader: Opus.Depayloader.new(), + | audio_depayloader: audio_depayloader, audio_writer: audio_writer, audio_track_id: id } @@ -173,7 +176,7 @@ defmodule SaveToFile.PeerHandler do defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do state = - case VP8.Depayloader.depayload(state.video_depayloader, packet) do + case Depayloader.depayload(state.video_depayloader, packet) do {nil, video_depayloader} -> %{state | video_depayloader: video_depayloader} @@ -193,7 +196,7 @@ defmodule SaveToFile.PeerHandler do end defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do - {opus_packet, depayloader} = Opus.Depayloader.depayload(state.audio_depayloader, packet) + {opus_packet, depayloader} = Depayloader.depayload(state.audio_depayloader, packet) {:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet) {:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}} diff --git a/examples/send_from_file/lib/send_from_file/peer_handler.ex b/examples/send_from_file/lib/send_from_file/peer_handler.ex index dcf9b18..bbc2f91 100644 --- a/examples/send_from_file/lib/send_from_file/peer_handler.ex +++ b/examples/send_from_file/lib/send_from_file/peer_handler.ex @@ -12,7 +12,7 @@ defmodule SendFromFile.PeerHandler do } alias ExWebRTC.Media.{IVF, Ogg} - alias ExWebRTC.RTP.{Opus, VP8} + alias ExWebRTC.RTP.Payloader @behaviour WebSock @@ -60,10 +60,10 @@ defmodule SendFromFile.PeerHandler do {:ok, _sender} = PeerConnection.add_track(pc, audio_track) {:ok, _header, video_reader} = IVF.Reader.open(@video_file) - video_payloader = VP8.Payloader.new(800) + {:ok, video_payloader} = @video_codecs |> hd() |> Payloader.new(800) {:ok, audio_reader} = Ogg.Reader.open(@audio_file) - audio_payloader = Opus.Payloader.new() + {:ok, audio_payloader} = @audio_codecs |> hd() |> Payloader.new() state = %{ peer_connection: pc, @@ -114,7 +114,7 @@ defmodule SendFromFile.PeerHandler do case IVF.Reader.next_frame(state.video_reader) do {:ok, frame} -> - {rtp_packets, payloader} = VP8.Payloader.payload(state.video_payloader, frame.data) + {rtp_packets, payloader} = Payloader.payload(state.video_payloader, frame.data) # 3_000 = 90_000 (VP8 clock rate) / 30 FPS next_sequence_number = @@ -160,7 +160,7 @@ defmodule SendFromFile.PeerHandler do # and time spent on reading and parsing the file Process.send_after(self(), :send_audio, duration) - {[rtp_packet], payloader} = Opus.Payloader.payload(state.audio_payloader, packet) + {[rtp_packet], payloader} = Payloader.payload(state.audio_payloader, packet) rtp_packet = %{ rtp_packet diff --git a/lib/ex_webrtc/rtp/depayloader.ex b/lib/ex_webrtc/rtp/depayloader.ex index 99c8d58..3037515 100644 --- a/lib/ex_webrtc/rtp/depayloader.ex +++ b/lib/ex_webrtc/rtp/depayloader.ex @@ -1,14 +1,14 @@ defmodule ExWebRTC.RTP.Depayloader do @moduledoc """ - Behaviour for ExWebRTC Depayloaders. + Dispatcher module and behaviour for ExWebRTC Depayloaders. """ + alias ExWebRTC.RTPCodecParameters + @type depayloader :: struct() @doc """ Creates a new depayloader struct. - - Refer to the modules implementing the behaviour for available options. """ @callback new(options :: any()) :: depayloader() @@ -20,4 +20,39 @@ defmodule ExWebRTC.RTP.Depayloader do """ @callback depayload(depayloader(), packet :: ExRTP.Packet.t()) :: {binary() | nil, depayloader()} + + @doc """ + Creates a new depayloader struct that matches the passed codec parameters. + + Refer to the modules implementing the behaviour for available options. + """ + @spec new(RTPCodecParameters.t(), any()) :: + {:ok, depayloader()} | {:error, :no_depayloader_for_codec} + def new(codec_params, options \\ nil) do + with {:ok, module} <- match_depayloader_module(codec_params.mime_type) do + depayloader = if is_nil(options), do: module.new(), else: module.new(options) + + {:ok, depayloader} + end + end + + @doc """ + Processes binary data from a single RTP packet using the depayloader's module, + and outputs a frame if assembled. + + Returns the frame (or `nil` if a frame could not be depayloaded yet) + together with the updated depayloader struct. + """ + @spec depayload(depayloader(), ExRTP.Packet.t()) :: {binary() | nil, depayloader()} + def depayload(%module{} = depayloader, packet) do + module.depayload(depayloader, packet) + end + + defp match_depayloader_module(mime_type) do + case String.downcase(mime_type) do + "video/vp8" -> {:ok, ExWebRTC.RTP.VP8.Depayloader} + "audio/opus" -> {:ok, ExWebRTC.RTP.Opus.Depayloader} + _other -> {:error, :no_depayloader_for_codec} + end + end end diff --git a/lib/ex_webrtc/rtp/payloader.ex b/lib/ex_webrtc/rtp/payloader.ex index c3a4fd9..8f17135 100644 --- a/lib/ex_webrtc/rtp/payloader.ex +++ b/lib/ex_webrtc/rtp/payloader.ex @@ -1,14 +1,14 @@ defmodule ExWebRTC.RTP.Payloader do @moduledoc """ - Behaviour for ExWebRTC Payloaders. + Dispatcher module and behaviour for ExWebRTC Payloaders. """ + alias ExWebRTC.RTPCodecParameters + @type payloader :: struct() @doc """ Creates a new payloader struct. - - Refer to the modules implementing the behaviour for available options. """ @callback new(options :: any()) :: payloader() @@ -18,4 +18,37 @@ defmodule ExWebRTC.RTP.Payloader do Returns the packets together with the updated payloader struct. """ @callback payload(payloader(), frame :: binary()) :: {[ExRTP.Packet.t()], payloader()} + + @doc """ + Creates a new payloader struct that matches the passed codec parameters. + + Refer to the modules implementing the behaviour for available options. + """ + @spec new(RTPCodecParameters.t(), any()) :: + {:ok, payloader()} | {:error, :no_payloader_for_codec} + def new(codec_params, options \\ nil) do + with {:ok, module} <- match_payloader_module(codec_params.mime_type) do + payloader = if is_nil(options), do: module.new(), else: module.new(options) + + {:ok, payloader} + end + end + + @doc """ + Packs a frame into one or more RTP packets using the payloader's module. + + Returns the packets together with the updated payloader struct. + """ + @spec payload(payloader(), binary()) :: {[ExRTP.Packet.t()], payloader()} + def payload(%module{} = payloader, frame) do + module.payload(payloader, frame) + end + + defp match_payloader_module(mime_type) do + case String.downcase(mime_type) do + "video/vp8" -> {:ok, ExWebRTC.RTP.VP8.Payloader} + "audio/opus" -> {:ok, ExWebRTC.RTP.Opus.Payloader} + _other -> {:error, :no_payloader_for_codec} + end + end end diff --git a/test/ex_webrtc/rtp/depayloader_test.exs b/test/ex_webrtc/rtp/depayloader_test.exs new file mode 100644 index 0000000..2f6ba3f --- /dev/null +++ b/test/ex_webrtc/rtp/depayloader_test.exs @@ -0,0 +1,44 @@ +defmodule ExWebRTC.RTP.DepayloaderTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTPCodecParameters + alias ExWebRTC.RTP.Depayloader + alias ExWebRTC.RTP.{Opus, VP8} + + @packet %ExRTP.Packet{ + payload_type: 96, + sequence_number: 0, + timestamp: 0, + ssrc: 0, + payload: <<0, 1, 2, 3>> + } + + test "creates a VP8 depayloader and dispatches calls to its module" do + assert {:ok, depayloader} = + %RTPCodecParameters{payload_type: 96, mime_type: "video/VP8", clock_rate: 90_000} + |> Depayloader.new() + + assert Depayloader.depayload(depayloader, @packet) == + VP8.Depayloader.depayload(depayloader, @packet) + end + + test "creates an Opus depayloader and dispatches calls to its module" do + assert {:ok, depayloader} = + %RTPCodecParameters{ + payload_type: 96, + mime_type: "audio/opus", + clock_rate: 48_000, + channels: 2 + } + |> Depayloader.new() + + assert Depayloader.depayload(depayloader, @packet) == + Opus.Depayloader.depayload(depayloader, @packet) + end + + test "returns error if no depayloader exists for given codec" do + assert {:error, :no_depayloader_for_codec} = + %RTPCodecParameters{payload_type: 97, mime_type: "video/H264", clock_rate: 90_000} + |> Depayloader.new() + end +end diff --git a/test/ex_webrtc/rtp/payloader_test.exs b/test/ex_webrtc/rtp/payloader_test.exs new file mode 100644 index 0000000..2f317de --- /dev/null +++ b/test/ex_webrtc/rtp/payloader_test.exs @@ -0,0 +1,41 @@ +defmodule ExWebRTC.RTP.PayloaderTest do + use ExUnit.Case, async: true + + alias ExWebRTC.RTPCodecParameters + alias ExWebRTC.RTP.Payloader + alias ExWebRTC.RTP.{Opus, VP8} + + @frame <<0, 1, 2, 3>> + + test "creates a VP8 payloader and dispatches calls to its module" do + assert {:ok, _payloader} = + %RTPCodecParameters{payload_type: 96, mime_type: "video/VP8", clock_rate: 90_000} + |> Payloader.new() + + # with options + assert {:ok, payloader} = + %RTPCodecParameters{payload_type: 96, mime_type: "video/VP8", clock_rate: 90_000} + |> Payloader.new(800) + + assert Payloader.payload(payloader, @frame) == VP8.Payloader.payload(payloader, @frame) + end + + test "creates an Opus payloader and dispatches calls to its module" do + assert {:ok, payloader} = + %RTPCodecParameters{ + payload_type: 111, + mime_type: "audio/opus", + clock_rate: 48_000, + channels: 2 + } + |> Payloader.new() + + assert Payloader.payload(payloader, @frame) == Opus.Payloader.payload(payloader, @frame) + end + + test "returns error if no payloader exists for given codec" do + assert {:error, :no_payloader_for_codec} = + %RTPCodecParameters{payload_type: 97, mime_type: "video/H264", clock_rate: 90_000} + |> Payloader.new() + end +end