Skip to content

Commit

Permalink
Add behaviour for (de)payloaders
Browse files Browse the repository at this point in the history
  • Loading branch information
sgfn committed Aug 6, 2024
1 parent af1fec4 commit eb41e01
Show file tree
Hide file tree
Showing 9 changed files with 155 additions and 32 deletions.
19 changes: 13 additions & 6 deletions examples/save_to_file/lib/save_to_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ defmodule SaveToFile.PeerHandler do
video_writer: nil,
video_depayloader: nil,
audio_writer: nil,
audio_payloader: nil,
frames_cnt: 0
}

Expand Down Expand Up @@ -154,7 +155,13 @@ defmodule SaveToFile.PeerHandler do
# by default uses 1 mono channel and 48k clock rate
{:ok, audio_writer} = Ogg.Writer.open(@audio_file)

state = %{state | audio_writer: audio_writer, audio_track_id: id}
state = %{
state
| audio_depayloader: Opus.Depayloader.new(),
audio_writer: audio_writer,
audio_track_id: id
}

{:ok, state}
end

Expand All @@ -166,11 +173,11 @@ defmodule SaveToFile.PeerHandler do

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{video_track_id: id} = state) do
state =
case VP8.Depayloader.write(state.video_depayloader, packet) do
{:ok, video_depayloader} ->
case VP8.Depayloader.depayload(state.video_depayloader, packet) do
{nil, video_depayloader} ->
%{state | video_depayloader: video_depayloader}

{:ok, vp8_frame, video_depayloader} ->
{vp8_frame, video_depayloader} ->
frame = %IVF.Frame{timestamp: state.frames_cnt, data: vp8_frame}
{:ok, video_writer} = IVF.Writer.write_frame(state.video_writer, frame)

Expand All @@ -186,10 +193,10 @@ defmodule SaveToFile.PeerHandler do
end

defp handle_webrtc_msg({:rtp, id, nil, packet}, %{audio_track_id: id} = state) do
opus_packet = Opus.Depayloader.depayload(packet)
{opus_packet, depayloader} = Opus.Depayloader.depayload(state.audio_depayloader, packet)
{:ok, audio_writer} = Ogg.Writer.write_packet(state.audio_writer, opus_packet)

{:ok, %{state | audio_writer: audio_writer}}
{:ok, %{state | audio_depayloader: depayloader, audio_writer: audio_writer}}
end

defp handle_webrtc_msg(_msg, state), do: {:ok, state}
Expand Down
5 changes: 4 additions & 1 deletion examples/send_from_file/lib/send_from_file/peer_handler.ex
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ defmodule SendFromFile.PeerHandler do
video_payloader = VP8.Payloader.new(800)

{:ok, audio_reader} = Ogg.Reader.open(@audio_file)
audio_payloader = Opus.Payloader.new()

