Skip to content

Commit

Permalink
feat(pipes-targets): add lambda function
Browse files Browse the repository at this point in the history
  • Loading branch information
WtfJoke committed Jun 27, 2024
1 parent e09126f commit 2ae6ddd
Show file tree
Hide file tree
Showing 20 changed files with 33,905 additions and 2 deletions.
53 changes: 51 additions & 2 deletions packages/@aws-cdk/aws-pipes-targets-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ Pipe targets are the end point of a EventBridge Pipe.
The following targets are supported:

1. `targets.SqsTarget`: [Send event source to a Queue](#amazon-sqs)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions)
2. `targets.SfnStateMachine`: [Invoke a State Machine from an event source](#aws-step-functions-state-machine)
3. `targets.LambdaFunction`: [Send event source to a Lambda Function](#aws-lambda-function)

### Amazon SQS

Expand Down Expand Up @@ -71,7 +72,7 @@ const pipe = new pipes.Pipe(this, 'Pipe', {

### AWS Step Functions State Machine

A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched/filtered) source payload.
A State Machine can be used as a target for a pipe. The State Machine will be invoked with the (enriched) source payload.

```ts
declare const sourceQueue: sqs.Queue;
Expand Down Expand Up @@ -122,3 +123,51 @@ const pipe = new pipes.Pipe(this, 'Pipe', {
target: pipeTarget
});
```

### AWS Lambda Function

A Lambda Function can be used as a target for a pipe. The Lambda Function will be invoked with the (enriched) source payload.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction,{});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The target Lambda Function is invoked synchronously by default. You can also choose to invoke the Lambda Function asynchronously by setting `invocationType` property to `FIRE_AND_FORGET`.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction, {
invocationType: targets.LambdaFunctionInvocationType.FIRE_AND_FORGET,
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```

The input to the target Lambda Function can be transformed:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetFunction: lambda.IFunction;

const pipeTarget = new targets.LambdaFunction(targetFunction, {
inputTransformation: pipes.InputTransformation.fromObject({ body: "👀" }),
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: new SomeSource(sourceQueue),
target: pipeTarget
});
```
1 change: 1 addition & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './lambda';
export * from './sqs';
export * from './stepfunctions';
79 changes: 79 additions & 0 deletions packages/@aws-cdk/aws-pipes-targets-alpha/lib/lambda.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import { IInputTransformation, IPipe, ITarget, TargetConfig } from '@aws-cdk/aws-pipes-alpha';
import { IRole } from 'aws-cdk-lib/aws-iam';
import * as lambda from 'aws-cdk-lib/aws-lambda';

/**
* Parameters for the LambdaFunction target
*/
export interface LambdaFunctionParameters {

/**
* The input transformation to apply to the message before sending it to the target.
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetparameters.html#cfn-pipes-pipe-pipetargetparameters-inputtemplate
* @default none
*/
readonly inputTransformation?: IInputTransformation;

/**
* Specify whether to invoke the Lambda Function synchronously (`REQUEST_RESPONSE`) or asynchronously (`FIRE_AND_FORGET`).
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetlambdafunctionparameters.html
* @default LambdaFunctionInvocationType.REQUEST_RESPONSE
*/
readonly invocationType?: LambdaFunctionInvocationType;
}

/**
* InvocationType for invoking the Lambda Function.
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipetargetlambdafunctionparameters.html
*/
export enum LambdaFunctionInvocationType {
/**
* Invoke Lambda Function asynchronously (`Invoke`). `InvocationType` is set to `Event` on `Invoke`, see https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html for more details.
*/
FIRE_AND_FORGET = 'FIRE_AND_FORGET',

/**
* Invoke Lambda Function synchronously (`Invoke`) and wait for the response. `InvocationType` is set to `RequestResponse` on `Invoke`, see https://docs.aws.amazon.com/lambda/latest/api/API_Invoke.html for more details.
*/
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
}

/**
* An EventBridge Pipes target that sends messages to an AWS Lambda Function.
*/
export class LambdaFunction implements ITarget {
public readonly targetArn: string;

private readonly lambdaFunction: lambda.IFunction;
private readonly invocationType: LambdaFunctionInvocationType;
private readonly inputTemplate?: IInputTransformation;

constructor(
lambdaFunction: lambda.IFunction,
parameters: LambdaFunctionParameters,
) {
this.lambdaFunction = lambdaFunction;
this.targetArn = lambdaFunction.functionArn;
this.invocationType =
parameters.invocationType ??
LambdaFunctionInvocationType.REQUEST_RESPONSE;
this.inputTemplate = parameters.inputTransformation;
}

grantPush(grantee: IRole): void {
this.lambdaFunction.grantInvoke(grantee);
}

bind(pipe: IPipe): TargetConfig {
return {
targetParameters: {
inputTemplate: this.inputTemplate?.bind(pipe).inputTemplate,
lambdaFunctionParameters: {
invocationType: this.invocationType,
},
},
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import * as cdk from 'aws-cdk-lib';
import * as sqs from 'aws-cdk-lib/aws-sqs';
import * as sfn from 'aws-cdk-lib/aws-stepfunctions';
import * as lambda from 'aws-cdk-lib/aws-lambda';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as targets from '@aws-cdk/aws-pipes-targets-alpha';
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Jest Snapshot v1, https://goo.gl/fbAQLP

exports[`lambda-function should grant lambda function invoke 1`] = `
{
"MyLambdaPipeRoleEF32F0E5": {
"Properties": {
"AssumeRolePolicyDocument": {
"Statement": [
{
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "pipes.amazonaws.com",
},
},
],
"Version": "2012-10-17",
},
},
"Type": "AWS::IAM::Role",
},
}
`;
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
// eslint-disable-next-line import/no-extraneous-dependencies
import { LambdaClient, TagResourceCommand } from '@aws-sdk/client-lambda';

exports.handler = async (event: any, context: any) => {
const client = new LambdaClient();
const input = {
Resource: context.invokedFunctionArn,
Tags: {
Identifier: event[0].body, // event is received in batches, we just take the first message to update the tag. See https://docs.aws.amazon.com/eventbridge/latest/userguide/pipes-targets-specifics.html
},
};
const command = new TagResourceCommand(input);
await client.send(command);
};

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 2ae6ddd

Please sign in to comment.