Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rewrite timestamps #55

Merged
merged 40 commits into from
Jan 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
957a363
vip
bartkrak Dec 12, 2023
9bb6bc0
encoder ready for testing
bartkrak Dec 14, 2023
a3282d1
encoder works, custom test data
bartkrak Dec 14, 2023
46c9c78
backup
bartkrak Dec 15, 2023
0b16507
parser seems to work
bartkrak Dec 15, 2023
2d9455c
parser tests fail
bartkrak Dec 15, 2023
16d5302
parser modified backup
bartkrak Dec 19, 2023
eca9e83
looks good
bartkrak Dec 19, 2023
26bb5fe
before merge
bartkrak Dec 19, 2023
3066e89
Merge branch 'master' into elem-rewrite-timestamps
bartkrak Dec 19, 2023
2c6a94e
ready for review
bartkrak Dec 19, 2023
8f988be
encoder test unused var fix
bartkrak Dec 19, 2023
f9bbd36
another quick fix
bartkrak Dec 19, 2023
80c9cfa
updated parser tests
bartkrak Dec 20, 2023
8fd695e
formatting fix
bartkrak Dec 20, 2023
79e5503
paser review fixes, parser tests
bartkrak Jan 2, 2024
60f7e47
encoder review requested fixes
bartkrak Jan 2, 2024
59ff90a
encoder pts logic changed, encoder tests
bartkrak Jan 2, 2024
ad83232
parser encoder new pts alg
bartkrak Jan 8, 2024
5fa6534
parser refractor
bartkrak Jan 8, 2024
35cdb59
formatting fix
bartkrak Jan 8, 2024
98ac265
another formatting miss
bartkrak Jan 8, 2024
830c580
code readability
bartkrak Jan 8, 2024
9c7735f
review requested changes, cleanup
bartkrak Jan 9, 2024
63b5e20
format
bartkrak Jan 9, 2024
0d9a686
refractor
bartkrak Jan 9, 2024
473d201
check pts of first full frame fix
bartkrak Jan 11, 2024
3541bf0
pattern matching rework
bartkrak Jan 11, 2024
7808af4
set_current_pts refractor
bartkrak Jan 15, 2024
003678f
check_pts_integrity refractor
bartkrak Jan 15, 2024
d7640df
Update lib/membrane_opus/parser.ex
bartkrak Jan 15, 2024
863fd82
refractor once again
bartkrak Jan 15, 2024
8275ff8
parser tests fix
bartkrak Jan 15, 2024
7f60ce1
check_pts_integrity complexity fix
bartkrak Jan 15, 2024
b71b6cc
credo update, little refactor
bartkrak Jan 15, 2024
41341d9
encoder fix, gitignore
bartkrak Jan 15, 2024
4909396
pts overlapping values fix
bartkrak Jan 17, 2024
1ac385b
pts overlapping warning
bartkrak Jan 17, 2024
ad0abdc
code readability fix
bartkrak Jan 17, 2024
aa4cdca
version bump
bartkrak Jan 17, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ compile_commands.json
.gdb_history
bundlex.sh
bundlex.bat

.vscode/
# Dir generated by tmp_dir ExUnit tag
/tmp/

Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ The package can be installed by adding `membrane_opus_plugin` to your list of de
```elixir
def deps do
[
{:membrane_opus_plugin, "~> 0.19.1"}
{:membrane_opus_plugin, "~> 0.19.2"}
]
end
```
Expand Down
51 changes: 41 additions & 10 deletions lib/membrane_opus/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ defmodule Membrane.Opus.Encoder do
options
|> Map.from_struct()
|> Map.merge(%{
current_pts: nil,
native: nil,
queue: <<>>
})
Expand Down Expand Up @@ -132,15 +133,26 @@ defmodule Membrane.Opus.Encoder do
end

@impl true
def handle_buffer(:input, %Buffer{payload: data}, _ctx, state) do
case encode_buffer(state.queue <> data, state, frame_size_in_bytes(state)) do
{:ok, {[], rest}} ->
def handle_buffer(:input, %Buffer{payload: data, pts: input_pts}, _ctx, state) do
check_pts_integrity? = state.queue != <<>>

