diff --git a/examples/save_to_file/lib/save_to_file/peer_handler.ex b/examples/save_to_file/lib/save_to_file/peer_handler.ex index b28373b..0ecc520 100644 --- a/examples/save_to_file/lib/save_to_file/peer_handler.ex +++ b/examples/save_to_file/lib/save_to_file/peer_handler.ex @@ -54,6 +54,7 @@ defmodule SaveToFile.PeerHandler do video_writer: nil, video_depayloader: nil, audio_writer: nil, + audio_payloader: nil, frames_cnt: 0 } @@ -154,7 +155,13 @@ defmodule SaveToFile.PeerHandler do # by default uses 1 mono channel and 48k clock rate {:ok, audio_writer} = Ogg.Writer.open(@audio_file) - state = %{state | audio_writer: audio_writer, audio_track_id: id} + state = %{ + state + | audio_depayloader: Opus.Depayloader.new(), + audio_writer: audio_writer, + audio_track_id: id + } + {:ok, state} end @@ -166,11 +173,11 @@ defmodule SaveToFile.PeerHandler do defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do state = - case VP8.Depayloader.write(state.video_depayloader, packet) do - {:ok, video_depayloader} -> + case VP8.Depayloader.depayload(state.video_depayloader, packet) do + {nil, video_depayloader} -> %{state | video_depayloader: video_depayloader} - {:ok, vp8_frame, video_depayloader} -> + {vp8_frame, video_depayloader} -> frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame} {:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame) @@ -186,10 +193,10 @@ defmodule SaveToFile.PeerHandler do end defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do - opus_packet = Opus.Depayloader.depayload(packet) + {opus_packet, depayloader} = Opus.Depayloader.depayload(state.audio_depayloader, packet) {:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet) - {:ok, %{state | audio_writer: audio_writer}} + {:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}} end defp handle_webrtc_msg(_msg, state), do: {:ok, state} diff --git a/examples/send_from_file/lib/send_from_file/peer_handler.ex b/examples/send_from_file/lib/send_from_file/peer_handler.ex index eeb37c2..dcf9b18 100644 --- a/examples/send_from_file/lib/send_from_file/peer_handler.ex +++ b/examples/send_from_file/lib/send_from_file/peer_handler.ex @@ -63,6 +63,7 @@ defmodule SendFromFile.PeerHandler do video_payloader = VP8.Payloader.new(800) {:ok, audio_reader} = Ogg.Reader.open(@audio_file) + audio_payloader = Opus.Payloader.new() state = %{ peer_connection: pc, @@ -71,6 +72,7 @@ defmodule SendFromFile.PeerHandler do video_reader: video_reader, video_payloader: video_payloader, audio_reader: audio_reader, + audio_payloader: audio_payloader, next_video_timestamp: Enum.random(0..@max_rtp_timestamp), next_audio_timestamp: Enum.random(0..@max_rtp_timestamp), next_video_sequence_number: Enum.random(0..@max_rtp_seq_no), @@ -158,7 +160,7 @@ defmodule SendFromFile.PeerHandler do # and time spent on reading and parsing the file Process.send_after(self(), :send_audio, duration) - rtp_packet = Opus.Payloader.payload(packet) + {[rtp_packet], payloader} = Opus.Payloader.payload(state.audio_payloader, packet) rtp_packet = %{ rtp_packet @@ -177,6 +179,7 @@ defmodule SendFromFile.PeerHandler do state = %{ state | audio_reader: reader, + audio_payloader: payloader, next_audio_timestamp: next_timestamp, next_audio_sequence_number: next_sequence_number } diff --git a/lib/ex_webrtc/rtp/depayloader.ex b/lib/ex_webrtc/rtp/depayloader.ex new file mode 100644 index 0000000..11294ea --- /dev/null +++ b/lib/ex_webrtc/rtp/depayloader.ex @@ -0,0 +1,23 @@ +defmodule ExWebRTC.RTP.Depayloader do + @moduledoc """ + Behaviour for ExWebRTC Depayloaders. + """ + + @type depayloader :: struct() + + @doc """ + Creates a new depayloader struct. + + Refer to the modules implementing the behaviour for available options. + """ + @callback new(options :: any()) :: depayloader() + + @doc """ + Processes binary data from a single RTP packet, and outputs a frame if assembled. + + Returns the frame (or `nil` if a frame could not be decoded yet) + together with the updated depayloader struct. + """ + @callback depayload(depayloader(), packet :: ExRTP.Packet.t()) :: + {binary() | nil, depayloader()} +end diff --git a/lib/ex_webrtc/rtp/opus/depayloader.ex b/lib/ex_webrtc/rtp/opus/depayloader.ex index ad89c5b..89e9d11 100644 --- a/lib/ex_webrtc/rtp/opus/depayloader.ex +++ b/lib/ex_webrtc/rtp/opus/depayloader.ex @@ -7,9 +7,31 @@ defmodule ExWebRTC.RTP.Opus.Depayloader do alias ExRTP.Packet + @behaviour ExWebRTC.RTP.Depayloader + + @opaque t :: %__MODULE__{} + + @enforce_keys [] + defstruct @enforce_keys + + @doc """ + Creates a new Opus depayloader struct. + + Does not take any options/parameters. + """ + @impl true + @spec new(any()) :: t() + def new(_unused \\ nil) do + %__MODULE__{} + end + @doc """ Takes Opus packet out of an RTP packet. + + Always returns a binary as the first element. """ - @spec depayload(Packet.t()) :: binary() - def depayload(%Packet{payload: payload}), do: payload + @impl true + @spec depayload(t(), Packet.t()) :: {binary(), t()} + def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload}), + do: {payload, depayloader} end diff --git a/lib/ex_webrtc/rtp/opus/payloader.ex b/lib/ex_webrtc/rtp/opus/payloader.ex index ef1f3f7..ed91b5f 100644 --- a/lib/ex_webrtc/rtp/opus/payloader.ex +++ b/lib/ex_webrtc/rtp/opus/payloader.ex @@ -5,13 +5,33 @@ defmodule ExWebRTC.RTP.Opus.Payloader do Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587). """ + @behaviour ExWebRTC.RTP.Payloader + + @opaque t :: %__MODULE__{} + + @enforce_keys [] + defstruct @enforce_keys + + @doc """ + Creates a new Opus payloader struct. + + Does not take any options/parameters. + """ + @impl true + @spec new(any()) :: t() + def new(_unused \\ nil) do + %__MODULE__{} + end + @doc """ Packs Opus packet into an RTP packet. Fields from RTP header like ssrc, timestamp etc. are set to 0. + Always returns a single RTP packet. """ - @spec payload(binary()) :: ExRTP.Packet.t() - def payload(packet) when packet != <<>> do - ExRTP.Packet.new(packet) + @impl true + @spec payload(t(), binary()) :: {[ExRTP.Packet.t()], t()} + def payload(%__MODULE__{} = payloader, packet) when packet != <<>> do + {[ExRTP.Packet.new(packet)], payloader} end end diff --git a/lib/ex_webrtc/rtp/payloader.ex b/lib/ex_webrtc/rtp/payloader.ex new file mode 100644 index 0000000..c3a4fd9 --- /dev/null +++ b/lib/ex_webrtc/rtp/payloader.ex @@ -0,0 +1,21 @@ +defmodule ExWebRTC.RTP.Payloader do + @moduledoc """ + Behaviour for ExWebRTC Payloaders. + """ + + @type payloader :: struct() + + @doc """ + Creates a new payloader struct. + + Refer to the modules implementing the behaviour for available options. + """ + @callback new(options :: any()) :: payloader() + + @doc """ + Packs a frame into one or more RTP packets. + + Returns the packets together with the updated payloader struct. + """ + @callback payload(payloader(), frame :: binary()) :: {[ExRTP.Packet.t()], payloader()} +end diff --git a/lib/ex_webrtc/rtp/vp8/depayloader.ex b/lib/ex_webrtc/rtp/vp8/depayloader.ex index c56cf15..4456143 100644 --- a/lib/ex_webrtc/rtp/vp8/depayloader.ex +++ b/lib/ex_webrtc/rtp/vp8/depayloader.ex @@ -4,6 +4,9 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do Based on [RFC 7741: RTP Payload Format for VP8 Video](https://datatracker.ietf.org/doc/html/rfc7741). """ + + @behaviour ExWebRTC.RTP.Depayloader + require Logger alias ExWebRTC.RTP.VP8.Payload @@ -15,17 +18,30 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do defstruct [:current_frame, :current_timestamp] - @spec new() :: t() - def new() do + @doc """ + Creates a new VP8 depayloader struct. + + Does not take any options/parameters. + """ + @impl true + @spec new(any()) :: t() + def new(_unused \\ nil) do %__MODULE__{} end - @spec write(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:ok, binary(), t()} - def write(depayloader, packet) + @doc """ + Reassembles VP8 frames from subsequent RTP packets. + + Returns the frame (or `nil` if a frame could not be decoded yet) + together with the updated depayloader struct. + """ + @impl true + @spec depayload(t(), ExRTP.Packet.t()) :: {binary() | nil, t()} + def depayload(depayloader, packet) - def write(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {:ok, depayloader} + def depayload(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {nil, depayloader} - def write(depayloader, packet) do + def depayload(depayloader, packet) do case Payload.parse(packet.payload) do {:ok, vp8_payload} -> do_write(depayloader, packet, vp8_payload) @@ -80,10 +96,10 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do case {depayloader.current_frame, packet.marker} do {current_frame, true} when current_frame != nil -> - {:ok, current_frame, %{depayloader | current_frame: nil, current_timestamp: nil}} + {current_frame, %{depayloader | current_frame: nil, current_timestamp: nil}} _ -> - {:ok, depayloader} + {nil, depayloader} end end end diff --git a/lib/ex_webrtc/rtp/vp8/payloader.ex b/lib/ex_webrtc/rtp/vp8/payloader.ex index 27b24d1..09447be 100644 --- a/lib/ex_webrtc/rtp/vp8/payloader.ex +++ b/lib/ex_webrtc/rtp/vp8/payloader.ex @@ -8,6 +8,8 @@ defmodule ExWebRTC.RTP.VP8.Payloader do does not pay attention to VP8 partition boundaries (see RFC 7741 sec. 4.4). """ + @behaviour ExWebRTC.RTP.Payloader + @first_chunk_descriptor <<0::1, 0::1, 0::1, 1::1, 0::1, 0::3>> @next_chunk_descriptor <<0::1, 0::1, 0::1, 0::1, 0::1, 0::3>> @@ -18,8 +20,16 @@ defmodule ExWebRTC.RTP.VP8.Payloader do max_payload_size: non_neg_integer() } - defstruct [:max_payload_size] + @enforce_keys [:max_payload_size] + defstruct @enforce_keys + + @doc """ + Creates a new VP8 payloader struct. + The parameter `max_payload_size` determines the maximum size of a single RTP packet + outputted by the payloader. It must be greater than `100`, and is set to `1000` by default. + """ + @impl true @spec new(non_neg_integer()) :: t() def new(max_payload_size \\ 1000) when max_payload_size > 100 do %__MODULE__{max_payload_size: max_payload_size} @@ -30,8 +40,9 @@ defmodule ExWebRTC.RTP.VP8.Payloader do Fields from RTP header like ssrc, timestamp etc. are set to 0. """ + @impl true @spec payload(t(), frame :: binary()) :: {[ExRTP.Packet.t()], t()} - def payload(payloader, frame) when frame != <<>> do + def payload(%__MODULE__{} = payloader, frame) when frame != <<>> do rtp_payloads = chunk(frame, payloader.max_payload_size - @desc_size_bytes) [first_rtp_payload | next_rtp_payloads] = rtp_payloads diff --git a/test/ex_webrtc/rtp/vp8/depayloader_test.exs b/test/ex_webrtc/rtp/vp8/depayloader_test.exs index 7ff1c46..ada0ddc 100644 --- a/test/ex_webrtc/rtp/vp8/depayloader_test.exs +++ b/test/ex_webrtc/rtp/vp8/depayloader_test.exs @@ -14,8 +14,8 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do packet = ExRTP.Packet.new(vp8_payload, marker: true) - assert {:ok, ^data, %{current_frame: nil, current_timestamp: nil} = depayloader} = - Depayloader.write(depayloader, packet) + assert {^data, %{current_frame: nil, current_timestamp: nil} = depayloader} = + Depayloader.depayload(depayloader, packet) # packet that doesn't start a new frame vp8_payload = %Payload{n: 0, s: 0, pid: 0, payload: data} @@ -23,8 +23,8 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do packet = ExRTP.Packet.new(vp8_payload) - assert {:ok, %{current_frame: nil, current_timestamp: nil} = depayloader} = - Depayloader.write(depayloader, packet) + assert {nil, %{current_frame: nil, current_timestamp: nil} = depayloader} = + Depayloader.depayload(depayloader, packet) # packet that starts a new frame without finishing the previous one vp8_payload = %Payload{n: 0, s: 1, pid: 0, payload: data} @@ -32,8 +32,8 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do packet = ExRTP.Packet.new(vp8_payload) - assert {:ok, %{current_frame: ^data, current_timestamp: 0} = depayloader} = - Depayloader.write(depayloader, packet) + assert {nil, %{current_frame: ^data, current_timestamp: 0} = depayloader} = + Depayloader.depayload(depayloader, packet) data2 = data <> <<0>> vp8_payload = %Payload{n: 0, s: 1, pid: 0, payload: data2} @@ -41,8 +41,8 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do packet = ExRTP.Packet.new(vp8_payload, timestamp: 3000) - assert {:ok, %{current_frame: ^data2, current_timestamp: 3000} = depayloader} = - Depayloader.write(depayloader, packet) + assert {nil, %{current_frame: ^data2, current_timestamp: 3000} = depayloader} = + Depayloader.depayload(depayloader, packet) # packet with timestamp from a new frame that is not a beginning of this frame data2 = data @@ -51,7 +51,7 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do packet = ExRTP.Packet.new(vp8_payload, timestamp: 6000) - assert {:ok, %{current_frame: nil, current_timestamp: nil}} = - Depayloader.write(depayloader, packet) + assert {nil, %{current_frame: nil, current_timestamp: nil}} = + Depayloader.depayload(depayloader, packet) end end