Skip to content
This repository has been archived by the owner on Jul 23, 2024. It is now read-only.

Commit

Permalink
Implement different 'unclogging' methods
Browse files Browse the repository at this point in the history
  • Loading branch information
sbalko committed Jun 10, 2024
1 parent 8eb614f commit 0181451
Show file tree
Hide file tree
Showing 6 changed files with 90 additions and 13 deletions.
3 changes: 2 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
"h264-sps-parser": "^0.2.1",
"jquery": "^3.7.1",
"mp4box": "^0.5.2",
"nal-extractor": "^1.0.1"
"nal-extractor": "^1.0.1",
"rxjs": "^7.8.1"
}
}
69 changes: 60 additions & 9 deletions src/encoder/h264-encoder.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import { AsyncSubject, Observable, Subject, Subscription } from "rxjs";
import { BlockingQueue, ReadQueue } from "../shared/blocking-queue";
import { LeaderFollowerQueue } from "../shared/leader-follower";
import { ENCODER_RESOLUTION, EncoderResolution, H264Profile, H264_PROFILES } from "./interfaces";
import { ENCODER_RESOLUTION, EncoderResolution, H264Profile, H264_PROFILES, UncloggingMethod } from "./interfaces";

const H264_LEVEL = 52;
const BITSTREAM_CAPCITY = 10;
const ENCODER_QUEUE_SIZE = 2;

const UNCLOG_POLLING_INTERVAL_MILLIS = 100;

