Skip to content

Commit

Permalink
fix: make listeners resilient to errors (#13)
Browse files Browse the repository at this point in the history
  • Loading branch information
noomorph authored May 17, 2024
1 parent 5d494d0 commit eaae58e
Show file tree
Hide file tree
Showing 7 changed files with 81 additions and 15 deletions.
10 changes: 5 additions & 5 deletions src/emitters/ReadonlyEmitterBase.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
order?: number,
): this {
if (!listener[ONCE]) {
this._log.trace(__LISTENERS(listener), `on(${String(type)})`);
this._log.trace(__LISTENERS(listener), 'on(%s)', type);
}

if (!this._listeners.has(type)) {
Expand All @@ -50,7 +50,7 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
}

once<K extends keyof EventMap>(type: K | '*', listener: Function, order?: number): this {
this._log.trace(__LISTENERS(listener), `once(${String(type)})`);
this._log.trace(__LISTENERS(listener), 'once(%s)', type);
return this.on(type, this.#createOnceListener(type, listener), order);
}

Expand All @@ -60,7 +60,7 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
_order?: number,
): this {
if (!listener[ONCE]) {
this._log.trace(__LISTENERS(listener), `off(${String(type)})`);
this._log.trace(__LISTENERS(listener), 'off(%s)', type);
}

const listeners = this._listeners.get(type) || [];
Expand All @@ -82,9 +82,9 @@ export abstract class ReadonlyEmitterBase<EventMap> implements ReadonlyEmitter<E
#createOnceListener<K extends keyof EventMap>(type: K | '*', listener: Function) {
const onceListener = ((event: Event) => {
this.off(type, onceListener);
listener(event);
return listener(event);
}) as Function & { [ONCE]?: true };

onceListener.toString = listener.toString.bind(listener);
onceListener[ONCE] = true as const;
return onceListener;
}
Expand Down
1 change: 0 additions & 1 deletion src/emitters/SemiAsyncEmitter.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@ describe('SemiAsyncEmitter', () => {
emitter.on('async_event', listener);
const promise = emitter.emit('async_event', 42);
expect(promise).toBeInstanceOf(Promise);
expect(listener).toHaveBeenCalledTimes(0);
await promise;
expect(listener).toHaveBeenCalledTimes(1);
expect(listener).toHaveBeenCalledWith(42);
Expand Down
19 changes: 18 additions & 1 deletion src/emitters/SerialAsyncEmitter.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
jest.mock('./logError');
import { SerialAsyncEmitter } from './SerialAsyncEmitter';

describe('SerialAsyncEmitter', () => {
Expand Down Expand Up @@ -50,12 +51,28 @@ describe('SerialAsyncEmitter', () => {

it('should not delay emits within emits', async () => {
const emitter = new SerialAsyncEmitter<TestEventMap>('test-emitter');
const listener1 = jest.fn(() => emitter.emit('test', { type: 'test', payload: 84 }));
const listener1 = jest.fn(() => {
emitter.emit('test', { type: 'test', payload: 84 });
});
const listener2 = jest.fn();
emitter.once('test', listener1);
emitter.on('test', listener2);
await emitter.emit('test', { type: 'test', payload: 42 });
expect(listener1).toHaveBeenCalledTimes(1);
expect(listener2).toHaveBeenCalledTimes(2);
});

it('should tolerate errors in listeners', async () => {
const error = new Error('This listener failed');
const emitter = new SerialAsyncEmitter<TestEventMap>('test-emitter');
const listener1 = () => Promise.reject(error);
const listener2 = jest.fn();
emitter.once('test', listener1);
emitter.once('test', listener2);
await emitter.emit('test', { type: 'test', payload: 42 });
expect(listener2).toHaveBeenCalledTimes(1);
const { logError } = jest.requireMock('./logError');
expect(logError).toHaveBeenCalledWith(error, 'test', expect.any(Function));
});
});

Expand Down
30 changes: 24 additions & 6 deletions src/emitters/SerialAsyncEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,46 @@
import type { AsyncEmitter } from './AsyncEmitter';
import { logError } from './logError';
import { ReadonlyEmitterBase } from './ReadonlyEmitterBase';
import { __EMIT, __INVOKE } from './syncEmitterCommons';

export class SerialAsyncEmitter<EventMap>
extends ReadonlyEmitterBase<EventMap>
implements AsyncEmitter<EventMap>
{
#promise = Promise.resolve();
#idle?: Promise<void>;
readonly #tasks: Promise<void>[] = [];

emit<K extends keyof EventMap>(eventType: K, event: EventMap[K]): Promise<void> {
return this.#enqueue(eventType, event);
this.#tasks.push(this.#doEmit(eventType, event));
this.#idle ??= this.#waitForIdle();
return this.#idle;
}

#enqueue<K extends keyof EventMap>(eventType: K, event: EventMap[K]) {
return (this.#promise = this.#promise.then(() => this.#doEmit(eventType, event)));
async #waitForIdle() {
do {
const $promises = new Set(this.#tasks);
await Promise.all(this.#tasks);
for (let index = this.#tasks.length - 1; index >= 0; index--) {
if ($promises.has(this.#tasks[index])) {
this.#tasks.splice(index, 1);
}
}
} while (this.#tasks.length > 0);
this.#idle = undefined;
}

async #doEmit<K extends keyof EventMap>(eventType: K, event: EventMap[K]) {
const listeners = [...this._getListeners(eventType)];
const $eventType = String(eventType);

await this._log.trace.complete(__EMIT(event), String(eventType), async () => {
await this._log.trace.complete(__EMIT(event), $eventType, async () => {
if (listeners) {
for (const listener of listeners) {
await this._log.trace.complete(__INVOKE(listener), 'invoke', () => listener(event));
try {
await this._log.trace.complete(__INVOKE(listener), 'invoke', () => listener(event));
} catch (error: unknown) {
logError(error, $eventType, listener);
}
}
}
});
Expand Down
16 changes: 16 additions & 0 deletions src/emitters/SerialSyncEmitter.test.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
jest.mock('./logError');
import { SerialSyncEmitter } from './SerialSyncEmitter';

describe('SerialSyncEmitter', () => {
Expand Down Expand Up @@ -59,6 +60,21 @@ describe('SerialSyncEmitter', () => {
expect(listener2.mock.calls[0][0]).toEqual({ type: 'test', payload: 42 });
expect(listener2.mock.calls[1][0]).toEqual({ type: 'test', payload: 84 });
});

it('should tolerate errors in listeners', () => {
const error = new Error('This listener failed');
const emitter = new SerialSyncEmitter<TestEventMap>('test-emitter');
const listener1 = () => {
throw error;
};
const listener2 = jest.fn();
emitter.once('test', listener1);
emitter.once('test', listener2);
expect(() => emitter.emit('test', { type: 'test', payload: 42 })).not.toThrow();
expect(listener2).toHaveBeenCalledTimes(1);
const { logError } = jest.requireMock('./logError');
expect(logError).toHaveBeenCalledWith(error, 'test', expect.any(Function));
});
});

type TestEventMap = {
Expand Down
10 changes: 8 additions & 2 deletions src/emitters/SerialSyncEmitter.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import type { Emitter } from './Emitter';
import { logError } from './logError';
import { ReadonlyEmitterBase } from './ReadonlyEmitterBase';
import { __EMIT, __ENQUEUE, __INVOKE } from './syncEmitterCommons';

Expand All @@ -24,11 +25,16 @@ export class SerialSyncEmitter<EventMap>
while (this.#queue.length > 0) {
const [eventType, event] = this.#queue[0];
const listeners = [...this._getListeners(eventType)];
const $eventType = String(eventType);

this._log.trace.complete(__EMIT(event), String(eventType), () => {
this._log.trace.complete(__EMIT(event), $eventType, () => {
if (listeners) {
for (const listener of listeners) {
this._log.trace.complete(__INVOKE(listener), 'invoke', () => listener(event));
try {
this._log.trace.complete(__INVOKE(listener), 'invoke', () => listener(event));
} catch (error: unknown) {
logError(error, $eventType, listener);
}
}
}
});
Expand Down
10 changes: 10 additions & 0 deletions src/emitters/logError.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
import { logger } from '../utils';

export function logError(error: unknown, eventType: unknown, listener: unknown) {
logger.warn(
error,
`Caught an error while emitting %j event in a listener function:\n%s`,
eventType,
listener,
);
}

0 comments on commit eaae58e

Please sign in to comment.