Skip to content

Commit

Permalink
feat: handle kinesis parsing
Browse files Browse the repository at this point in the history
  • Loading branch information
aecorredor committed Mar 12, 2024
1 parent c5af1c4 commit dcaa997
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 37 deletions.
78 changes: 55 additions & 23 deletions src/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@ beforeEach(() => {
});

const testSerializer = {
parseEvent: (msg: string) => JSON.parse(msg),
stringifyEvent: (msg: any) => JSON.stringify(msg),
parseEvent: (msg: Record<string, unknown>) => msg,
toKinesisNativeRecord: (msg: Record<string, unknown>) =>
Buffer.from(JSON.stringify(msg)).toString('base64'),
};

describe('KinesisEventHandler', () => {
Expand Down Expand Up @@ -64,7 +65,9 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
],
Expand Down Expand Up @@ -104,25 +107,33 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-4' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-4',
}),
},
},
],
Expand Down Expand Up @@ -190,9 +201,7 @@ describe('KinesisEventHandler', () => {
.onEvent((ctx, msg) => {
ctx.doSomething(msg);
})
.harness({
stringifyEvent: testSerializer.stringifyEvent,
});
.harness();

await sendEvent({
events: [{ data: 'test-event-1' }, { data: 'test-event-2' }],
Expand Down Expand Up @@ -229,19 +238,25 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
] as any,
Expand Down Expand Up @@ -281,7 +296,6 @@ describe('KinesisEventHandler', () => {
ctx.dataSources.doSomething((ctx as any).testValue);
})
.harness({
stringifyEvent: testSerializer.stringifyEvent,
logger: overrideLogger as any,
createRunContext: () => ({ dataSources, testValue }),
});
Expand Down Expand Up @@ -320,19 +334,25 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
] as any,
Expand Down Expand Up @@ -371,37 +391,49 @@ describe('KinesisEventHandler', () => {
{
kinesis: {
partitionKey: 'group-id',
data: JSON.stringify({ data: 'test-event-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-1',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-2',
}),
},
},
{
kinesis: {
partitionKey: 'group-id-2',
data: JSON.stringify({ data: 'test-event-other-1' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-other-1',
}),
},
},
{
kinesis: {
partitionKey: 'group-id',
data: JSON.stringify({ data: 'test-event-3' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-3',
}),
},
},
{
kinesis: {
partitionKey: 'group-id-2',
data: JSON.stringify({ data: 'test-event-other-2' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-other-2',
}),
},
},
{
kinesis: {
partitionKey: uuid(),
data: JSON.stringify({ data: 'test-event-4' }),
data: testSerializer.toKinesisNativeRecord({
data: 'test-event-4',
}),
},
},
] as any,
Expand Down
23 changes: 9 additions & 14 deletions src/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,15 @@ export type KinesisEventHandlerConfig<Event, Context> =
/**
* A function for parsing the Kinesis event data into your custom type.
*/
parseEvent: (body: string) => Event;
parseEvent: (body: Record<string, unknown>) => Event;
};

export type KinesisEventAction<Event, Context> = (
context: Context & BaseContext,
event: Event,
) => void | Promise<void>;

export type KinesisEventHandlerHarnessOptions<Event, Context> = {
/**
* A function for stringifying events.
*/
stringifyEvent: (event: Event) => string;

export type KinesisEventHandlerHarnessOptions<Context> = {
/**
* An optional override for the logger.
*/
Expand Down Expand Up @@ -89,7 +84,11 @@ export class KinesisEventHandler<Event, Context> {
eventId: record.eventID,
});

const parsedEvent = this.config.parseEvent(record.kinesis.data);
const parsedEvent = this.config.parseEvent(
JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf8'),
) as Record<string, unknown>,
);

for (const action of this.actions) {
await action({ ...context, logger: eventLogger }, parsedEvent);
Expand All @@ -105,12 +104,8 @@ export class KinesisEventHandler<Event, Context> {
}

harness({
stringifyEvent,
...overrides
}: KinesisEventHandlerHarnessOptions<
Event,
Context
>): KinesisEventHandlerHarnessContext<Event> {
}: KinesisEventHandlerHarnessOptions<Context> = {}): KinesisEventHandlerHarnessContext<Event> {
// Make a copy of the handler.
let handler = new KinesisEventHandler({ ...this.config, ...overrides });
for (const action of this.actions) {
Expand All @@ -126,7 +121,7 @@ export class KinesisEventHandler<Event, Context> {
eventID: uuid(),
kinesis: {
partitionKey: uuid(),
data: stringifyEvent(e),
data: Buffer.from(JSON.stringify(e)).toString('base64'),
},
})),
};
Expand Down

0 comments on commit dcaa997

Please sign in to comment.