export class H264Encoder {
private readonly bitstream = new BlockingQueue<{
chunk: EncodedVideoChunk;
Expand All @@ -21,21 +24,27 @@ export class H264Encoder {
}

private readonly encoder: VideoEncoder;
private readonly tick = new AsyncSubject<void>();

constructor(
private readonly frames: ReadQueue<VideoFrame>,
private readonly onEncoding: (frame: VideoFrame) => void,
h264Profile: H264Profile,
encoderResolution: EncoderResolution,
encoderConfig: Omit<VideoEncoderConfig, 'width' | 'height' | 'codec'>,
private readonly uncloggingMethod: UncloggingMethod
) {
this.encoder = new VideoEncoder({
output: async (chunk, metadata) => {
this.tick.next();

this.leader.enqueuePromise(async () => {
await this.bitstream.push({ chunk, metadata })
});
},
error: error => {
this.tick.error(error);

alert(`Encoder error ${error.name}: ${error.message}`);
console.error(error);
}
Expand All @@ -53,24 +62,66 @@ export class H264Encoder {
}

private async maybeWaitUntilUnclogged(): Promise<void> {
// If the inbound queue size is below the threshold, we continue feeding frames
// into the encoder.
if (this.encoder.encodeQueueSize < ENCODER_QUEUE_SIZE) {
return;
}

return await new Promise<void>((resolve, reject) => {
if (this.encoder.encodeQueueSize < ENCODER_QUEUE_SIZE) {
resolve();
// When the inbound queue overflows and the `flush_encoder` unclogging method is selected,
// we simply flush the encoder to force any queued frames to be processed.
if (this.uncloggingMethod === 'flush_encoder') {
await this.encoder.flush();

if (this.encoder.encodeQueueSize >= ENCODER_QUEUE_SIZE) {
throw new Error(`Encoder was flushed, but encode queue size is still at ${this.encoder.encodeQueueSize} (should be below ${ENCODER_QUEUE_SIZE}`);
}

this.encoder.ondequeue = () => {
return;
}

let intervalId: number | undefined = undefined;
let tickSubscription: Subscription | undefined = undefined;

try {
return await new Promise<void>(resolve => {
if (this.encoder.encodeQueueSize < ENCODER_QUEUE_SIZE) {
resolve();
} else {
this.maybeWaitUntilUnclogged().then(resolve, reject);
}
}
});

// Otherwise ('dequeue_event' or 'polling_output' unclogging method selected),
// we always wait for the "dequeue" event.
this.encoder.ondequeue = () => {
if (this.encoder.encodeQueueSize < ENCODER_QUEUE_SIZE) {
resolve();
}
}

if (this.uncloggingMethod === 'polling_output') {
// If the 'polling_output' unclogging method is selected, we also asynchronously check
// for the inbound queue size to go down and also recheck whenever an output packet is produced.

tickSubscription = this.tick.subscribe(() => {
if (this.encoder.encodeQueueSize < ENCODER_QUEUE_SIZE) {
resolve();
}
});

intervalId = setInterval(() => {
if (this.encoder.encodeQueueSize < ENCODER_QUEUE_SIZE) {
resolve();
}
}, UNCLOG_POLLING_INTERVAL_MILLIS);
}
});
} finally {
this.encoder.ondequeue = null;

if (tickSubscription !== undefined) {
(tickSubscription as Subscription).unsubscribe();
}
clearInterval(intervalId);
}
}

private async encode(): Promise<void> {
Expand Down
4 changes: 3 additions & 1 deletion src/encoder/interfaces.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,4 +14,6 @@ export const ENCODER_RESOLUTION = {
'4K': { width: 3840, height: 2160 },
}

export type EncoderResolution = keyof typeof ENCODER_RESOLUTION;
export type EncoderResolution = keyof typeof ENCODER_RESOLUTION;

export type UncloggingMethod = 'dequeue_event' | 'polling_output' | 'flush_encoder';
8 changes: 8 additions & 0 deletions src/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,14 @@ <h3>Encoder settings</h3>
</select>
</div>

<div class="mb-3">
<label for="unclogging-method" class="form-label">Encoder back-pressure resolution method:</label>
<select id="unclogging-method" class="form-select">
<option selected value="dequeue_event">VideoEncoder dequeue event (Clipchamp production)</option>
<option value="polling_output">Timer-based polling, output-based polling and dequeue event</option>
<option value="flush_encoder">Flush encoder</option>
</select>
</div>
</div>

<div class="row">
Expand Down
7 changes: 5 additions & 2 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import $ from 'jquery';
import { Mp4Demuxer } from './decoder/mp4-demuxer';
import { H264Decoder } from './decoder/h264-decoder';
import { H264Encoder } from './encoder/h264-encoder';
import { EncoderResolution, H264Profile } from './encoder/interfaces';
import { EncoderResolution, H264Profile, UncloggingMethod } from './encoder/interfaces';
import { loadInputFile } from './shared/input-files';

const DEFAULT_FRAMERATE = 30;
Expand Down Expand Up @@ -59,6 +59,8 @@ $('button#run-benchmark').click(async () => {
const bitrate = Number.parseInt($('input#encoding-bitrate').val() as string);
const bitrateMode = $('select#bitrate-mode').val() as VideoEncoderBitrateMode;
const latencyMode = $('select#latency-mode').val() as LatencyMode;
const uncloggingMethod = $('select#unclogging-method').val() as UncloggingMethod;

const showEncodingProgress = $('input#progress-update:checked').val() === 'on';


Expand Down Expand Up @@ -90,7 +92,8 @@ $('button#run-benchmark').click(async () => {
bitrate,
bitrateMode,
latencyMode
}
},
uncloggingMethod
);

for (let packet = await encoder.packets.pull(); packet !== undefined; packet = await encoder.packets.pull()) {
Expand Down
12 changes: 12 additions & 0 deletions yarn.lock
Original file line number Diff line number Diff line change
Expand Up @@ -1359,6 +1359,13 @@ run-parallel@^1.1.9:
dependencies:
queue-microtask "^1.2.2"

rxjs@^7.8.1:
version "7.8.1"
resolved "https://registry.yarnpkg.com/rxjs/-/rxjs-7.8.1.tgz#6f6f3d99ea8044291efd92e7c7fcf562c4057543"
integrity sha512-AA3TVj+0A2iuIoQkWEK/tqFjBq2j+6PO6Y0zJcvzLAFhEFIO3HL0vls9hWLncZbAAbK0mar7oZ4V079I/qPMxg==
dependencies:
tslib "^2.1.0"

safe-buffer@5.1.2:
version "5.1.2"
resolved "https://registry.yarnpkg.com/safe-buffer/-/safe-buffer-5.1.2.tgz#991ec69d296e0313747d59bdfd2b745c35f8828d"
Expand Down Expand Up @@ -1547,6 +1554,11 @@ ts-loader@^9.5.1:
semver "^7.3.4"
source-map "^0.7.4"

tslib@^2.1.0:
version "2.6.3"
resolved "https://registry.yarnpkg.com/tslib/-/tslib-2.6.3.tgz#0438f810ad7a9edcde7a241c3d80db693c8cbfe0"
integrity sha512-xNvxJEOUiWPGhUuUdQgAJPKOOJfGnIyKySOc09XkKsgdUV/3E2zvwZYdejjmRgPCgcym1juLH3226yA7sEFJKQ==

typescript@^5.4.5:
version "5.4.5"
resolved "https://registry.yarnpkg.com/typescript/-/typescript-5.4.5.tgz#42ccef2c571fdbd0f6718b1d1f5e6e5ef006f611"
Expand Down

0 comments on commit 0181451

Please sign in to comment.