Skip to content

Commit

Permalink
Merge pull request #50 from lifeomic/marshall-helper
Browse files Browse the repository at this point in the history
feat!: automatically marshall + unmarshall from Dynamo format
  • Loading branch information
swain authored Jul 13, 2023
2 parents fb30577 + 153958c commit a558691
Show file tree
Hide file tree
Showing 6 changed files with 915 additions and 79 deletions.
13 changes: 5 additions & 8 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ import { DynamoStreamHandler } from '@lifeomic/delta';

const stream = new DynamoStreamHandler({
logger,
unmarshall: (object) => {
/* ... unmarshall from unknown stream format -> your custom type ... */
parse: (item) => {
// parse the item using your custom logic, e.g. using zod or ajv.
return { id: object.id };
},
createRunContext: () => {
Expand All @@ -27,7 +27,7 @@ const stream = new DynamoStreamHandler({
})
.onInsert(async (ctx, entity) => {
// INSERT actions receive a single strongly typed new entities
// (entities are typed based on the `unmarshall` function)
// (entities are typed based on the `parse` function)
entity.id;

// `ctx` contains the nice result of `createRunContext`
Expand Down Expand Up @@ -67,9 +67,6 @@ const context = {
}

const harness = stream.harness({
marshall: () => {
/* marshall from your custom type -> stream format */
},
/* optionally override the logger */
logger,
createRunContext: () => {
Expand Down Expand Up @@ -103,7 +100,7 @@ import { SQSMessageHandler } from '@lifeomic/delta';
const queue = new SQSMessageHandler({
logger,
parseMessage: (message) => {
/* ... unmarshall from message string -> your custom type ... */
/* ... parse from message string -> your custom type ... */
return JSON.parse(message);
},
createRunContext: () => {
Expand Down Expand Up @@ -137,7 +134,7 @@ const context = {

const harness = queue.harness({
stringifyMessage: (message) => {
/* marshall from your custom type -> string */
/* stringify from your custom type -> string */
return JSON.stringify(message)
},
/* optionally override the logger */
Expand Down
5 changes: 4 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
"build": "node build.js"
},
"devDependencies": {
"@aws-sdk/client-dynamodb": "^3.369.0",
"@lifeomic/eslint-config-standards": "^2.1.1",
"@lifeomic/jest-config": "^1.1.2",
"@lifeomic/logging": "^4.0.0",
Expand All @@ -37,9 +38,11 @@
"prettier": "^2.5.1",
"semantic-release": "^19.0.2",
"ts-jest": "^27.1.3",
"typescript": "^4.5.5"
"typescript": "^4.5.5",
"zod": "^3.21.4"
},
"dependencies": {
"@aws-sdk/util-dynamodb": "^3.369.0",
"@types/aws-lambda": "^8.10.92",
"uuid": "^8.3.2"
},
Expand Down
82 changes: 40 additions & 42 deletions src/dynamo-streams.test.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,13 @@
import { LoggerInterface } from '@lifeomic/logging';
import { v4 as uuid } from 'uuid';
import { DynamoStreamHandler } from './dynamo-streams';
import { marshall } from '@aws-sdk/util-dynamodb';
import { z } from 'zod';

const TestSchema = z.object({ id: z.string(), name: z.string().optional() });

const testSerializer = {
unmarshall: (object: any) => object.marshalled,
marshall: (object: any) => ({ marshalled: object }),
parse: (object: any) => TestSchema.parse(object),
};

const logger: jest.Mocked<LoggerInterface> = {
Expand All @@ -29,7 +32,7 @@ describe('DynamoStreamHandler', () => {
test('responds to HTTP health checks', async () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({}),
}).lambda();

Expand All @@ -48,7 +51,7 @@ describe('DynamoStreamHandler', () => {
test('responds to healthCheck events', async () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({}),
}).lambda();

Expand All @@ -66,7 +69,7 @@ describe('DynamoStreamHandler', () => {
test('handles insert events', async () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
})
.onInsert((ctx, entity) => {
Expand All @@ -79,7 +82,7 @@ describe('DynamoStreamHandler', () => {
Records: [
{
eventName: 'INSERT',
dynamodb: { NewImage: { marshalled: { id: 'new-insert' } } as any },
dynamodb: { NewImage: { id: { S: 'new-insert' } } },
},
],
},
Expand All @@ -96,7 +99,7 @@ describe('DynamoStreamHandler', () => {
test('handles modify events', async () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
})
.onModify((ctx, oldEntity, newEntity) => {
Expand All @@ -110,8 +113,8 @@ describe('DynamoStreamHandler', () => {
{
eventName: 'MODIFY',
dynamodb: {
OldImage: { marshalled: { id: 'old-modify' } } as any,
NewImage: { marshalled: { id: 'new-modify' } } as any,
OldImage: marshall({ id: 'old-modify' }) as any,
NewImage: marshall({ id: 'new-modify' }) as any,
},
},
],
Expand All @@ -130,7 +133,7 @@ describe('DynamoStreamHandler', () => {
test('handles remove events', async () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
})
.onRemove((ctx, entity) => {
Expand All @@ -144,7 +147,7 @@ describe('DynamoStreamHandler', () => {
{
eventName: 'REMOVE',
dynamodb: {
OldImage: { marshalled: { id: 'old-remove' } } as any,
OldImage: marshall({ id: 'old-remove' }) as any,
},
},
],
Expand All @@ -162,7 +165,7 @@ describe('DynamoStreamHandler', () => {
test('handles a variety of events', async () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
})
// onInsert twice to test same event through multiple actions
Expand All @@ -186,37 +189,27 @@ describe('DynamoStreamHandler', () => {
{
eventName: 'INSERT',
dynamodb: {
NewImage: {
marshalled: { id: 'new-insert-varied-lambda' },
} as any,
NewImage: marshall({ id: 'new-insert-varied-lambda' }) as any,
},
},
{
eventName: 'MODIFY',
dynamodb: {
OldImage: {
marshalled: { id: 'old-modify-varied-lambda' },
} as any,
NewImage: {
marshalled: { id: 'new-modify-varied-lambda' },
} as any,
OldImage: marshall({ id: 'old-modify-varied-lambda' }) as any,
NewImage: marshall({ id: 'new-modify-varied-lambda' }) as any,
},
},
{
eventName: 'REMOVE',
dynamodb: {
OldImage: {
marshalled: { id: 'old-remove-varied-lambda' },
} as any,
OldImage: marshall({ id: 'old-remove-varied-lambda' }) as any,
},
},
// A second remove event to test multiple events through a single action
{
eventName: 'REMOVE',
dynamodb: {
OldImage: {
marshalled: { id: 'old-remove-varied-lambda-second' },
} as any,
OldImage: marshall({ id: 'old-remove-varied-lambda-second' }),
},
},
],
Expand Down Expand Up @@ -250,13 +243,13 @@ describe('DynamoStreamHandler', () => {
test('sends insert event', async () => {
const { sendEvent } = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ dataSources }),
})
.onInsert((ctx, entity) => {
ctx.dataSources.doSomething(entity);
})
.harness({ marshall: testSerializer.marshall });
.harness();

await sendEvent({
records: [{ type: 'insert', entity: { id: 'new-insert' } }],
Expand All @@ -271,13 +264,13 @@ describe('DynamoStreamHandler', () => {
test('sends modify event', async () => {
const { sendEvent } = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ dataSources }),
})
.onModify((ctx, oldEntity, newEntity) => {
ctx.dataSources.doSomething(oldEntity, newEntity);
})
.harness({ marshall: testSerializer.marshall });
.harness();

await sendEvent({
records: [
Expand All @@ -299,13 +292,13 @@ describe('DynamoStreamHandler', () => {
test('sends remove event', async () => {
const { sendEvent } = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ dataSources }),
})
.onRemove((ctx, entity) => {
ctx.dataSources.doSomething(entity);
})
.harness({ marshall: testSerializer.marshall });
.harness();

await sendEvent({
records: [{ type: 'remove', entity: { id: 'old-remove' } }],
Expand All @@ -320,7 +313,7 @@ describe('DynamoStreamHandler', () => {
test('sends a variety of events', async () => {
const { sendEvent } = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ dataSources }),
})
// onInsert twice to test same event through multiple actions
Expand All @@ -336,7 +329,7 @@ describe('DynamoStreamHandler', () => {
.onRemove((ctx, entity) => {
ctx.dataSources.doSomething(entity);
})
.harness({ marshall: testSerializer.marshall });
.harness();

await sendEvent({
records: [
Expand Down Expand Up @@ -387,15 +380,14 @@ describe('DynamoStreamHandler', () => {
overrideLogger.child.mockImplementation(() => overrideLogger);
const { sendEvent } = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ dataSources }),
})
.onInsert((ctx) => {
ctx.logger.info(testValue);
ctx.dataSources.doSomething((ctx as any).testValue);
})
.harness({
marshall: testSerializer.marshall,
logger: overrideLogger as any,
createRunContext: () => ({ dataSources, testValue }),
});
Expand All @@ -414,12 +406,12 @@ describe('DynamoStreamHandler', () => {
test('generates a correlation id', async () => {
const { sendEvent } = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: (ctx) => {
expect(typeof ctx.correlationId === 'string').toBe(true);
return {};
},
}).harness({ marshall: () => ({}) });
}).harness();

await sendEvent({ records: [] });

Expand All @@ -433,7 +425,7 @@ describe('DynamoStreamHandler', () => {
describe('error scenarios', () => {
const lambda = new DynamoStreamHandler({
logger,
unmarshall: testSerializer.unmarshall,
parse: testSerializer.parse,
createRunContext: () => ({ logger, dataSources }),
}).lambda();

Expand Down Expand Up @@ -467,7 +459,10 @@ describe('DynamoStreamHandler', () => {
await lambda(
{
Records: [
{ eventName: 'MODIFY', dynamodb: { OldImage: { marshalled: {} } } },
{
eventName: 'MODIFY',
dynamodb: { OldImage: { id: { S: 'test-id' } } },
},
],
},
{} as any,
Expand All @@ -484,7 +479,10 @@ describe('DynamoStreamHandler', () => {
await lambda(
{
Records: [
{ eventName: 'MODIFY', dynamodb: { NewImage: { marshalled: {} } } },
{
eventName: 'MODIFY',
dynamodb: { NewImage: { id: { S: 'test-id' } } },
},
],
},
{} as any,
Expand Down
Loading

0 comments on commit a558691

Please sign in to comment.