From fba2c28aaef702ceaa29169cc59b3017c9ba5b97 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 20 Nov 2023 17:25:29 -0500 Subject: [PATCH 01/13] feat: support SQS partial batching --- src/dynamo-streams.ts | 1 - src/sqs.ts | 105 +++++++++++++++++++++++++++++++----------- src/utils.ts | 59 ++++++++++++++++++++---- tsconfig.json | 2 +- 4 files changed, 130 insertions(+), 37 deletions(-) diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index 627072e..b8fa09f 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -258,7 +258,6 @@ export class DynamoStreamHandler { ); }, concurrency: this.config.concurrency ?? 5, - stopOnError: false, }, async (record) => { const recordLogger = this.config.logger.child({ diff --git a/src/sqs.ts b/src/sqs.ts index 32246ef..e16a60e 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -26,6 +26,12 @@ export type SQSMessageHandlerConfig = { * @default 5 */ concurrency?: number; + /** + * Whether or not to use SQS partial batch responses. For more details + * about SQS partial batch responses see + * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html + */ + partialBatching?: boolean; }; export type SQSMessageAction = ( @@ -55,6 +61,12 @@ export type SQSMessageHandlerHarnessContext = { sendEvent: (event: { messages: Message[] }) => Promise; }; +export type SQSPartialBatchResponse = { + batchItemFailures: { + itemIdentifier: string; + }[]; +}; + /** * An abstraction for an SQS message handler. */ @@ -75,7 +87,10 @@ export class SQSMessageHandler { return this; } - lambda(): (event: SQSEvent, context: AWSContext) => Promise { + lambda(): ( + event: SQSEvent, + context: AWSContext, + ) => Promise { return withHealthCheckHandling(async (event, awsContext) => { // 1. Build the context. const correlationId = uuid(); @@ -92,32 +107,68 @@ export class SQSMessageHandler { // 2. Process all the records. context.logger.info({ event }, 'Processing SQS topic message'); - await processWithOrdering( - { - items: event.Records, - // If there is not a MessageGroupId, then we don't care about - // the ordering for the event. We can just generate a UUID for the - // ordering key. - orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), - concurrency: this.config.concurrency ?? 5, - stopOnError: false, - }, - async (record) => { - const messageLogger = context.logger.child({ - messageId: record.messageId, - }); - - const parsedMessage = this.config.parseMessage(record.body); - - for (const action of this.messageActions) { - await action({ ...context, logger: messageLogger }, parsedMessage); - } - - messageLogger.info('Successfully processed message'); - }, - ); - - context.logger.info('Succesfully processed all messages'); + const { hasUnprocessedRecords, unprocessedRecords } = + await processWithOrdering( + { + items: event.Records, + // If there is not a MessageGroupId, then we don't care about + // the ordering for the event. We can just generate a UUID for the + // ordering key. + orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), + concurrency: this.config.concurrency ?? 5, + rejectOnError: false, + }, + async (record) => { + const messageLogger = context.logger.child({ + messageId: record.messageId, + }); + + const parsedMessage = this.config.parseMessage(record.body); + + for (const action of this.messageActions) { + await action( + { ...context, logger: messageLogger }, + parsedMessage, + ); + } + + messageLogger.info('Successfully processed message'); + }, + ); + + if (hasUnprocessedRecords) { + context.logger.warn( + { unprocessedRecords }, + 'Failed to process all messages', + ); + } else { + context.logger.info('Succesfully processed all messages'); + } + + if (this.config.partialBatching) { + // SQS partial batching expects that you return an ordered list of + // failures. We map through each group and add them to the batch item + // failures in order for each group. + const batchItemFailures = Object.entries(unprocessedRecords) + .map(([groupId, record]) => { + context.logger.warn( + { groupId, error: record.error }, + 'Failed to process message group', + ); + + return record.items.map((item) => ({ + itemIdentifier: item.messageId, + })); + }) + .flat(); + + context.logger.info( + { batchItemFailures }, + 'Sending SQS partial batch response', + ); + + return { batchItemFailures }; + } }); } diff --git a/src/utils.ts b/src/utils.ts index 0085df4..354e4ce 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -2,6 +2,7 @@ import { LoggerInterface } from '@lifeomic/logging'; import { Context } from 'aws-lambda'; import pMap from 'p-map'; import groupBy from 'lodash/groupBy'; +import zipObject from 'lodash/zipObject'; export type BaseContext = { logger: LoggerInterface; @@ -9,8 +10,10 @@ export type BaseContext = { }; export const withHealthCheckHandling = - (handler: (event: Event, context: Context) => Promise) => - (event: Event, context: Context): Promise => { + ( + handler: (event: Event, context: Context) => Promise, + ) => + (event: Event, context: Context): Promise => { if ((event as any).httpMethod) { return { statusCode: 200, @@ -29,7 +32,7 @@ export type ProcessWithOrderingParams = { items: T[]; orderBy: (msg: T) => string; concurrency: number; - stopOnError: boolean; + rejectOnError?: boolean; }; /** @@ -54,19 +57,59 @@ export type ProcessWithOrderingParams = { * * This same scenario is true for SQS FIFO queues, which will order messages * by MessageGroupId. + * + * When rejectOnError is false, the method will return the list + * of unprocessed items by listId. Callers will then have access to the error + * that caused the list to stop processing events, plus all the events + * that were not processed in the batch. */ export const processWithOrdering = async ( params: ProcessWithOrderingParams, process: (item: T) => Promise, ) => { - const lists = Object.values(groupBy(params.items, params.orderBy)); + const rejectOnError = params.rejectOnError ?? true; + const groupedItems = groupBy(params.items, params.orderBy); + const listIds = Object.keys(groupedItems); + const lists = Object.values(groupedItems); + const unprocessedRecordsByListId = zipObject<{ error: any; items: T[] }>( + listIds, + lists.map(() => ({ error: null, items: [] })), + ); + await pMap( lists, - async (list) => { - for (const item of list) { - await process(item); + async (list, listIndex) => { + for (let i = 0; i < list.length; i++) { + const item = list[i]; + + try { + await process(item); + } catch (error) { + // Keep track of all unprocessed items and stop processing the current + // list as soon as we encounter the first error. + unprocessedRecordsByListId[listIds[listIndex]] = { + error, + items: list.slice(i), + }; + return; + } } }, - { concurrency: params.concurrency, stopOnError: params.stopOnError }, + { + concurrency: params.concurrency, + }, ); + + const aggregateErrors = Object.values(unprocessedRecordsByListId) + .map((record) => record.error) + .flat(); + + if (aggregateErrors.length > 0 && rejectOnError) { + throw new AggregateError(aggregateErrors); + } + + return { + hasUnprocessedRecords: aggregateErrors.length > 0, + unprocessedRecords: unprocessedRecordsByListId, + }; }; diff --git a/tsconfig.json b/tsconfig.json index 88738aa..4438ebd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,7 @@ "compilerOptions": { "target": "es2019", "module": "commonjs", - "lib": ["ES2020"], + "lib": ["ES2021"], "outDir": "./dist", "inlineSources": false, "inlineSourceMap": false, From fb6e421de45a65f6207525c34de554d171a577ae Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Tue, 21 Nov 2023 12:47:22 -0500 Subject: [PATCH 02/13] fix docs --- src/sqs.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/sqs.ts b/src/sqs.ts index e16a60e..7badb8e 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -29,7 +29,7 @@ export type SQSMessageHandlerConfig = { /** * Whether or not to use SQS partial batch responses. For more details * about SQS partial batch responses see - * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html + * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting */ partialBatching?: boolean; }; From 897338895414f5201f2863ab2410b4ca6bced8e7 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 15:14:21 -0500 Subject: [PATCH 03/13] sqs partial batch responses improvements pt. 1 --- src/sqs.ts | 72 +++++++++++++++++++++++++++------------------------- src/utils.ts | 2 +- 2 files changed, 39 insertions(+), 35 deletions(-) diff --git a/src/sqs.ts b/src/sqs.ts index 7badb8e..3b9329d 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -31,7 +31,7 @@ export type SQSMessageHandlerConfig = { * about SQS partial batch responses see * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting */ - partialBatching?: boolean; + usePartialBatchResponses?: boolean; }; export type SQSMessageAction = ( @@ -136,39 +136,43 @@ export class SQSMessageHandler { }, ); - if (hasUnprocessedRecords) { - context.logger.warn( - { unprocessedRecords }, - 'Failed to process all messages', - ); - } else { - context.logger.info('Succesfully processed all messages'); - } - - if (this.config.partialBatching) { - // SQS partial batching expects that you return an ordered list of - // failures. We map through each group and add them to the batch item - // failures in order for each group. - const batchItemFailures = Object.entries(unprocessedRecords) - .map(([groupId, record]) => { - context.logger.warn( - { groupId, error: record.error }, - 'Failed to process message group', - ); - - return record.items.map((item) => ({ - itemIdentifier: item.messageId, - })); - }) - .flat(); - - context.logger.info( - { batchItemFailures }, - 'Sending SQS partial batch response', - ); - - return { batchItemFailures }; - } + hasUnprocessedRecords + ? context.logger.warn( + { unprocessedRecords }, + 'Failed to process all messages', + ) + : context.logger.info('Succesfully processed all messages'); + + if (!this.config.usePartialBatchResponses) return; + + // SQS partial batching expects that you return an ordered list of + // failures. We map through each group and add them to the batch item + // failures in order for each group. + const batchItemFailures = Object.entries(unprocessedRecords) + .map(([groupId, record]) => { + const [failedRecord, ...subsequentUnprocessedRecords] = record.items; + context.logger.warn( + { + groupId, + err: record.error, + failedRecord, + subsequentUnprocessedRecords, + }, + 'Failed to fully process message group', + ); + + return record.items.map((item) => ({ + itemIdentifier: item.messageId, + })); + }) + .flat(); + + context.logger.info( + { batchItemFailures }, + 'Sending SQS partial batch response', + ); + + return { batchItemFailures }; }); } diff --git a/src/utils.ts b/src/utils.ts index 354e4ce..3420e17 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -10,7 +10,7 @@ export type BaseContext = { }; export const withHealthCheckHandling = - ( + ( handler: (event: Event, context: Context) => Promise, ) => (event: Event, context: Context): Promise => { From 827e3579e4668892b71e25c17dd6f9af823283a7 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 15:29:32 -0500 Subject: [PATCH 04/13] sqs partial batch responses improvements pt. 2 --- src/sqs.ts | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/src/sqs.ts b/src/sqs.ts index 3b9329d..716e77c 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -136,12 +136,9 @@ export class SQSMessageHandler { }, ); - hasUnprocessedRecords - ? context.logger.warn( - { unprocessedRecords }, - 'Failed to process all messages', - ) - : context.logger.info('Succesfully processed all messages'); + if (!hasUnprocessedRecords) { + context.logger.info('Succesfully processed all messages'); + } if (!this.config.usePartialBatchResponses) return; From b3b0b1eb75e5a8d432de5742e3040864f434464b Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 15:45:12 -0500 Subject: [PATCH 05/13] sqs partial batch responses improvements pt. 3 --- src/dynamo-streams.ts | 8 +++++++- src/kinesis.ts | 12 +++++++----- src/sqs.ts | 5 ++--- src/utils.ts | 18 +++--------------- 4 files changed, 19 insertions(+), 24 deletions(-) diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index b8fa09f..b9257f9 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -231,7 +231,7 @@ export class DynamoStreamHandler { 'Processing DynamoDB stream event', ); - await processWithOrdering( + const { hasUnprocessedRecords } = await processWithOrdering( { items: event.Records, orderBy: (record) => { @@ -324,6 +324,12 @@ export class DynamoStreamHandler { } }, ); + + if (hasUnprocessedRecords) { + throw new Error('Failed to process all DynamoDB stream records'); + } + + context.logger.info('Successfully processed all DynamoDB stream records'); }); } diff --git a/src/kinesis.ts b/src/kinesis.ts index 6c12158..0f8a757 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -90,13 +90,11 @@ export class KinesisEventHandler { Object.assign(context, await this.config.createRunContext(context)); // 2. Process all the records. - - await processWithOrdering( + const { hasUnprocessedRecords } = await processWithOrdering( { items: event.Records, orderBy: (record) => record.kinesis.partitionKey, concurrency: this.config.concurrency ?? 5, - stopOnError: false, }, async (record) => { const eventLogger = context.logger.child({ @@ -109,11 +107,15 @@ export class KinesisEventHandler { await action({ ...context, logger: eventLogger }, parsedEvent); } - eventLogger.info('Successfully processed event'); + eventLogger.info('Successfully processed Kinesis record'); }, ); - context.logger.info('Succesfully processed all events'); + if (hasUnprocessedRecords) { + throw new Error('Failed to process all Kinesis records'); + } + + context.logger.info('Succesfully processed all Kinesis records'); }); } diff --git a/src/sqs.ts b/src/sqs.ts index 716e77c..397057a 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -116,7 +116,6 @@ export class SQSMessageHandler { // ordering key. orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), concurrency: this.config.concurrency ?? 5, - rejectOnError: false, }, async (record) => { const messageLogger = context.logger.child({ @@ -132,12 +131,12 @@ export class SQSMessageHandler { ); } - messageLogger.info('Successfully processed message'); + messageLogger.info('Successfully processed SQS message'); }, ); if (!hasUnprocessedRecords) { - context.logger.info('Succesfully processed all messages'); + context.logger.info('Succesfully processed all SQS messages'); } if (!this.config.usePartialBatchResponses) return; diff --git a/src/utils.ts b/src/utils.ts index 3420e17..237fa81 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -32,7 +32,6 @@ export type ProcessWithOrderingParams = { items: T[]; orderBy: (msg: T) => string; concurrency: number; - rejectOnError?: boolean; }; /** @@ -58,16 +57,11 @@ export type ProcessWithOrderingParams = { * This same scenario is true for SQS FIFO queues, which will order messages * by MessageGroupId. * - * When rejectOnError is false, the method will return the list - * of unprocessed items by listId. Callers will then have access to the error - * that caused the list to stop processing events, plus all the events - * that were not processed in the batch. */ export const processWithOrdering = async ( params: ProcessWithOrderingParams, process: (item: T) => Promise, ) => { - const rejectOnError = params.rejectOnError ?? true; const groupedItems = groupBy(params.items, params.orderBy); const listIds = Object.keys(groupedItems); const lists = Object.values(groupedItems); @@ -100,16 +94,10 @@ export const processWithOrdering = async ( }, ); - const aggregateErrors = Object.values(unprocessedRecordsByListId) - .map((record) => record.error) - .flat(); - - if (aggregateErrors.length > 0 && rejectOnError) { - throw new AggregateError(aggregateErrors); - } - return { - hasUnprocessedRecords: aggregateErrors.length > 0, + hasUnprocessedRecords: Object.values(unprocessedRecordsByListId).some( + (record) => record.items.length > 0, + ), unprocessedRecords: unprocessedRecordsByListId, }; }; From 3f3308c72b86f8976d185b9f11051d63cad074a9 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 15:57:01 -0500 Subject: [PATCH 06/13] throw on unprocessed records for kinesis and dynamo --- src/dynamo-streams.test.ts | 40 +++++++++++++++++++++++++++++++++ src/kinesis.test.ts | 45 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 85 insertions(+) diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index e9389b7..eb655a2 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -76,6 +76,46 @@ describe('DynamoStreamHandler', () => { }); }); + test('throws error if there are unprocessed records', async () => { + const handler = new DynamoStreamHandler({ + logger, + parse: testSerializer.parse, + createRunContext: () => ({ logger, dataSources }), + }) + .onInsert((ctx, entity) => { + if (entity.id === 'new-insert-2') { + ctx.dataSources.doSomething(entity); + } else { + throw new Error('poison record!'); + } + }) + .lambda(); + + await expect( + handler( + { + Records: [ + { + eventName: 'INSERT', + dynamodb: { NewImage: { id: { S: 'new-insert-1' } } }, + }, + { + eventName: 'INSERT', + dynamodb: { NewImage: { id: { S: 'new-insert-2' } } }, + }, + ], + }, + {} as any, + {} as any, + ), + ).rejects.toThrow('Failed to process all DynamoDB stream records'); + + expect(dataSources.doSomething).toHaveBeenCalledTimes(1); + expect(dataSources.doSomething).toHaveBeenCalledWith({ + id: 'new-insert-2', + }); + }); + test('handles insert events', async () => { const lambda = new DynamoStreamHandler({ logger, diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index 62f8ac1..9c00dbf 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -207,6 +207,51 @@ describe('KinesisEventHandler', () => { }); }); + test('throws error if there are unprocessed records', async () => { + let alreadyFailed = false; + + const handler = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }) + .onEvent(() => { + if (!alreadyFailed) { + alreadyFailed = true; + throw new Error('poison record!'); + } + }) + .lambda(); + + await expect( + handler( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-1' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-2' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-3' }), + }, + }, + ] as any, + }, + {} as any, + ), + ).rejects.toThrow('Failed to process all Kinesis records'); + }); + test('allows overriding context and logger', async () => { const testValue = uuid(); From 666591f7dd81f9ecfc79ac9386b66a43bf2b9dbb Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 16:25:02 -0500 Subject: [PATCH 07/13] improve processWithOrdering --- src/dynamo-streams.test.ts | 17 ++++++++--- src/dynamo-streams.ts | 7 ++--- src/kinesis.test.ts | 18 ++++++----- src/kinesis.ts | 7 ++--- src/sqs.test.ts | 4 ++- src/sqs.ts | 61 +++++++++++++++++++------------------- src/utils.ts | 14 +++++++-- 7 files changed, 72 insertions(+), 56 deletions(-) diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index eb655a2..3a60474 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -83,10 +83,10 @@ describe('DynamoStreamHandler', () => { createRunContext: () => ({ logger, dataSources }), }) .onInsert((ctx, entity) => { - if (entity.id === 'new-insert-2') { + if (entity.id === 'new-insert-1') { ctx.dataSources.doSomething(entity); } else { - throw new Error('poison record!'); + throw new Error(`Failed to process ${entity.id}`); } }) .lambda(); @@ -103,16 +103,25 @@ describe('DynamoStreamHandler', () => { eventName: 'INSERT', dynamodb: { NewImage: { id: { S: 'new-insert-2' } } }, }, + { + eventName: 'INSERT', + dynamodb: { NewImage: { id: { S: 'new-insert-3' } } }, + }, ], }, {} as any, {} as any, ), - ).rejects.toThrow('Failed to process all DynamoDB stream records'); + ).rejects.toThrowError( + new AggregateError([ + new Error('Failed to process new-insert-2'), + new Error('Failed to process new-insert-3'), + ]), + ); expect(dataSources.doSomething).toHaveBeenCalledTimes(1); expect(dataSources.doSomething).toHaveBeenCalledWith({ - id: 'new-insert-2', + id: 'new-insert-1', }); }); diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index b9257f9..5809dc7 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -231,7 +231,7 @@ export class DynamoStreamHandler { 'Processing DynamoDB stream event', ); - const { hasUnprocessedRecords } = await processWithOrdering( + const processingResult = await processWithOrdering( { items: event.Records, orderBy: (record) => { @@ -325,10 +325,7 @@ export class DynamoStreamHandler { }, ); - if (hasUnprocessedRecords) { - throw new Error('Failed to process all DynamoDB stream records'); - } - + processingResult.throwOnUnprocessedRecords(); context.logger.info('Successfully processed all DynamoDB stream records'); }); } diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index 9c00dbf..eaed039 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -207,18 +207,15 @@ describe('KinesisEventHandler', () => { }); }); - test('throws error if there are unprocessed records', async () => { - let alreadyFailed = false; - + test('throws aggregate error if there are unprocessed records', async () => { const handler = new KinesisEventHandler({ logger, parseEvent: testSerializer.parseEvent, createRunContext: () => ({}), }) - .onEvent(() => { - if (!alreadyFailed) { - alreadyFailed = true; - throw new Error('poison record!'); + .onEvent((ctx, message) => { + if (message.data !== 'test-event-1') { + throw new Error(`Failed to process ${message.data}`); } }) .lambda(); @@ -249,7 +246,12 @@ describe('KinesisEventHandler', () => { }, {} as any, ), - ).rejects.toThrow('Failed to process all Kinesis records'); + ).rejects.toThrowError( + new AggregateError([ + new Error('Failed to process test-event-2'), + new Error('Failed to process test-event-3'), + ]), + ); }); test('allows overriding context and logger', async () => { diff --git a/src/kinesis.ts b/src/kinesis.ts index 0f8a757..b7d8389 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -90,7 +90,7 @@ export class KinesisEventHandler { Object.assign(context, await this.config.createRunContext(context)); // 2. Process all the records. - const { hasUnprocessedRecords } = await processWithOrdering( + const processingResult = await processWithOrdering( { items: event.Records, orderBy: (record) => record.kinesis.partitionKey, @@ -111,10 +111,7 @@ export class KinesisEventHandler { }, ); - if (hasUnprocessedRecords) { - throw new Error('Failed to process all Kinesis records'); - } - + processingResult.throwOnUnprocessedRecords(); context.logger.info('Succesfully processed all Kinesis records'); }); } diff --git a/src/sqs.test.ts b/src/sqs.test.ts index 3dac844..d430b4b 100644 --- a/src/sqs.test.ts +++ b/src/sqs.test.ts @@ -58,7 +58,7 @@ describe('SQSMessageHandler', () => { }, }).lambda(); - await lambda( + const response = await lambda( { Records: [ { attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) }, @@ -67,6 +67,8 @@ describe('SQSMessageHandler', () => { {} as any, ); + expect(response).toBeUndefined(); + expect(logger.child).toHaveBeenCalledWith( expect.objectContaining({ correlationId: expect.any(String) }), ); diff --git a/src/sqs.ts b/src/sqs.ts index 397057a..f6969da 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -107,44 +107,45 @@ export class SQSMessageHandler { // 2. Process all the records. context.logger.info({ event }, 'Processing SQS topic message'); - const { hasUnprocessedRecords, unprocessedRecords } = - await processWithOrdering( - { - items: event.Records, - // If there is not a MessageGroupId, then we don't care about - // the ordering for the event. We can just generate a UUID for the - // ordering key. - orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), - concurrency: this.config.concurrency ?? 5, - }, - async (record) => { - const messageLogger = context.logger.child({ - messageId: record.messageId, - }); - - const parsedMessage = this.config.parseMessage(record.body); - - for (const action of this.messageActions) { - await action( - { ...context, logger: messageLogger }, - parsedMessage, - ); - } - - messageLogger.info('Successfully processed SQS message'); - }, - ); + const processingResult = await processWithOrdering( + { + items: event.Records, + // If there is not a MessageGroupId, then we don't care about + // the ordering for the event. We can just generate a UUID for the + // ordering key. + orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), + concurrency: this.config.concurrency ?? 5, + }, + async (record) => { + const messageLogger = context.logger.child({ + messageId: record.messageId, + }); + + const parsedMessage = this.config.parseMessage(record.body); + + for (const action of this.messageActions) { + await action({ ...context, logger: messageLogger }, parsedMessage); + } + + messageLogger.info('Successfully processed SQS message'); + }, + ); - if (!hasUnprocessedRecords) { + if (!processingResult.hasUnprocessedRecords) { context.logger.info('Succesfully processed all SQS messages'); } - if (!this.config.usePartialBatchResponses) return; + if (!this.config.usePartialBatchResponses) { + processingResult.throwOnUnprocessedRecords(); + return; + } // SQS partial batching expects that you return an ordered list of // failures. We map through each group and add them to the batch item // failures in order for each group. - const batchItemFailures = Object.entries(unprocessedRecords) + const batchItemFailures = Object.entries( + processingResult.unprocessedRecords, + ) .map(([groupId, record]) => { const [failedRecord, ...subsequentUnprocessedRecords] = record.items; context.logger.warn( diff --git a/src/utils.ts b/src/utils.ts index 237fa81..733270a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -94,10 +94,18 @@ export const processWithOrdering = async ( }, ); + const aggregateErrors = Object.values(unprocessedRecordsByListId) + .map((record) => record.error) + .filter(Boolean) + .flat(); + return { - hasUnprocessedRecords: Object.values(unprocessedRecordsByListId).some( - (record) => record.items.length > 0, - ), + hasUnprocessedRecords: aggregateErrors.length > 0, unprocessedRecords: unprocessedRecordsByListId, + throwOnUnprocessedRecords: () => { + if (aggregateErrors.length) { + throw new AggregateError(aggregateErrors); + } + }, }; }; From e8a55ada231bf9c1b11e91f1f3b330b4f03a67e7 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 17:33:44 -0500 Subject: [PATCH 08/13] update sqs unit test coverage --- src/sqs.test.ts | 120 ++++++++++++++++++++++++++++++++++++++++++++++-- src/sqs.ts | 2 +- 2 files changed, 118 insertions(+), 4 deletions(-) diff --git a/src/sqs.test.ts b/src/sqs.test.ts index d430b4b..86390c7 100644 --- a/src/sqs.test.ts +++ b/src/sqs.test.ts @@ -1,9 +1,10 @@ import { v4 as uuid } from 'uuid'; import { LoggerInterface } from '@lifeomic/logging'; -import { SQSMessageHandler } from './sqs'; +import { SQSMessageAction, SQSMessageHandler } from './sqs'; const logger: jest.Mocked = { info: jest.fn(), + error: jest.fn(), child: jest.fn(), } as any; @@ -49,6 +50,8 @@ describe('SQSMessageHandler', () => { }); test('generates a correlation id', async () => { + expect.assertions(3); + const lambda = new SQSMessageHandler({ logger, parseMessage: testSerializer.parseMessage, @@ -67,13 +70,124 @@ describe('SQSMessageHandler', () => { {} as any, ); + // Assert that when all messages are processed successfully and partial + // batch responses are not used (the default setting), nothing is returned + // as the lambda response. expect(response).toBeUndefined(); - expect(logger.child).toHaveBeenCalledWith( expect.objectContaining({ correlationId: expect.any(String) }), ); + }); + + describe('error handling', () => { + const records = [ + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-1', + body: JSON.stringify({ name: 'test-event-1' }), + }, + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-2', + body: JSON.stringify({ name: 'test-event-2' }), + }, + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-3', + body: JSON.stringify({ name: 'test-event-3' }), + }, + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-4', + body: JSON.stringify({ name: 'test-event-4' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-5', + body: JSON.stringify({ name: 'test-event-5' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-6', + body: JSON.stringify({ name: 'test-event-6' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-7', + body: JSON.stringify({ name: 'test-event-7' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-8', + body: JSON.stringify({ name: 'test-event-8' }), + }, + ]; + const messageHandler: SQSMessageAction<{ name: string }, any> = ( + ctx, + message, + ) => { + // Fail on the third message on each group (each group has 4 messages). + if (message.name === 'test-event-3' || message.name === 'test-event-7') { + throw new Error(`Failed to process message ${message.name}`); + } + }; + + test('throws on unprocessed events by default', async () => { + expect.assertions(2); + const handler = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + concurrency: 2, + }) + .onMessage(messageHandler) + .lambda(); + + try { + await handler( + { + Records: records, + } as any, + {} as any, + ); + } catch (e) { + expect(e).toBeInstanceOf(AggregateError); + expect(e.errors).toEqual([ + new Error('Failed to process message test-event-3'), + new Error('Failed to process message test-event-7'), + ]); + } + }); + + test('returns partial batch response when setting is enabled', async () => { + const handler = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + usePartialBatchResponses: true, + // Make sure partial batch responses are returned in order even + // when using concurrency. + concurrency: 2, + }) + .onMessage(messageHandler) + .lambda(); - expect.assertions(2); + const result = await handler( + { + Records: records, + } as any, + {} as any, + ); + + expect(result).toEqual({ + batchItemFailures: [ + { itemIdentifier: 'message-3' }, + { itemIdentifier: 'message-4' }, + { itemIdentifier: 'message-7' }, + { itemIdentifier: 'message-8' }, + ], + }); + }); }); test('sending messages with context', async () => { diff --git a/src/sqs.ts b/src/sqs.ts index f6969da..076f46e 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -148,7 +148,7 @@ export class SQSMessageHandler { ) .map(([groupId, record]) => { const [failedRecord, ...subsequentUnprocessedRecords] = record.items; - context.logger.warn( + context.logger.error( { groupId, err: record.error, From cb99993a100b432409a82e8358554327cba5ebfb Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 17:38:15 -0500 Subject: [PATCH 09/13] more robust unit tests --- src/dynamo-streams.test.ts | 16 +++++++++------- src/kinesis.test.ts | 17 ++++++++++------- 2 files changed, 19 insertions(+), 14 deletions(-) diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index 3a60474..e7233ec 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -77,6 +77,7 @@ describe('DynamoStreamHandler', () => { }); test('throws error if there are unprocessed records', async () => { + expect.assertions(4); const handler = new DynamoStreamHandler({ logger, parse: testSerializer.parse, @@ -91,8 +92,8 @@ describe('DynamoStreamHandler', () => { }) .lambda(); - await expect( - handler( + try { + await handler( { Records: [ { @@ -111,13 +112,14 @@ describe('DynamoStreamHandler', () => { }, {} as any, {} as any, - ), - ).rejects.toThrowError( - new AggregateError([ + ); + } catch (e) { + expect(e).toBeInstanceOf(AggregateError); + expect(e.errors).toEqual([ new Error('Failed to process new-insert-2'), new Error('Failed to process new-insert-3'), - ]), - ); + ]); + } expect(dataSources.doSomething).toHaveBeenCalledTimes(1); expect(dataSources.doSomething).toHaveBeenCalledWith({ diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index eaed039..48e5e97 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -208,6 +208,8 @@ describe('KinesisEventHandler', () => { }); test('throws aggregate error if there are unprocessed records', async () => { + expect.assertions(2); + const handler = new KinesisEventHandler({ logger, parseEvent: testSerializer.parseEvent, @@ -220,8 +222,8 @@ describe('KinesisEventHandler', () => { }) .lambda(); - await expect( - handler( + try { + await handler( { Records: [ { @@ -245,13 +247,14 @@ describe('KinesisEventHandler', () => { ] as any, }, {} as any, - ), - ).rejects.toThrowError( - new AggregateError([ + ); + } catch (e) { + expect(e).toBeInstanceOf(AggregateError); + expect(e.errors).toEqual([ new Error('Failed to process test-event-2'), new Error('Failed to process test-event-3'), - ]), - ); + ]); + } }); test('allows overriding context and logger', async () => { From 4367973d256b0171ee99b8edc82e108ae37d0c36 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Mon, 27 Nov 2023 17:45:07 -0500 Subject: [PATCH 10/13] lock vscode TS version + node 16 --- .github/workflows/pull-request.yaml | 2 +- .github/workflows/release.yaml | 2 +- .vscode/settings.json | 3 +++ 3 files changed, 5 insertions(+), 2 deletions(-) create mode 100644 .vscode/settings.json diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 0d8f7d3..67b3fa6 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -11,7 +11,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-node@v2 with: - node-version: 14 + node-version: 16 - name: Build & Test run: | echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > .npmrc diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 6a4bf7f..02cf9bc 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -16,7 +16,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-node@v2 with: - node-version: 14 + node-version: 16 - name: Test run: | echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > .npmrc diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..3662b37 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "typescript.tsdk": "node_modules/typescript/lib" +} \ No newline at end of file From a397da29992245e25efd5cbd3b74b6ec3a3a199c Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Tue, 28 Nov 2023 16:41:56 -0500 Subject: [PATCH 11/13] typo --- src/kinesis.ts | 2 +- src/sqs.ts | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/src/kinesis.ts b/src/kinesis.ts index b7d8389..e99c7c6 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -112,7 +112,7 @@ export class KinesisEventHandler { ); processingResult.throwOnUnprocessedRecords(); - context.logger.info('Succesfully processed all Kinesis records'); + context.logger.info('Successfully processed all Kinesis records'); }); } diff --git a/src/sqs.ts b/src/sqs.ts index 076f46e..8831f46 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -132,7 +132,7 @@ export class SQSMessageHandler { ); if (!processingResult.hasUnprocessedRecords) { - context.logger.info('Succesfully processed all SQS messages'); + context.logger.info('Successfully processed all SQS messages'); } if (!this.config.usePartialBatchResponses) { From cd26a3dfb44a2b6fc70cead6302fdd1717043180 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Tue, 28 Nov 2023 16:44:49 -0500 Subject: [PATCH 12/13] better docs --- src/sqs.ts | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/src/sqs.ts b/src/sqs.ts index 8831f46..f9c9ac3 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -27,8 +27,10 @@ export type SQSMessageHandlerConfig = { */ concurrency?: number; /** - * Whether or not to use SQS partial batch responses. For more details - * about SQS partial batch responses see + * Whether or not to use SQS partial batch responses. Make sure to also + * turn on partial batch responses when configuring your event source mapping + * by specifying ReportBatchItemFailures for the FunctionResponseTypes action. + * For more details see: * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting */ usePartialBatchResponses?: boolean; From dfc10047709fc9460df3bbc450fa9cdcdc679a63 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Tue, 28 Nov 2023 16:46:07 -0500 Subject: [PATCH 13/13] better docs 2 --- src/sqs.ts | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/sqs.ts b/src/sqs.ts index f9c9ac3..57fbeeb 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -27,10 +27,10 @@ export type SQSMessageHandlerConfig = { */ concurrency?: number; /** - * Whether or not to use SQS partial batch responses. Make sure to also - * turn on partial batch responses when configuring your event source mapping - * by specifying ReportBatchItemFailures for the FunctionResponseTypes action. - * For more details see: + * Whether or not to use SQS partial batch responses. If set to true, make + * sure to also turn on partial batch responses when configuring your event + * source mapping by specifying ReportBatchItemFailures for the + * FunctionResponseTypes action. For more details see: * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting */ usePartialBatchResponses?: boolean;