state = %{
peer_connection: pc,
Expand All @@ -71,6 +72,7 @@ defmodule SendFromFile.PeerHandler do
video_reader: video_reader,
video_payloader: video_payloader,
audio_reader: audio_reader,
audio_payloader: audio_payloader,
next_video_timestamp: Enum.random(0..@max_rtp_timestamp),
next_audio_timestamp: Enum.random(0..@max_rtp_timestamp),
next_video_sequence_number: Enum.random(0..@max_rtp_seq_no),
Expand Down Expand Up @@ -158,7 +160,7 @@ defmodule SendFromFile.PeerHandler do
# and time spent on reading and parsing the file
Process.send_after(self(), :send_audio, duration)

rtp_packet = Opus.Payloader.payload(packet)
{[rtp_packet], payloader} = Opus.Payloader.payload(state.audio_payloader, packet)

rtp_packet = %{
rtp_packet
Expand All @@ -177,6 +179,7 @@ defmodule SendFromFile.PeerHandler do
state = %{
state
| audio_reader: reader,
audio_payloader: payloader,
next_audio_timestamp: next_timestamp,
next_audio_sequence_number: next_sequence_number
}
Expand Down
23 changes: 23 additions & 0 deletions lib/ex_webrtc/rtp/depayloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
defmodule ExWebRTC.RTP.Depayloader do
@moduledoc """
Behaviour for ExWebRTC Depayloaders.
"""

@type depayloader :: struct()

@doc """
Creates a new depayloader struct.
Refer to the modules implementing the behaviour for available options.
"""
@callback new(options :: any()) :: depayloader()

@doc """
Processes binary data from a single RTP packet, and outputs a frame if assembled.
Returns the frame (or `nil` if a frame could not be decoded yet)
together with the updated depayloader struct.
"""
@callback depayload(depayloader(), packet :: ExRTP.Packet.t()) ::
{binary() | nil, depayloader()}
end
26 changes: 24 additions & 2 deletions lib/ex_webrtc/rtp/opus/depayloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,31 @@ defmodule ExWebRTC.RTP.Opus.Depayloader do

alias ExRTP.Packet

@behaviour ExWebRTC.RTP.Depayloader

@opaque t :: %__MODULE__{}

@enforce_keys []
defstruct @enforce_keys

@doc """
Creates a new Opus depayloader struct.
Does not take any options/parameters.
"""
@impl true
@spec new(any()) :: t()
def new(_unused \\ nil) do
%__MODULE__{}
end

@doc """
Takes Opus packet out of an RTP packet.
Always returns a binary as the first element.
"""
@spec depayload(Packet.t()) :: binary()
def depayload(%Packet{payload: payload}), do: payload
@impl true
@spec depayload(t(), Packet.t()) :: {binary(), t()}
def depayload(%__MODULE__{} = depayloader, %Packet{payload: payload}),
do: {payload, depayloader}
end
26 changes: 23 additions & 3 deletions lib/ex_webrtc/rtp/opus/payloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,33 @@ defmodule ExWebRTC.RTP.Opus.Payloader do
Based on [RFC 7587: RTP Payload Format for the Opus Speech and Audio Codec](https://datatracker.ietf.org/doc/html/rfc7587).
"""

@behaviour ExWebRTC.RTP.Payloader

@opaque t :: %__MODULE__{}

@enforce_keys []
defstruct @enforce_keys

@doc """
Creates a new Opus payloader struct.
Does not take any options/parameters.
"""
@impl true
@spec new(any()) :: t()
def new(_unused \\ nil) do
%__MODULE__{}
end

@doc """
Packs Opus packet into an RTP packet.
Fields from RTP header like ssrc, timestamp etc. are set to 0.
Always returns a single RTP packet.
"""
@spec payload(binary()) :: ExRTP.Packet.t()
def payload(packet) when packet != <<>> do
ExRTP.Packet.new(packet)
@impl true
@spec payload(t(), binary()) :: {[ExRTP.Packet.t()], t()}
def payload(%__MODULE__{} = payloader, packet) when packet != <<>> do
{[ExRTP.Packet.new(packet)], payloader}
end
end
21 changes: 21 additions & 0 deletions lib/ex_webrtc/rtp/payloader.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
defmodule ExWebRTC.RTP.Payloader do
@moduledoc """
Behaviour for ExWebRTC Payloaders.
"""

@type payloader :: struct()

@doc """
Creates a new payloader struct.
Refer to the modules implementing the behaviour for available options.
"""
@callback new(options :: any()) :: payloader()

@doc """
Packs a frame into one or more RTP packets.
Returns the packets together with the updated payloader struct.
"""
@callback payload(payloader(), frame :: binary()) :: {[ExRTP.Packet.t()], payloader()}
end
32 changes: 24 additions & 8 deletions lib/ex_webrtc/rtp/vp8/depayloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do
Based on [RFC 7741: RTP Payload Format for VP8 Video](https://datatracker.ietf.org/doc/html/rfc7741).
"""

@behaviour ExWebRTC.RTP.Depayloader

require Logger

alias ExWebRTC.RTP.VP8.Payload
Expand All @@ -15,17 +18,30 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do

defstruct [:current_frame, :current_timestamp]

@spec new() :: t()
def new() do
@doc """
Creates a new VP8 depayloader struct.
Does not take any options/parameters.
"""
@impl true
@spec new(any()) :: t()
def new(_unused \\ nil) do
%__MODULE__{}
end

@spec write(t(), ExRTP.Packet.t()) :: {:ok, t()} | {:ok, binary(), t()}
def write(depayloader, packet)
@doc """
Reassembles VP8 frames from subsequent RTP packets.
Returns the frame (or `nil` if a frame could not be decoded yet)
together with the updated depayloader struct.
"""
@impl true
@spec depayload(t(), ExRTP.Packet.t()) :: {binary() | nil, t()}
def depayload(depayloader, packet)

def write(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {:ok, depayloader}
def depayload(depayloader, %ExRTP.Packet{payload: <<>>, padding: true}), do: {nil, depayloader}

def write(depayloader, packet) do
def depayload(depayloader, packet) do
case Payload.parse(packet.payload) do
{:ok, vp8_payload} ->
do_write(depayloader, packet, vp8_payload)
Expand Down Expand Up @@ -80,10 +96,10 @@ defmodule ExWebRTC.RTP.VP8.Depayloader do

case {depayloader.current_frame, packet.marker} do
{current_frame, true} when current_frame != nil ->
{:ok, current_frame, %{depayloader | current_frame: nil, current_timestamp: nil}}
{current_frame, %{depayloader | current_frame: nil, current_timestamp: nil}}

_ ->
{:ok, depayloader}
{nil, depayloader}
end
end
end
15 changes: 13 additions & 2 deletions lib/ex_webrtc/rtp/vp8/payloader.ex
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ defmodule ExWebRTC.RTP.VP8.Payloader do
does not pay attention to VP8 partition boundaries (see RFC 7741 sec. 4.4).
"""

@behaviour ExWebRTC.RTP.Payloader

@first_chunk_descriptor <<0::1, 0::1, 0::1, 1::1, 0::1, 0::3>>

@next_chunk_descriptor <<0::1, 0::1, 0::1, 0::1, 0::1, 0::3>>
Expand All @@ -18,8 +20,16 @@ defmodule ExWebRTC.RTP.VP8.Payloader do
max_payload_size: non_neg_integer()
}

defstruct [:max_payload_size]
@enforce_keys [:max_payload_size]
defstruct @enforce_keys

@doc """
Creates a new VP8 payloader struct.
The parameter `max_payload_size` determines the maximum size of a single RTP packet
outputted by the payloader. It must be greater than `100`, and is set to `1000` by default.
"""
@impl true
@spec new(non_neg_integer()) :: t()
def new(max_payload_size \\ 1000) when max_payload_size > 100 do
%__MODULE__{max_payload_size: max_payload_size}
Expand All @@ -30,8 +40,9 @@ defmodule ExWebRTC.RTP.VP8.Payloader do
Fields from RTP header like ssrc, timestamp etc. are set to 0.
"""
@impl true
@spec payload(t(), frame :: binary()) :: {[ExRTP.Packet.t()], t()}
def payload(payloader, frame) when frame != <<>> do
def payload(%__MODULE__{} = payloader, frame) when frame != <<>> do
rtp_payloads = chunk(frame, payloader.max_payload_size - @desc_size_bytes)

[first_rtp_payload | next_rtp_payloads] = rtp_payloads
Expand Down
20 changes: 10 additions & 10 deletions test/ex_webrtc/rtp/vp8/depayloader_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,35 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do

packet = ExRTP.Packet.new(vp8_payload, marker: true)

assert {:ok, ^data, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.write(depayloader, packet)
assert {^data, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.depayload(depayloader, packet)

# packet that doesn't start a new frame
vp8_payload = %Payload{n: 0, s: 0, pid: 0, payload: data}
vp8_payload = Payload.serialize(vp8_payload)

packet = ExRTP.Packet.new(vp8_payload)

assert {:ok, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: nil, current_timestamp: nil} = depayloader} =
Depayloader.depayload(depayloader, packet)

# packet that starts a new frame without finishing the previous one
vp8_payload = %Payload{n: 0, s: 1, pid: 0, payload: data}
vp8_payload = Payload.serialize(vp8_payload)

packet = ExRTP.Packet.new(vp8_payload)

assert {:ok, %{current_frame: ^data, current_timestamp: 0} = depayloader} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: ^data, current_timestamp: 0} = depayloader} =
Depayloader.depayload(depayloader, packet)

data2 = data <> <<0>>
vp8_payload = %Payload{n: 0, s: 1, pid: 0, payload: data2}
vp8_payload = Payload.serialize(vp8_payload)

packet = ExRTP.Packet.new(vp8_payload, timestamp: 3000)

assert {:ok, %{current_frame: ^data2, current_timestamp: 3000} = depayloader} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: ^data2, current_timestamp: 3000} = depayloader} =
Depayloader.depayload(depayloader, packet)

# packet with timestamp from a new frame that is not a beginning of this frame
data2 = data
Expand All @@ -51,7 +51,7 @@ defmodule ExWebRTC.RTP.VP8.DepayloaderTest do

packet = ExRTP.Packet.new(vp8_payload, timestamp: 6000)

assert {:ok, %{current_frame: nil, current_timestamp: nil}} =
Depayloader.write(depayloader, packet)
assert {nil, %{current_frame: nil, current_timestamp: nil}} =
Depayloader.depayload(depayloader, packet)
end
end

0 comments on commit eb41e01

Please sign in to comment.