Skip to content

Commit

Permalink
Merge pull request #1982 from ejschaefer/ejschaefer-firehose-lambda-t…
Browse files Browse the repository at this point in the history
…ransformation-cdk-typescript

New serverless pattern -  Firehose Transformation pattern (CDK + Typescript)
  • Loading branch information
Ben Smith authored Jan 15, 2024
2 parents b113c5e + 0780df3 commit f3cfe2d
Show file tree
Hide file tree
Showing 11 changed files with 574 additions and 0 deletions.
74 changes: 74 additions & 0 deletions firehose-transformation-cdk-typescript/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
# Amazon Kinesis Data Firehose Data Transformation with AWS Lambda

This pattern demonstrates how to transform streaming data received by Amazon Kinesis Data Firehose using AWS Lambda before delivering the transformed data to Amazon S3.

The pattern uses the AWS Cloud Development Kit (AWS CDK) to deploy a Kinesis Data Firehose delivery stream, a Lambda function to transform source data, and an Amazon S3 bucket to receive the transformed data.

Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/firehose-transformation-cdk-typescript

Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example.

## Requirements

* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources.
* [AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) installed and configured
* [Git Installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git)
* [AWS CDK](https://docs.aws.amazon.com/cdk/latest/guide/cli.html) installed and configured

## Deployment Instructions

1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository:
```
git clone https://github.com/aws-samples/serverless-patterns
```
1. Change directory to the pattern directory:
```
cd serverless-patterns/firehose-transformation-cdk-typescript/src
```
1. Install dependencies:
```
npm install
```
1. Bootstrap environment (if you have not done so already)
```
cdk bootstrap
```
1. Deploy the stack to your default AWS account and region.
```
cdk deploy
```
## How it works
Kinesis Data Firehose can invoke a Lambda function to transform incoming source data and deliver the transformed data to destinations. In this architecture, Kinesis Data Firehose invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode. The transformed data is sent from Lambda to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination S3 bucket when the specified destination buffering size or buffering interval is reached, whichever happens first.
## Testing
1. Open the Kinesis Data Firehose console at https://console.aws.amazon.com/firehose/
2. Choose the {stack-name}-firehosestream-{stream-id} delivery stream
3. Under **Test with demo data**, choose **Start sending demo data** to generate sample stock ticker data.
4. After a few seconds, choose **Stop sending demo data**
5. Verify that test events are being sent to the destination S3 bucket. Note that it might take a few minutes for new objects to appear in the bucket, based on the buffering configuration.
```
aws s3 ls s3://{destination_bucket_name} --recursive --human-readable --summarize
```
Or nagivate to the S3 console and manually verify that the demo data has been sent to S3
## Cleanup
Run the following command to delete the resources
```bash
cdk destroy
```

----
Copyright 2023 Amazon.com, Inc. or its affiliates. All Rights Reserved.

SPDX-License-Identifier: MIT-0
65 changes: 65 additions & 0 deletions firehose-transformation-cdk-typescript/example-pattern.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
{
"title": "Kinesis Firehose Data Transformation with Lambda",
"description": "Transform incoming source data and deliver the transformed data to S3.",
"language": "TypeScript",
"level": "200",
"framework": "CDK",
"introBox": {
"headline": "How it works",
"text": [
"This pattern demonstrates how to transform streaming data received by Kinesis Data Firehose using AWS Lambda before delivering the transformed data to Amazon S3.",
"To transform incoming source data, Kinesis Data Firehose invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode. The transformed data is sent from Lambda to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination S3 bucket when the specified destination buffering size or buffering interval is reached, whichever happens first."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/firehose-transformation-cdk-typescript",
"templateURL": "serverless-patterns/firehose-transformation-cdk-typescript",
"projectFolder": "firehose-transformation-cdk-typescript",
"templateFile": "src/lib/firehost-lambda-transform"
}
},
"resources": {
"bullets": [
{
"text": "Amazon Kinesis Data Firehose Data Transformation",
"link": "https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html"
},
{
"text": "Using AWS Lambda with Amazon Kinesis Data Firehose",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html"
},
{
"text": "Ingesting enriched IoT data into Amazon S3 using Amazon Kinesis Data Firehose",
"link": "https://aws.amazon.com/blogs/iot/ingesting-enriched-iot-data-into-amazon-s3-using-amazon-kinesis-data-firehose/"
},
{
"text": "Capture clickstream data using AWS serverless services",
"link": "https://aws.amazon.com/blogs/industries/capture-clickstream-data-using-aws-serverless-services/"
}
]
},
"deploy": {
"text": [
"cdk deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"cdk delete"
]
},
"authors": [
{
"name": "Edward Schaefer",
"image": "https://d2siip5gg18ho0.cloudfront.net/images/schaeedw-photo-centered_250x250.jpg",
"bio": "Solutions Architect @ Amazon Web Services",
"linkedin": "ejschaefer"
}
]
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,95 @@
{
"title": "Kinesis Firehose Data Transformation with Lambda",
"description": "Transform incoming source data and deliver the transformed data to S3.",
"language": "TypeScript",
"level": "200",
"framework": "CDK",
"introBox": {
"headline": "How it works",
"text": [
"This pattern demonstrates how to transform streaming data received by Kinesis Data Firehose using AWS Lambda before delivering the transformed data to Amazon S3.",
"To transform incoming source data, Kinesis Data Firehose invokes the specified Lambda function asynchronously with each buffered batch using the AWS Lambda synchronous invocation mode. The transformed data is sent from Lambda to Kinesis Data Firehose. Kinesis Data Firehose then sends it to the destination S3 bucket when the specified destination buffering size or buffering interval is reached, whichever happens first."
]
},
"gitHub": {
"template": {
"repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/firehose-transformation-cdk-typescript",
"templateURL": "serverless-patterns/firehose-transformation-cdk-typescript",
"projectFolder": "firehose-transformation-cdk-typescript",
"templateFile": "src/lib/firehose-lambda-stack.ts"
}
},
"resources": {
"bullets": [
{
"text": "Amazon Kinesis Data Firehose Data Transformation",
"link": "https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html"
},
{
"text": "Using AWS Lambda with Amazon Kinesis Data Firehose",
"link": "https://docs.aws.amazon.com/lambda/latest/dg/services-kinesisfirehose.html"
},
{
"text": "Ingesting enriched IoT data into Amazon S3 using Amazon Kinesis Data Firehose",
"link": "https://aws.amazon.com/blogs/iot/ingesting-enriched-iot-data-into-amazon-s3-using-amazon-kinesis-data-firehose/"
},
{
"text": "Capture clickstream data using AWS serverless services",
"link": "https://aws.amazon.com/blogs/industries/capture-clickstream-data-using-aws-serverless-services/"
}
]
},
"deploy": {
"text": [
"cdk deploy"
]
},
"testing": {
"text": [
"See the GitHub repo for detailed testing instructions."
]
},
"cleanup": {
"text": [
"<code>cdk delete</code>"
]
},
"authors": [
{
"name": "Edward Schaefer",
"image": "https://d2siip5gg18ho0.cloudfront.net/images/schaeedw-photo-centered_250x250.jpg",
"bio": "Solutions Architect @ Amazon Web Services",
"linkedin": "ejschaefer"
}
],
"patternArch": {
"icon1": {
"x": 20,
"y": 50,
"service": "kinesis-firehose",
"label": "Amazon Kinesis Firehose"
},
"icon2": {
"x": 50,
"y": 50,
"service": "lambda",
"label": "AWS Lambda"
},
"icon3": {
"x": 80,
"y": 50,
"service": "s3",
"label": "S3"
},
"line1": {
"from": "icon1",
"to": "icon2",
"label": ""
},
"line2": {
"from": "icon2",
"to": "icon3",
"label": ""
}
}
}
8 changes: 8 additions & 0 deletions firehose-transformation-cdk-typescript/src/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
*.js
!jest.config.js
*.d.ts
node_modules

# CDK asset staging directory
.cdk.staging
cdk.out
6 changes: 6 additions & 0 deletions firehose-transformation-cdk-typescript/src/.npmignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
*.ts
!*.d.ts

# CDK asset staging directory
.cdk.staging
cdk.out
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
#!/usr/bin/env node
import 'source-map-support/register';
import * as cdk from 'aws-cdk-lib';
import { FirehoseLambdaStack } from '../lib/firehose-lambda-stack';

const app = new cdk.App();
new FirehoseLambdaStack(app, 'FirehoseLambdaStack', {

});
63 changes: 63 additions & 0 deletions firehose-transformation-cdk-typescript/src/cdk.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
{
"app": "npx ts-node --prefer-ts-exts bin/firehose-lambda-transform.ts",
"watch": {
"include": [
"**"
],
"exclude": [
"README.md",
"cdk*.json",
"**/*.d.ts",
"**/*.js",
"tsconfig.json",
"package*.json",
"yarn.lock",
"node_modules",
"test"
]
},
"context": {
"@aws-cdk/aws-lambda:recognizeLayerVersion": true,
"@aws-cdk/core:checkSecretUsage": true,
"@aws-cdk/core:target-partitions": [
"aws",
"aws-cn"
],
"@aws-cdk-containers/ecs-service-extensions:enableDefaultLogDriver": true,
"@aws-cdk/aws-ec2:uniqueImdsv2TemplateName": true,
"@aws-cdk/aws-ecs:arnFormatIncludesClusterName": true,
"@aws-cdk/aws-iam:minimizePolicies": true,
"@aws-cdk/core:validateSnapshotRemovalPolicy": true,
"@aws-cdk/aws-codepipeline:crossAccountKeyAliasStackSafeResourceName": true,
"@aws-cdk/aws-s3:createDefaultLoggingPolicy": true,
"@aws-cdk/aws-sns-subscriptions:restrictSqsDescryption": true,
"@aws-cdk/aws-apigateway:disableCloudWatchRole": true,
"@aws-cdk/core:enablePartitionLiterals": true,
"@aws-cdk/aws-events:eventsTargetQueueSameAccount": true,
"@aws-cdk/aws-iam:standardizedServicePrincipals": true,
"@aws-cdk/aws-ecs:disableExplicitDeploymentControllerForCircuitBreaker": true,
"@aws-cdk/aws-iam:importedRoleStackSafeDefaultPolicyName": true,
"@aws-cdk/aws-s3:serverAccessLogsUseBucketPolicy": true,
"@aws-cdk/aws-route53-patters:useCertificate": true,
"@aws-cdk/customresources:installLatestAwsSdkDefault": false,
"@aws-cdk/aws-rds:databaseProxyUniqueResourceName": true,
"@aws-cdk/aws-codedeploy:removeAlarmsFromDeploymentGroup": true,
"@aws-cdk/aws-apigateway:authorizerChangeDeploymentLogicalId": true,
"@aws-cdk/aws-ec2:launchTemplateDefaultUserData": true,
"@aws-cdk/aws-secretsmanager:useAttachedSecretResourcePolicyForSecretTargetAttachments": true,
"@aws-cdk/aws-redshift:columnId": true,
"@aws-cdk/aws-stepfunctions-tasks:enableEmrServicePolicyV2": true,
"@aws-cdk/aws-ec2:restrictDefaultSecurityGroup": true,
"@aws-cdk/aws-apigateway:requestValidatorUniqueId": true,
"@aws-cdk/aws-kms:aliasNameRef": true,
"@aws-cdk/aws-autoscaling:generateLaunchTemplateInsteadOfLaunchConfig": true,
"@aws-cdk/core:includePrefixInUniqueNameGeneration": true,
"@aws-cdk/aws-efs:denyAnonymousAccess": true,
"@aws-cdk/aws-opensearchservice:enableOpensearchMultiAzWithStandby": true,
"@aws-cdk/aws-lambda-nodejs:useLatestRuntimeVersion": true,
"@aws-cdk/aws-efs:mountTargetOrderInsensitiveLogicalId": true,
"@aws-cdk/aws-rds:auroraClusterChangeScopeOfInstanceParameterGroupWithEachParameters": true,
"@aws-cdk/aws-appsync:useArnForSourceApiAssociationIdentifier": true,
"@aws-cdk/aws-rds:preventRenderingDeprecatedCredentials": true
}
}
44 changes: 44 additions & 0 deletions firehose-transformation-cdk-typescript/src/lambda/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
import {
Context,
FirehoseTransformationResult,
FirehoseTransformationResultRecord
} from 'aws-lambda';
import { FirehoseTransformationEvent } from 'aws-lambda/trigger/kinesis-firehose-transformation'
import { Buffer } from 'buffer';

export const handler = async (event: FirehoseTransformationEvent, context: Context): Promise<FirehoseTransformationResult> => {

console.log(`Event: ${JSON.stringify(event, null, 2)}`);
console.log(`Context: ${JSON.stringify(context, null, 2)}`);
const records: FirehoseTransformationResultRecord[] = []

for (const record of event.records) {
const recordData = JSON.parse(Buffer.from(record.data, 'base64').toString('utf8'));

/** do some record transformation here */

/**
* This example assumes source data is similar to Kinesis Data Firehose console demo data
* simulated stock ticker data: https://docs.aws.amazon.com/firehose/latest/dev/test-drive-firehose.html
* */

const oldPrice = recordData["PRICE"] - recordData["CHANGE"];
recordData["CUSTOM_RECORDID"] = record.recordId
recordData["CUSTOM_OLDPRICE"] = oldPrice.toFixed(2)

records.push({
recordId: record.recordId,
result: 'Ok',
data: Buffer.from(JSON.stringify(recordData), 'utf-8').toString('base64')
});
}


console.log(`Processing completed. Successful records ${records.length}.`);
console.log(`Results: ${JSON.stringify(records)}`)

return {
records: records
};

};
Loading

0 comments on commit f3cfe2d

Please sign in to comment.