From 3293adaa8d9c1b136ff5aba2ec66cb416ee524f3 Mon Sep 17 00:00:00 2001 From: Swain Molster Date: Wed, 22 Nov 2023 10:50:45 -0500 Subject: [PATCH 1/2] feat: support kinesis processing --- src/index.ts | 1 + src/kinesis.test.ts | 395 ++++++++++++++++++++++++++++++++++++++++++++ src/kinesis.ts | 157 ++++++++++++++++++ 3 files changed, 553 insertions(+) create mode 100644 src/kinesis.test.ts create mode 100644 src/kinesis.ts diff --git a/src/index.ts b/src/index.ts index f792b4f..459e39b 100644 --- a/src/index.ts +++ b/src/index.ts @@ -1,3 +1,4 @@ export * from './dynamo-streams'; export * from './sqs'; +export * from './kinesis'; export { BaseContext } from './utils'; diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts new file mode 100644 index 0000000..62f8ac1 --- /dev/null +++ b/src/kinesis.test.ts @@ -0,0 +1,395 @@ +import { v4 as uuid } from 'uuid'; +import { LoggerInterface } from '@lifeomic/logging'; +import { KinesisEventHandler } from './kinesis'; + +const logger: jest.Mocked = { + info: jest.fn(), + child: jest.fn(), +} as any; + +beforeEach(() => { + logger.info.mockReset(); + logger.child.mockReset(); + logger.child.mockImplementation(() => logger); +}); + +const testSerializer = { + parseEvent: (msg: string) => JSON.parse(msg), + stringifyEvent: (msg: any) => JSON.stringify(msg), +}; + +describe('KinesisEventHandler', () => { + test('responds to HTTP health checks', async () => { + const lambda = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }).lambda(); + + const result = await lambda({ httpMethod: 'GET' } as any, {} as any); + + expect(result).toStrictEqual({ + statusCode: 200, + body: '{"healthy":true}', + }); + }); + + test('responds to healthCheck events', async () => { + const lambda = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }).lambda(); + + const result = await lambda({ healthCheck: true } as any, {} as any); + + expect(result).toStrictEqual({ + healthy: true, + }); + }); + + test('generates a correlation id', async () => { + const lambda = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: (ctx) => { + expect(typeof ctx.correlationId === 'string').toBe(true); + return {}; + }, + }).lambda(); + + await lambda( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-1' }), + }, + }, + ], + } as any, + {} as any, + ); + + expect(logger.child).toHaveBeenCalledWith( + expect.objectContaining({ correlationId: expect.any(String) }), + ); + + expect.assertions(2); + }); + + test('sending events with context', async () => { + const dataSources = { + doSomething: jest.fn(), + }; + + const lambda = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => dataSources, + concurrency: 1, + }) + .onEvent((ctx, event) => { + ctx.doSomething('first-handler', event); + }) + .onEvent((ctx, event) => { + ctx.doSomething('second-handler', event); + }) + .lambda(); + + await lambda( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-1' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-2' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-3' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-4' }), + }, + }, + ], + } as any, + {} as any, + ); + + // Expect 8 calls. 2 event handlers * 4 events. + expect(dataSources.doSomething).toHaveBeenCalledTimes(8); + + // Now, confirm the ordering. + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 1, + 'first-handler', + { data: 'test-event-1' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 2, + 'second-handler', + { data: 'test-event-1' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 3, + 'first-handler', + { data: 'test-event-2' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 4, + 'second-handler', + { data: 'test-event-2' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 5, + 'first-handler', + { data: 'test-event-3' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 6, + 'second-handler', + { data: 'test-event-3' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 7, + 'first-handler', + { data: 'test-event-4' }, + ); + expect(dataSources.doSomething).toHaveBeenNthCalledWith( + 8, + 'second-handler', + { data: 'test-event-4' }, + ); + }); + + describe('harness', () => { + test('sends events correctly', async () => { + const dataSources = { + doSomething: jest.fn(), + }; + + const { sendEvent } = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => dataSources, + }) + .onEvent((ctx, msg) => { + ctx.doSomething(msg); + }) + .harness({ + stringifyEvent: testSerializer.stringifyEvent, + }); + + await sendEvent({ + events: [{ data: 'test-event-1' }, { data: 'test-event-2' }], + }); + + expect(dataSources.doSomething).toHaveBeenCalledTimes(2); + expect(dataSources.doSomething).toHaveBeenNthCalledWith(1, { + data: 'test-event-1', + }); + expect(dataSources.doSomething).toHaveBeenNthCalledWith(2, { + data: 'test-event-2', + }); + }); + + test('allows overriding context and logger', async () => { + const testValue = uuid(); + + const overrideLogger = { + info: jest.fn(), + error: jest.fn(), + child: jest.fn(), + }; + overrideLogger.child.mockImplementation(() => overrideLogger); + + const dataSources = { + doSomething: jest.fn(), + }; + + const { sendEvent } = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({ dataSources }), + }) + .onEvent((ctx) => { + ctx.logger.info((ctx as any).testValue); + ctx.dataSources.doSomething((ctx as any).testValue); + }) + .harness({ + stringifyEvent: testSerializer.stringifyEvent, + logger: overrideLogger as any, + createRunContext: () => ({ dataSources, testValue }), + }); + + await sendEvent({ events: [{}] }); + + expect(logger.info).not.toHaveBeenCalled(); + expect(overrideLogger.info).toHaveBeenCalledWith(testValue); + + expect(dataSources.doSomething).toHaveBeenCalledWith(testValue); + }); + }); + + const wait = (ms: number) => + new Promise((resolve) => setTimeout(resolve, ms)); + + describe('parallelization', () => { + test('processes events in parallel', async () => { + const handler = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }) + .onEvent(async () => { + // We'll wait 100ms per event, then use that to make timing + // assertions below. + await wait(100); + }) + .lambda(); + + const start = Date.now(); + + await handler( + { + Records: [ + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-1' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-2' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-3' }), + }, + }, + ] as any, + }, + {} as any, + ); + + const end = Date.now(); + + // This assertion confirms there is parallel processing. + expect(end - start).toBeLessThan(200); + }); + + test('maintains ordering by partitionKey', async () => { + const eventStarted = jest.fn(); + const eventFinished = jest.fn(); + const handler = new KinesisEventHandler({ + logger, + parseEvent: testSerializer.parseEvent, + createRunContext: () => ({}), + }) + .onEvent(async (ctx, msg) => { + eventStarted(msg, Date.now()); + // We'll wait 100ms per event, then use that to make timing + // assertions below. + await wait(100); + eventFinished(msg, Date.now()); + }) + .lambda(); + + const start = Date.now(); + + await handler( + { + Records: [ + { + kinesis: { + partitionKey: 'group-id', + data: JSON.stringify({ data: 'test-event-1' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-2' }), + }, + }, + { + kinesis: { + partitionKey: 'group-id-2', + data: JSON.stringify({ data: 'test-event-other-1' }), + }, + }, + { + kinesis: { + partitionKey: 'group-id', + data: JSON.stringify({ data: 'test-event-3' }), + }, + }, + { + kinesis: { + partitionKey: 'group-id-2', + data: JSON.stringify({ data: 'test-event-other-2' }), + }, + }, + { + kinesis: { + partitionKey: uuid(), + data: JSON.stringify({ data: 'test-event-4' }), + }, + }, + ] as any, + }, + {} as any, + ); + + const end = Date.now(); + + // This assertion confirms that the group doesn't process in less than 200ms. + // If it did, then the events would be fully parallelized, which would be bad. + expect(end - start).toBeGreaterThan(200); + + // This assertion confirms that there is at least some parallelization happening. + expect(end - start).toBeLessThanOrEqual(450); + + // Now, let's also assert that event 3 was processed _after_ the end of event 1. + const event1FinishedTime = eventFinished.mock.calls.find( + (call) => call[0].data === 'test-event-1', + )[1]; + + const event3StartedTime = eventStarted.mock.calls.find( + (call) => call[0].data === 'test-event-3', + )[1]; + + expect(event3StartedTime).toBeGreaterThanOrEqual(event1FinishedTime); + + const eventOther1FinishedTime = eventFinished.mock.calls.find( + (call) => call[0].data === 'test-event-other-1', + )[1]; + + const eventOther2StartedTime = eventStarted.mock.calls.find( + (call) => call[0].data === 'test-event-other-2', + )[1]; + + expect(eventOther2StartedTime).toBeGreaterThanOrEqual( + eventOther1FinishedTime, + ); + }); + }); +}); diff --git a/src/kinesis.ts b/src/kinesis.ts new file mode 100644 index 0000000..6c12158 --- /dev/null +++ b/src/kinesis.ts @@ -0,0 +1,157 @@ +import { LoggerInterface } from '@lifeomic/logging'; +import { v4 as uuid } from 'uuid'; +import { KinesisStreamEvent, Context as AWSContext } from 'aws-lambda'; +import { + BaseContext, + processWithOrdering, + withHealthCheckHandling, +} from './utils'; + +export type KinesisEventHandlerConfig = { + /** + * A logger to use in the context. + */ + logger: LoggerInterface; + /** + * A function for parsing the Kinesis event data into your custom type. + */ + parseEvent: (body: string) => Event; + /** + * Create a "context" for the lambda execution. (e.g. "data sources") + */ + createRunContext: (base: BaseContext) => Context | Promise; + /** + * The maximum concurrency for processing events. + * + * @default 5 + */ + concurrency?: number; +}; + +export type KinesisEventAction = ( + context: Context & BaseContext, + event: Event, +) => void | Promise; + +export type KinesisEventHandlerHarnessOptions = { + /** + * A function for stringifying events. + */ + stringifyEvent: (event: Event) => string; + + /** + * An optional override for the logger. + */ + logger?: LoggerInterface; + + /** + * An optional override for creating the run context. + */ + createRunContext?: () => Context | Promise; +}; + +export type KinesisEventHandlerHarnessContext = { + /** Sends the specified event through the handler. */ + sendEvent: (event: { events: Event[] }) => Promise; +}; + +/** + * An abstraction for a Kinesis event handler. + */ +export class KinesisEventHandler { + private actions: KinesisEventAction[] = []; + + constructor(readonly config: KinesisEventHandlerConfig) {} + + /** + * Adds a event action to the handler. + * + * @param handler The handler, for additional chaining. + */ + onEvent( + action: KinesisEventAction, + ): KinesisEventHandler { + this.actions.push(action); + return this; + } + + lambda(): (event: KinesisStreamEvent, context: AWSContext) => Promise { + return withHealthCheckHandling(async (event, awsContext) => { + // 1. Build the context. + const correlationId = uuid(); + const context: BaseContext & Context = { + correlationId, + logger: this.config.logger.child({ + requestID: awsContext.awsRequestId, + correlationId, + }), + } as any; + + Object.assign(context, await this.config.createRunContext(context)); + + // 2. Process all the records. + + await processWithOrdering( + { + items: event.Records, + orderBy: (record) => record.kinesis.partitionKey, + concurrency: this.config.concurrency ?? 5, + stopOnError: false, + }, + async (record) => { + const eventLogger = context.logger.child({ + eventId: record.eventID, + }); + + const parsedEvent = this.config.parseEvent(record.kinesis.data); + + for (const action of this.actions) { + await action({ ...context, logger: eventLogger }, parsedEvent); + } + + eventLogger.info('Successfully processed event'); + }, + ); + + context.logger.info('Succesfully processed all events'); + }); + } + + harness({ + stringifyEvent, + ...overrides + }: KinesisEventHandlerHarnessOptions< + Event, + Context + >): KinesisEventHandlerHarnessContext { + // Make a copy of the handler. + let handler = new KinesisEventHandler({ ...this.config, ...overrides }); + for (const action of this.actions) { + handler = handler.onEvent(action); + } + const lambda = handler.lambda(); + + return { + sendEvent: async ({ events }) => { + const event: KinesisStreamEvent = { + // We don't need to mock every field on this event -- there are lots. + // @ts-expect-error + Records: events.map((e) => ({ + eventID: uuid(), + kinesis: { + partitionKey: uuid(), + data: stringifyEvent(e), + }, + })), + }; + + await lambda( + event, + // We don't need to mock every field on the context -- there are lots. + // eslint-disable-next-line @typescript-eslint/no-unsafe-argument + { awsRequestId: uuid() } as any, + ); + }, + }; + } +} From f19f8a81a182402fe5239b475c04420302c258e2 Mon Sep 17 00:00:00 2001 From: Swain Molster Date: Wed, 22 Nov 2023 10:55:25 -0500 Subject: [PATCH 2/2] docs: add docs for KinesisEventHandler --- README.md | 98 ++++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 86 insertions(+), 12 deletions(-) diff --git a/README.md b/README.md index a2c76c0..8063f8f 100644 --- a/README.md +++ b/README.md @@ -16,7 +16,7 @@ import { DynamoStreamHandler } from '@lifeomic/delta'; const stream = new DynamoStreamHandler({ logger, - // Optionally specify a list of image keys to obfuscate the values of + // Optionally specify a list of image keys to obfuscate the values of loggerObfuscateImageKeys: ['api-secret'], parse: (item) => { // parse the item using your custom logic, e.g. using zod or ajv. @@ -154,7 +154,7 @@ const harness = queue.harness({ test('something', async () => { // Provides a simple `sendEvent` function await harness.sendEvent({ - message: [ + messages: [ { /* message 1 */} { /* message 2 */} { /* message 3 */} @@ -165,24 +165,98 @@ test('something', async () => { }) ``` +### `KinesisEventHandler` + +This helper provides an abstraction over a Kinesis stream Lambda handler. + +```typescript +import { KinesisEventHandler } from '@lifeomic/delta'; + +const queue = new KinesisEventHandler({ + logger, + parseEvent: (event) => { + /* ... parse from event data -> your custom type ... */ + return JSON.parse(event); + }, + createRunContext: () => { + /* ... create the "context", e.g. data sources ... */ + return { doSomething: () => null }; + }, + // Optionally specify a concurrency setting for processing events. + concurrency: 5, +}) + .onEvent(async (ctx, event) => { + // `ctx` contains the nice result of `createRunContext`: + await ctx.doSomething(); + + // `ctx` contains a logger by default, which already includes niceties like + // the AWS request id + ctx.logger.info('blah blah'); + }) + // Add multiple event handlers for code organization. + .onEvent(async (ctx, event) => { + // do something else + }); + +// Provides a dead-simple API for creating the Lambda. +export const handler = stream.lambda(); +``` + +`KinesisEventHandler` also comes with a nice helper for testing: `harness(...)` + +```typescript +const context = { + doSomething: jest.fn() +} + +const harness = queue.harness({ + stringifyEvent: (event) => { + /* stringify from your custom type -> string */ + return JSON.stringify(event) + }, + /* optionally override the logger */ + logger, + createRunContext: () => { + /* optionally override the context, to mock e.g. data sources */ + return context; + } +}) + +test('something', async () => { + // Provides a simple `sendEvent` function + await harness.sendEvent({ + events: [ + { /* event 1 */} + { /* event 2 */} + { /* event 3 */} + ] + }) + + expect(context.doSomething).toHaveBeenCalledTimes(3) +}) +``` + ### Parallel Processing + Ordering -By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and -`SQSMessageHandler`) will process events in parallel. To control the +By default, the abstractions in `@lifeomic/delta` will process events in parallel. To control the parallelization, specify a `concurrency` value when creating the handler. -These abstractions also ensure that within a batch of events correct _ordering_ -of events is maintained according to the ordering semantics of the upstream +These abstractions also ensure that within a batch of events correct _ordering_ +of events is maintained according to the ordering semantics of the upstream event source, even when processing in parallel. -In `DynamoStreamHandler`, events for the same _key_ will always be processed +In `DynamoStreamHandler`, events for the same _key_ will always be processed serially -- events from different keys will be processed in parallel. -In `SQSMessageHandler`, events with the same `MessageGroupId` will always -processed serially -- events with different `MessageGroupId` values will be +In `SQSMessageHandler`, events with the same `MessageGroupId` will always +processed serially -- events with different `MessageGroupId` values will be +processed in parallel. + +In `KinesisEventHandler`, events with the same `partitionKey` will always +processed serially -- events with different `partitionKey` values will be processed in parallel. -**Note**: while the ordering semantics above will always be preserved, events -that do _not_ need to be ordered will not necessarily be processed in the same -order they were received in the batch (even when using a `concurrency` value of +**Note**: while the ordering semantics above will always be preserved, events +that do _not_ need to be ordered will not necessarily be processed in the same +order they were received in the batch (even when using a `concurrency` value of `1`).