case encode_buffer(
state.queue <> data,
set_current_pts(state, input_pts),
frame_size_in_bytes(state)
) do
{:ok, [], state} ->
# nothing was encoded
{[], %{state | queue: rest}}
{[], state}
FelonEkonom marked this conversation as resolved.
Show resolved Hide resolved

{:ok, {encoded_buffers, rest}} ->
{:ok, encoded_buffers, state} ->
# something was encoded
{[buffer: {:output, encoded_buffers}], %{state | queue: rest}}
if check_pts_integrity? and length(encoded_buffers) >= 2 and
Enum.at(encoded_buffers, 1).pts > input_pts do
Membrane.Logger.warning("PTS values are overlapping")
end

{[buffer: {:output, encoded_buffers}], state}
end
end

Expand All @@ -160,6 +172,12 @@ defmodule Membrane.Opus.Encoder do
end
end

defp set_current_pts(%{queue: <<>>} = state, input_pts) do
%{state | current_pts: input_pts}
end

defp set_current_pts(state, _input_pts), do: state

defp mk_native!(state) do
with {:ok, channels} <- validate_channels(state.input_stream_format.channels),
{:ok, input_rate} <- validate_sample_rate(state.input_stream_format.sample_rate),
Expand Down Expand Up @@ -207,16 +225,29 @@ defmodule Membrane.Opus.Encoder do
{:ok, raw_encoded} = Native.encode_packet(state.native, raw_frame, frame_size(state))

# maybe keep encoding if there are more frames
out_buffer = [%Buffer{payload: raw_encoded, pts: state.current_pts} | encoded_frames]

encode_buffer(
rest,
state,
bump_current_pts(state, raw_frame),
target_byte_size,
[%Buffer{payload: raw_encoded} | encoded_frames]
out_buffer
)
end

defp encode_buffer(raw_buffer, _state, _target_byte_size, encoded_frames) do
defp encode_buffer(raw_buffer, state, _target_byte_size, encoded_frames) do
# Invariant for encode_buffer - return what we have encoded
{:ok, {encoded_frames |> Enum.reverse(), raw_buffer}}
{:ok, encoded_frames |> Enum.reverse(), %{state | queue: raw_buffer}}
end

defp bump_current_pts(%{current_pts: nil} = state, _raw_frame), do: state

defp bump_current_pts(state, raw_frame) do
duration =
raw_frame
|> byte_size()
|> RawAudio.bytes_to_time(state.input_stream_format)

Map.update!(state, :current_pts, &(&1 + duration))
end
end
146 changes: 92 additions & 54 deletions lib/membrane_opus/parser.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Membrane.Opus.Parser do
"""

use Membrane.Filter

require Membrane.Logger
alias __MODULE__.{Delimitation, FrameLengths}
alias Membrane.{Buffer, Opus, RemoteStream}
alias Membrane.Opus.Util
Expand Down Expand Up @@ -42,6 +42,15 @@ defmodule Membrane.Opus.Parser do
to true to force the Parser to assume the input is self-delimitted? and
ignore upstream stream_format information on self-delimitation.
"""
],
generate_best_effort_timestamps?: [
spec: boolean(),
default: false,
description: """
If this is set to true parser will try to generate timestamps
starting from 0 and increasing them by frame duration,
otherwise it will pass pts from input to output, even if it's nil.
"""
]

def_input_pad :input,
Expand All @@ -56,8 +65,8 @@ defmodule Membrane.Opus.Parser do
options
|> Map.from_struct()
|> Map.merge(%{
pts: 0,
buffer: <<>>
current_pts: nil,
queue: <<>>
})

{[], state}
Expand All @@ -69,96 +78,125 @@ defmodule Membrane.Opus.Parser do
{[], state}
end

defp set_current_pts(
%{generate_best_effort_timestamps?: true, current_pts: nil} = state,
_input_pts
) do
%{state | current_pts: 0}
end

defp set_current_pts(%{generate_best_effort_timestamps?: false, queue: <<>>} = state, input_pts) do
%{state | current_pts: input_pts}
end

defp set_current_pts(state, _input_pts), do: state

@impl true
def handle_buffer(:input, %Buffer{payload: data}, ctx, state) do
def handle_buffer(:input, %Buffer{payload: data, pts: input_pts}, ctx, state) do
{delimitation_processor, self_delimiting?} =
Delimitation.get_processor(state.delimitation, state.input_delimitted?)

