From 972a80f0141914f79ddc95715b3ae52144dbfb26 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Thu, 30 Nov 2023 17:20:03 +0000 Subject: [PATCH 01/12] wip: prepare as packets before passing into socketchild --- src/atem.ts | 17 ++++++++++++----- src/lib/atemSocket.ts | 18 ++++++++++++++---- src/lib/atemSocketChild.ts | 21 ++++++++------------- 3 files changed, 34 insertions(+), 22 deletions(-) diff --git a/src/atem.ts b/src/atem.ts index 9fee13ea8..11376d7cc 100644 --- a/src/atem.ts +++ b/src/atem.ts @@ -59,6 +59,10 @@ export interface AtemOptions { debugBuffers?: boolean disableMultithreaded?: boolean childProcessTimeout?: number + /** + * Maximum size of packets to transmit + */ + packetMtu?: number } export type AtemEvents = { @@ -86,6 +90,7 @@ export enum AtemConnectionStatus { } export const DEFAULT_PORT = 9910 +export const DEFAULT_MTU = 1500 export class BasicAtem extends EventEmitter { private readonly socket: AtemSocket @@ -97,14 +102,16 @@ export class BasicAtem extends EventEmitter { constructor(options?: AtemOptions) { super() + // const packetMtu = options?.packetMtu ?? DEFAULT_MTU + this._state = AtemStateUtil.Create() this._status = AtemConnectionStatus.CLOSED this.socket = new AtemSocket({ - debugBuffers: (options || {}).debugBuffers || false, - address: (options || {}).address || '', - port: (options || {}).port || DEFAULT_PORT, - disableMultithreaded: (options || {}).disableMultithreaded || false, - childProcessTimeout: (options || {}).childProcessTimeout || 600, + debugBuffers: options?.debugBuffers ?? false, + address: options?.address || '', + port: options?.port || DEFAULT_PORT, + disableMultithreaded: options?.disableMultithreaded ?? false, + childProcessTimeout: options?.childProcessTimeout || 600, }) this.dataTransferManager = new DT.DataTransferManager(this.sendCommands.bind(this)) diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index 3d8a3e9cf..d1977fe3f 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -104,7 +104,7 @@ export class AtemSocket extends EventEmitter { commands: Array<{ rawCommand: ISerializableCommand; trackingId: number }> ): Promise { if (this._socketProcess) { - const wrappedCommands = commands.map((cmd) => { + const wrappedPackets = commands.map((cmd) => { if (typeof cmd.rawCommand.serialize !== 'function') { throw new Error(`Command ${cmd.rawCommand.constructor.name} is not serializable`) } @@ -113,14 +113,24 @@ export class AtemSocket extends EventEmitter { if (this._debugBuffers) this.emit('debug', `PAYLOAD ${cmd.rawCommand.constructor.name} ${payload.toString('hex')}`) + const rawName: string = (cmd.rawCommand.constructor as any).rawName + + const buffer = Buffer.alloc(payload.length + 8) + // Command + buffer.writeUInt16BE(payload.length + 8, 0) + buffer.write(rawName, 4, 4) + + // Body + payload.copy(buffer, 8) + return { - payload: [...payload], - rawName: (cmd.rawCommand.constructor as any).rawName, + payloadLength: buffer.length, + payloadHex: buffer.toString('hex'), trackingId: cmd.trackingId, } }) - await this._socketProcess.sendCommands(wrappedCommands) + await this._socketProcess.sendPackets(wrappedPackets) } else { throw new Error('Socket process is not open') } diff --git a/src/lib/atemSocketChild.ts b/src/lib/atemSocketChild.ts index c21b24f32..115ebd57b 100644 --- a/src/lib/atemSocketChild.ts +++ b/src/lib/atemSocketChild.ts @@ -167,29 +167,24 @@ export class AtemSocketChild { void this.onLog(message) } - public sendCommands(commands: Array<{ payload: number[]; rawName: string; trackingId: number }>): void { - commands.forEach((cmd) => { - this.sendCommand(cmd.payload, cmd.rawName, cmd.trackingId) - }) + public sendPackets(packets: Array<{ payloadLength: number; payloadHex: string; trackingId: number }>): void { + for (const packet of packets) { + this.sendPacket(packet.payloadLength, packet.payloadHex, packet.trackingId) + } } - private sendCommand(payload: number[], rawName: string, trackingId: number): void { + private sendPacket(payloadLength: number, payloadHex: string, trackingId: number): void { const packetId = this._nextSendPacketId++ if (this._nextSendPacketId >= MAX_PACKET_ID) this._nextSendPacketId = 0 const opcode = PacketFlag.AckRequest << 11 - const buffer = Buffer.alloc(20 + payload.length, 0) - buffer.writeUInt16BE(opcode | (payload.length + 20), 0) // Opcode & Length + const buffer = Buffer.alloc(12 + payloadLength, 0) + buffer.writeUInt16BE(opcode | (payloadLength + 12), 0) // Opcode & Length buffer.writeUInt16BE(this._sessionId, 2) buffer.writeUInt16BE(packetId, 10) - // Command - buffer.writeUInt16BE(payload.length + 8, 12) - buffer.write(rawName, 16, 4) - - // Body - Buffer.from(payload).copy(buffer, 20) + buffer.write(payloadHex, 12, payloadLength, 'hex') this._sendPacket(buffer) this._inFlight.push({ From 67b80f79d293b92b9ecddd5b76a4bd241b86eb5c Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Thu, 30 Nov 2023 17:36:12 +0000 Subject: [PATCH 02/12] wip: simplify `sendCommands` implementation to return a single promise --- src/atem.ts | 40 +++++++++++++++++++-------------------- src/dataTransfer/index.ts | 8 ++++---- src/lib/atemSocket.ts | 25 ++++++++++++++---------- 3 files changed, 38 insertions(+), 35 deletions(-) diff --git a/src/atem.ts b/src/atem.ts index 11376d7cc..138792df1 100644 --- a/src/atem.ts +++ b/src/atem.ts @@ -77,8 +77,7 @@ export type AtemEvents = { updatedTime: [TimeInfo] } -interface SentCommand { - command: ISerializableCommand +interface SentPackets { resolve: () => void reject: () => void } @@ -96,7 +95,7 @@ export class BasicAtem extends EventEmitter { private readonly socket: AtemSocket protected readonly dataTransferManager: DT.DataTransferManager private _state: AtemState | undefined - private _sentQueue: { [packetId: string]: SentCommand } = {} + private _sentQueue: { [packetId: string]: SentPackets } = {} private _status: AtemConnectionStatus constructor(options?: AtemOptions) { @@ -158,28 +157,27 @@ export class BasicAtem extends EventEmitter { return this.socket.destroy() } - private sendCommands(commands: ISerializableCommand[]): Array> { - const commands2 = commands.map((cmd) => ({ - rawCommand: cmd, - trackingId: this.socket.nextCommandTrackingId, - })) + public async sendCommands(commands: ISerializableCommand[]): Promise { + const trackingIds = await this.socket.sendCommands(commands) - const sendPromise = this.socket.sendCommands(commands2) + const promises: Promise[] = [] - return commands2.map(async (cmd) => { - await sendPromise - return new Promise((resolve, reject) => { - this._sentQueue[cmd.trackingId] = { - command: cmd.rawCommand, - resolve, - reject, - } - }) - }) + for (const trackingId of trackingIds) { + promises.push( + new Promise((resolve, reject) => { + this._sentQueue[trackingId] = { + resolve, + reject, + } + }) + ) + } + + await Promise.allSettled(promises) } public async sendCommand(command: ISerializableCommand): Promise { - return this.sendCommands([command])[0] + return await this.sendCommands([command]) } private _mutateState(commands: IDeserializedCommand[]): void { @@ -269,7 +267,7 @@ export class BasicAtem extends EventEmitter { const sentQueue = this._sentQueue this._sentQueue = {} - Object.values(sentQueue).forEach((sent) => sent.reject()) + Object.values(sentQueue).forEach((sent) => sent.reject()) } } diff --git a/src/dataTransfer/index.ts b/src/dataTransfer/index.ts index a62716b59..91cbe2585 100644 --- a/src/dataTransfer/index.ts +++ b/src/dataTransfer/index.ts @@ -30,7 +30,7 @@ export class DataTransferManager { } readonly #sendLockCommand = (/*lock: DataTransferLockingQueue,*/ cmd: ISerializableCommand): void => { - Promise.all(this.#rawSendCommands([cmd])).catch((e) => { + this.#rawSendCommands([cmd]).catch((e) => { debug(`Failed to send lock command: ${e}`) console.log('Failed to send lock command') }) @@ -41,12 +41,12 @@ export class DataTransferManager { readonly #labelsLock = new DataTransferSimpleQueue(this.#nextTransferId) readonly #macroLock = new DataTransferSimpleQueue(this.#nextTransferId) - readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Array> + readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Promise private interval?: NodeJS.Timer private exitUnsubscribe?: () => void - constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Array>) { + constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Promise) { this.#rawSendCommands = rawSendCommands } @@ -75,7 +75,7 @@ export class DataTransferManager { const commandsToSend = lock.popQueuedCommands(MAX_PACKETS_TO_SEND_PER_TICK) // Take some, it is unlikely that multiple will run at once if (commandsToSend && commandsToSend.length > 0) { // debug(`Sending ${commandsToSend.length} commands `) - Promise.all(this.#rawSendCommands(commandsToSend)).catch((e) => { + this.#rawSendCommands(commandsToSend).catch((e) => { // Failed to send/queue something, so abort it lock.tryAbortTransfer(new Error(`Command send failed: ${e}`)) }) diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index d1977fe3f..c1777f339 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -100,20 +100,20 @@ export class AtemSocket extends EventEmitter { return ++this._nextCommandTrackingId } - public async sendCommands( - commands: Array<{ rawCommand: ISerializableCommand; trackingId: number }> - ): Promise { + public async sendCommands(commands: Array): Promise { if (this._socketProcess) { + const trackingIds: number[] = [] + + // TODO - batch into less packets const wrappedPackets = commands.map((cmd) => { - if (typeof cmd.rawCommand.serialize !== 'function') { - throw new Error(`Command ${cmd.rawCommand.constructor.name} is not serializable`) + if (typeof cmd.serialize !== 'function') { + throw new Error(`Command ${cmd.constructor.name} is not serializable`) } - const payload = cmd.rawCommand.serialize(this._commandParser.version) - if (this._debugBuffers) - this.emit('debug', `PAYLOAD ${cmd.rawCommand.constructor.name} ${payload.toString('hex')}`) + const payload = cmd.serialize(this._commandParser.version) + if (this._debugBuffers) this.emit('debug', `PAYLOAD ${cmd.constructor.name} ${payload.toString('hex')}`) - const rawName: string = (cmd.rawCommand.constructor as any).rawName + const rawName: string = (cmd.constructor as any).rawName const buffer = Buffer.alloc(payload.length + 8) // Command @@ -123,14 +123,19 @@ export class AtemSocket extends EventEmitter { // Body payload.copy(buffer, 8) + const trackingId = this.nextCommandTrackingId + trackingIds.push(trackingId) + return { payloadLength: buffer.length, payloadHex: buffer.toString('hex'), - trackingId: cmd.trackingId, + trackingId, } }) await this._socketProcess.sendPackets(wrappedPackets) + + return trackingIds } else { throw new Error('Socket process is not open') } From 8b34c1d7559ab64ef4df3d14ba55106051953b38 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Thu, 30 Nov 2023 17:56:35 +0000 Subject: [PATCH 03/12] wip: attempt to combine commands into packets --- src/atem.ts | 3 +- src/lib/atemSocket.ts | 88 ++++++++++++++++++++++++++------------ src/lib/atemSocketChild.ts | 8 +++- 3 files changed, 68 insertions(+), 31 deletions(-) diff --git a/src/atem.ts b/src/atem.ts index 138792df1..58e21dd8c 100644 --- a/src/atem.ts +++ b/src/atem.ts @@ -101,8 +101,6 @@ export class BasicAtem extends EventEmitter { constructor(options?: AtemOptions) { super() - // const packetMtu = options?.packetMtu ?? DEFAULT_MTU - this._state = AtemStateUtil.Create() this._status = AtemConnectionStatus.CLOSED this.socket = new AtemSocket({ @@ -111,6 +109,7 @@ export class BasicAtem extends EventEmitter { port: options?.port || DEFAULT_PORT, disableMultithreaded: options?.disableMultithreaded ?? false, childProcessTimeout: options?.childProcessTimeout || 600, + packetMtu: options?.packetMtu ?? DEFAULT_MTU, }) this.dataTransferManager = new DT.DataTransferManager(this.sendCommands.bind(this)) diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index c1777f339..243b0e834 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -4,7 +4,7 @@ import exitHook = require('exit-hook') import { VersionCommand, ISerializableCommand, IDeserializedCommand } from '../commands' import { DEFAULT_PORT } from '../atem' import { threadedClass, ThreadedClass, ThreadedClassManager } from 'threadedclass' -import type { AtemSocketChild } from './atemSocketChild' +import type { AtemSocketChild, OutboundPacketInfo } from './atemSocketChild' export interface AtemSocketOptions { address: string @@ -12,6 +12,7 @@ export interface AtemSocketOptions { debugBuffers: boolean disableMultithreaded: boolean childProcessTimeout: number + packetMtu: number } export type AtemSocketEvents = { @@ -27,6 +28,7 @@ export class AtemSocket extends EventEmitter { private readonly _debugBuffers: boolean private readonly _disableMultithreaded: boolean private readonly _childProcessTimeout: number + private readonly _packetMtu: number private readonly _commandParser: CommandParser = new CommandParser() private _nextCommandTrackingId = 0 @@ -44,6 +46,7 @@ export class AtemSocket extends EventEmitter { this._debugBuffers = options.debugBuffers this._disableMultithreaded = options.disableMultithreaded this._childProcessTimeout = options.childProcessTimeout + this._packetMtu = options.packetMtu } public async connect(address?: string, port?: number): Promise { @@ -101,44 +104,73 @@ export class AtemSocket extends EventEmitter { } public async sendCommands(commands: Array): Promise { - if (this._socketProcess) { - const trackingIds: number[] = [] + if (!this._socketProcess) throw new Error('Socket process is not open') - // TODO - batch into less packets - const wrappedPackets = commands.map((cmd) => { - if (typeof cmd.serialize !== 'function') { - throw new Error(`Command ${cmd.constructor.name} is not serializable`) - } + const trackingIds: number[] = [] - const payload = cmd.serialize(this._commandParser.version) - if (this._debugBuffers) this.emit('debug', `PAYLOAD ${cmd.constructor.name} ${payload.toString('hex')}`) + const packets: Array = [] - const rawName: string = (cmd.constructor as any).rawName + const maxPacketSize = this._packetMtu - 28 - 12 // MTU minus UDP header and ATEM header - const buffer = Buffer.alloc(payload.length + 8) - // Command - buffer.writeUInt16BE(payload.length + 8, 0) - buffer.write(rawName, 4, 4) + let currentPacketBuffer = Buffer.alloc(maxPacketSize) + let currentPacketFilled = 0 - // Body - payload.copy(buffer, 8) + const startNewBuffer = (skipCreate?: boolean) => { + if (currentPacketFilled === 0) return - const trackingId = this.nextCommandTrackingId - trackingIds.push(trackingId) + const trackingId = this.nextCommandTrackingId + trackingIds.push(trackingId) - return { - payloadLength: buffer.length, - payloadHex: buffer.toString('hex'), - trackingId, - } + packets.push({ + payloadLength: currentPacketFilled, + payloadHex: currentPacketBuffer.subarray(0, currentPacketFilled).toString('hex'), + trackingId, }) - await this._socketProcess.sendPackets(wrappedPackets) + if (!skipCreate) { + currentPacketBuffer = Buffer.alloc(maxPacketSize) + currentPacketFilled = 0 + } + } + + for (const cmd of commands) { + if (typeof cmd.serialize !== 'function') { + throw new Error(`Command ${cmd.constructor.name} is not serializable`) + } + + const payload = cmd.serialize(this._commandParser.version) + if (this._debugBuffers) this.emit('debug', `PAYLOAD ${cmd.constructor.name} ${payload.toString('hex')}`) - return trackingIds - } else { - throw new Error('Socket process is not open') + const rawName: string = (cmd.constructor as any).rawName + + const totalLength = payload.length + 8 + if (totalLength >= maxPacketSize) { + throw new Error(`Comamnd ${cmd.constructor.name} is too large for a single packet`) + } + + // Ensure the packet will fit into the current buffer + if (totalLength + currentPacketFilled > maxPacketSize) { + startNewBuffer() + } + + // Command name + currentPacketBuffer.writeUInt16BE(payload.length + 8, currentPacketFilled + 0) + currentPacketBuffer.write(rawName, currentPacketFilled + 4, 4) + + // Body + payload.copy(currentPacketBuffer, currentPacketFilled + 8) + + currentPacketFilled += totalLength } + + // Push the buffer to the queue + startNewBuffer(true) + + if (packets.length > 0) { + await this._socketProcess.sendPackets(packets) + } + + return trackingIds } private async _createSocketProcess(): Promise { diff --git a/src/lib/atemSocketChild.ts b/src/lib/atemSocketChild.ts index 115ebd57b..da20b3f85 100644 --- a/src/lib/atemSocketChild.ts +++ b/src/lib/atemSocketChild.ts @@ -42,6 +42,12 @@ interface InFlightPacket { resent: number } +export interface OutboundPacketInfo { + payloadLength: number + payloadHex: string + trackingId: number +} + export class AtemSocketChild { private readonly _debugBuffers: boolean @@ -167,7 +173,7 @@ export class AtemSocketChild { void this.onLog(message) } - public sendPackets(packets: Array<{ payloadLength: number; payloadHex: string; trackingId: number }>): void { + public sendPackets(packets: OutboundPacketInfo[]): void { for (const packet of packets) { this.sendPacket(packet.payloadLength, packet.payloadHex, packet.trackingId) } From 37f579db7df7aaf80d2458702a5a4d703094edd1 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 13:29:35 +0000 Subject: [PATCH 04/12] fix: tests --- src/__tests__/atem.spec.ts | 24 ++--- src/dataTransfer/__tests__/index.spec.ts | 12 +-- src/lib/__tests__/atemSocket.spec.ts | 51 +++++----- src/lib/__tests__/socket-child.spec.ts | 120 +++++++++++++---------- 4 files changed, 107 insertions(+), 100 deletions(-) diff --git a/src/__tests__/atem.spec.ts b/src/__tests__/atem.spec.ts index 6efec941a..f66b8097e 100644 --- a/src/__tests__/atem.spec.ts +++ b/src/__tests__/atem.spec.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/unbound-method */ -import { Atem, DEFAULT_PORT } from '../atem' +import { Atem, DEFAULT_MTU, DEFAULT_PORT } from '../atem' import { CutCommand } from '../commands' import { promisify } from 'util' import { EventEmitter } from 'events' @@ -35,13 +35,14 @@ describe('Atem', () => { disableMultithreaded: true, log: (conn as any)._log, port: DEFAULT_PORT, + packetMtu: DEFAULT_MTU, }) } finally { await conn.destroy() } }) test('constructor test 2', async () => { - const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23 }) + const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23, packetMtu: 500 }) try { const socket = (conn as any).socket as AtemSocket @@ -55,6 +56,7 @@ describe('Atem', () => { disableMultithreaded: false, log: (conn as any)._log, port: 23, + packetMtu: 500, }) } finally { await conn.destroy() @@ -114,23 +116,19 @@ describe('Atem', () => { }) expect(socket.nextCommandTrackingId).toEqual(123) - socket.sendCommands = jest.fn(() => Promise.resolve(35) as any) + socket.sendCommands = jest.fn(() => Promise.resolve([124]) as any) const sentQueue = (conn as any)._sentQueue as Record expect(Object.keys(sentQueue)).toHaveLength(0) const cmd = new CutCommand(0) const res = conn.sendCommand(cmd) + res.catch(() => null) // Dismiss UnhandledPromiseRejection await setImmediatePromise() expect(Object.keys(sentQueue)).toHaveLength(1) expect(socket.sendCommands).toHaveBeenCalledTimes(1) - expect(socket.sendCommands).toHaveBeenCalledWith([ - { - rawCommand: cmd, - trackingId: 124, - }, - ]) + expect(socket.sendCommands).toHaveBeenCalledWith([cmd]) // Trigger the ack, and it should switfy resolve socket.emit('commandsAck', [124]) @@ -165,15 +163,11 @@ describe('Atem', () => { const cmd = new CutCommand(0) const res = conn.sendCommand(cmd) + res.catch(() => null) // Dismiss UnhandledPromiseRejection // Send command should be called expect(socket.sendCommands).toHaveBeenCalledTimes(1) - expect(socket.sendCommands).toHaveBeenCalledWith([ - { - rawCommand: cmd, - trackingId: 124, - }, - ]) + expect(socket.sendCommands).toHaveBeenCalledWith([cmd]) expect(Object.keys(sentQueue)).toHaveLength(0) diff --git a/src/dataTransfer/__tests__/index.spec.ts b/src/dataTransfer/__tests__/index.spec.ts index bd8db2a8a..3c61f94a3 100644 --- a/src/dataTransfer/__tests__/index.spec.ts +++ b/src/dataTransfer/__tests__/index.spec.ts @@ -36,10 +36,10 @@ function mangleCommand(cmd: any, dir: string): any { } function runDataTransferTest(spec: any): DataTransferManager { - const manager = new DataTransferManager((cmds) => - cmds.map(async (cmd) => { + const manager = new DataTransferManager(async (cmds) => { + for (const rawCmd of cmds) { const expectedCmd = spec.shift() - const gotCmd = mangleCommand(cmd, 'send') + const gotCmd = mangleCommand(rawCmd, 'send') expect(gotCmd).toEqual(expectedCmd) while (spec.length > 0) { @@ -49,10 +49,8 @@ function runDataTransferTest(spec: any): DataTransferManager { if (!nextCmd2) throw new Error(`Failed specToCommandClass ${nextCmd.name}`) manager.queueHandleCommand(nextCmd2) } - - return Promise.resolve() - }) - ) + } + }) manager.startCommandSending(true) return manager } diff --git a/src/lib/__tests__/atemSocket.spec.ts b/src/lib/__tests__/atemSocket.spec.ts index 406f66262..25b16cd83 100644 --- a/src/lib/__tests__/atemSocket.spec.ts +++ b/src/lib/__tests__/atemSocket.spec.ts @@ -40,7 +40,7 @@ class AtemSocketChildMock implements AtemSocketChild { public connect = jest.fn(async () => Promise.resolve()) public disconnect = jest.fn(async () => Promise.resolve()) - public sendCommands = jest.fn(async () => Promise.resolve()) + public sendPackets = jest.fn(async () => Promise.resolve()) } const AtemSocketChildSingleton = new AtemSocketChildMock() @@ -83,7 +83,7 @@ describe('AtemSocket', () => { ;(AtemSocketChild as any).mockClear() AtemSocketChildSingleton.connect.mockClear() AtemSocketChildSingleton.disconnect.mockClear() - AtemSocketChildSingleton.sendCommands.mockClear() + AtemSocketChildSingleton.sendPackets.mockClear() if (!lite) { AtemSocketChildSingleton.onLog = async (): Promise => Promise.resolve() @@ -108,6 +108,7 @@ describe('AtemSocket', () => { port: 890, disableMultithreaded: true, childProcessTimeout: 100, + packetMtu: 1500, }) } @@ -128,7 +129,7 @@ describe('AtemSocket', () => { // Connect was not called explicitly expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(1) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) // New child was constructed expect(AtemSocketChild).toHaveBeenCalledTimes(1) @@ -153,7 +154,7 @@ describe('AtemSocket', () => { // Connect was not called explicitly expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(1) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) // New child was constructed expect(AtemSocketChild).toHaveBeenCalledTimes(1) @@ -180,7 +181,7 @@ describe('AtemSocket', () => { expect(AtemSocketChild).toHaveBeenCalledTimes(1) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(1) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) mockClear() @@ -193,7 +194,7 @@ describe('AtemSocket', () => { expect(AtemSocketChild).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(1) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledWith('new', 455) }) @@ -220,7 +221,7 @@ describe('AtemSocket', () => { expect(AtemSocketChild).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(1) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledWith() }) @@ -234,7 +235,7 @@ describe('AtemSocket', () => { expect(AtemSocketChild).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) }) test('sendCommand - not open', async () => { @@ -242,15 +243,13 @@ describe('AtemSocket', () => { expect(getChild(socket)).toBeFalsy() const cmd = new CutCommand(0) - await expect(socket.sendCommands([{ rawCommand: cmd, trackingId: 1 }])).rejects.toEqual( - new Error('Socket process is not open') - ) + await expect(socket.sendCommands([cmd])).rejects.toEqual(new Error('Socket process is not open')) // connect was called explicitly expect(AtemSocketChild).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) }) test('sendCommand - not serializable', async () => { @@ -266,7 +265,7 @@ describe('AtemSocket', () => { productIdentifier: 'ATEM OneME', }) as any as ISerializableCommand expect(cmd.serialize).toBeFalsy() - await expect(socket.sendCommands([{ rawCommand: cmd, trackingId: 1 }])).rejects.toEqual( + await expect(socket.sendCommands([cmd])).rejects.toEqual( new Error('Command ProductIdentifierCommand is not serializable') ) @@ -274,7 +273,7 @@ describe('AtemSocket', () => { expect(AtemSocketChild).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(0) + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(0) }) test('sendCommand', async () => { @@ -294,21 +293,25 @@ describe('AtemSocket', () => { } const cmd = new MockCommand({}) - const cmdId = 836 - await socket.sendCommands([{ rawCommand: cmd, trackingId: cmdId }]) + ;(socket as any)._nextCommandTrackingId = 835 + await socket.sendCommands([cmd]) // connect was called explicitly expect(AtemSocketChild).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.connect).toHaveBeenCalledTimes(0) expect(AtemSocketChildSingleton.disconnect).toHaveBeenCalledTimes(0) - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledTimes(1) - - const expectedBuffer = [...cmd.serialize()] - expect(AtemSocketChildSingleton.sendCommands).toHaveBeenCalledWith([ + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledTimes(1) + + const expectedBuffer = + Buffer.from([0, 20]).toString('hex') + + '0000' + + Buffer.from('TEST').toString('hex') + + cmd.serialize().toString('hex') + expect(AtemSocketChildSingleton.sendPackets).toHaveBeenCalledWith([ { - payload: expectedBuffer, - rawName: 'TEST', - trackingId: cmdId, + payloadLength: 20, + payloadHex: expectedBuffer, + trackingId: 836, }, ]) }) @@ -402,7 +405,7 @@ describe('AtemSocket', () => { expect(parser.version).toEqual(0x01020304) // Parsed // A change with the command - const expectedCmd = new VersionCommand(0x01020304) + const expectedCmd = new VersionCommand(0x01020304 as ProtocolVersion) expect(change).toHaveBeenCalledWith([expectedCmd]) }) test('receive - multiple commands', async () => { diff --git a/src/lib/__tests__/socket-child.spec.ts b/src/lib/__tests__/socket-child.spec.ts index 5d7881bdd..ce16dba16 100644 --- a/src/lib/__tests__/socket-child.spec.ts +++ b/src/lib/__tests__/socket-child.spec.ts @@ -396,14 +396,20 @@ describe('SocketChild', () => { } // Send something - const buf1 = [0, 1, 2] + const buf1 = Buffer.from([0, 1, 2]) const cmdName = 'test' const buf1Expected = Buffer.alloc(11) buf1Expected.writeUInt16BE(buf1Expected.length, 0) buf1Expected.write(cmdName, 4, 4) Buffer.from(buf1).copy(buf1Expected, 8) - child.sendCommands([{ payload: buf1, rawName: cmdName, trackingId: 1 }]) + child.sendPackets([ + { + payloadLength: buf1Expected.length, + payloadHex: buf1Expected.toString('hex'), + trackingId: 1, + }, + ]) expect(received).toEqual([ { id: 123, @@ -414,7 +420,13 @@ describe('SocketChild', () => { expect(getInflightIds(child)).toEqual([123]) // Send another - child.sendCommands([{ payload: buf1, rawName: cmdName, trackingId: 1 }]) + child.sendPackets([ + { + payloadLength: buf1Expected.length, + payloadHex: buf1Expected.toString('hex'), + trackingId: 1, + }, + ]) expect(received).toEqual([ { id: 124, @@ -472,16 +484,16 @@ describe('SocketChild', () => { acked = [] // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, - { payload: buf1, rawName: '', trackingId: 6 }, - { payload: buf1, rawName: '', trackingId: 7 }, + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 7 }, ]) - child.sendCommands([{ payload: buf1, rawName: '', trackingId: 8 }]) - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 9 }, - { payload: buf1, rawName: '', trackingId: 10 }, + child.sendPackets([{ payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 8 }]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 9 }, + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 10 }, ]) expect(received).toEqual([123, 124, 125, 126, 127, 128]) received = [] @@ -531,18 +543,18 @@ describe('SocketChild', () => { acked = [] // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, // 32764 - { payload: buf1, rawName: '', trackingId: 6 }, // 32765 - { payload: buf1, rawName: '', trackingId: 7 }, // 32766 + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, // 32764 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, // 32765 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 7 }, // 32766 ]) - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 8 }, // 32767 + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 8 }, // 32767 ]) - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 9 }, // 0 - { payload: buf1, rawName: '', trackingId: 10 }, // 1 + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 9 }, // 0 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 10 }, // 1 ]) expect(received).toEqual([32764, 32765, 32766, 32767, 0, 1]) received = [] @@ -595,14 +607,14 @@ describe('SocketChild', () => { acked = [] // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, // 32764 - { payload: buf1, rawName: '', trackingId: 6 }, // 32765 - { payload: buf1, rawName: '', trackingId: 7 }, // 32766 - { payload: buf1, rawName: '', trackingId: 8 }, // 32767 - { payload: buf1, rawName: '', trackingId: 9 }, // 0 - { payload: buf1, rawName: '', trackingId: 10 }, // 1 + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, // 32764 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, // 32765 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 7 }, // 32766 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 8 }, // 32767 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 9 }, // 0 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 10 }, // 1 ]) expect(received).toEqual([32764, 32765, 32766, 32767, 0, 1]) received = [] @@ -631,8 +643,8 @@ describe('SocketChild', () => { received = [] // Add another to the queue - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 11 }, // 2 + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 11 }, // 2 ]) expect(received).toEqual([2]) received = [] @@ -692,14 +704,14 @@ describe('SocketChild', () => { acked = [] // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, // 32764 - { payload: buf1, rawName: '', trackingId: 6 }, // 32765 - { payload: buf1, rawName: '', trackingId: 7 }, // 32766 - { payload: buf1, rawName: '', trackingId: 8 }, // 32767 - { payload: buf1, rawName: '', trackingId: 9 }, // 0 - { payload: buf1, rawName: '', trackingId: 10 }, // 1 + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, // 32764 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, // 32765 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 7 }, // 32766 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 8 }, // 32767 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 9 }, // 0 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 10 }, // 1 ]) expect(received).toEqual([32764, 32765, 32766, 32767, 0, 1]) received = [] @@ -753,10 +765,10 @@ describe('SocketChild', () => { connected = true // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, // 32767 - { payload: buf1, rawName: '', trackingId: 6 }, // 0 + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, // 32767 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, // 0 ]) expect(getInflightIds(child)).toEqual([32767, 0]) expect(acked).toEqual([]) @@ -793,10 +805,10 @@ describe('SocketChild', () => { connected = true // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, // 32767 - { payload: buf1, rawName: '', trackingId: 6 }, // 0 + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, // 32767 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, // 0 ]) expect(getInflightIds(child)).toEqual([32767, 0]) expect(acked).toEqual([]) @@ -835,10 +847,10 @@ describe('SocketChild', () => { acked = [] // Send some stuff - const buf1 = [0, 1, 2] - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 5 }, // 32767 - { payload: buf1, rawName: '', trackingId: 6 }, // 0 + const buf1 = Buffer.from([0, 1, 2]) + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 5 }, // 32767 + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 6 }, // 0 ]) expect(getInflightIds(child)).toEqual([32767, 0]) expect(acked).toEqual([]) @@ -868,8 +880,8 @@ describe('SocketChild', () => { // Not quite await clock.tickAsync(1990) expect(connected).toBeTrue() - child.sendCommands([ - { payload: buf1, rawName: '', trackingId: 7 }, // 1 + child.sendPackets([ + { payloadLength: buf1.length, payloadHex: buf1.toString('hex'), trackingId: 7 }, // 1 ]) expect(getInflightIds(child)).toEqual([1]) expect(acked).toEqual([]) From 159f6e5b4e1975311e58c89bc941f44a27af3c0e Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 13:31:10 +0000 Subject: [PATCH 05/12] chore: renaming --- src/__tests__/atem.spec.ts | 2 +- src/__tests__/connection.spec.ts | 10 ++++----- src/atem.ts | 4 ++-- src/lib/__tests__/atemSocket.spec.ts | 28 +++++++++++++------------- src/lib/__tests__/socket-child.spec.ts | 4 ++-- src/lib/atemSocket.ts | 10 ++++----- src/lib/atemSocketChild.ts | 8 ++++---- 7 files changed, 33 insertions(+), 33 deletions(-) diff --git a/src/__tests__/atem.spec.ts b/src/__tests__/atem.spec.ts index f66b8097e..c0dac1a76 100644 --- a/src/__tests__/atem.spec.ts +++ b/src/__tests__/atem.spec.ts @@ -131,7 +131,7 @@ describe('Atem', () => { expect(socket.sendCommands).toHaveBeenCalledWith([cmd]) // Trigger the ack, and it should switfy resolve - socket.emit('commandsAck', [124]) + socket.emit('ackPackets', [124]) expect(Object.keys(sentQueue)).toHaveLength(0) // Finally, it should now resolve without a timeout diff --git a/src/__tests__/connection.spec.ts b/src/__tests__/connection.spec.ts index b49ff9b5a..fdfcc218e 100644 --- a/src/__tests__/connection.spec.ts +++ b/src/__tests__/connection.spec.ts @@ -18,18 +18,18 @@ class AtemSocketChildMock implements AtemSocketChild { public onDisconnect: () => Promise public onLog: (message: string) => Promise public onCommandsReceived: (payload: Buffer, packetId: number) => Promise - public onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise + public onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise constructor( onDisconnect: () => Promise, onLog: (message: string) => Promise, onCommandsReceived: (payload: Buffer, packetId: number) => Promise, - onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise + onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise ) { this.onDisconnect = onDisconnect this.onLog = onLog this.onCommandsReceived = onCommandsReceived - this.onCommandsAcknowledged = onCommandsAcknowledged + this.onPacketsAcknowledged = onPacketsAcknowledged } public connect = jest.fn(async () => Promise.resolve()) @@ -43,9 +43,9 @@ class AtemSocketChildMock implements AtemSocketChild { onDisconnect: () => Promise, onLog: (message: string) => Promise, onCommandsReceived: (payload: Buffer, packetId: number) => Promise, - onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise + onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise ) => { - return new AtemSocketChildMock(onDisconnect, onLog, onCommandsReceived, onCommandsAcknowledged) + return new AtemSocketChildMock(onDisconnect, onLog, onCommandsReceived, onPacketsAcknowledged) } ) diff --git a/src/atem.ts b/src/atem.ts index 58e21dd8c..4bd5f0d9d 100644 --- a/src/atem.ts +++ b/src/atem.ts @@ -113,11 +113,11 @@ export class BasicAtem extends EventEmitter { }) this.dataTransferManager = new DT.DataTransferManager(this.sendCommands.bind(this)) - this.socket.on('commandsReceived', (commands) => { + this.socket.on('receivedCommands', (commands) => { this.emit('receivedCommands', commands) this._mutateState(commands) }) - this.socket.on('commandsAck', (trackingIds) => this._resolveCommands(trackingIds)) + this.socket.on('ackPackets', (trackingIds) => this._resolveCommands(trackingIds)) this.socket.on('info', (msg) => this.emit('info', msg)) this.socket.on('debug', (msg) => this.emit('debug', msg)) this.socket.on('error', (e) => this.emit('error', e)) diff --git a/src/lib/__tests__/atemSocket.spec.ts b/src/lib/__tests__/atemSocket.spec.ts index 25b16cd83..cb32cbf01 100644 --- a/src/lib/__tests__/atemSocket.spec.ts +++ b/src/lib/__tests__/atemSocket.spec.ts @@ -25,7 +25,7 @@ class AtemSocketChildMock implements AtemSocketChild { public onDisconnect: () => Promise public onLog: (message: string) => Promise public onCommandsReceived: (payload: Buffer, packetId: number) => Promise - public onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise + public onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise constructor() { // this._debug = options.debug @@ -35,7 +35,7 @@ class AtemSocketChildMock implements AtemSocketChild { this.onDisconnect = async (): Promise => Promise.resolve() this.onLog = async (msg): Promise => console.log(msg) this.onCommandsReceived = async (): Promise => Promise.resolve() - this.onCommandsAcknowledged = async (): Promise => Promise.resolve() + this.onPacketsAcknowledged = async (): Promise => Promise.resolve() } public connect = jest.fn(async () => Promise.resolve()) @@ -50,12 +50,12 @@ const AtemSocketChildSingleton = new AtemSocketChildMock() onDisconnect: () => Promise, onLog: (message: string) => Promise, onCommandsReceived: (payload: Buffer, packetId: number) => Promise, - onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise + onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise ) => { AtemSocketChildSingleton.onDisconnect = onDisconnect AtemSocketChildSingleton.onLog = onLog AtemSocketChildSingleton.onCommandsReceived = onCommandsReceived - AtemSocketChildSingleton.onCommandsAcknowledged = onCommandsAcknowledged + AtemSocketChildSingleton.onPacketsAcknowledged = onPacketsAcknowledged return AtemSocketChildSingleton } ) @@ -88,7 +88,7 @@ describe('AtemSocket', () => { if (!lite) { AtemSocketChildSingleton.onLog = async (): Promise => Promise.resolve() AtemSocketChildSingleton.onDisconnect = async (): Promise => Promise.resolve() - AtemSocketChildSingleton.onCommandsAcknowledged = async (): Promise => Promise.resolve() + AtemSocketChildSingleton.onPacketsAcknowledged = async (): Promise => Promise.resolve() AtemSocketChildSingleton.onCommandsReceived = async (): Promise => Promise.resolve() } } @@ -328,15 +328,15 @@ describe('AtemSocket', () => { const ack = jest.fn() socket.on('disconnect', disconnect) - socket.on('commandsAck', ack) + socket.on('ackPackets', ack) expect(AtemSocketChildSingleton.onDisconnect).toBeDefined() await AtemSocketChildSingleton.onDisconnect() await clock.tickAsync(0) expect(disconnect).toHaveBeenCalledTimes(1) - expect(AtemSocketChildSingleton.onCommandsAcknowledged).toBeDefined() - await AtemSocketChildSingleton.onCommandsAcknowledged([{ packetId: 675, trackingId: 98 }]) + expect(AtemSocketChildSingleton.onPacketsAcknowledged).toBeDefined() + await AtemSocketChildSingleton.onPacketsAcknowledged([{ packetId: 675, trackingId: 98 }]) await clock.tickAsync(0) expect(ack).toHaveBeenCalledTimes(1) expect(ack).toHaveBeenCalledWith([98]) @@ -354,7 +354,7 @@ describe('AtemSocket', () => { const change = jest.fn() socket.on('error', error) - socket.on('commandsReceived', change) + socket.on('receivedCommands', change) const parser = (socket as any)._commandParser as CommandParser expect(parser).toBeTruthy() @@ -383,7 +383,7 @@ describe('AtemSocket', () => { const change = jest.fn() socket.on('error', error) - socket.on('commandsReceived', change) + socket.on('receivedCommands', change) const parser = (socket as any)._commandParser as CommandParser expect(parser).toBeTruthy() @@ -420,7 +420,7 @@ describe('AtemSocket', () => { const change = jest.fn() socket.on('error', error) - socket.on('commandsReceived', change) + socket.on('receivedCommands', change) const parser = (socket as any)._commandParser as CommandParser expect(parser).toBeTruthy() @@ -479,7 +479,7 @@ describe('AtemSocket', () => { const change = jest.fn() socket.on('error', error) - socket.on('commandsReceived', change) + socket.on('receivedCommands', change) const testBuffer = Buffer.alloc(0) const pktId = 822 @@ -502,7 +502,7 @@ describe('AtemSocket', () => { const change = jest.fn() socket.on('error', error) - socket.on('commandsReceived', change) + socket.on('receivedCommands', change) const testBuffer = Buffer.alloc(10, 0) const pktId = 822 @@ -525,7 +525,7 @@ describe('AtemSocket', () => { const change = jest.fn() socket.on('error', error) - socket.on('commandsReceived', change) + socket.on('receivedCommands', change) class BrokenCommand extends DeserializedCommand<{}> { public static readonly rawName = 'TEST' diff --git a/src/lib/__tests__/socket-child.spec.ts b/src/lib/__tests__/socket-child.spec.ts index ce16dba16..6cf1e4dba 100644 --- a/src/lib/__tests__/socket-child.spec.ts +++ b/src/lib/__tests__/socket-child.spec.ts @@ -32,7 +32,7 @@ function fakeConnect(child: AtemSocketChild): void { function createSocketChild( onCommandsReceived?: (payload: Buffer, packetId: number) => Promise, - onCommandsAcknowledged?: (ids: Array<{ packetId: number; trackingId: number }>) => Promise, + onPacketsAcknowledged?: (ids: Array<{ packetId: number; trackingId: number }>) => Promise, onDisconnect?: () => Promise ): AtemSocketChild { return new AtemSocketChild( @@ -45,7 +45,7 @@ function createSocketChild( // async msg => { console.log(msg) }, async (): Promise => Promise.resolve(), onCommandsReceived || (async (): Promise => Promise.resolve()), - onCommandsAcknowledged || (async (): Promise => Promise.resolve()) + onPacketsAcknowledged || (async (): Promise => Promise.resolve()) ) } diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index 243b0e834..460d7c1f7 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -20,8 +20,8 @@ export type AtemSocketEvents = { info: [string] debug: [string] error: [string] - commandsReceived: [IDeserializedCommand[]] - commandsAck: [number[]] + receivedCommands: [IDeserializedCommand[]] + ackPackets: [number[]] } export class AtemSocket extends EventEmitter { @@ -194,10 +194,10 @@ export class AtemSocket extends EventEmitter { }, // onCommandsReceived async (ids: Array<{ packetId: number; trackingId: number }>): Promise => { this.emit( - 'commandsAck', + 'ackPackets', ids.map((id) => id.trackingId) ) - }, // onCommandsAcknowledged + }, // onPacketsAcknowledged ], { instanceName: 'atem-connection', @@ -260,7 +260,7 @@ export class AtemSocket extends EventEmitter { } if (parsedCommands.length > 0) { - this.emit('commandsReceived', parsedCommands) + this.emit('receivedCommands', parsedCommands) } return parsedCommands } diff --git a/src/lib/atemSocketChild.ts b/src/lib/atemSocketChild.ts index da20b3f85..215eb547a 100644 --- a/src/lib/atemSocketChild.ts +++ b/src/lib/atemSocketChild.ts @@ -72,7 +72,7 @@ export class AtemSocketChild { private readonly onDisconnect: () => Promise private readonly onLog: (message: string) => Promise private readonly onCommandsReceived: (payload: Buffer, packetId: number) => Promise - private readonly onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise + private readonly onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise constructor( options: { address: string; port: number; debugBuffers: boolean }, @@ -88,7 +88,7 @@ export class AtemSocketChild { this.onDisconnect = onDisconnect this.onLog = onLog this.onCommandsReceived = onCommandReceived - this.onCommandsAcknowledged = onCommandAcknowledged + this.onPacketsAcknowledged = onCommandAcknowledged this._socket = this._createSocket() } @@ -308,7 +308,7 @@ export class AtemSocketChild { return true } }) - ps.push(this.onCommandsAcknowledged(ackedCommands)) + ps.push(this.onPacketsAcknowledged(ackedCommands)) // this.log(`${Date.now()} Got ack ${ackPacketId} Remaining=${this._inFlight.length}`) } } @@ -396,7 +396,7 @@ export class AtemSocketChild { // Retransmit the packet and anything after it return this._retransmitFrom(sentPacket.packetId) } else { - // A command has timed out, so we need to reset to avoid getting stuck + // A packet has timed out, so we need to reset to avoid getting stuck this.log(`Packet timed out: ${sentPacket.packetId}`) return this.restartConnection() } From 6a0518fe3502c7b6b2f1efc1ffdfbb4818f46446 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 13:54:01 +0000 Subject: [PATCH 06/12] chore: renaming --- src/__tests__/atem.spec.ts | 8 ++++---- src/lib/__tests__/atemSocket.spec.ts | 10 +++++----- src/lib/atemSocket.ts | 12 ++++++------ 3 files changed, 15 insertions(+), 15 deletions(-) diff --git a/src/__tests__/atem.spec.ts b/src/__tests__/atem.spec.ts index c0dac1a76..13927c2e5 100644 --- a/src/__tests__/atem.spec.ts +++ b/src/__tests__/atem.spec.ts @@ -110,11 +110,11 @@ describe('Atem', () => { expect(socket).toBeTruthy() let nextId = 123 - Object.defineProperty(socket, 'nextCommandTrackingId', { + Object.defineProperty(socket, 'nextPacketTrackingId', { get: jest.fn(() => nextId++), set: jest.fn(), }) - expect(socket.nextCommandTrackingId).toEqual(123) + expect(socket.nextPacketTrackingId).toEqual(123) socket.sendCommands = jest.fn(() => Promise.resolve([124]) as any) @@ -150,11 +150,11 @@ describe('Atem', () => { expect(socket).toBeTruthy() let nextId = 123 - Object.defineProperty(socket, 'nextCommandTrackingId', { + Object.defineProperty(socket, 'nextPacketTrackingId', { get: jest.fn(() => nextId++), set: jest.fn(), }) - expect(socket.nextCommandTrackingId).toEqual(123) + expect(socket.nextPacketTrackingId).toEqual(123) socket.sendCommands = jest.fn(() => Promise.reject(35) as any) diff --git a/src/lib/__tests__/atemSocket.spec.ts b/src/lib/__tests__/atemSocket.spec.ts index cb32cbf01..346805b5f 100644 --- a/src/lib/__tests__/atemSocket.spec.ts +++ b/src/lib/__tests__/atemSocket.spec.ts @@ -198,12 +198,12 @@ describe('AtemSocket', () => { expect(AtemSocketChildSingleton.connect).toHaveBeenCalledWith('new', 455) }) - test('nextCommandTrackingId', () => { + test('nextPacketTrackingId', () => { const socket = createSocket() - expect(socket.nextCommandTrackingId).toEqual(1) - expect(socket.nextCommandTrackingId).toEqual(2) - expect(socket.nextCommandTrackingId).toEqual(3) + expect(socket.nextPacketTrackingId).toEqual(1) + expect(socket.nextPacketTrackingId).toEqual(2) + expect(socket.nextPacketTrackingId).toEqual(3) }) test('disconnect', async () => { @@ -293,7 +293,7 @@ describe('AtemSocket', () => { } const cmd = new MockCommand({}) - ;(socket as any)._nextCommandTrackingId = 835 + ;(socket as any)._nextPacketTrackingId = 835 await socket.sendCommands([cmd]) // connect was called explicitly diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index 460d7c1f7..0e3839052 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -31,7 +31,7 @@ export class AtemSocket extends EventEmitter { private readonly _packetMtu: number private readonly _commandParser: CommandParser = new CommandParser() - private _nextCommandTrackingId = 0 + private _nextPacketTrackingId = 0 private _isDisconnecting = false private _address: string private _port: number = DEFAULT_PORT @@ -96,11 +96,11 @@ export class AtemSocket extends EventEmitter { } } - get nextCommandTrackingId(): number { - if (this._nextCommandTrackingId >= Number.MAX_SAFE_INTEGER) { - this._nextCommandTrackingId = 0 + get nextPacketTrackingId(): number { + if (this._nextPacketTrackingId >= Number.MAX_SAFE_INTEGER) { + this._nextPacketTrackingId = 0 } - return ++this._nextCommandTrackingId + return ++this._nextPacketTrackingId } public async sendCommands(commands: Array): Promise { @@ -118,7 +118,7 @@ export class AtemSocket extends EventEmitter { const startNewBuffer = (skipCreate?: boolean) => { if (currentPacketFilled === 0) return - const trackingId = this.nextCommandTrackingId + const trackingId = this.nextPacketTrackingId trackingIds.push(trackingId) packets.push({ From f83e2b14edbce61b705cfb02ec984c4192fc5520 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 14:05:47 +0000 Subject: [PATCH 07/12] wip: pull out packetbuilder helper --- src/lib/atemSocket.ts | 65 ++++++--------------------------------- src/lib/packetBuilder.ts | 66 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+), 55 deletions(-) create mode 100644 src/lib/packetBuilder.ts diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index 0e3839052..fc6ff419e 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -5,6 +5,7 @@ import { VersionCommand, ISerializableCommand, IDeserializedCommand } from '../c import { DEFAULT_PORT } from '../atem' import { threadedClass, ThreadedClass, ThreadedClassManager } from 'threadedclass' import type { AtemSocketChild, OutboundPacketInfo } from './atemSocketChild' +import { PacketBuilder } from './packetBuilder' export interface AtemSocketOptions { address: string @@ -106,71 +107,25 @@ export class AtemSocket extends EventEmitter { public async sendCommands(commands: Array): Promise { if (!this._socketProcess) throw new Error('Socket process is not open') - const trackingIds: number[] = [] - - const packets: Array = [] - const maxPacketSize = this._packetMtu - 28 - 12 // MTU minus UDP header and ATEM header - - let currentPacketBuffer = Buffer.alloc(maxPacketSize) - let currentPacketFilled = 0 - - const startNewBuffer = (skipCreate?: boolean) => { - if (currentPacketFilled === 0) return - - const trackingId = this.nextPacketTrackingId - trackingIds.push(trackingId) - - packets.push({ - payloadLength: currentPacketFilled, - payloadHex: currentPacketBuffer.subarray(0, currentPacketFilled).toString('hex'), - trackingId, - }) - - if (!skipCreate) { - currentPacketBuffer = Buffer.alloc(maxPacketSize) - currentPacketFilled = 0 - } - } + const packetBuilder = new PacketBuilder(maxPacketSize, this._commandParser.version) for (const cmd of commands) { - if (typeof cmd.serialize !== 'function') { - throw new Error(`Command ${cmd.constructor.name} is not serializable`) - } - - const payload = cmd.serialize(this._commandParser.version) - if (this._debugBuffers) this.emit('debug', `PAYLOAD ${cmd.constructor.name} ${payload.toString('hex')}`) - - const rawName: string = (cmd.constructor as any).rawName - - const totalLength = payload.length + 8 - if (totalLength >= maxPacketSize) { - throw new Error(`Comamnd ${cmd.constructor.name} is too large for a single packet`) - } - - // Ensure the packet will fit into the current buffer - if (totalLength + currentPacketFilled > maxPacketSize) { - startNewBuffer() - } - - // Command name - currentPacketBuffer.writeUInt16BE(payload.length + 8, currentPacketFilled + 0) - currentPacketBuffer.write(rawName, currentPacketFilled + 4, 4) - - // Body - payload.copy(currentPacketBuffer, currentPacketFilled + 8) - - currentPacketFilled += totalLength + packetBuilder.addCommand(cmd) } - // Push the buffer to the queue - startNewBuffer(true) + const packets: OutboundPacketInfo[] = packetBuilder.getPackets().map((buffer) => ({ + payloadLength: buffer.length, + payloadHex: buffer.toString('hex'), + trackingId: this.nextPacketTrackingId, + })) + if (this._debugBuffers) this.emit('debug', `PAYLOAD PACKETS ${JSON.stringify(packets)}`) if (packets.length > 0) { await this._socketProcess.sendPackets(packets) } - return trackingIds + return packets.map((packet) => packet.trackingId) } private async _createSocketProcess(): Promise { diff --git a/src/lib/packetBuilder.ts b/src/lib/packetBuilder.ts new file mode 100644 index 000000000..3a9945ab6 --- /dev/null +++ b/src/lib/packetBuilder.ts @@ -0,0 +1,66 @@ +import type { ProtocolVersion } from '../enums' +import type { ISerializableCommand } from '../commands' + +export class PacketBuilder { + readonly #maxPacketSize: number + readonly #protocolVersion: ProtocolVersion + + readonly #completedBuffers: Buffer[] = [] + + #currentPacketBuffer: Buffer + #currentPacketFilled: number + + constructor(maxPacketSize: number, protocolVersion: ProtocolVersion) { + this.#maxPacketSize = maxPacketSize + this.#protocolVersion = protocolVersion + + this.#currentPacketBuffer = Buffer.alloc(maxPacketSize) + this.#currentPacketFilled = 0 + } + + public addCommand(cmd: ISerializableCommand): void { + if (typeof cmd.serialize !== 'function') { + throw new Error(`Command ${cmd.constructor.name} is not serializable`) + } + + const payload = cmd.serialize(this.#protocolVersion) + + const rawName: string = (cmd.constructor as any).rawName + + const totalLength = payload.length + 8 + if (totalLength >= this.#maxPacketSize) { + throw new Error(`Comamnd ${cmd.constructor.name} is too large for a single packet`) + } + + // Ensure the packet will fit into the current buffer + if (totalLength + this.#currentPacketFilled > this.#maxPacketSize) { + this.#finishBuffer() + } + + // Command name + this.#currentPacketBuffer.writeUInt16BE(payload.length + 8, this.#currentPacketFilled + 0) + this.#currentPacketBuffer.write(rawName, this.#currentPacketFilled + 4, 4) + + // Body + payload.copy(this.#currentPacketBuffer, this.#currentPacketFilled + 8) + + this.#currentPacketFilled += totalLength + } + + public getPackets(): Buffer[] { + this.#finishBuffer(true) + + return this.#completedBuffers + } + + #finishBuffer(skipCreateNext?: boolean) { + if (this.#currentPacketFilled === 0) return + + this.#completedBuffers.push(this.#currentPacketBuffer.subarray(0, this.#currentPacketFilled)) + + if (!skipCreateNext) { + this.#currentPacketBuffer = Buffer.alloc(this.#maxPacketSize) + this.#currentPacketFilled = 0 + } + } +} From 0282ac9575c861ec43db21bf69e386809c8a65db Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 14:22:50 +0000 Subject: [PATCH 08/12] wip: pull out packetbuilder helper --- src/lib/__tests__/packetBuilder.spec.ts | 120 ++++++++++++++++++++++++ src/lib/packetBuilder.ts | 16 ++-- 2 files changed, 129 insertions(+), 7 deletions(-) create mode 100644 src/lib/__tests__/packetBuilder.spec.ts diff --git a/src/lib/__tests__/packetBuilder.spec.ts b/src/lib/__tests__/packetBuilder.spec.ts new file mode 100644 index 000000000..c4a69a760 --- /dev/null +++ b/src/lib/__tests__/packetBuilder.spec.ts @@ -0,0 +1,120 @@ +import type { ISerializableCommand } from '../../commands' +import { ProtocolVersion } from '../../enums' +import { PacketBuilder } from '../packetBuilder' + +class FakeCommand implements ISerializableCommand { + static readonly rawName: string = 'FAKE' + + constructor(public readonly length: number, public readonly value: number = 1) {} + + public get lengthWithHeader(): number { + return this.length + 8 + } + + serialize = jest.fn((_version: ProtocolVersion): Buffer => { + return Buffer.alloc(this.length, this.value) + }) +} + +describe('PacketBuilder', () => { + it('No commands', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + expect(builder.getPackets()).toHaveLength(0) + }) + + it('Single command', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const cmd = new FakeCommand(10) + builder.addCommand(cmd) + + expect(builder.getPackets()).toHaveLength(1) + expect(builder.getPackets()).toHaveLength(1) // Ensure that calling it twice doesnt affect the output + expect(builder.getPackets()[0]).toHaveLength(cmd.lengthWithHeader) + }) + + it('Once finished cant add more commands', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const cmd = new FakeCommand(10) + builder.addCommand(cmd) + + expect(builder.getPackets()).toHaveLength(1) + + expect(() => builder.addCommand(cmd)).toThrow('finished') + }) + + it('Repeated command', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const cmd = new FakeCommand(10) + for (let i = 0; i < 5; i++) { + builder.addCommand(cmd) + } + + expect(builder.getPackets()).toHaveLength(1) + expect(builder.getPackets()[0]).toHaveLength(cmd.lengthWithHeader * 5) + }) + + it('Repeated command spanning multiple packets', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const cmd = new FakeCommand(10) + for (let i = 0; i < 60; i++) { + builder.addCommand(cmd) + } + + expect(cmd.lengthWithHeader).toBe(18) + expect(builder.getPackets()).toHaveLength(3) + + expect(builder.getPackets()[0]).toHaveLength(cmd.lengthWithHeader * 27) + expect(builder.getPackets()[1]).toHaveLength(cmd.lengthWithHeader * 27) + expect(builder.getPackets()[2]).toHaveLength(cmd.lengthWithHeader * 6) + }) + + it('Command too large to fit a packets', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const cmd = new FakeCommand(501) + expect(() => builder.addCommand(cmd)).toThrow('too large') + }) + + it('Command same size as packet', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const cmd = new FakeCommand(500 - 8) + expect(cmd.lengthWithHeader).toBe(500) + + builder.addCommand(cmd) + expect(builder.getPackets()).toHaveLength(1) + expect(builder.getPackets()[0]).toHaveLength(cmd.lengthWithHeader) + }) + + it('Commands of mixed sizes', () => { + const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) + + const largeCmd = new FakeCommand(400) + const mediumCmd = new FakeCommand(80) + const smallCmd = new FakeCommand(10) + + // packet 0: + builder.addCommand(mediumCmd) + builder.addCommand(smallCmd) + + // packet 1: + builder.addCommand(largeCmd) + builder.addCommand(mediumCmd) + + // packet 2: + builder.addCommand(smallCmd) + builder.addCommand(smallCmd) + builder.addCommand(largeCmd) + + expect(builder.getPackets()).toHaveLength(3) + expect(builder.getPackets()[0]).toHaveLength(mediumCmd.lengthWithHeader + smallCmd.lengthWithHeader) + expect(builder.getPackets()[1]).toHaveLength(largeCmd.lengthWithHeader + mediumCmd.lengthWithHeader) + expect(builder.getPackets()[2]).toHaveLength( + smallCmd.lengthWithHeader + smallCmd.lengthWithHeader + largeCmd.lengthWithHeader + ) + }) +}) diff --git a/src/lib/packetBuilder.ts b/src/lib/packetBuilder.ts index 3a9945ab6..d4db48edf 100644 --- a/src/lib/packetBuilder.ts +++ b/src/lib/packetBuilder.ts @@ -7,6 +7,7 @@ export class PacketBuilder { readonly #completedBuffers: Buffer[] = [] + #finished = false #currentPacketBuffer: Buffer #currentPacketFilled: number @@ -19,16 +20,17 @@ export class PacketBuilder { } public addCommand(cmd: ISerializableCommand): void { + if (this.#finished) throw new Error('Packets have been finished') + if (typeof cmd.serialize !== 'function') { throw new Error(`Command ${cmd.constructor.name} is not serializable`) } - const payload = cmd.serialize(this.#protocolVersion) - const rawName: string = (cmd.constructor as any).rawName + const payload = cmd.serialize(this.#protocolVersion) const totalLength = payload.length + 8 - if (totalLength >= this.#maxPacketSize) { + if (totalLength > this.#maxPacketSize) { throw new Error(`Comamnd ${cmd.constructor.name} is too large for a single packet`) } @@ -37,11 +39,9 @@ export class PacketBuilder { this.#finishBuffer() } - // Command name + // Add to packet this.#currentPacketBuffer.writeUInt16BE(payload.length + 8, this.#currentPacketFilled + 0) this.#currentPacketBuffer.write(rawName, this.#currentPacketFilled + 4, 4) - - // Body payload.copy(this.#currentPacketBuffer, this.#currentPacketFilled + 8) this.#currentPacketFilled += totalLength @@ -50,11 +50,13 @@ export class PacketBuilder { public getPackets(): Buffer[] { this.#finishBuffer(true) + this.#finished = true + return this.#completedBuffers } #finishBuffer(skipCreateNext?: boolean) { - if (this.#currentPacketFilled === 0) return + if (this.#currentPacketFilled === 0 || this.#finished) return this.#completedBuffers.push(this.#currentPacketBuffer.subarray(0, this.#currentPacketFilled)) From 3384972a01a26b433b87b69b81b61b895800a44e Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 14:25:44 +0000 Subject: [PATCH 09/12] chore: fix test --- src/dataTransfer/__tests__/upload-multiviewer-sequence.json | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/dataTransfer/__tests__/upload-multiviewer-sequence.json b/src/dataTransfer/__tests__/upload-multiviewer-sequence.json index da2b1bbfb..39eef15e8 100644 --- a/src/dataTransfer/__tests__/upload-multiviewer-sequence.json +++ b/src/dataTransfer/__tests__/upload-multiviewer-sequence.json @@ -24,7 +24,7 @@ "properties": { "name": "Label", "description": "", - "fileHash": "T�9z�;{\u0012r����-��", + "fileHash": "VKA5etY7exJy9ayH+i3d4Q==", "transferId": 0 }, "direction": "send" From edd4571fa45438e01a7b9650f9c2588903637286 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 14:48:27 +0000 Subject: [PATCH 10/12] fix: adjust numbers based on real atem --- src/__tests__/atem.spec.ts | 8 ++++---- src/atem.ts | 6 +++--- src/lib/__tests__/atemSocket.spec.ts | 2 +- src/lib/atemSocket.ts | 8 ++++---- 4 files changed, 12 insertions(+), 12 deletions(-) diff --git a/src/__tests__/atem.spec.ts b/src/__tests__/atem.spec.ts index 13927c2e5..9f8bd0009 100644 --- a/src/__tests__/atem.spec.ts +++ b/src/__tests__/atem.spec.ts @@ -1,5 +1,5 @@ /* eslint-disable @typescript-eslint/unbound-method */ -import { Atem, DEFAULT_MTU, DEFAULT_PORT } from '../atem' +import { Atem, DEFAULT_MAX_PACKET_SIZE, DEFAULT_PORT } from '../atem' import { CutCommand } from '../commands' import { promisify } from 'util' import { EventEmitter } from 'events' @@ -35,14 +35,14 @@ describe('Atem', () => { disableMultithreaded: true, log: (conn as any)._log, port: DEFAULT_PORT, - packetMtu: DEFAULT_MTU, + maxPacketSize: DEFAULT_MAX_PACKET_SIZE, }) } finally { await conn.destroy() } }) test('constructor test 2', async () => { - const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23, packetMtu: 500 }) + const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23, maxPacketSize: 500 }) try { const socket = (conn as any).socket as AtemSocket @@ -56,7 +56,7 @@ describe('Atem', () => { disableMultithreaded: false, log: (conn as any)._log, port: 23, - packetMtu: 500, + maxPacketSize: 500, }) } finally { await conn.destroy() diff --git a/src/atem.ts b/src/atem.ts index 4bd5f0d9d..1e8deec5f 100644 --- a/src/atem.ts +++ b/src/atem.ts @@ -62,7 +62,7 @@ export interface AtemOptions { /** * Maximum size of packets to transmit */ - packetMtu?: number + maxPacketSize?: number } export type AtemEvents = { @@ -89,7 +89,7 @@ export enum AtemConnectionStatus { } export const DEFAULT_PORT = 9910 -export const DEFAULT_MTU = 1500 +export const DEFAULT_MAX_PACKET_SIZE = 1416 // Matching ATEM software export class BasicAtem extends EventEmitter { private readonly socket: AtemSocket @@ -109,7 +109,7 @@ export class BasicAtem extends EventEmitter { port: options?.port || DEFAULT_PORT, disableMultithreaded: options?.disableMultithreaded ?? false, childProcessTimeout: options?.childProcessTimeout || 600, - packetMtu: options?.packetMtu ?? DEFAULT_MTU, + maxPacketSize: options?.maxPacketSize ?? DEFAULT_MAX_PACKET_SIZE, }) this.dataTransferManager = new DT.DataTransferManager(this.sendCommands.bind(this)) diff --git a/src/lib/__tests__/atemSocket.spec.ts b/src/lib/__tests__/atemSocket.spec.ts index 346805b5f..b146ec91b 100644 --- a/src/lib/__tests__/atemSocket.spec.ts +++ b/src/lib/__tests__/atemSocket.spec.ts @@ -108,7 +108,7 @@ describe('AtemSocket', () => { port: 890, disableMultithreaded: true, childProcessTimeout: 100, - packetMtu: 1500, + maxPacketSize: 1416, }) } diff --git a/src/lib/atemSocket.ts b/src/lib/atemSocket.ts index fc6ff419e..26cb0d9fc 100644 --- a/src/lib/atemSocket.ts +++ b/src/lib/atemSocket.ts @@ -13,7 +13,7 @@ export interface AtemSocketOptions { debugBuffers: boolean disableMultithreaded: boolean childProcessTimeout: number - packetMtu: number + maxPacketSize: number } export type AtemSocketEvents = { @@ -29,7 +29,7 @@ export class AtemSocket extends EventEmitter { private readonly _debugBuffers: boolean private readonly _disableMultithreaded: boolean private readonly _childProcessTimeout: number - private readonly _packetMtu: number + private readonly _maxPacketSize: number private readonly _commandParser: CommandParser = new CommandParser() private _nextPacketTrackingId = 0 @@ -47,7 +47,7 @@ export class AtemSocket extends EventEmitter { this._debugBuffers = options.debugBuffers this._disableMultithreaded = options.disableMultithreaded this._childProcessTimeout = options.childProcessTimeout - this._packetMtu = options.packetMtu + this._maxPacketSize = options.maxPacketSize } public async connect(address?: string, port?: number): Promise { @@ -107,7 +107,7 @@ export class AtemSocket extends EventEmitter { public async sendCommands(commands: Array): Promise { if (!this._socketProcess) throw new Error('Socket process is not open') - const maxPacketSize = this._packetMtu - 28 - 12 // MTU minus UDP header and ATEM header + const maxPacketSize = this._maxPacketSize - 12 // MTU minus ATEM header const packetBuilder = new PacketBuilder(maxPacketSize, this._commandParser.version) for (const cmd of commands) { From fc1256e19f84721a0b286ae79b9f7907e2d2fb24 Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Fri, 1 Dec 2023 14:51:01 +0000 Subject: [PATCH 11/12] chore: tidy --- src/lib/packetBuilder.ts | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/src/lib/packetBuilder.ts b/src/lib/packetBuilder.ts index d4db48edf..7958f29ed 100644 --- a/src/lib/packetBuilder.ts +++ b/src/lib/packetBuilder.ts @@ -48,19 +48,19 @@ export class PacketBuilder { } public getPackets(): Buffer[] { - this.#finishBuffer(true) + this.#finishBuffer(false) this.#finished = true return this.#completedBuffers } - #finishBuffer(skipCreateNext?: boolean) { + #finishBuffer(allocNewBuffer = true) { if (this.#currentPacketFilled === 0 || this.#finished) return this.#completedBuffers.push(this.#currentPacketBuffer.subarray(0, this.#currentPacketFilled)) - if (!skipCreateNext) { + if (allocNewBuffer) { this.#currentPacketBuffer = Buffer.alloc(this.#maxPacketSize) this.#currentPacketFilled = 0 } From 296d4e38ad45317bdc5fb5839f4e2d09b13a869f Mon Sep 17 00:00:00 2001 From: Julian Waller Date: Mon, 11 Dec 2023 10:48:35 +0000 Subject: [PATCH 12/12] fix: send oversize commands without erroring --- src/lib/__tests__/packetBuilder.spec.ts | 9 ++++++++- src/lib/packetBuilder.ts | 19 +++++++++++-------- 2 files changed, 19 insertions(+), 9 deletions(-) diff --git a/src/lib/__tests__/packetBuilder.spec.ts b/src/lib/__tests__/packetBuilder.spec.ts index c4a69a760..658c3f65b 100644 --- a/src/lib/__tests__/packetBuilder.spec.ts +++ b/src/lib/__tests__/packetBuilder.spec.ts @@ -76,7 +76,14 @@ describe('PacketBuilder', () => { const builder = new PacketBuilder(500, ProtocolVersion.V8_1_1) const cmd = new FakeCommand(501) - expect(() => builder.addCommand(cmd)).toThrow('too large') + expect(cmd.lengthWithHeader).toBe(501 + 8) + const smallCmd = new FakeCommand(10) + + builder.addCommand(cmd) + builder.addCommand(smallCmd) + expect(builder.getPackets()).toHaveLength(2) + expect(builder.getPackets()[0]).toHaveLength(cmd.lengthWithHeader) + expect(builder.getPackets()[1]).toHaveLength(smallCmd.lengthWithHeader) }) it('Command same size as packet', () => { diff --git a/src/lib/packetBuilder.ts b/src/lib/packetBuilder.ts index 7958f29ed..aa23f1add 100644 --- a/src/lib/packetBuilder.ts +++ b/src/lib/packetBuilder.ts @@ -31,11 +31,12 @@ export class PacketBuilder { const totalLength = payload.length + 8 if (totalLength > this.#maxPacketSize) { - throw new Error(`Comamnd ${cmd.constructor.name} is too large for a single packet`) + // Command is too large for a normal packet, try sending it on its own anyway + this.#finishBuffer(totalLength) } // Ensure the packet will fit into the current buffer - if (totalLength + this.#currentPacketFilled > this.#maxPacketSize) { + if (totalLength + this.#currentPacketFilled > this.#currentPacketBuffer.length) { this.#finishBuffer() } @@ -48,20 +49,22 @@ export class PacketBuilder { } public getPackets(): Buffer[] { - this.#finishBuffer(false) + this.#finishBuffer(0) this.#finished = true return this.#completedBuffers } - #finishBuffer(allocNewBuffer = true) { - if (this.#currentPacketFilled === 0 || this.#finished) return + #finishBuffer(newBufferLength = this.#maxPacketSize) { + if (this.#finished) return - this.#completedBuffers.push(this.#currentPacketBuffer.subarray(0, this.#currentPacketFilled)) + if (this.#currentPacketFilled > 0) { + this.#completedBuffers.push(this.#currentPacketBuffer.subarray(0, this.#currentPacketFilled)) + } - if (allocNewBuffer) { - this.#currentPacketBuffer = Buffer.alloc(this.#maxPacketSize) + if (newBufferLength > 0) { + this.#currentPacketBuffer = Buffer.alloc(newBufferLength) this.#currentPacketFilled = 0 } }