diff --git a/src/kinesis.test.ts b/src/kinesis.test.ts index a351269..26a235c 100644 --- a/src/kinesis.test.ts +++ b/src/kinesis.test.ts @@ -16,6 +16,8 @@ beforeEach(() => { const testSerializer = { parseEvent: (msg: string) => JSON.parse(msg), stringifyEvent: (msg: any) => JSON.stringify(msg), + toKinesisNativeRecord: (msg: Record) => + Buffer.from(JSON.stringify(msg)).toString('base64'), }; describe('KinesisEventHandler', () => { @@ -64,7 +66,9 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, ], @@ -104,25 +108,33 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-4' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-4', + }), }, }, ], @@ -229,19 +241,25 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, ] as any, @@ -320,19 +338,25 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, ] as any, @@ -371,37 +395,49 @@ describe('KinesisEventHandler', () => { { kinesis: { partitionKey: 'group-id', - data: JSON.stringify({ data: 'test-event-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-1', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-2', + }), }, }, { kinesis: { partitionKey: 'group-id-2', - data: JSON.stringify({ data: 'test-event-other-1' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-other-1', + }), }, }, { kinesis: { partitionKey: 'group-id', - data: JSON.stringify({ data: 'test-event-3' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-3', + }), }, }, { kinesis: { partitionKey: 'group-id-2', - data: JSON.stringify({ data: 'test-event-other-2' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-other-2', + }), }, }, { kinesis: { partitionKey: uuid(), - data: JSON.stringify({ data: 'test-event-4' }), + data: testSerializer.toKinesisNativeRecord({ + data: 'test-event-4', + }), }, }, ] as any, diff --git a/src/kinesis.ts b/src/kinesis.ts index 864ea05..d77906c 100644 --- a/src/kinesis.ts +++ b/src/kinesis.ts @@ -26,7 +26,6 @@ export type KinesisEventHandlerHarnessOptions = { * A function for stringifying events. */ stringifyEvent: (event: Event) => string; - /** * An optional override for the logger. */ @@ -89,7 +88,9 @@ export class KinesisEventHandler { eventId: record.eventID, }); - const parsedEvent = this.config.parseEvent(record.kinesis.data); + const parsedEvent = this.config.parseEvent( + Buffer.from(record.kinesis.data, 'base64').toString('utf8'), + ); for (const action of this.actions) { await action({ ...context, logger: eventLogger }, parsedEvent); @@ -126,7 +127,7 @@ export class KinesisEventHandler { eventID: uuid(), kinesis: { partitionKey: uuid(), - data: stringifyEvent(e), + data: Buffer.from(stringifyEvent(e)).toString('base64'), }, })), };