case maybe_parse(
state.buffer <> data,
state.pts,
state.input_delimitted?,
delimitation_processor
) do
{:ok, buffer, pts, packets, channels} ->
stream_format = %Opus{
self_delimiting?: self_delimiting?,
channels: channels
}
check_pts_integrity? = state.queue != <<>> and not state.generate_best_effort_timestamps?

packets_len = length(packets)
{:ok, queue, packets, channels, state} =
maybe_parse(
state.queue <> data,
delimitation_processor,
set_current_pts(state, input_pts)
)

if check_pts_integrity? and length(packets) >= 2 and
Enum.at(packets, 1).pts > input_pts do
Membrane.Logger.warning("PTS values are overlapping")
end

packet_actions =
cond do
packets_len > 0 and stream_format != ctx.pads.output.stream_format ->
[stream_format: {:output, stream_format}, buffer: {:output, packets}]
stream_format = %Opus{
self_delimiting?: self_delimiting?,
channels: channels
}

packets_len > 0 ->
[buffer: {:output, packets}]
packets_len = length(packets)

true ->
[]
end
packet_actions =
cond do
packets_len > 0 and stream_format != ctx.pads.output.stream_format ->
[stream_format: {:output, stream_format}, buffer: {:output, packets}]

{packet_actions, %{state | buffer: buffer, pts: pts}}
packets_len > 0 ->
[buffer: {:output, packets}]

:error ->
{{:error, "An error occured in parsing"}, state}
end
true ->
[]
end

{packet_actions, %{state | queue: queue}}
end

@spec maybe_parse(
data :: binary,
pts :: Membrane.Time.t(),
input_delimitted? :: boolean,
processor :: Delimitation.processor_t(),
packets :: [Buffer.t()],
channels :: 0..2
) ::
{:ok, remaining_buffer :: binary, pts :: Membrane.Time.t(), packets :: [Buffer.t()],
channels :: 0..2}
| :error
defp maybe_parse(data, pts, input_delimitted?, processor, packets \\ [], channels \\ 0)

defp maybe_parse(data, pts, input_delimitted?, processor, packets, channels)
defp maybe_parse(
data,
processor,
packets \\ [],
channels \\ 0,
state
)

defp maybe_parse(
data,
processor,
packets,
channels,
state
)
when byte_size(data) > 0 do
with {:ok, configuration_number, stereo_flag, frame_packing} <- Util.parse_toc_byte(data),
channels <- max(channels, Util.parse_channels(stereo_flag)),
{:ok, _mode, _bandwidth, frame_duration} <-
Util.parse_configuration(configuration_number),
{:ok, header_size, frame_lengths, padding_size} <-
FrameLengths.parse(frame_packing, data, input_delimitted?),
FrameLengths.parse(frame_packing, data, state.input_delimitted?),
expected_packet_size <- header_size + Enum.sum(frame_lengths) + padding_size,
{:ok, raw_packet, rest} <- rest_of_packet(data, expected_packet_size) do
duration = elapsed_time(frame_lengths, frame_duration)

packet = %Buffer{
pts: pts,
pts: state.current_pts,
payload: processor.process(raw_packet, frame_lengths, header_size),
metadata: %{
duration: duration
}
}

state =
if state.current_pts == nil do
state
else
%{state | current_pts: state.current_pts + duration}
end

maybe_parse(
rest,
pts + duration,
input_delimitted?,
processor,
[packet | packets],
channels
channels,
state
)
else
{:error, :cont} ->
{:ok, data, pts, packets |> Enum.reverse(), channels}

:error ->
:error
raise "An error occured in parsing"

{:error, :cont} ->
{:ok, data, packets |> Enum.reverse(), channels, state}
end
end

defp maybe_parse(data, pts, _input_delimitted?, _processor, packets, channels) do
{:ok, data, pts, packets |> Enum.reverse(), channels}
defp maybe_parse(
data,
_processor,
packets,
channels,
state
) do
{:ok, data, packets |> Enum.reverse(), channels, state}
end

