Skip to content

Commit

Permalink
Merge pull request #51 from membraneframework/elem-rewrite-timestamps
Browse files Browse the repository at this point in the history
fix pts handling
  • Loading branch information
bartkrak authored Jan 15, 2024
2 parents 2149415 + 5b26b23 commit 512e9d2
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 16 deletions.
8 changes: 4 additions & 4 deletions lib/membrane_aac_fdk_plugin/decoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -48,21 +48,21 @@ defmodule Membrane.AAC.FDK.Decoder do
# since they should stay consistent for the whole stream.
# 4. In case an unhandled error is returned during this flow, returns error message.
@impl true
def handle_buffer(:input, %Buffer{payload: payload}, ctx, state) do
def handle_buffer(:input, %Buffer{payload: payload, pts: pts}, ctx, state) do
:ok = Native.fill!(payload, state.native)
decoded_frames = decode_buffer!(payload, state.native)
decoded_frames = decode_buffer!(payload, pts, state.native)

format_action = get_format_if_needed(ctx.pads.output.stream_format, state)
buffer_actions = [buffer: {:output, decoded_frames}]

{format_action ++ buffer_actions, state}
end

defp decode_buffer!(payload, native, acc \\ []) do
defp decode_buffer!(payload, pts, native, acc \\ []) do
case Native.decode_frame(payload, native) do
{:ok, decoded_frame} ->
# Accumulate decoded frames
decode_buffer!(payload, native, [%Buffer{payload: decoded_frame} | acc])
decode_buffer!(payload, pts, native, [%Buffer{payload: decoded_frame, pts: pts} | acc])

{:error, :not_enough_bits} ->
# Means that we've parsed the whole buffer.
Expand Down
54 changes: 42 additions & 12 deletions lib/membrane_aac_fdk_plugin/encoder.ex
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,8 @@ defmodule Membrane.AAC.FDK.Encoder do
|> Map.from_struct()
|> Map.merge(%{
native: nil,
queue: <<>>
queue: <<>>,
current_pts: nil
})

{[], state}
Expand Down Expand Up @@ -135,21 +136,35 @@ defmodule Membrane.AAC.FDK.Encoder do
end

@impl true
def handle_buffer(:input, buffer, ctx, state) do
def handle_buffer(:input, %Buffer{payload: payload, pts: input_pts}, ctx, state) do
%{native: native, queue: queue} = state

to_encode = queue <> buffer.payload
to_encode = queue <> payload

raw_frame_size =
aac_frame_size(state.aot) * ctx.pads.input.stream_format.channels * @sample_size

case encode_buffer(to_encode, native, raw_frame_size) do
{encoded_buffers, bytes_used} when bytes_used > 0 ->
check_pts_integrity? = state.queue != <<>>

state =
if state.queue == <<>> do
%{state | current_pts: input_pts}
else
state
end

case encode_buffer(to_encode, native, raw_frame_size, state) do
{encoded_buffers, bytes_used, state} when bytes_used > 0 ->
<<_handled::binary-size(bytes_used), rest::binary>> = to_encode

if check_pts_integrity? and length(encoded_buffers) >= 2 and
Enum.at(encoded_buffers, 1).pts != input_pts do
raise "PTS values are not continuous"
end

{[buffer: {:output, encoded_buffers}], %{state | queue: rest}}

{[], 0} ->
{[], 0, state} ->
{[], %{state | queue: to_encode}}
end
end
Expand All @@ -173,29 +188,44 @@ defmodule Membrane.AAC.FDK.Encoder do
end
end

defp encode_buffer(buffer, native, raw_frame_size, acc \\ [], bytes_used \\ 0)
defp encode_buffer(buffer, native, raw_frame_size, acc \\ [], bytes_used \\ 0, state)

# Encode a single frame if buffer contains at least one frame
defp encode_buffer(buffer, native, raw_frame_size, acc, bytes_used)
defp encode_buffer(buffer, native, raw_frame_size, acc, bytes_used, state)
when byte_size(buffer) >= raw_frame_size do
<<raw_frame::binary-size(raw_frame_size), rest::binary>> = buffer

encoded_buffer = %Buffer{payload: Native.encode_frame!(raw_frame, native)}
encoded_buffer = %Buffer{
payload: Native.encode_frame!(raw_frame, native),
pts: state.current_pts
}

# Continue encoding the rest until no more frames are available in the queue
encode_buffer(
rest,
native,
raw_frame_size,
[encoded_buffer | acc],
bytes_used + raw_frame_size
bytes_used + raw_frame_size,
bump_current_pts(state, raw_frame)
)
end

# Not enough samples for a frame
defp encode_buffer(_partial_buffer, _native, _raw_frame_size, acc, bytes_used) do
defp encode_buffer(_partial_buffer, _native, _raw_frame_size, acc, bytes_used, state) do
# Return accumulated encoded frames
{acc |> Enum.reverse(), bytes_used}
{acc |> Enum.reverse(), bytes_used, state}
end

defp bump_current_pts(%{current_pts: nil} = state, _raw_frame), do: state

defp bump_current_pts(state, raw_frame) do
duration =
raw_frame
|> byte_size()
|> RawAudio.bytes_to_time(state.input_stream_format)

Map.update!(state, :current_pts, &(&1 + duration))
end

defp mk_native!(channels, sample_rate, aot, bitrate_mode, bitrate) do
Expand Down

0 comments on commit 512e9d2

Please sign in to comment.