Skip to content

Commit

Permalink
Merge pull request #59 from lifeomic/support-sqs-partial-batching
Browse files Browse the repository at this point in the history
support SQS partial batching
  • Loading branch information
aecorredor authored Nov 29, 2023
2 parents 4430f04 + dfc1004 commit 2645f7f
Show file tree
Hide file tree
Showing 11 changed files with 340 additions and 26 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/pull-request.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-node@v2
with:
node-version: 14
node-version: 16
- name: Build & Test
run: |
echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > .npmrc
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ jobs:
- uses: actions/checkout@v2
- uses: actions/setup-node@v2
with:
node-version: 14
node-version: 16
- name: Test
run: |
echo "//registry.npmjs.org/:_authToken=${NPM_TOKEN}" > .npmrc
Expand Down
3 changes: 3 additions & 0 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"typescript.tsdk": "node_modules/typescript/lib"
}
51 changes: 51 additions & 0 deletions src/dynamo-streams.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,57 @@ describe('DynamoStreamHandler', () => {
});
});

test('throws error if there are unprocessed records', async () => {
expect.assertions(4);
const handler = new DynamoStreamHandler({
logger,
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
})
.onInsert((ctx, entity) => {
if (entity.id === 'new-insert-1') {
ctx.dataSources.doSomething(entity);
} else {
throw new Error(`Failed to process ${entity.id}`);
}
})
.lambda();

try {
await handler(
{
Records: [
{
eventName: 'INSERT',
dynamodb: { NewImage: { id: { S: 'new-insert-1' } } },
},
{
eventName: 'INSERT',
dynamodb: { NewImage: { id: { S: 'new-insert-2' } } },
},
{
eventName: 'INSERT',
dynamodb: { NewImage: { id: { S: 'new-insert-3' } } },
},
],
},
{} as any,
{} as any,
);
} catch (e) {
expect(e).toBeInstanceOf(AggregateError);
expect(e.errors).toEqual([
new Error('Failed to process new-insert-2'),
new Error('Failed to process new-insert-3'),
]);
}

expect(dataSources.doSomething).toHaveBeenCalledTimes(1);
expect(dataSources.doSomething).toHaveBeenCalledWith({
id: 'new-insert-1',
});
});

test('handles insert events', async () => {
const lambda = new DynamoStreamHandler({
logger,
Expand Down
6 changes: 4 additions & 2 deletions src/dynamo-streams.ts
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ export class DynamoStreamHandler<Entity, Context> {
'Processing DynamoDB stream event',
);

await processWithOrdering(
const processingResult = await processWithOrdering(
{
items: event.Records,
orderBy: (record) => {
Expand All @@ -258,7 +258,6 @@ export class DynamoStreamHandler<Entity, Context> {
);
},
concurrency: this.config.concurrency ?? 5,
stopOnError: false,
},
async (record) => {
const recordLogger = this.config.logger.child({
Expand Down Expand Up @@ -325,6 +324,9 @@ export class DynamoStreamHandler<Entity, Context> {
}
},
);

processingResult.throwOnUnprocessedRecords();
context.logger.info('Successfully processed all DynamoDB stream records');
});
}

Expand Down
50 changes: 50 additions & 0 deletions src/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,56 @@ describe('KinesisEventHandler', () => {
});
});

test('throws aggregate error if there are unprocessed records', async () => {
expect.assertions(2);

const handler = new KinesisEventHandler({
logger,
parseEvent: testSerializer.parseEvent,
createRunContext: () => ({}),
})
.onEvent((ctx, message) => {
if (message.data !== 'test-event-1') {
throw new Error(`Failed to process ${message.data}`);
}
})
.lambda();

try {
await handler(
{
Records: [
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
},
},
] as any,
},
{} as any,
);
} catch (e) {
expect(e).toBeInstanceOf(AggregateError);
expect(e.errors).toEqual([
new Error('Failed to process test-event-2'),
new Error('Failed to process test-event-3'),
]);
}
});

