From dcaa997b9a0fafc0d24dd93a8ac2afe7ed668ba0 Mon Sep 17 00:00:00 2001 From: Alejandro Corredor Date: Tue, 12 Mar 2024 16:27:55 -0400 Subject: [PATCH] feat: handle kinesis parsing --- src/kinesis.test.ts | 78 ++++++++++++++++++++++++++++++++------------- src/kinesis.ts | 23 ++++++------- 2 files changed, 64 insertions(+), 37 deletions(-) diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index a351269..e6a883a 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -14,8 +14,9 @@ beforeEach(() => { }); const testSerializer = { - parseEvent: (msg: string) => JSON.parse(msg), - stringifyEvent: (msg: any) => JSON.stringify(msg), + parseEvent: (msg: Record) => msg, + toKinesisNativeRecord: (msg: Record) => + Buffer.from(JSON.stringify(msg)).toString('base64'), }; describe('KinesisEventHandler', () => { @@ -64,7 +65,9 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, ], @@ -104,25 +107,33 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-4' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-4', + }), }, }, ], @@ -190,9 +201,7 @@ describe('KinesisEventHandler', () => { .onEvent((ctx, msg) => { ctx.doSomething(msg); }) - .harness({ - stringifyEvent: testSerializer.stringifyEvent, - }); + .harness(); await sendEvent({ events: [{ data: 'test-event-1' }, { data: 'test-event-2' }], @@ -229,19 +238,25 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, ] as any, @@ -281,7 +296,6 @@ describe('KinesisEventHandler', () => { ctx.dataSources.doSomething((ctx as any).testValue); }) .harness({ - stringifyEvent: testSerializer.stringifyEvent, logger: overrideLogger as any, createRunContext: () => ({ dataSources, testValue }), }); @@ -320,19 +334,25 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, ] as any, @@ -371,37 +391,49 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: 'group-id', - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: 'group-id-2', - data: JSON.stringify({ data: 'test-event-other-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-other-1', + }), }, }, { kinesis: { partitionKey: 'group-id', - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, { kinesis: { partitionKey: 'group-id-2', - data: JSON.stringify({ data: 'test-event-other-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-other-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-4' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-4', + }), }, }, ] as any, diff --git a/src/kinesis.ts b/src/kinesis.ts index 864ea05..b539021 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -13,7 +13,7 @@ export type KinesisEventHandlerConfig = /** * A function for parsing the Kinesis event data into your custom type. */ - parseEvent: (body: string) => Event; + parseEvent: (body: Record) => Event; }; export type KinesisEventAction = ( @@ -21,12 +21,7 @@ export type KinesisEventAction = ( event: Event, ) => void | Promise; -export type KinesisEventHandlerHarnessOptions = { - /** - * A function for stringifying events. - */ - stringifyEvent: (event: Event) => string; - +export type KinesisEventHandlerHarnessOptions = { /** * An optional override for the logger. */ @@ -89,7 +84,11 @@ export class KinesisEventHandler { eventId: record.eventID, }); - const parsedEvent = this.config.parseEvent(record.kinesis.data); + const parsedEvent = this.config.parseEvent( + JSON.parse( + Buffer.from(record.kinesis.data, 'base64').toString('utf8'), + ) as Record, + ); for (const action of this.actions) { await action({ ...context, logger: eventLogger }, parsedEvent); @@ -105,12 +104,8 @@ export class KinesisEventHandler { } harness({ - stringifyEvent, ...overrides - }: KinesisEventHandlerHarnessOptions< - Event, - Context - >): KinesisEventHandlerHarnessContext { + }: KinesisEventHandlerHarnessOptions = {}): KinesisEventHandlerHarnessContext { // Make a copy of the handler. let handler = new KinesisEventHandler({ ...this.config, ...overrides }); for (const action of this.actions) { @@ -126,7 +121,7 @@ export class KinesisEventHandler { eventID: uuid(), kinesis: { partitionKey: uuid(), - data: stringifyEvent(e), + data: Buffer.from(JSON.stringify(e)).toString('base64'), }, })), };