diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index bb93141..01347d0 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -6,11 +6,13 @@ const logger: jest.Mocked = { info: jest.fn(), error: jest.fn(), child: jest.fn(), + warn: jest.fn(), } as any; beforeEach(() => { logger.info.mockReset(); logger.error.mockReset(); + logger.warn.mockReset(); logger.child.mockReset(); logger.child.mockImplementation(() => logger); }); @@ -593,4 +595,92 @@ describe('KinesisEventHandler', () => { ); }); }); + + test('throws when encountering an unparseable message', async () => { + const lambda = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }).lambda(); + + await expect( + lambda( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: Buffer.from('not-a-json-string', 'utf-8').toString( + 'base64', + ), + }, + }, + ] as any, + }, + {} as any, + ), + ).rejects.toThrow('Unexpected token o in JSON at position 1'); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + err: expect.objectContaining({ + message: 'Unexpected token o in JSON at position 1', + }), + }), + 'Failed to parse event', + ); + }); + + test('respects ignoreUnparseableEvents', async () => { + const processor = jest.fn(); + const lambda = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + ignoreUnparseableEvents: true, + }) + .onEvent((ctx, event) => processor(event)) + .lambda(); + + await lambda( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: Buffer.from('not-a-json-string', 'utf-8').toString( + 'base64', + ), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: Buffer.from( + JSON.stringify({ message: 'test-message' }), + 'utf-8', + ).toString('base64'), + }, + }, + ] as any, + }, + {} as any, + ); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + err: expect.objectContaining({ + message: 'Unexpected token o in JSON at position 1', + }), + }), + 'Failed to parse event', + ); + + expect(logger.warn).toHaveBeenCalledWith( + 'ignoreUnparseableEvents is set to true. Ignoring message.', + ); + + expect(processor).toHaveBeenCalledTimes(1); + expect(processor).toHaveBeenCalledWith({ message: 'test-message' }); + }); }); diff --git a/src/kinesis.ts b/src/kinesis.ts index 0a128ab..91ba403 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -16,6 +16,13 @@ export type KinesisEventHandlerConfig = * A function for parsing the Kinesis event data into your custom type. */ parseEvent: (body: string) => Event; + + /** + * Whether to ignore events that fail to parse. If set to true, + * throwing in the custom parsing function will cause the event + * to be ignored, and never processed. + */ + ignoreUnparseableEvents?: boolean; }; export type KinesisEventAction = ( @@ -93,9 +100,21 @@ export class KinesisEventHandler { eventId: record.eventID, }); - const parsedEvent = this.config.parseEvent( - Buffer.from(record.kinesis.data, 'base64').toString('utf8'), - ); + let parsedEvent: Event; + try { + parsedEvent = this.config.parseEvent( + Buffer.from(record.kinesis.data, 'base64').toString('utf8'), + ); + } catch (err) { + eventLogger.error({ err }, 'Failed to parse event'); + if (this.config.ignoreUnparseableEvents) { + eventLogger.warn( + 'ignoreUnparseableEvents is set to true. Ignoring message.', + ); + return; + } + throw err; + } for (const action of this.actions) { await action({ ...context, logger: eventLogger }, parsedEvent); diff --git a/src/sqs.test.ts b/src/sqs.test.ts index 5d369a0..bd537a8 100644 --- a/src/sqs.test.ts +++ b/src/sqs.test.ts @@ -8,6 +8,7 @@ const logger: jest.Mocked = { info: jest.fn(), error: jest.fn(), child: jest.fn(), + warn: jest.fn(), } as any; let publicKey: string; @@ -21,6 +22,7 @@ beforeAll(async () => { beforeEach(() => { logger.info.mockReset(); logger.error.mockReset(); + logger.warn.mockReset(); logger.child.mockReset(); logger.child.mockImplementation(() => logger); }); @@ -792,4 +794,66 @@ describe('SQSMessageHandler', () => { ); }); }); + + test('throws when encountering an unparseable message', async () => { + const lambda = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + }).lambda(); + + await expect( + lambda( + { Records: [{ attributes: {}, body: 'not-a-json-string' }] } as any, + {} as any, + ), + ).rejects.toThrow('Unexpected token o in JSON at position 1'); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + err: expect.objectContaining({ + message: 'Unexpected token o in JSON at position 1', + }), + }), + 'Failed to parse message', + ); + }); + + test('respects ignoreUnparseableMessages', async () => { + const processor = jest.fn().mockResolvedValue(void 0); + const lambda = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + ignoreUnparseableMessages: true, + }) + .onMessage((ctx, message) => processor(message)) + .lambda(); + + await lambda( + { + Records: [ + { attributes: {}, body: 'not-a-json-string' }, + { attributes: {}, body: JSON.stringify({ message: 'test-message' }) }, + ], + } as any, + {} as any, + ); + + expect(logger.error).toHaveBeenCalledWith( + expect.objectContaining({ + err: expect.objectContaining({ + message: 'Unexpected token o in JSON at position 1', + }), + }), + 'Failed to parse message', + ); + + expect(logger.warn).toHaveBeenCalledWith( + 'ignoreUnparseableMessages is set to true. Ignoring message.', + ); + + expect(processor).toHaveBeenCalledTimes(1); + expect(processor).toHaveBeenCalledWith({ message: 'test-message' }); + }); }); diff --git a/src/sqs.ts b/src/sqs.ts index fc7de2a..d91c2fe 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -37,6 +37,13 @@ export type SQSMessageHandlerConfig = */ publicKeyDescription: string; }; + + /** + * Whether to ignore messages that fail to parse. If set to true, + * throwing in the custom parsing function will cause the message + * to be ignored, and never processed. + */ + ignoreUnparseableMessages?: boolean; }; export type SQSMessageAction = ( @@ -183,8 +190,19 @@ export class SQSMessageHandler { messageId: record.messageId, }); - const parsedMessage = this.config.parseMessage(record.body); - + let parsedMessage: Message; + try { + parsedMessage = this.config.parseMessage(record.body); + } catch (err) { + messageLogger.error({ err }, 'Failed to parse message'); + if (this.config.ignoreUnparseableMessages) { + messageLogger.warn( + 'ignoreUnparseableMessages is set to true. Ignoring message.', + ); + return; + } + throw err; + } for (const action of this.messageActions) { await action({ ...context, logger: messageLogger }, parsedMessage); }