diff --git a/.github/workflows/pull-request.yaml b/.github/workflows/pull-request.yaml index 0d8f7d3..67b3fa6 100644 --- a/.github/workflows/pull-request.yaml +++ b/.github/workflows/pull-request.yaml @@ -11,7 +11,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-node@v2 with: - node-version: 14 + node-version: 16 - name: Build & Test run: | echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > .npmrc diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 6a4bf7f..02cf9bc 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -16,7 +16,7 @@ jobs: - uses: actions/checkout@v2 - uses: actions/setup-node@v2 with: - node-version: 14 + node-version: 16 - name: Test run: | echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > .npmrc diff --git a/.vscode/settings.json b/.vscode/settings.json new file mode 100644 index 0000000..3662b37 --- /dev/null +++ b/.vscode/settings.json @@ -0,0 +1,3 @@ +{ + "typescript.tsdk": "node_modules/typescript/lib" +} \ No newline at end of file diff --git a/src/dynamo-streams.test.ts b/src/dynamo-streams.test.ts index e9389b7..e7233ec 100644 --- a/src/dynamo-streams.test.ts +++ b/src/dynamo-streams.test.ts @@ -76,6 +76,57 @@ describe('DynamoStreamHandler', () => { }); }); + test('throws error if there are unprocessed records', async () => { + expect.assertions(4); + const handler = new DynamoStreamHandler({ + logger, + parse: testSerializer.parse, + createRunContext: () => ({ logger, dataSources }), + }) + .onInsert((ctx, entity) => { + if (entity.id === 'new-insert-1') { + ctx.dataSources.doSomething(entity); + } else { + throw new Error(`Failed to process ${entity.id}`); + } + }) + .lambda(); + + try { + await handler( + { + Records: [ + { + eventName: 'INSERT', + dynamodb: { NewImage: { id: { S: 'new-insert-1' } } }, + }, + { + eventName: 'INSERT', + dynamodb: { NewImage: { id: { S: 'new-insert-2' } } }, + }, + { + eventName: 'INSERT', + dynamodb: { NewImage: { id: { S: 'new-insert-3' } } }, + }, + ], + }, + {} as any, + {} as any, + ); + } catch (e) { + expect(e).toBeInstanceOf(AggregateError); + expect(e.errors).toEqual([ + new Error('Failed to process new-insert-2'), + new Error('Failed to process new-insert-3'), + ]); + } + + expect(dataSources.doSomething).toHaveBeenCalledTimes(1); + expect(dataSources.doSomething).toHaveBeenCalledWith({ + id: 'new-insert-1', + }); + }); + test('handles insert events', async () => { const lambda = new DynamoStreamHandler({ logger, diff --git a/src/dynamo-streams.ts b/src/dynamo-streams.ts index 627072e..5809dc7 100644 --- a/src/dynamo-streams.ts +++ b/src/dynamo-streams.ts @@ -231,7 +231,7 @@ export class DynamoStreamHandler { 'Processing DynamoDB stream event', ); - await processWithOrdering( + const processingResult = await processWithOrdering( { items: event.Records, orderBy: (record) => { @@ -258,7 +258,6 @@ export class DynamoStreamHandler { ); }, concurrency: this.config.concurrency ?? 5, - stopOnError: false, }, async (record) => { const recordLogger = this.config.logger.child({ @@ -325,6 +324,9 @@ export class DynamoStreamHandler { } }, ); + + processingResult.throwOnUnprocessedRecords(); + context.logger.info('Successfully processed all DynamoDB stream records'); }); } diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index 62f8ac1..48e5e97 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -207,6 +207,56 @@ describe('KinesisEventHandler', () => { }); }); + test('throws aggregate error if there are unprocessed records', async () => { + expect.assertions(2); + + const handler = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }) + .onEvent((ctx, message) => { + if (message.data !== 'test-event-1') { + throw new Error(`Failed to process ${message.data}`); + } + }) + .lambda(); + + try { + await handler( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-1' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-2' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-3' }), + }, + }, + ] as any, + }, + {} as any, + ); + } catch (e) { + expect(e).toBeInstanceOf(AggregateError); + expect(e.errors).toEqual([ + new Error('Failed to process test-event-2'), + new Error('Failed to process test-event-3'), + ]); + } + }); + test('allows overriding context and logger', async () => { const testValue = uuid(); diff --git a/src/kinesis.ts b/src/kinesis.ts index 6c12158..e99c7c6 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -90,13 +90,11 @@ export class KinesisEventHandler { Object.assign(context, await this.config.createRunContext(context)); // 2. Process all the records. - - await processWithOrdering( + const processingResult = await processWithOrdering( { items: event.Records, orderBy: (record) => record.kinesis.partitionKey, concurrency: this.config.concurrency ?? 5, - stopOnError: false, }, async (record) => { const eventLogger = context.logger.child({ @@ -109,11 +107,12 @@ export class KinesisEventHandler { await action({ ...context, logger: eventLogger }, parsedEvent); } - eventLogger.info('Successfully processed event'); + eventLogger.info('Successfully processed Kinesis record'); }, ); - context.logger.info('Succesfully processed all events'); + processingResult.throwOnUnprocessedRecords(); + context.logger.info('Successfully processed all Kinesis records'); }); } diff --git a/src/sqs.test.ts b/src/sqs.test.ts index 3dac844..86390c7 100644 --- a/src/sqs.test.ts +++ b/src/sqs.test.ts @@ -1,9 +1,10 @@ import { v4 as uuid } from 'uuid'; import { LoggerInterface } from '@lifeomic/logging'; -import { SQSMessageHandler } from './sqs'; +import { SQSMessageAction, SQSMessageHandler } from './sqs'; const logger: jest.Mocked = { info: jest.fn(), + error: jest.fn(), child: jest.fn(), } as any; @@ -49,6 +50,8 @@ describe('SQSMessageHandler', () => { }); test('generates a correlation id', async () => { + expect.assertions(3); + const lambda = new SQSMessageHandler({ logger, parseMessage: testSerializer.parseMessage, @@ -58,7 +61,7 @@ describe('SQSMessageHandler', () => { }, }).lambda(); - await lambda( + const response = await lambda( { Records: [ { attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) }, @@ -67,11 +70,124 @@ describe('SQSMessageHandler', () => { {} as any, ); + // Assert that when all messages are processed successfully and partial + // batch responses are not used (the default setting), nothing is returned + // as the lambda response. + expect(response).toBeUndefined(); expect(logger.child).toHaveBeenCalledWith( expect.objectContaining({ correlationId: expect.any(String) }), ); + }); + + describe('error handling', () => { + const records = [ + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-1', + body: JSON.stringify({ name: 'test-event-1' }), + }, + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-2', + body: JSON.stringify({ name: 'test-event-2' }), + }, + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-3', + body: JSON.stringify({ name: 'test-event-3' }), + }, + { + attributes: { MessageGroupId: '1' }, + messageId: 'message-4', + body: JSON.stringify({ name: 'test-event-4' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-5', + body: JSON.stringify({ name: 'test-event-5' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-6', + body: JSON.stringify({ name: 'test-event-6' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-7', + body: JSON.stringify({ name: 'test-event-7' }), + }, + { + attributes: { MessageGroupId: '2' }, + messageId: 'message-8', + body: JSON.stringify({ name: 'test-event-8' }), + }, + ]; + const messageHandler: SQSMessageAction<{ name: string }, any> = ( + ctx, + message, + ) => { + // Fail on the third message on each group (each group has 4 messages). + if (message.name === 'test-event-3' || message.name === 'test-event-7') { + throw new Error(`Failed to process message ${message.name}`); + } + }; + + test('throws on unprocessed events by default', async () => { + expect.assertions(2); + const handler = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + concurrency: 2, + }) + .onMessage(messageHandler) + .lambda(); + + try { + await handler( + { + Records: records, + } as any, + {} as any, + ); + } catch (e) { + expect(e).toBeInstanceOf(AggregateError); + expect(e.errors).toEqual([ + new Error('Failed to process message test-event-3'), + new Error('Failed to process message test-event-7'), + ]); + } + }); + + test('returns partial batch response when setting is enabled', async () => { + const handler = new SQSMessageHandler({ + logger, + parseMessage: testSerializer.parseMessage, + createRunContext: () => ({}), + usePartialBatchResponses: true, + // Make sure partial batch responses are returned in order even + // when using concurrency. + concurrency: 2, + }) + .onMessage(messageHandler) + .lambda(); - expect.assertions(2); + const result = await handler( + { + Records: records, + } as any, + {} as any, + ); + + expect(result).toEqual({ + batchItemFailures: [ + { itemIdentifier: 'message-3' }, + { itemIdentifier: 'message-4' }, + { itemIdentifier: 'message-7' }, + { itemIdentifier: 'message-8' }, + ], + }); + }); }); test('sending messages with context', async () => { diff --git a/src/sqs.ts b/src/sqs.ts index 32246ef..57fbeeb 100644 --- a/src/sqs.ts +++ b/src/sqs.ts @@ -26,6 +26,14 @@ export type SQSMessageHandlerConfig = { * @default 5 */ concurrency?: number; + /** + * Whether or not to use SQS partial batch responses. If set to true, make + * sure to also turn on partial batch responses when configuring your event + * source mapping by specifying ReportBatchItemFailures for the + * FunctionResponseTypes action. For more details see: + * https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html#services-sqs-batchfailurereporting + */ + usePartialBatchResponses?: boolean; }; export type SQSMessageAction = ( @@ -55,6 +63,12 @@ export type SQSMessageHandlerHarnessContext = { sendEvent: (event: { messages: Message[] }) => Promise; }; +export type SQSPartialBatchResponse = { + batchItemFailures: { + itemIdentifier: string; + }[]; +}; + /** * An abstraction for an SQS message handler. */ @@ -75,7 +89,10 @@ export class SQSMessageHandler { return this; } - lambda(): (event: SQSEvent, context: AWSContext) => Promise { + lambda(): ( + event: SQSEvent, + context: AWSContext, + ) => Promise { return withHealthCheckHandling(async (event, awsContext) => { // 1. Build the context. const correlationId = uuid(); @@ -92,7 +109,7 @@ export class SQSMessageHandler { // 2. Process all the records. context.logger.info({ event }, 'Processing SQS topic message'); - await processWithOrdering( + const processingResult = await processWithOrdering( { items: event.Records, // If there is not a MessageGroupId, then we don't care about @@ -100,7 +117,6 @@ export class SQSMessageHandler { // ordering key. orderBy: (record) => record.attributes.MessageGroupId ?? uuid(), concurrency: this.config.concurrency ?? 5, - stopOnError: false, }, async (record) => { const messageLogger = context.logger.child({ @@ -113,11 +129,49 @@ export class SQSMessageHandler { await action({ ...context, logger: messageLogger }, parsedMessage); } - messageLogger.info('Successfully processed message'); + messageLogger.info('Successfully processed SQS message'); }, ); - context.logger.info('Succesfully processed all messages'); + if (!processingResult.hasUnprocessedRecords) { + context.logger.info('Successfully processed all SQS messages'); + } + + if (!this.config.usePartialBatchResponses) { + processingResult.throwOnUnprocessedRecords(); + return; + } + + // SQS partial batching expects that you return an ordered list of + // failures. We map through each group and add them to the batch item + // failures in order for each group. + const batchItemFailures = Object.entries( + processingResult.unprocessedRecords, + ) + .map(([groupId, record]) => { + const [failedRecord, ...subsequentUnprocessedRecords] = record.items; + context.logger.error( + { + groupId, + err: record.error, + failedRecord, + subsequentUnprocessedRecords, + }, + 'Failed to fully process message group', + ); + + return record.items.map((item) => ({ + itemIdentifier: item.messageId, + })); + }) + .flat(); + + context.logger.info( + { batchItemFailures }, + 'Sending SQS partial batch response', + ); + + return { batchItemFailures }; }); } diff --git a/src/utils.ts b/src/utils.ts index 0085df4..733270a 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -2,6 +2,7 @@ import { LoggerInterface } from '@lifeomic/logging'; import { Context } from 'aws-lambda'; import pMap from 'p-map'; import groupBy from 'lodash/groupBy'; +import zipObject from 'lodash/zipObject'; export type BaseContext = { logger: LoggerInterface; @@ -9,8 +10,10 @@ export type BaseContext = { }; export const withHealthCheckHandling = - (handler: (event: Event, context: Context) => Promise) => - (event: Event, context: Context): Promise => { + ( + handler: (event: Event, context: Context) => Promise, + ) => + (event: Event, context: Context): Promise => { if ((event as any).httpMethod) { return { statusCode: 200, @@ -29,7 +32,6 @@ export type ProcessWithOrderingParams = { items: T[]; orderBy: (msg: T) => string; concurrency: number; - stopOnError: boolean; }; /** @@ -54,19 +56,56 @@ export type ProcessWithOrderingParams = { * * This same scenario is true for SQS FIFO queues, which will order messages * by MessageGroupId. + * */ export const processWithOrdering = async ( params: ProcessWithOrderingParams, process: (item: T) => Promise, ) => { - const lists = Object.values(groupBy(params.items, params.orderBy)); + const groupedItems = groupBy(params.items, params.orderBy); + const listIds = Object.keys(groupedItems); + const lists = Object.values(groupedItems); + const unprocessedRecordsByListId = zipObject<{ error: any; items: T[] }>( + listIds, + lists.map(() => ({ error: null, items: [] })), + ); + await pMap( lists, - async (list) => { - for (const item of list) { - await process(item); + async (list, listIndex) => { + for (let i = 0; i < list.length; i++) { + const item = list[i]; + + try { + await process(item); + } catch (error) { + // Keep track of all unprocessed items and stop processing the current + // list as soon as we encounter the first error. + unprocessedRecordsByListId[listIds[listIndex]] = { + error, + items: list.slice(i), + }; + return; + } } }, - { concurrency: params.concurrency, stopOnError: params.stopOnError }, + { + concurrency: params.concurrency, + }, ); + + const aggregateErrors = Object.values(unprocessedRecordsByListId) + .map((record) => record.error) + .filter(Boolean) + .flat(); + + return { + hasUnprocessedRecords: aggregateErrors.length > 0, + unprocessedRecords: unprocessedRecordsByListId, + throwOnUnprocessedRecords: () => { + if (aggregateErrors.length) { + throw new AggregateError(aggregateErrors); + } + }, + }; }; diff --git a/tsconfig.json b/tsconfig.json index 88738aa..4438ebd 100644 --- a/tsconfig.json +++ b/tsconfig.json @@ -4,7 +4,7 @@ "compilerOptions": { "target": "es2019", "module": "commonjs", - "lib": ["ES2020"], + "lib": ["ES2021"], "outDir": "./dist", "inlineSources": false, "inlineSourceMap": false,