Skip to content

Commit

Permalink
Merge pull request #70 from lifeomic/fix/sqs-partial-batching
Browse files Browse the repository at this point in the history
  • Loading branch information
aecorredor authored Mar 14, 2024
2 parents 5329d1d + b4b2b05 commit 2b90896
Show file tree
Hide file tree
Showing 3 changed files with 92 additions and 35 deletions.
73 changes: 64 additions & 9 deletions src/sqs.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -300,7 +300,8 @@ describe('SQSMessageHandler', () => {
body: JSON.stringify({ name: 'test-event-8' }),
},
];
const messageHandler: SQSMessageAction<{ name: string }, any> = (

const errorMessageHandler: SQSMessageAction<{ name: string }, any> = (
ctx,
message,
) => {
Expand All @@ -310,6 +311,13 @@ describe('SQSMessageHandler', () => {
}
};

const successMessageHandler: SQSMessageAction<
{ name: string },
any
> = () => {
return Promise.resolve();
};

test('throws on unprocessed events by default', async () => {
expect.assertions(2);
const handler = new SQSMessageHandler({
Expand All @@ -318,7 +326,7 @@ describe('SQSMessageHandler', () => {
createRunContext: () => ({}),
concurrency: 2,
})
.onMessage(messageHandler)
.onMessage(errorMessageHandler)
.lambda();

try {
Expand Down Expand Up @@ -347,7 +355,7 @@ describe('SQSMessageHandler', () => {
// when using concurrency.
concurrency: 2,
})
.onMessage(messageHandler)
.onMessage(errorMessageHandler)
.lambda();

const result = await handler(
Expand All @@ -357,14 +365,61 @@ describe('SQSMessageHandler', () => {
{} as any,
);

const batchItemFailures = [
{ itemIdentifier: 'message-3' },
{ itemIdentifier: 'message-4' },
{ itemIdentifier: 'message-7' },
{ itemIdentifier: 'message-8' },
];

expect(result).toEqual({
batchItemFailures: [
{ itemIdentifier: 'message-3' },
{ itemIdentifier: 'message-4' },
{ itemIdentifier: 'message-7' },
{ itemIdentifier: 'message-8' },
],
batchItemFailures,
});
expect(logger.info).not.toHaveBeenCalledWith(
'Successfully processed all SQS messages',
);
expect(logger.info).toHaveBeenCalledWith(
{
batchItemFailures,
},
'Sending SQS partial batch response',
);
});

test('returns nothing when all events are processed successfully', async () => {
const handler = new SQSMessageHandler({
logger,
parseMessage: testSerializer.parseMessage,
createRunContext: () => ({}),
usePartialBatchResponses: true,
// Make sure partial batch responses are returned in order even
// when using concurrency.
concurrency: 2,
})
.onMessage(successMessageHandler)
.lambda();

const result = await handler(
{
Records: records,
} as any,
{} as any,
);

expect(result).toEqual(undefined);
expect(logger.error).not.toHaveBeenCalledWith(
expect.any(Object),
'Failed to fully process message group',
);
expect(logger.info).toHaveBeenCalledWith(
'Successfully processed all SQS messages',
);
expect(logger.info).not.toHaveBeenCalledWith(
{
batchItemFailures: expect.any(Array),
},
'Sending SQS partial batch response',
);
});
});

Expand Down
12 changes: 7 additions & 5 deletions src/sqs.ts
Original file line number Diff line number Diff line change
Expand Up @@ -199,21 +199,23 @@ export class SQSMessageHandler<Message, Context> {
},
);

if (!processingResult.hasUnprocessedRecords) {
const unprocessedRecordsByGroupIdEntries = Object.entries(
processingResult.unprocessedRecordsByGroupId,
);

if (!unprocessedRecordsByGroupIdEntries.length) {
context.logger.info('Successfully processed all SQS messages');
return;
}

if (!this.config.usePartialBatchResponses) {
processingResult.throwOnUnprocessedRecords();
return;
}

// SQS partial batching expects that you return an ordered list of
// failures. We map through each group and add them to the batch item
// failures in order for each group.
const batchItemFailures = Object.entries(
processingResult.unprocessedRecords,
)
const batchItemFailures = unprocessedRecordsByGroupIdEntries
.map(([groupId, record]) => {
const [failedRecord, ...subsequentUnprocessedRecords] = record.items;
context.logger.error(
Expand Down
42 changes: 21 additions & 21 deletions src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import { LoggerInterface } from '@lifeomic/logging';
import { Context } from 'aws-lambda';
import pMap from 'p-map';
import groupBy from 'lodash/groupBy';
import zipObject from 'lodash/zipObject';

export type BaseContext = {
logger: LoggerInterface;
Expand Down Expand Up @@ -91,27 +90,30 @@ export const processWithOrdering = async <T>(
process: (item: T) => Promise<void>,
) => {
const groupedItems = groupBy(params.items, params.orderBy);
const listIds = Object.keys(groupedItems);
const lists = Object.values(groupedItems);
const unprocessedRecordsByListId = zipObject<{ error: any; items: T[] }>(
listIds,
lists.map(() => ({ error: null, items: [] })),
);
const groupIds = Object.keys(groupedItems);
const groups = Object.values(groupedItems);
const unprocessedRecordsByGroupId: Record<
string,
{
error: any;
items: T[];
}
> = {};

await pMap(
lists,
async (list, listIndex) => {
for (let i = 0; i < list.length; i++) {
const item = list[i];
groups,
async (group, groupIndex) => {
for (let i = 0; i < group.length; i++) {
const item = group[i];

try {
await process(item);
} catch (error) {
// Keep track of all unprocessed items and stop processing the current
// list as soon as we encounter the first error.
unprocessedRecordsByListId[listIds[listIndex]] = {
// group as soon as we encounter the first error.
unprocessedRecordsByGroupId[groupIds[groupIndex]] = {
error,
items: list.slice(i),
items: group.slice(i),
};
return;
}
Expand All @@ -122,15 +124,13 @@ export const processWithOrdering = async <T>(
},
);

const aggregateErrors = Object.values(unprocessedRecordsByListId)
.map((record) => record.error)
.filter(Boolean)
.flat();

return {
hasUnprocessedRecords: aggregateErrors.length > 0,
unprocessedRecords: unprocessedRecordsByListId,
unprocessedRecordsByGroupId,
throwOnUnprocessedRecords: () => {
const aggregateErrors = Object.values(unprocessedRecordsByGroupId).map(
(record) => record.error,
);

if (aggregateErrors.length) {
throw new AggregateError(aggregateErrors);
}
Expand Down

0 comments on commit 2b90896

Please sign in to comment.