Skip to content

Commit

Permalink
Merge pull request #60 from lifeomic/kinesis
Browse files Browse the repository at this point in the history
feat: support kinesis processing
  • Loading branch information
swain authored Nov 22, 2023
2 parents 7f181b8 + f19f8a8 commit 4430f04
Show file tree
Hide file tree
Showing 4 changed files with 639 additions and 12 deletions.
98 changes: 86 additions & 12 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import { DynamoStreamHandler } from '@lifeomic/delta';

const stream = new DynamoStreamHandler({
logger,
// Optionally specify a list of image keys to obfuscate the values of
// Optionally specify a list of image keys to obfuscate the values of
loggerObfuscateImageKeys: ['api-secret'],
parse: (item) => {
// parse the item using your custom logic, e.g. using zod or ajv.
Expand Down Expand Up @@ -154,7 +154,7 @@ const harness = queue.harness({
test('something', async () => {
// Provides a simple `sendEvent` function
await harness.sendEvent({
message: [
messages: [
{ /* message 1 */}
{ /* message 2 */}
{ /* message 3 */}
Expand All @@ -165,24 +165,98 @@ test('something', async () => {
})
```

### `KinesisEventHandler`

This helper provides an abstraction over a Kinesis stream Lambda handler.

```typescript
import { KinesisEventHandler } from '@lifeomic/delta';

const queue = new KinesisEventHandler({
logger,
parseEvent: (event) => {
/* ... parse from event data -> your custom type ... */
return JSON.parse(event);
},
createRunContext: () => {
/* ... create the "context", e.g. data sources ... */
return { doSomething: () => null };
},
// Optionally specify a concurrency setting for processing events.
concurrency: 5,
})
.onEvent(async (ctx, event) => {
// `ctx` contains the nice result of `createRunContext`:
await ctx.doSomething();

// `ctx` contains a logger by default, which already includes niceties like
// the AWS request id
ctx.logger.info('blah blah');
})
// Add multiple event handlers for code organization.
.onEvent(async (ctx, event) => {
// do something else
});

// Provides a dead-simple API for creating the Lambda.
export const handler = stream.lambda();
```

`KinesisEventHandler` also comes with a nice helper for testing: `harness(...)`

```typescript
const context = {
doSomething: jest.fn()
}

const harness = queue.harness({
stringifyEvent: (event) => {
/* stringify from your custom type -> string */
return JSON.stringify(event)
},
/* optionally override the logger */
logger,
createRunContext: () => {
/* optionally override the context, to mock e.g. data sources */
return context;
}
})

test('something', async () => {
// Provides a simple `sendEvent` function
await harness.sendEvent({
events: [
{ /* event 1 */}
{ /* event 2 */}
{ /* event 3 */}
]
})

expect(context.doSomething).toHaveBeenCalledTimes(3)
})
```

### Parallel Processing + Ordering

By default, the abstractions in `@lifeomic/delta` (`DynamoStreamHandler` and
`SQSMessageHandler`) will process events in parallel. To control the
By default, the abstractions in `@lifeomic/delta` will process events in parallel. To control the
parallelization, specify a `concurrency` value when creating the handler.

These abstractions also ensure that within a batch of events correct _ordering_
of events is maintained according to the ordering semantics of the upstream
These abstractions also ensure that within a batch of events correct _ordering_
of events is maintained according to the ordering semantics of the upstream
event source, even when processing in parallel.

In `DynamoStreamHandler`, events for the same _key_ will always be processed
In `DynamoStreamHandler`, events for the same _key_ will always be processed
serially -- events from different keys will be processed in parallel.

In `SQSMessageHandler`, events with the same `MessageGroupId` will always
processed serially -- events with different `MessageGroupId` values will be
In `SQSMessageHandler`, events with the same `MessageGroupId` will always
processed serially -- events with different `MessageGroupId` values will be
processed in parallel.

In `KinesisEventHandler`, events with the same `partitionKey` will always
processed serially -- events with different `partitionKey` values will be
processed in parallel.

**Note**: while the ordering semantics above will always be preserved, events
that do _not_ need to be ordered will not necessarily be processed in the same
order they were received in the batch (even when using a `concurrency` value of
**Note**: while the ordering semantics above will always be preserved, events
that do _not_ need to be ordered will not necessarily be processed in the same
order they were received in the batch (even when using a `concurrency` value of
`1`).
1 change: 1 addition & 0 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
export * from './dynamo-streams';
export * from './sqs';
export * from './kinesis';
export { BaseContext } from './utils';
Loading

0 comments on commit 4430f04

Please sign in to comment.