test('allows overriding context and logger', async () => {
const testValue = uuid();

Expand Down
9 changes: 4 additions & 5 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,13 +90,11 @@ export class KinesisEventHandler<Event, Context> {
Object.assign(context, await this.config.createRunContext(context));

// 2. Process all the records.

await processWithOrdering(
const processingResult = await processWithOrdering(
{
items: event.Records,
orderBy: (record) => record.kinesis.partitionKey,
concurrency: this.config.concurrency ?? 5,
stopOnError: false,
},
async (record) => {
const eventLogger = context.logger.child({
Expand All @@ -109,11 +107,12 @@ export class KinesisEventHandler<Event, Context> {
await action({ ...context, logger: eventLogger }, parsedEvent);
}

eventLogger.info('Successfully processed event');
eventLogger.info('Successfully processed Kinesis record');
},
);

context.logger.info('Succesfully processed all events');
processingResult.throwOnUnprocessedRecords();
context.logger.info('Successfully processed all Kinesis records');
});
}

Expand Down
122 changes: 119 additions & 3 deletions src/sqs.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { v4 as uuid } from 'uuid';
import { LoggerInterface } from '@lifeomic/logging';
import { SQSMessageHandler } from './sqs';
import { SQSMessageAction, SQSMessageHandler } from './sqs';

const logger: jest.Mocked<LoggerInterface> = {
info: jest.fn(),
error: jest.fn(),
child: jest.fn(),
} as any;

Expand Down Expand Up @@ -49,6 +50,8 @@ describe('SQSMessageHandler', () => {
});

test('generates a correlation id', async () => {
expect.assertions(3);

const lambda = new SQSMessageHandler({
logger,
parseMessage: testSerializer.parseMessage,
Expand All @@ -58,7 +61,7 @@ describe('SQSMessageHandler', () => {
},
}).lambda();

await lambda(
const response = await lambda(
{
Records: [
{ attributes: {}, body: JSON.stringify({ data: 'test-event-1' }) },
Expand All @@ -67,11 +70,124 @@ describe('SQSMessageHandler', () => {
{} as any,
);

// Assert that when all messages are processed successfully and partial
// batch responses are not used (the default setting), nothing is returned
// as the lambda response.
expect(response).toBeUndefined();
expect(logger.child).toHaveBeenCalledWith(
expect.objectContaining({ correlationId: expect.any(String) }),
);
});

describe('error handling', () => {
const records = [
{
attributes: { MessageGroupId: '1' },
messageId: 'message-1',
body: JSON.stringify({ name: 'test-event-1' }),
},
{
attributes: { MessageGroupId: '1' },
messageId: 'message-2',
body: JSON.stringify({ name: 'test-event-2' }),
},
{
attributes: { MessageGroupId: '1' },
messageId: 'message-3',
body: JSON.stringify({ name: 'test-event-3' }),
},
{
attributes: { MessageGroupId: '1' },
messageId: 'message-4',
body: JSON.stringify({ name: 'test-event-4' }),
},
{
attributes: { MessageGroupId: '2' },
messageId: 'message-5',
body: JSON.stringify({ name: 'test-event-5' }),
},
{
attributes: { MessageGroupId: '2' },
messageId: 'message-6',
body: JSON.stringify({ name: 'test-event-6' }),
},
{
attributes: { MessageGroupId: '2' },
messageId: 'message-7',
body: JSON.stringify({ name: 'test-event-7' }),
},
{
attributes: { MessageGroupId: '2' },
messageId: 'message-8',
body: JSON.stringify({ name: 'test-event-8' }),
},
];
const messageHandler: SQSMessageAction<{ name: string }, any> = (
ctx,
message,
) => {
// Fail on the third message on each group (each group has 4 messages).
if (message.name === 'test-event-3' || message.name === 'test-event-7') {
throw new Error(`Failed to process message ${message.name}`);
}
};

test('throws on unprocessed events by default', async () => {
expect.assertions(2);
const handler = new SQSMessageHandler({
logger,
parseMessage: testSerializer.parseMessage,
createRunContext: () => ({}),
concurrency: 2,
})
.onMessage(messageHandler)
.lambda();

try {
await handler(
{
Records: records,
} as any,
{} as any,
);
} catch (e) {
expect(e).toBeInstanceOf(AggregateError);
expect(e.errors).toEqual([
new Error('Failed to process message test-event-3'),
new Error('Failed to process message test-event-7'),
]);
}
});

test('returns partial batch response when setting is enabled', async () => {
const handler = new SQSMessageHandler({
logger,
parseMessage: testSerializer.parseMessage,
createRunContext: () => ({}),
usePartialBatchResponses: true,
// Make sure partial batch responses are returned in order even
// when using concurrency.
concurrency: 2,
})
.onMessage(messageHandler)
.lambda();

expect.assertions(2);
const result = await handler(
{
Records: records,
} as any,
{} as any,
);

expect(result).toEqual({
batchItemFailures: [
{ itemIdentifier: 'message-3' },
{ itemIdentifier: 'message-4' },
{ itemIdentifier: 'message-7' },
{ itemIdentifier: 'message-8' },
],
});
});
});

test('sending messages with context', async () => {
Expand Down
Loading

0 comments on commit 2645f7f

Please sign in to comment.