@spec rest_of_packet(data :: binary, expected_packet_size :: pos_integer) ::
Expand Down
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
defmodule Membrane.Opus.Plugin.Mixfile do
use Mix.Project

@version "0.19.1"
@version "0.19.2"
@github_url "https://github.com/membraneframework/membrane_opus_plugin"

def project do
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,16 @@
"bunch": {:hex, :bunch, "1.6.1", "5393d827a64d5f846092703441ea50e65bc09f37fd8e320878f13e63d410aec7", [:mix], [], "hexpm", "286cc3add551628b30605efbe2fca4e38cc1bea89bcd0a1a7226920b3364fe4a"},
"bunch_native": {:hex, :bunch_native, "0.5.0", "8ac1536789a597599c10b652e0b526d8833348c19e4739a0759a2bedfd924e63", [:mix], [{:bundlex, "~> 1.0", [hex: :bundlex, repo: "hexpm", optional: false]}], "hexpm", "24190c760e32b23b36edeb2dc4852515c7c5b3b8675b1a864e0715bdd1c8f80d"},
"bundlex": {:hex, :bundlex, "1.4.1", "60702b7f8e036a00c88bec69993329cc4aae32fe402804fe2e8db0c1e1396cd6", [:mix], [{:bunch, "~> 1.0", [hex: :bunch, repo: "hexpm", optional: false]}, {:elixir_uuid, "~> 1.2", [hex: :elixir_uuid, repo: "hexpm", optional: false]}, {:qex, "~> 0.5", [hex: :qex, repo: "hexpm", optional: false]}, {:req, "~> 0.4.0", [hex: :req, repo: "hexpm", optional: false]}, {:zarex, "~> 1.0", [hex: :zarex, repo: "hexpm", optional: false]}], "hexpm", "7511718b4b8063e457f3fa5166df177beff65c532db44631f41b496cfa2f48a3"},
"bunt": {:hex, :bunt, "0.2.1", "e2d4792f7bc0ced7583ab54922808919518d0e57ee162901a16a1b6664ef3b14", [:mix], [], "hexpm", "a330bfb4245239787b15005e66ae6845c9cd524a288f0d141c148b02603777a5"},
"bunt": {:hex, :bunt, "1.0.0", "081c2c665f086849e6d57900292b3a161727ab40431219529f13c4ddcf3e7a44", [:mix], [], "hexpm", "dc5f86aa08a5f6fa6b8096f0735c4e76d54ae5c9fa2c143e5a1fc7c1cd9bb6b5"},
"castore": {:hex, :castore, "1.0.5", "9eeebb394cc9a0f3ae56b813459f990abb0a3dedee1be6b27fdb50301930502f", [:mix], [], "hexpm", "8d7c597c3e4a64c395980882d4bca3cebb8d74197c590dc272cfd3b6a6310578"},
"coerce": {:hex, :coerce, "1.0.1", "211c27386315dc2894ac11bc1f413a0e38505d808153367bd5c6e75a4003d096", [:mix], [], "hexpm", "b44a691700f7a1a15b4b7e2ff1fa30bebd669929ac8aa43cffe9e2f8bf051cf1"},
"credo": {:hex, :credo, "1.7.1", "6e26bbcc9e22eefbff7e43188e69924e78818e2fe6282487d0703652bc20fd62", [:mix], [{:bunt, "~> 0.2.1", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2.8", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "e9871c6095a4c0381c89b6aa98bc6260a8ba6addccf7f6a53da8849c748a58a2"},
"credo": {:hex, :credo, "1.7.3", "05bb11eaf2f2b8db370ecaa6a6bda2ec49b2acd5e0418bc106b73b07128c0436", [:mix], [{:bunt, "~> 0.2.1 or ~> 1.0", [hex: :bunt, repo: "hexpm", optional: false]}, {:file_system, "~> 0.2 or ~> 1.0", [hex: :file_system, repo: "hexpm", optional: false]}, {:jason, "~> 1.0", [hex: :jason, repo: "hexpm", optional: false]}], "hexpm", "35ea675a094c934c22fb1dca3696f3c31f2728ae6ef5a53b5d648c11180a4535"},
"dialyxir": {:hex, :dialyxir, "1.4.2", "764a6e8e7a354f0ba95d58418178d486065ead1f69ad89782817c296d0d746a5", [:mix], [{:erlex, ">= 0.2.6", [hex: :erlex, repo: "hexpm", optional: false]}], "hexpm", "516603d8067b2fd585319e4b13d3674ad4f314a5902ba8130cd97dc902ce6bbd"},
"earmark_parser": {:hex, :earmark_parser, "1.4.39", "424642f8335b05bb9eb611aa1564c148a8ee35c9c8a8bba6e129d51a3e3c6769", [:mix], [], "hexpm", "06553a88d1f1846da9ef066b87b57c6f605552cfbe40d20bd8d59cc6bde41944"},
"elixir_uuid": {:hex, :elixir_uuid, "1.2.1", "dce506597acb7e6b0daeaff52ff6a9043f5919a4c3315abb4143f0b00378c097", [:mix], [], "hexpm", "f7eba2ea6c3555cea09706492716b0d87397b88946e6380898c2889d68585752"},
"erlex": {:hex, :erlex, "0.2.6", "c7987d15e899c7a2f34f5420d2a2ea0d659682c06ac607572df55a43753aa12e", [:mix], [], "hexpm", "2ed2e25711feb44d52b17d2780eabf998452f6efda104877a3881c2f8c0c0c75"},
"ex_doc": {:hex, :ex_doc, "0.31.0", "06eb1dfd787445d9cab9a45088405593dd3bb7fe99e097eaa71f37ba80c7a676", [:mix], [{:earmark_parser, "~> 1.4.39", [hex: :earmark_parser, repo: "hexpm", optional: false]}, {:makeup_c, ">= 0.1.1", [hex: :makeup_c, repo: "hexpm", optional: true]}, {:makeup_elixir, "~> 0.14", [hex: :makeup_elixir, repo: "hexpm", optional: false]}, {:makeup_erlang, "~> 0.1", [hex: :makeup_erlang, repo: "hexpm", optional: false]}], "hexpm", "5350cafa6b7f77bdd107aa2199fe277acf29d739aba5aee7e865fc680c62a110"},
"file_system": {:hex, :file_system, "0.2.10", "fb082005a9cd1711c05b5248710f8826b02d7d1784e7c3451f9c1231d4fc162d", [:mix], [], "hexpm", "41195edbfb562a593726eda3b3e8b103a309b733ad25f3d642ba49696bf715dc"},
"file_system": {:hex, :file_system, "1.0.0", "b689cc7dcee665f774de94b5a832e578bd7963c8e637ef940cd44327db7de2cd", [:mix], [], "hexpm", "6752092d66aec5a10e662aefeed8ddb9531d79db0bc145bb8c40325ca1d8536d"},
"finch": {:hex, :finch, "0.16.0", "40733f02c89f94a112518071c0a91fe86069560f5dbdb39f9150042f44dcfb1a", [:mix], [{:castore, "~> 0.1 or ~> 1.0", [hex: :castore, repo: "hexpm", optional: false]}, {:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.3", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 0.2.6 or ~> 1.0", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "f660174c4d519e5fec629016054d60edd822cdfe2b7270836739ac2f97735ec5"},
"hpax": {:hex, :hpax, "0.1.2", "09a75600d9d8bbd064cdd741f21fc06fc1f4cf3d0fcc335e5aa19be1a7235c84", [:mix], [], "hexpm", "2c87843d5a23f5f16748ebe77969880e29809580efdaccd615cd3bed628a8c13"},
"jason": {:hex, :jason, "1.4.1", "af1504e35f629ddcdd6addb3513c3853991f694921b1b9368b0bd32beb9f1b63", [:mix], [{:decimal, "~> 1.0 or ~> 2.0", [hex: :decimal, repo: "hexpm", optional: true]}], "hexpm", "fbb01ecdfd565b56261302f7e1fcc27c4fb8f32d56eab74db621fc154604a7a1"},
Expand Down
2 changes: 2 additions & 0 deletions test/membrane_opus/encoder_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -62,5 +62,7 @@ defmodule Membrane.Opus.Encoder.EncoderTest do

pipeline = Pipeline.start_link_supervised!(spec: spec)
assert_start_of_stream(pipeline, :encoder, :input)

Pipeline.terminate(pipeline)
end
end
Loading