Skip to content

Commit

Permalink
do not assume kinesis json data
Browse files Browse the repository at this point in the history
  • Loading branch information
aecorredor committed Mar 12, 2024
1 parent dcaa997 commit b5b7772
Show file tree
Hide file tree
Showing 2 changed files with 19 additions and 9 deletions.
8 changes: 6 additions & 2 deletions src/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,8 @@ beforeEach(() => {
});

const testSerializer = {
parseEvent: (msg: Record<string, unknown>) => msg,
parseEvent: (msg: string) => JSON.parse(msg),
stringifyEvent: (msg: any) => JSON.stringify(msg),
toKinesisNativeRecord: (msg: Record<string, unknown>) =>
Buffer.from(JSON.stringify(msg)).toString('base64'),
};
Expand Down Expand Up @@ -201,7 +202,9 @@ describe('KinesisEventHandler', () => {
.onEvent((ctx, msg) => {
ctx.doSomething(msg);
})
.harness();
.harness({
stringifyEvent: testSerializer.stringifyEvent,
});

await sendEvent({
events: [{ data: 'test-event-1' }, { data: 'test-event-2' }],
Expand Down Expand Up @@ -296,6 +299,7 @@ describe('KinesisEventHandler', () => {
ctx.dataSources.doSomething((ctx as any).testValue);
})
.harness({
stringifyEvent: testSerializer.stringifyEvent,
logger: overrideLogger as any,
createRunContext: () => ({ dataSources, testValue }),
});
Expand Down
20 changes: 13 additions & 7 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,15 +13,19 @@ export type KinesisEventHandlerConfig<Event, Context> =
/**
* A function for parsing the Kinesis event data into your custom type.
*/
parseEvent: (body: Record<string, unknown>) => Event;
parseEvent: (body: string) => Event;
};

export type KinesisEventAction<Event, Context> = (
context: Context & BaseContext,
event: Event,
) => void | Promise<void>;

export type KinesisEventHandlerHarnessOptions<Context> = {
export type KinesisEventHandlerHarnessOptions<Event, Context> = {
/**
* A function for stringifying events.
*/
stringifyEvent: (event: Event) => string;
/**
* An optional override for the logger.
*/
Expand Down Expand Up @@ -85,9 +89,7 @@ export class KinesisEventHandler<Event, Context> {
});

const parsedEvent = this.config.parseEvent(
JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
) as Record<string, unknown>,
Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
);

for (const action of this.actions) {
Expand All @@ -104,8 +106,12 @@ export class KinesisEventHandler<Event, Context> {
}

harness({
stringifyEvent,
...overrides
}: KinesisEventHandlerHarnessOptions<Context> = {}): KinesisEventHandlerHarnessContext<Event> {
}: KinesisEventHandlerHarnessOptions<
Event,
Context
>): KinesisEventHandlerHarnessContext<Event> {
// Make a copy of the handler.
let handler = new KinesisEventHandler({ ...this.config, ...overrides });
for (const action of this.actions) {
Expand All @@ -121,7 +127,7 @@ export class KinesisEventHandler<Event, Context> {
eventID: uuid(),
kinesis: {
partitionKey: uuid(),
data: Buffer.from(JSON.stringify(e)).toString('base64'),
data: Buffer.from(stringifyEvent(e)).toString('base64'),
},
})),
};
Expand Down

0 comments on commit b5b7772

Please sign in to comment.