Skip to content

Commit

Permalink
fix: async emitter
Browse files Browse the repository at this point in the history
  • Loading branch information
noomorph committed May 17, 2024
1 parent 3a41251 commit 75e9c41
Show file tree
Hide file tree
Showing 7 changed files with 79 additions and 14 deletions.
6 changes: 3 additions & 3 deletions src/emitters/ReadonlyEmitterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
order?: number,
): this {
if (!listener[ONCE]) {
this._log.trace(__LISTENERS(listener), `on(${String(type)})`);
this._log.trace(__LISTENERS(listener), 'on(%s)', type);
}

if (!this._listeners.has(type)) {
Expand All @@ -50,7 +50,7 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
}

once<K extends keyof EventMap>(type: K | '*', listener: Function, order?: number): this {
this._log.trace(__LISTENERS(listener), `once(${String(type)})`);
this._log.trace(__LISTENERS(listener), 'once(%s)', type);
return this.on(type, this.#createOnceListener(type, listener), order);
}

Expand All @@ -60,7 +60,7 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
_order?: number,
): this {
if (!listener[ONCE]) {
this._log.trace(__LISTENERS(listener), `off(${String(type)})`);
this._log.trace(__LISTENERS(listener), 'off(%s)', type);
}

const listeners = this._listeners.get(type) || [];
Expand Down
13 changes: 10 additions & 3 deletions src/emitters/SerialAsyncEmitter.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
jest.mock('./logError');
import { SerialAsyncEmitter } from './SerialAsyncEmitter';

describe('SerialAsyncEmitter', () => {
Expand Down Expand Up @@ -50,22 +51,28 @@ describe('SerialAsyncEmitter', () => {

it('should not delay emits within emits', async () => {
const emitter = new SerialAsyncEmitter<TestEventMap>('test-emitter');
const listener1 = jest.fn(() => emitter.emit('test', { type: 'test', payload: 84 }));
const listener1 = jest.fn(() => {
emitter.emit('test', { type: 'test', payload: 84 });
});
const listener2 = jest.fn();
emitter.once('test', listener1);
emitter.on('test', listener2);
await emitter.emit('test', { type: 'test', payload: 42 });
expect(listener2).toHaveBeenCalledTimes(1);
expect(listener1).toHaveBeenCalledTimes(1);
expect(listener2).toHaveBeenCalledTimes(2);
});

it('should tolerate errors in listeners', async () => {
const error = new Error('This listener failed');
const emitter = new SerialAsyncEmitter<TestEventMap>('test-emitter');
const listener1 = jest.fn().mockRejectedValue(new Error('This listener failed'));
const listener1 = () => Promise.reject(error);
const listener2 = jest.fn();
emitter.once('test', listener1);
emitter.once('test', listener2);
await emitter.emit('test', { type: 'test', payload: 42 });
expect(listener2).toHaveBeenCalledTimes(1);
const { logError } = jest.requireMock('./logError');
expect(logError).toHaveBeenCalledWith(error, 'test', expect.any(Function));
});
});

Expand Down
19 changes: 15 additions & 4 deletions src/emitters/SerialAsyncEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { arraysEqual } from '../utils';
import type { AsyncEmitter } from './AsyncEmitter';
import { logError } from './logError';
import { ReadonlyEmitterBase } from './ReadonlyEmitterBase';
Expand All @@ -7,14 +8,24 @@ export class SerialAsyncEmitter<EventMap>
extends ReadonlyEmitterBase<EventMap>
implements AsyncEmitter<EventMap>
{
#promise = Promise.resolve();
#tasks: Promise<void>[] = [];
#idle?: Promise<void>;

emit<K extends keyof EventMap>(eventType: K, event: EventMap[K]): Promise<void> {
return this.#enqueue(eventType, event);
this.#tasks.push(this.#doEmit(eventType, event));
this.#idle ??= this.#waitForIdle();
return this.#idle;
}

#enqueue<K extends keyof EventMap>(eventType: K, event: EventMap[K]) {
return (this.#promise = this.#promise.then(() => this.#doEmit(eventType, event)));
async #waitForIdle() {
let $promises: Promise<void>[] = [];
while (!arraysEqual($promises, this.#tasks)) {
$promises = [...this.#tasks];
await Promise.all($promises);
}

this.#tasks.splice(0, this.#tasks.length);
this.#idle = undefined;
}

async #doEmit<K extends keyof EventMap>(eventType: K, event: EventMap[K]) {
Expand Down
9 changes: 5 additions & 4 deletions src/emitters/logError.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { logger } from '../utils';

// eslint-disable-next-line @typescript-eslint/ban-types
export function logError(error: unknown, eventType: string, listener: Function) {
const errorDetails = (error instanceof Error && error.stack) || String(error);
export function logError(error: unknown, eventType: unknown, listener: unknown) {
logger.warn(
`Caught an error while emitting "${eventType}" event:\n${errorDetails}\nThe listener function was:\n${listener}`,
error,
`Caught an error while emitting %j event in a listener function:\n%s`,
eventType,
listener,
);
}
33 changes: 33 additions & 0 deletions src/utils/arraysEqual.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { arraysEqual } from './arraysEqual';

const EMPTY_OBJECT = {};
const EMPTY_ARRAY: unknown[] = [];
const FUNCTION = () => {};
const STRING = 'string';
const NUMBER = 123;
const BOOLEAN = true;
const ARRAY = [EMPTY_OBJECT, EMPTY_ARRAY, FUNCTION, STRING, NUMBER, BOOLEAN];

describe('arraysEqual', () => {
it('should return true for empty arrays', () => {
expect(arraysEqual([], [])).toBe(true);
});

it('should return true for the same array', () => {
expect(arraysEqual(ARRAY, ARRAY)).toBe(true);
});

it('should return true for arrays with the same elements', () => {
expect(arraysEqual([...ARRAY], [...ARRAY])).toBe(true);
});

it('should return false for arrays with different lengths', () => {
const arr1 = Array.from({ length: 1 });
const arr2 = Array.from({ length: 2 });
expect(arraysEqual(arr1, arr2)).toBe(false);
});

it('should return false for arrays with different elements', () => {
expect(arraysEqual([{}], [{}])).toBe(false);
});
});
12 changes: 12 additions & 0 deletions src/utils/arraysEqual.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
/* eslint-disable unicorn/no-for-loop */
export function arraysEqual<T>(a: readonly T[], b: readonly T[]): boolean {
if (a === b) return true;

if (a.length !== b.length) return false;

const length = a.length;
for (let index = 0; index < length; index++) {
if (a[index] !== b[index]) return false;
}
return true;
}
1 change: 1 addition & 0 deletions src/utils/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './arraysEqual';
export * from './assertions';
export * from './getHierarchy';
export * from './iterateSorted';
Expand Down

0 comments on commit 75e9c41

Please sign in to comment.