Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: command batching SOFIE-2549 #157

Merged
merged 12 commits into from
Dec 11, 2023
34 changes: 14 additions & 20 deletions src/__tests__/atem.spec.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/* eslint-disable @typescript-eslint/unbound-method */
import { Atem, DEFAULT_PORT } from '../atem'
import { Atem, DEFAULT_MAX_PACKET_SIZE, DEFAULT_PORT } from '../atem'
import { CutCommand } from '../commands'
import { promisify } from 'util'
import { EventEmitter } from 'events'
Expand Down Expand Up @@ -35,13 +35,14 @@ describe('Atem', () => {
disableMultithreaded: true,
log: (conn as any)._log,
port: DEFAULT_PORT,
maxPacketSize: DEFAULT_MAX_PACKET_SIZE,
})
} finally {
await conn.destroy()
}
})
test('constructor test 2', async () => {
const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23 })
const conn = new Atem({ debugBuffers: true, address: 'test1', port: 23, maxPacketSize: 500 })

try {
const socket = (conn as any).socket as AtemSocket
Expand All @@ -55,6 +56,7 @@ describe('Atem', () => {
disableMultithreaded: false,
log: (conn as any)._log,
port: 23,
maxPacketSize: 500,
})
} finally {
await conn.destroy()
Expand Down Expand Up @@ -108,32 +110,28 @@ describe('Atem', () => {
expect(socket).toBeTruthy()

let nextId = 123
Object.defineProperty(socket, 'nextCommandTrackingId', {
Object.defineProperty(socket, 'nextPacketTrackingId', {
get: jest.fn(() => nextId++),
set: jest.fn(),
})
expect(socket.nextCommandTrackingId).toEqual(123)
expect(socket.nextPacketTrackingId).toEqual(123)

socket.sendCommands = jest.fn(() => Promise.resolve(35) as any)
socket.sendCommands = jest.fn(() => Promise.resolve([124]) as any)

const sentQueue = (conn as any)._sentQueue as Record<string, unknown>
expect(Object.keys(sentQueue)).toHaveLength(0)

const cmd = new CutCommand(0)
const res = conn.sendCommand(cmd)
res.catch(() => null) // Dismiss UnhandledPromiseRejection
await setImmediatePromise()
expect(Object.keys(sentQueue)).toHaveLength(1)

expect(socket.sendCommands).toHaveBeenCalledTimes(1)
expect(socket.sendCommands).toHaveBeenCalledWith([
{
rawCommand: cmd,
trackingId: 124,
},
])
expect(socket.sendCommands).toHaveBeenCalledWith([cmd])

// Trigger the ack, and it should switfy resolve
socket.emit('commandsAck', [124])
socket.emit('ackPackets', [124])
expect(Object.keys(sentQueue)).toHaveLength(0)

// Finally, it should now resolve without a timeout
Expand All @@ -152,11 +150,11 @@ describe('Atem', () => {
expect(socket).toBeTruthy()

let nextId = 123
Object.defineProperty(socket, 'nextCommandTrackingId', {
Object.defineProperty(socket, 'nextPacketTrackingId', {
get: jest.fn(() => nextId++),
set: jest.fn(),
})
expect(socket.nextCommandTrackingId).toEqual(123)
expect(socket.nextPacketTrackingId).toEqual(123)

socket.sendCommands = jest.fn(() => Promise.reject(35) as any)

Expand All @@ -165,15 +163,11 @@ describe('Atem', () => {

const cmd = new CutCommand(0)
const res = conn.sendCommand(cmd)
res.catch(() => null) // Dismiss UnhandledPromiseRejection

// Send command should be called
expect(socket.sendCommands).toHaveBeenCalledTimes(1)
expect(socket.sendCommands).toHaveBeenCalledWith([
{
rawCommand: cmd,
trackingId: 124,
},
])
expect(socket.sendCommands).toHaveBeenCalledWith([cmd])

expect(Object.keys(sentQueue)).toHaveLength(0)

Expand Down
10 changes: 5 additions & 5 deletions src/__tests__/connection.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,18 +18,18 @@ class AtemSocketChildMock implements AtemSocketChild {
public onDisconnect: () => Promise<void>
public onLog: (message: string) => Promise<void>
public onCommandsReceived: (payload: Buffer, packetId: number) => Promise<void>
public onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
public onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>

constructor(
onDisconnect: () => Promise<void>,
onLog: (message: string) => Promise<void>,
onCommandsReceived: (payload: Buffer, packetId: number) => Promise<void>,
onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
) {
this.onDisconnect = onDisconnect
this.onLog = onLog
this.onCommandsReceived = onCommandsReceived
this.onCommandsAcknowledged = onCommandsAcknowledged
this.onPacketsAcknowledged = onPacketsAcknowledged
}

public connect = jest.fn(async () => Promise.resolve())
Expand All @@ -43,9 +43,9 @@ class AtemSocketChildMock implements AtemSocketChild {
onDisconnect: () => Promise<void>,
onLog: (message: string) => Promise<void>,
onCommandsReceived: (payload: Buffer, packetId: number) => Promise<void>,
onCommandsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
onPacketsAcknowledged: (ids: Array<{ packetId: number; trackingId: number }>) => Promise<void>
) => {
return new AtemSocketChildMock(onDisconnect, onLog, onCommandsReceived, onCommandsAcknowledged)
return new AtemSocketChildMock(onDisconnect, onLog, onCommandsReceived, onPacketsAcknowledged)
}
)

Expand Down
60 changes: 32 additions & 28 deletions src/atem.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,10 @@ export interface AtemOptions {
debugBuffers?: boolean
disableMultithreaded?: boolean
childProcessTimeout?: number
/**
* Maximum size of packets to transmit
*/
maxPacketSize?: number
}

export type AtemEvents = {
Expand All @@ -73,8 +77,7 @@ export type AtemEvents = {
updatedTime: [TimeInfo]
}

interface SentCommand {
command: ISerializableCommand
interface SentPackets {
resolve: () => void
reject: () => void
}
Expand All @@ -86,12 +89,13 @@ export enum AtemConnectionStatus {
}

export const DEFAULT_PORT = 9910
export const DEFAULT_MAX_PACKET_SIZE = 1416 // Matching ATEM software

export class BasicAtem extends EventEmitter<AtemEvents> {
private readonly socket: AtemSocket
protected readonly dataTransferManager: DT.DataTransferManager
private _state: AtemState | undefined
private _sentQueue: { [packetId: string]: SentCommand } = {}
private _sentQueue: { [packetId: string]: SentPackets } = {}
private _status: AtemConnectionStatus

constructor(options?: AtemOptions) {
Expand All @@ -100,19 +104,20 @@ export class BasicAtem extends EventEmitter<AtemEvents> {
this._state = AtemStateUtil.Create()
this._status = AtemConnectionStatus.CLOSED
this.socket = new AtemSocket({
debugBuffers: (options || {}).debugBuffers || false,
address: (options || {}).address || '',
port: (options || {}).port || DEFAULT_PORT,
disableMultithreaded: (options || {}).disableMultithreaded || false,
childProcessTimeout: (options || {}).childProcessTimeout || 600,
debugBuffers: options?.debugBuffers ?? false,
address: options?.address || '',
port: options?.port || DEFAULT_PORT,
disableMultithreaded: options?.disableMultithreaded ?? false,
childProcessTimeout: options?.childProcessTimeout || 600,
maxPacketSize: options?.maxPacketSize ?? DEFAULT_MAX_PACKET_SIZE,
})
this.dataTransferManager = new DT.DataTransferManager(this.sendCommands.bind(this))

this.socket.on('commandsReceived', (commands) => {
this.socket.on('receivedCommands', (commands) => {
this.emit('receivedCommands', commands)
this._mutateState(commands)
})
this.socket.on('commandsAck', (trackingIds) => this._resolveCommands(trackingIds))
this.socket.on('ackPackets', (trackingIds) => this._resolveCommands(trackingIds))
this.socket.on('info', (msg) => this.emit('info', msg))
this.socket.on('debug', (msg) => this.emit('debug', msg))
this.socket.on('error', (e) => this.emit('error', e))
Expand Down Expand Up @@ -151,28 +156,27 @@ export class BasicAtem extends EventEmitter<AtemEvents> {
return this.socket.destroy()
}

private sendCommands(commands: ISerializableCommand[]): Array<Promise<void>> {
const commands2 = commands.map((cmd) => ({
rawCommand: cmd,
trackingId: this.socket.nextCommandTrackingId,
}))
public async sendCommands(commands: ISerializableCommand[]): Promise<void> {
const trackingIds = await this.socket.sendCommands(commands)

const sendPromise = this.socket.sendCommands(commands2)
const promises: Promise<void>[] = []

return commands2.map(async (cmd) => {
await sendPromise
return new Promise<void>((resolve, reject) => {
this._sentQueue[cmd.trackingId] = {
command: cmd.rawCommand,
resolve,
reject,
}
})
})
for (const trackingId of trackingIds) {
promises.push(
new Promise<void>((resolve, reject) => {
this._sentQueue[trackingId] = {
resolve,
reject,
}
})
)
}

await Promise.allSettled(promises)
}

public async sendCommand(command: ISerializableCommand): Promise<void> {
return this.sendCommands([command])[0]
return await this.sendCommands([command])
}

private _mutateState(commands: IDeserializedCommand[]): void {
Expand Down Expand Up @@ -262,7 +266,7 @@ export class BasicAtem extends EventEmitter<AtemEvents> {
const sentQueue = this._sentQueue
this._sentQueue = {}

Object.values<SentCommand>(sentQueue).forEach((sent) => sent.reject())
Object.values<SentPackets>(sentQueue).forEach((sent) => sent.reject())
}
}

Expand Down
12 changes: 5 additions & 7 deletions src/dataTransfer/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,10 +36,10 @@ function mangleCommand(cmd: any, dir: string): any {
}

function runDataTransferTest(spec: any): DataTransferManager {
const manager = new DataTransferManager((cmds) =>
cmds.map(async (cmd) => {
const manager = new DataTransferManager(async (cmds) => {
for (const rawCmd of cmds) {
const expectedCmd = spec.shift()
const gotCmd = mangleCommand(cmd, 'send')
const gotCmd = mangleCommand(rawCmd, 'send')
expect(gotCmd).toEqual(expectedCmd)

while (spec.length > 0) {
Expand All @@ -49,10 +49,8 @@ function runDataTransferTest(spec: any): DataTransferManager {
if (!nextCmd2) throw new Error(`Failed specToCommandClass ${nextCmd.name}`)
manager.queueHandleCommand(nextCmd2)
}

return Promise.resolve()
})
)
}
})
manager.startCommandSending(true)
return manager
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
"properties": {
"name": "Label",
"description": "",
"fileHash": "T�9z�;{\u0012r����-��",
"fileHash": "VKA5etY7exJy9ayH+i3d4Q==",
"transferId": 0
},
"direction": "send"
Expand Down
8 changes: 4 additions & 4 deletions src/dataTransfer/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ export class DataTransferManager {
}

readonly #sendLockCommand = (/*lock: DataTransferLockingQueue,*/ cmd: ISerializableCommand): void => {
Promise.all(this.#rawSendCommands([cmd])).catch((e) => {
this.#rawSendCommands([cmd]).catch((e) => {
debug(`Failed to send lock command: ${e}`)
console.log('Failed to send lock command')
})
Expand All @@ -41,12 +41,12 @@ export class DataTransferManager {
readonly #labelsLock = new DataTransferSimpleQueue(this.#nextTransferId)
readonly #macroLock = new DataTransferSimpleQueue(this.#nextTransferId)

readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Array<Promise<void>>
readonly #rawSendCommands: (cmds: ISerializableCommand[]) => Promise<void>

private interval?: NodeJS.Timer
private exitUnsubscribe?: () => void

constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Array<Promise<void>>) {
constructor(rawSendCommands: (cmds: ISerializableCommand[]) => Promise<void>) {
this.#rawSendCommands = rawSendCommands
}

Expand Down Expand Up @@ -75,7 +75,7 @@ export class DataTransferManager {
const commandsToSend = lock.popQueuedCommands(MAX_PACKETS_TO_SEND_PER_TICK) // Take some, it is unlikely that multiple will run at once
if (commandsToSend && commandsToSend.length > 0) {
// debug(`Sending ${commandsToSend.length} commands `)
Promise.all(this.#rawSendCommands(commandsToSend)).catch((e) => {
this.#rawSendCommands(commandsToSend).catch((e) => {
// Failed to send/queue something, so abort it
lock.tryAbortTransfer(new Error(`Command send failed: ${e}`))
})
Expand Down
Loading