Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

EventBridge Pipes L2 Construct #473

Closed
10 of 11 tasks
RaphaelManke opened this issue Jan 4, 2023 · 31 comments · Fixed by #488
Closed
10 of 11 tasks

EventBridge Pipes L2 Construct #473

RaphaelManke opened this issue Jan 4, 2023 · 31 comments · Fixed by #488
Assignees
Labels
bar-raiser/assigned l2-request request for new L2 construct management/tracking status/done Implementation complete

Comments

@RaphaelManke
Copy link
Contributor

RaphaelManke commented Jan 4, 2023

Description

Amazon EventBridge Pipes (User Guide, CloudFormation resource) enable connections between several aws services and has filtering, transforming and enrichment capabilities that makes connecting aws services much easier.

Although there is no L2 support for this new aws feature yet.
This results in an user experience that can be improved to have a similar experience than the aws console has.

The current L1 cfn constructs give user no hint which services can be connected and what needs to be done for example in regards to iam permissions.
The AWS console provides a nice ui that is split into four phases:
pipes overview
This source, enrichment and target have a dropdown list of possible options.
On top of that the console creates a iam policy that is needed to access all the configured sources.

The current L1 construct api has no type safety and gives the user no clue which sources, enrichments and targets can be used. On top of that the user has to create iam roles and permissions by itself.

Example of L1 construct connecting two sqs queues

const pipeRole = new Role(this, "PipeRole", {
  assumedBy: new ServicePrincipal("pipes.amazonaws.com", {}),
});

const sourceQueue = new Queue(this, "SourceQueue");
const targetQueue = new Queue(this, "TargetQueue");

const Pipe = new CfnPipe(this, "MyPipe", {
  roleArn: pipeRole.roleArn,
  source: sourceQueue.queueArn,
  target: targetQueue.queueArn,
});
sourceQueue.grantConsumeMessages(pipeRole);
targetQueue.grantSendMessages(pipeRole);

I'd suggest to build a L2 construct that gives the user guidance how to build pipes.

Possible class diagram

expand diagram
classDiagram 
  direction LR
  
  class Pipe {
    source PipeSource
    target PipeTarget
    filter? PipeFilter
    enrichment? PipeEnrichment
  }
  
  Pipe --> PipeSource
  Pipe --> PipeTarget
  Pipe --> PipeFilter
  Pipe --> PipeEnrichment
  
  PipeSource --> DynamoDBSource
  PipeSource --> KinesisSource
  PipeSource --> AmazonMqSource
  PipeSource --> AmazonMSKSource
  PipeSource --> SelfManagedKafkaSource
  PipeSource --> SelfManagedKafkaSource
  PipeSource --> SqsSource
  
  PipeTarget --> ApiDestinationTarget
  PipeTarget --> ApiGatewayTarget
  PipeTarget --> BatchJobQueueTarget
  PipeTarget --> CloudwatchLoggroupTarget
  PipeTarget --> EcsTaskTarget
  PipeTarget --> AllOtherTarget
  
  
  class PipeFilter {
    fromObject()
    fromString()
  }
  
  class PipeEnrichment 
  
  PipeEnrichment --> ApiDestinationEnrichment
  PipeEnrichment --> ApiGatewayEnrichment
  PipeEnrichment --> LambdaEnrichment
  PipeEnrichment --> StepFunctionEnrichment

Loading

Example usage of the L2 construct

const sourceQueue = new Queue(this, "SourceQueue");
const targetQueue = new Queue(this, "TargetQueue");
const lambdaFunction = new NodejsFunction(this, "LambdaFunction")

const Pipe = new Pipe(this, "MyPipe", {
  source: new SqsSource(sourceQueue),
  target: new SqsTarget(targetQueue, {
    inputTemplate: JSON.stringify({
      body: "<$.body>",
      messageId: "<$.messageId>",
      messageAttributes: "<$.messageAttributes>",
      nestedBody: {
        body: "<$.body>",
      },
    }),
  }),
  filter: PipeFilter.fromObject({ body: [{ prefix: "Test" }] }),
  enrichment: new LambdaEnrichment(lambdaFunction)
});

PoC implementation

https://github.com/RaphaelManke/aws-cdk-pipes-rfc-473

Roles

Role User
Proposed by @RaphaelManke
Author(s) @RaphaelManke
API Bar Raiser @mrgrain
Stakeholders @nikp

See RFC Process for details

Workflow

  • Tracking issue created (label: status/proposed)
  • API bar raiser assigned (ping us at #aws-cdk-rfcs if needed)
  • Kick off meeting
  • RFC pull request submitted (label: status/review)
  • Community reach out (via Slack and/or Twitter)
  • API signed-off (label api-approved applied to pull request)
  • Final comments period (label: status/final-comments-period)
  • Approved and merged (label: status/approved)
  • Execution plan submitted (label: status/planning)
  • Plan approved and merged (label: status/implementing)
  • Implementation complete (label: status/done)

Author is responsible to progress the RFC according to this checklist, and
apply the relevant labels to this issue so that the RFC table in README gets
updated.

@RaphaelManke
Copy link
Contributor Author

I created a PoC implementation here: https://github.com/RaphaelManke/aws-cdk-pipes-rfc-473

@RaphaelManke
Copy link
Contributor Author

RaphaelManke commented Jan 15, 2023

Pipe

AWS EventBridge Pipe has itself is a fully managed service that does the heavy lifting of polling a source, then be able to filter out payloads based on filter criteria. This reduces the target invocations and can reduce costs.
After filtering events the resulting events can be enriched in the enrichment phase of a Pipe. The result of the enrichment is then pushed to the Target.
Before passing a payload to the enrichment and Target the payload can be transformed using a input Transformation.
To give the EventBridge Pipe access to the services that are connected in a pipe, each Pipe assumes a IAM Role. This role must have iam policies attached to read from a source, invoke a enrichment service and finally push to a target service.

So a Pipe has the following components:

besides these (core) components that are used while processing data, there are additional attributes that describe a Pipe

  • Name
    • This the (physical-) identifier for the AWS resource, the actual Pipe. It is used in the ARN of the provisioned resource.
  • Description
    • This is text field for humans to identify what the pipe does.
  • Tags
    • AWS tags for the resource
graph LR
classDef required fill:#00941b 
classDef optional fill:#5185f5

Source:::required
Filter:::optional
Enrichment_Input_Transformation[Input transformation]:::optional
Enrichment:::optional
Target_Input_Transformation[Input transformation]:::optional
Target:::required

Source --> Filter --> Enrichment_Input_Transformation --> Enrichment --> Target_Input_Transformation --> Target

Loading

Example implementation

interface PipeProps {
	readonly source : PipeSource
	readonly target : PipeTarget
	
	readonly filter? : PipeFilter
	readonly enrichment? : PipeEnrichment
	readonly role? : IRole // role is optional, if not provided a new role is created
	readonly description : string
	readonly tags? : Tags
}

interface Pipe {
	readonly role : IRole
	readonly source : PipeSource
	readonly target : PipeTarget
	
	readonly filter? : PipeFilter
	readonly enrichment? : PipeEnrichment
	readonly description : string
	readonly tags? : Tags

	constructor(scope: Scope, id: string, props:PipeProps)
}

Open questions

  1. Should the input Transformation be part of the PipeProps (alternative: a property of the PipeEnrichment and PipeTarget props) ?
    1. Pro PipeProps:
      1. In the case of a Refactoring, for example replace the target the input transformation doesn't have to be touched/moved
    2. Con PipeProps:
      1. Input transformation can occur twice in a Pipe definition. The naming needs to make sure for which phase the the transformation is meant. E.g. EnrichmentInputTransformation and TargetInputTransformation
      2. Setting the EnrichmentInputTransformation without an PipeEnrichment makes no sense and needs additional validation code. This can be omitted if the inputTransformation is a property of the PipeEnrichment or PipeTarget classes.
  2. Should the PipeFilter be part of the PipeSource property definition instead of a attribute on the Pipeclass?
    1. Pro:
      1. The possible filter keys depend on the source
      2. cloudformation itself put the FilterCriteria into the PipeSourceParameters
    2. Con:
      1. To align with the AWS console it should be on the same level as the Source itself. User that have tested pipes in the console can easier understand the api.
      2. It would be more robust to future AWS changes because the Filter can always be defined based on the cloudformation generated type definitions and don't have to be explicitly build for a new source.

Source

A source is a AWS Service that needs to be polled.
The following Sources are possible:

The CfnPipe resource reference the source only by their ARN. Right now there is no validation in der CDK framework that checks if a ARN is valid or not.
To overcome this shortcoming a PipeSource class representing a source is needed. This PipeSource is then implemented by all the supported sources.

export abstract class PipeSource {

	public readonly sourceArn: string;
	
	public readonly sourceParameters?:
	| CfnPipe.PipeSourceParametersProperty
	| IResolvable;
	
	constructor(
		sourceArn: string,
		props?: CfnPipe.PipeSourceParametersProperty,
	) {
		this.sourceArn = sourceArn;
		this.sourceParameters = props;
	}
	
	public abstract grantRead(grantee: IRole): void;
}

This PipeSource class has a sourceArn that is mapped to the CfnPipe sourceArn attribute.
The sourceParameters are the config options for the source. Depending on the source theses attributes are present under a different key. E.g. for a SQS queue the configuration attributes are:

{
	sqsQueueParameters : {...}
}

The specific Source class implementation hides this detail for the user and provide a interface with only the possible configuration options that are possible for the specific source.

interface PipeSourceSqsQueueParametersProperty {
	readonly batchSize?: number;
	readonly maximumBatchingWindowInSeconds?: number;
}

This interface for example is provided by the cloudformation specification and can be used as a base for possible configurations (additional validation can be added if useful).

To be able to consume a source the EventBridge Pipe has a IAM-role. This role needs to have a policy to read from the source.
The grantRead method need to be implemented for that purpose.
E.g. the SQS can leverage its L2 .grantConsumeMessages() method.

Example implementation

An example api for a source that polls for a SQS-queue then can look like:

export class SqsSource extends PipeSource {

	private queue: IQueue;
	
	constructor(queue: IQueue, props?:CfnPipe.PipeSourceSqsQueueParametersProperty) {

	super(queue.queueArn, { sqsQueueParameters: props });
		this.queue = queue;
	}

	public grantRead(grantee: IRole): void {
		this.queue.grantConsumeMessages(grantee);
	}
}

It takes an existing SQS-queue and polling properties that are possible for that kind of source and does implement a grantRead method which creates the required IAM policy for the Pipe role.

Role

A IAM role is required that can be assumed by the pipes.amazonaws.com principal. This role needs IAM policies attached to read from a PipeSource, invoke a PipeEnrichment and push to a PipeTarget.
The user can bring its own role. If the user does not provide a role, a new role will be created. In both cases the role should be exposed by the Pipe class so it is transparent for user which role is used within the Pipe.

Open questions

  1. How can be assured the pipes service has access to encrypted sources and targets? The role or pipes principal needs access to KMS.
  2. Can we allow IRole or do we need to make a restriction to allow Role only?
    1. We have to make sure the generated policies are attached to the role in both cases. If restricted to Role this can easily done by using L2 construct methods of the role or the source, enrichment or target and pass the role along. If a IRole is provided the role policies cannot be extended.

Filter

A filter does pattern matching based on the incoming payload and the specified filter criteria's. The matching is done in the same way the EventBridge pattern are matched.
The possible fields that can be used to filter incoming payloads are depending on the source.

Example Implementation

The implementation is split into two types.

  1. generic Filter
    1. this filter is the basic class for defining a filter. It represent 1:1 the cloudformation filter specification.
  2. Source specific filter
    1. this filter gives the user guidance on which attributes for this specific source a filter can be created. It then takes care of that the actual data-key e.g. data, body, dynamodb see docs.
interface IPipeFilterPattern {
	pattern: string;
}

class PipeGenericFilterPattern {
	static fromJson(patternObject: Record<string, any>) :IPipeFilterPattern {
		return { pattern: JSON.stringify(patternObject) };
	}
}

interface SqsMessageAttributes : {
	messageId?: string;
	receiptHandle?: string;
	body?: any;
	attributes?: {
		ApproximateReceiveCount?: string;
		SentTimestamp?: string;
		SequenceNumber?: string;
		MessageGroupId?: string;
		SenderId?: string;
		MessageDeduplicationId?: string;
		ApproximateFirstReceiveTimestamp?: string;
	};
	messageAttributes?: any;
	md5OfBody?: string;
}

class PipeSqsFilterPattern extends PipeGenericFilterPattern {
	static fromSqsMessageAttributes(attributes: SqsMessageAttributes) :IPipeFilterPattern {
		return {
			pattern: JSON.stringify( attributes ),
		};

	}
}

Target

A Target is the end of the Pipe. After the payload from the source is pulled, filtered and enriched it is forwarded to the target.
For now the following targets are supported:

  • API destination
  • API Gateway
  • Batch job queue
  • CloudWatch log group
  • ECS task
  • Event bus in the same account and Region
  • Firehose delivery stream
  • Inspector assessment template
  • Kinesis stream
  • Lambda function (SYNC or ASYNC)
  • Redshift cluster data API queries
  • SageMaker Pipeline
  • SNS topic
  • SQS queue
  • Step Functions state machine
    • Express workflows (ASYNC)
    • Standard workflows (SYNC or ASYNC)

The CfnPipe resource reference the target only by their ARN. Right now there is no validation in der CDK framework that checks if a ARN is valid or not.
To overcome this shortcoming a PipeTarget class representing a target is needed. This PipeTarget is then implemented by all the supported targets.

The implementation is then similar to the Source implementation:

Example implementation

interface IPipeTarget {
	targetArn: string;
	targetParameters: CfnPipe.PipeTargetParametersProperty;
	
	grantPush(grantee: IRole): void;
};


export interface SqsTargetProps {
	queue: IQueue;
	sqsQueueParameters?: CfnPipe.PipeTargetSqsQueueParametersProperty;
}

export class SqsTarget implements IPipeTarget {
	private queue: IQueue;
	targetArn: string;
	targetParameters: CfnPipe.PipeTargetParametersProperty;

	constructor(props: SqsTargetProps) {
		this.queue = props.queue;
		this.targetArn = props.queue.queueArn;
		this.targetParameters = { sqsQueueParameters: props.sqsQueueParameters };
	}
	
	public grantPush(grantee: IRole): void {
		this.queue.grantSendMessages(grantee);
	}
}

Enrichment

In the enrichment step the filtered payloads can be used to invoke one of the following services

  • API destination
  • Amazon API Gateway
  • Lambda function
  • Step Functions state machine
    • only express workflow

The invocation is a synchron call to the service. The result of the enrichment step then can be used to combine it with the filtered payload to target.
The enrichment has two main properties for all types of supported services

  • enrichment ARN
  • input transformation

The enrichment ARN is the AWS resource ARN that should be invoked. The Role must have access to invoke this ARN.
The input transformation is used to map values from the filter step output to the input to the enrichment step.
For API destination and Api Gateway enrichments there can additional request parameter be set like header, query params. These properties can either be static or dynamic based on the payload from the previous step or extracted from the input transformation.

Example implementation

export abstract class PipeEnrichment {
	public readonly enrichmentArn: string;
	public enrichmentParameters: CfnPipe.PipeEnrichmentParametersProperty;
	
	constructor( enrichmentArn: string, props: CfnPipe.PipeEnrichmentParametersProperty) {
		this.enrichmentParameters = props;
		this.enrichmentArn = enrichmentArn;
	}
	
	abstract grantInvoke(grantee: IRole): void;
}

export class LambdaEnrichment extends PipeEnrichment {
	private lambda : IFunction;
	
	constructor(lambda: IFunction, props: {inputTransformation?: PipeInputTransformation}) {
		super(lambda.functionArn, { inputTemplate: props.inputTransformation?.inputTemplate });
		this.lambda = lambda;	
	}
	
	grantInvoke(grantee: IRole): void {
		this.lambda.grantInvoke(grantee);
	}
}

Input Transformation

Input transformations are used to transform or extend payloads to a desired structure. This transformation mechanism can be used prior to the enrichment or target step.

There are two types of mappings. Both types can be either static values or use values from the output of the previous step. Additionally there are a few values that come from the pipe itself (see reservedVariables enum).

  • string
    • static
    • dynamic
  • json
    • static
    • dynamic

Example implementation

enum reservedVariables {
	PIPES_ARN = '<aws.pipes.pipe-arn>',
	PIPES_NAME = '<aws.pipes.pipe-name>',
	PIPES_TARGET_ARN = '<aws.pipes.target-arn>',
	PIPE_EVENT_INGESTION_TIME = '<aws.pipes.event.ingestion-time>',
	PIPE_EVENT = '<aws.pipes.event>',
	PIPE_EVENT_JSON = '<aws.pipes.event.json>'
}

type StaticString = string;
type JsonPath = `<$.${string}>`;
type KeyValue = Record<string, string | reservedVariables>;
type StaticJsonFlat = Record<string, StaticString| JsonPath | KeyValue >;
type InputTransformJson = Record<string, StaticString| JsonPath | KeyValue | StaticJsonFlat>;

type PipeInputTransformationValue = StaticString | InputTransformJson

export interface IInputTransformationProps {
	inputTemplate: PipeInputTransformationValue;
} 

export class PipeInputTransformation {
	static fromJson(inputTemplate: Record<string, any>): PipeInputTransformation {
		return new PipeInputTransformation({ inputTemplate });
	} 

	readonly inputTemplate: string;

	constructor(props: IInputTransformationProps) {
		this.inputTemplate = JSON.stringify(props);
	}
}

Open Question

  1. The EventBridge L2 construct has a InputTransformation as well see cdk docs. Should this be reused/extended?
  2. Should there be specific InputTransformation helper that are specific to a source similar to the Source filter.

@nikp
Copy link

nikp commented Jan 16, 2023

I am an engineer on the Pipes service team. Thanks for contributing this! I just came back from vacation and will allocate some time in the next couple of weeks to review this.

@RaphaelManke
Copy link
Contributor Author

@nikp is there something I can do in the mean time? Should I start with a draft RFC Pull Request? Extend my PoC implementation?

@nikp
Copy link

nikp commented Feb 7, 2023

@RaphaelManke I'm truly sorry for the delay. I did not mean to become a bottleneck. I've been unable to allocate time to this due to some other emergent issues. I am not on the CDK team, and am a customer of them also. Please follow the process they recommend - in the checklist I think a bar raiser needs to be assigned. That said a draft RFC seems like the right move too.

I don't want to overpromise and underdeliver again but I will get back to provide feedback as soon as I can.

@mrgrain mrgrain changed the title Eventbridge Pipe L2 Construct Eventbridge Pipes L2 Construct Feb 21, 2023
@Mdev303
Copy link

Mdev303 commented Mar 4, 2023

it's been a month , is there any news about this ? seems stuck in stage 2 for a few months now

@RaphaelManke
Copy link
Contributor Author

Yes there is 😃 @mrgrain got assigned as bar raiser. We will meet in the next weeks and discuss next steps.

@mrgrain mrgrain changed the title Eventbridge Pipes L2 Construct EventBridge Pipes L2 Construct Mar 13, 2023
@RaphaelManke
Copy link
Contributor Author

After a kickoff this week with @mrgrain I created a PR for the RFC. #488

@niklaswallerstedt
Copy link

@RaphaelManke thanks for this just implemented CfnPipe this weekend.

The source (DynamoDB Stream) and target (EventBridge) were pretty easy to setup, I used the same setup with .grantRead and .grantPut on a custom role.

The filter was also quite easy, the thing that tripped me up for a bit was how to transform the target input to EventBridge. I could not find a way to dynamically set the source and event-detail from the event itself.

Would be helpful to understand what kind of default transformation Pipes is setting for each target.

I can provide some code later to explain better. Looking forward to having this as a L2.

@spac3lord
Copy link

Hi @RaphaelManke ! Thanks for your contribution - an L2 construct for Pipes would be a great asset. @nikp asked me to take a look to not make you wait longer.

Would it be fair to separate the construct into several key concepts (which unsurprisingly match the Pipes stages)?

  1. Sources
  2. Filters
  3. Enrichers (really invoking an external function that can do more than enrich, e.g. it can also filter out events)
  4. Targets (with Input Filters)

Sources

I'll have to double-check but I believe all sources have batchSize and maximumBatchingWindowInSeconds parameters as those define the Pipes poling behavior. The remaining parameters are specific to the source. Would you think that it'd be possible (and useful) to separate these concepts?

Filters

The fields and expressions changing with the source is indeed a bit cumbersome (it also affects writing Enrichers) but I'd be worried that trying to wrap this into classes for each source could end up becoming a liability when Pipes supports more sources. EventBridge rules uses a common EventPatterns class for this reason.

Enrichment

Would you plan on providing wrappers for each kind (I saw Lambda in your repo, so I assume yes)?

Targets

I'd have to have a closer look but would favor reusing the Input Transformers from EB if possible. I'd be cautious again about tailoring them for each Target type fort the same reason as the sources.

I'd be happy to have a live chat also.

RaphaelManke added a commit to RaphaelManke/aws-cdk-rfcs that referenced this issue Mar 24, 2023
@RaphaelManke
Copy link
Contributor Author

Sources

I'll have to double-check but I believe all sources have batchSize and maximumBatchingWindowInSeconds parameters as those define the Pipes poling behavior.

Nice 😃 didn't notice that yet. I updated the RFC PR to add this info.

The remaining parameters are specific to the source. Would you think that it'd be possible (and useful) to separate these concepts?

My idea would be that the Source class constructor provide these source specific attributes as constructor params.

Filters

The fields and expressions changing with the source is indeed a bit cumbersome (it also affects writing Enrichers) but I'd be worried that trying to wrap this into classes for each source could end up becoming a liability when Pipes supports more sources. EventBridge rules uses a common EventPatterns class for this reason.

Reusing would be a good idea, I am not sure if it matches 100 %.
I would at least build a generic class (or static class method) that can take the json input from the aws console so that the developer can use the pattern simulation there and copy and paste the result to its codebase. This also allows new sources.

A source specific filter class would be an addition to make creating these pattern easier.

Targets

Reusing existing Input Transformers would be very time efficient because this parts is the trickiest of all due to the <> syntax which is no json.

I am open to have a live chat 😃 you can reach me on the cdk slack or any other social media.

@RaphaelManke
Copy link
Contributor Author

I just found the pipes settings section:
image

But I don't know how to configure these things in cloudformation
@mrgrain, @nikp
do you know or can get to know if this should be possible or is this config via cfn not possible right now?

@NicholasFields
Copy link

Thanks for your great work and initiative @RaphaelMank

Legitimately my favorite PR of all time
👀

@RaphaelManke
Copy link
Contributor Author

Just wanted to give you an update:
I published my current progress on the pipes implementation on npmjs
https://www.npmjs.com/package/@raphaelmanke/aws-cdk-pipes-rfc
and here you can see some examples how to use the construct

https://github.com/RaphaelManke/aws-cdk-pipes-rfc-473/tree/main/e2e

I would be happy if you guys would play around with it and give feedback or report bugs.
I am mainly interested how you like the api and if it can be improved.

I am also happy to take PRs 😄

Note: this is a POC implementation and subject to change. I would not recommend it to use it in production.

@niklaswallerstedt
Copy link

@RaphaelManke thanks for putting this up, will try it out!

Is it possible to set the detail-type and source dynamically from the event? Could not find a way to do it using CfnPipe so maybe that's a hard limitation in Cloudformation.

"detail-type": <$.property>,
"source": <$.property>

@RaphaelManke
Copy link
Contributor Author

Thanks for this example. I checked and now think understand your question.
Let me try to rephrase your question:

You want to use an value from a source/enrichment step as an value for the target invocation.
Lets say you have an order system which puts orders on a sqs queue and want to produce a an event on eventbridge for each order.

Source: SQS
Target: EventBridge eventbus

Source Event:

{ 
  "orderSystem" : "offline", 
  "orderId" : "Order-123"
 }

this will be a SQS message in the format:

{
  "messageId": "...",
  "receiptHandle": "...",
  "body": "{ \n  \"orderSystem\" : \"offline\", \n  \"orderId\" : \"Order-123\"\n }",
  "attributes": {
    "ApproximateReceiveCount": "...",
    "SentTimestamp": "....",
    "SenderId": "...",
    "ApproximateFirstReceiveTimestamp": "..."
  },
  "messageAttributes": {},
  "md5OfBody": "...",
  "eventSource": "...",
  "eventSourceARN": "...",
  "awsRegion": "..."
}

the target event should be in format:

{
    "version": "...",
    "id": "...",
    "detail-type": "newOrder", // <-- static string
    "source": "offline", // <-- dynamic from the source event
    "account": "...",
    "time": "...",
    "region": "...",
    "resources": [],
    "detail": {
          "orderId" : "Order-123" // <-- dynamic from the source event
    }
}

AFAIK this is not possible with the tools provided in the AWS console because it lacks the capability to set the targetParameters (it only shows them when they are set).
Untitled

But it is possible via the AWS API or cloudformation.

The solution requires two parts.
To set the detail object in the target you have to provide a inputTemplate.
For this example it would look like this

{
  "orderId" : <$.body.orderId>
}

which needs to be stringyfied to:

"{ \"orderId\" : <$.body.orderId>}"

The second part is the the pipe target takes an object called targetParameters which has a key eventBridgeEventBusParameters.
Here you can set the the other fields of an eventbridge target invocation like the detail-type field.

For this example the parameters look like this:

{
    "EventBridgeEventBusParameters": {
        "Source": "$.body.orderSystem",
        "DetailType": "newOrder"
    }
}

This is described in the docs.
And for example in the cloudformation docs you can see what can be set.

Although this docs are not very clear and misses examples.

In the CDK construct you can already use the targetParameters like this

const target = new EventBridgeEventBusTarget(targetEventBus, {
  source: '$.body.orderSystem',
  detailType: 'newOrder,
});

The inputTemplate is missing currently on the construct but will follow shortly (just forgot to add this property)

@niklaswallerstedt
Copy link

You got it right, interesting will test this out!

@mrgrain
Copy link
Contributor

mrgrain commented Apr 11, 2023

I just found the pipes settings section:
image

But I don't know how to configure these things in cloudformation
@mrgrain, @nikp
do you know or can get to know if this should be possible or is this config via cfn not possible right now?

Is this maybe just for the DynamoDB Stream Source?

https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcedynamodbstreamparameters.html

@bpgould
Copy link

bpgould commented Apr 22, 2023

Would be great to have this construct!

@nikp
Copy link

nikp commented Apr 24, 2023

@mrgrain sorry i am losing my Github notifications. Today, Pipe retry policy and dead letter destinations are only supported for Kinesis and DynamoDB Stream sources:

@RaphaelManke
Copy link
Contributor Author

RaphaelManke commented May 19, 2023

The L2 construct is still in the making 😃

While using the the current api I noticed that it feels a little unintuitive to use the inputTransformation attribute to specify a transformation. I think the problem is that you don't know where the event payload is put in the enrichment or target field.

For example when using ApiDestination as enrichment or target there are the dynamic fields pathParameters, headers and queryParamters. The Body of the api call is then the input event.

If I want to transform the body I need to know that this can be done by the inputTransform property.

How about making this explicit?

See the following example:

current implementation

const enrichment = new ApiDestinationEnrichment(apiDestination, {
  headerParameters: {
    'Content-Type': 'application/json',
    'Dynamic-Header': '$.messageId',
  },

  inputTransformation: PipeInputTransformation.fromJson({ // <-- generic transformation which will be the body
    bodyFromSqs: '<$.body>',
    rawSqsMessage: '<aws.pipes.event.json>',
  }),
});

alternative implementation

const enrichment = new ApiDestinationEnrichment(apiDestination, {
  headerParameters: {
    'Content-Type': 'application/json',
    'Dynamic-Header': '$.messageId',
  },

  body: PipeInputTransformation.fromJson({  // <-- the body is a attribute of the enrichment
    bodyFromSqs: '<$.body>',
    rawSqsMessage: '<aws.pipes.event.json>',
  }),
});

What do you think?
👍 if you like the alternative implementation
👎 if you prefer the current implementation

@mrgrain
Copy link
Contributor

mrgrain commented May 22, 2023

How about making this explicit?

Is it basically a rename from inputTransformation to body you are suggesting? I'm in favor of that. Just input might be another alternative name. Same for target.

@nikp
Copy link

nikp commented May 30, 2023

Hi folks, I've finally carved off some time and am doing a thorough review of this whole thread and will reply with detailed thoughts on all the outstanding work @RaphaelManke has done so far to push this forward

But I'll go LIFO and reply to the last point first. You're very correct that the input transformation for an ApiDestination is the "body" of the request and is intrinsically tied to the rest of the target.

Unfortunately things are a bit more fuzzy for the the other targets. The trouble would be with suggesting domain language that isn't valid for that target. The event goes into the SQS Message body, but the Kinesis Record data, and the Lambda Function Invocation payload.

For some targets, the event is ignored entirely, or rather it's only used as a possible source for dynamic path parameters to the values of the Target - e.g. Redshift and ECS that let you execute data api queries and run tasks, respectively, but the event itself isn't passed through. That's just the nature of semantics of trying to be a universal resource for a variety of different AWS APIs.

In the long term, Pipes will support "Universal targets" the way that EventBridge Scheduler does: https://docs.aws.amazon.com/scheduler/latest/UserGuide/managing-targets-universal.html and each invocation will become an "AWS SDK call" with the event as a source of parameter data for it.

So I would suggest NOT renaming inputTransformer to body everywhere. However, I do like the idea of having a body provide the clarity you're seeking on the Api{Destination|Gateway}{Enrichment|Target}. What if each of those had a body that just delegated to the inputTransformer on the base class for some syntactic sugar?

I will reply with more comments later tonight

@nikp
Copy link

nikp commented May 30, 2023

First of all, want to say kudos, @RaphaelManke, you've done really creative work on this RFC, and I think it's going to be a great user experience for CDK users everywhere. I'm learning a lot from your ideas, as this is not really my area of expertise building mostly web services. :)

I welcome disagreement to any of my proposals or ideas below from you or anyone else in the community or anyone more experienced like @mrgrain

Now onto some of the detailed comments

Pipe

Open questions

  1. Should the input Transformation be part of the PipeProps (alternative: a property of the PipeEnrichment and PipeTarget props) ?

The object type i.e. IInputTransformation should certainly be the same, but the instances should be inside PipeEnrichment/PipeTarget separately, rather than at the root level - to make sure it’s really clear which one it’s transforming. As you noted in your latest comment, For ApiDestinations the InputTransformer is actually a key part of the target construction, and so we should strive for clarity by keeping them close.

However, I'm noticing from looking at the code in your repo that you currently have it on individual Enrichment/Target objects - e.g. ApiDestinationEnrichment, LambdaEnrichment, etc. Is that intentional? There shouldn't be any reason why these can't be reused. Input transformer support is identical in syntax for both enrichment and Target.

Speaking of reuse, just while we're here, the ApiDestination and ApiGateway parameters for both Enrichment and Target are identical, and could also be reused. In the API/CFN we call it HttpParameters - header/query-string/path parameters. It's just HTTP APIs. The reason why ApiGateway and ApiDestinations are otherwise separate targets is ApiGateway is an AWS resource and has an ARN, and an ApiDestination is any HTTP target, and is an EventBridge resource for managed authN/authZ and rate limiting.

  1. Should the PipeFilter be part of the PipeSource property definition instead of a attribute on the Pipeclass?

I don’t feel strongly about this one either way, there is good pros and cons to both. So I would default to maintaining consistency with CFN/API, and putting it inside PipeSource to minimize any user surprise

Source

As a matter of guidance, I would suggest not pulling up common parameters for the individual sources to the top level, and keep them on the individual Source type objects. The reason is that there are subtle differences between even concepts that seem common. For example, Kinesis/DynamoDB stream batches are always going to be within a single shard. Whereas SQS batches are across the entire source. These are just the natural realities of a generic resource to integrate different types of services. The batch size limits are also quite different.

Role

Open questions

  1. How can be assured the pipes service has access to encrypted sources and targets? The role or pipes principal needs access to KMS.

Good question, let’s look at some prior art for how SNS→SQS: https://github.com/aws/aws-cdk/blob/cea1039e3664fdfa89c6f00cdaeb1a0185a12678/packages/%40aws-cdk/aws-sns-subscriptions/lib/sqs.ts#L54

Would that work?

  1. Can we allow IRole or do we need to make a restriction to allow Role only?

I don’t know enough about IRole vs Role in CDK-land to comment on this

In a broad sense, the specific policies to configure permissions for different sources can actually get fairly complex, and I can provide some help with this. SQS and Kinesis polling is fairly straightforward but permissions to poll for instance a Self-Managed-Kafka source from a VPC can get fairly advanced, and diverse depending on the authorization types. I can share the policy templates the console uses for each scenario, and we can probably encode them into the logic of the L2 construct.

Just to give you an example of that last one:

{
      "Version": "2012-10-17",
      "Statement": [
        {  
           "Effect": "Allow",
           "Action": [
              "secretsmanager:GetSecretValue"
           ],  
           "Resource": [    
              "arn:aws:secretsmanager:{{region}}:{{accountId}}:secret:{{secretId}}"  
           ]
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:DescribeNetworkInterfaces",
                "ec2:DescribeSubnets",
                "ec2:DescribeSecurityGroups",
                "ec2:DescribeVpcs"
            ],
            "Resource": "*"
        },
        {
            "Effect": "Allow",
            "Action": [
                "ec2:CreateNetworkInterface",
                "ec2:DeleteNetworkInterface"
            ],
            "Resource": "*",
            "Condition": {
                "StringEqualsIfExists": {
                    "ec2:SubnetID": ["[[subnetIds]]"]
                }
            }
        }
    ]
}

A good starting point for the policies is the evb-cli - https://github.com/mhlabs/evb-cli/blob/master/src/commands/pipes/pipes-config.json We worked with @ljacobsson to share some of the policies to make it easy to configure permissions.

Would be happy to provide more help here

Filter

I can't quite tell if I'm agreeing or disagreeing with @spac3lord but I like the concept of strongly typed fields for each Filter - the Source documentation has payload examples for each source.

For event buses pattern matching is a bit easier because each one has a standard envelope allowing us to use EventPattern. With Pipes each source is slightly different and we strived to be (mostly) backwards compatible with the payloads of Lambda EventSourceMapping, resulting in what you see here.

Target

	public grantPush(grantee: IRole): void {

My suggestion would be to use the terminology grantInvoke instead of grantPush because it’s more in line with EventBridge domain language for target invocation.

Input Transformation

There are two types of mappings. Both types can be either static values or use values from the output of the previous step. Additionally there are a few values that come from the pipe itself (see reservedVariables enum).

  • string

    • static
    • dynamic
  • json

    • static
    • dynamic

At first i wasn't following this point... Static and Dynamic mappings are the same, the only question is whether they contain inline variable usage. The string vs JSON transformer type may be considered somewhat distinct if only because a user may want to explicitly choose to choose JSON and want client-side validation. (This is extra desirable because a “JSON” input transformer template is itself not necessarily valid JSON to allow for the possibility of referencing JSON objects, such as:
InputTemplate: > {"key1": "value1","key2": <$.body.someObjectOrArray> }

But then I saw your code, and I think I understand now - you're describing how you want to strongly type the validation for these right? Very cool!

Example implementation

type StaticString = string;
type JsonPath = `<$.${string}>`;
type KeyValue = Record<string, string | reservedVariables>;
type StaticJsonFlat = Record<string, StaticString| JsonPath | KeyValue >;
type InputTransformJson = Record<string, StaticString| JsonPath | KeyValue | StaticJsonFlat>;

This syntax is very compelling to me, can we use the same for the Enrichment/Target parameters that all support json value?

Open Question

  1. The EventBridge L2 construct has a InputTransformation as well see cdk docs. Should this be reused/extended?

This one I will disagree with @spac3lord on, though I’m willing to hear other arguments. While the semantic purpose behind input transformation and the overall capabilities remain the same, the Pipe input transformation syntax was simplified by merging everything into one field (whereas EventBridge Rule transformers separate variable declaration from usage). Further, while Pipes are a part of the EventBridge “product”, and are in the same console, from an API and CFN perspective, they do not share common types, and are distinct. So maybe the CDK should stay distinct too?

  1. Should there be specific InputTransformation helper that are specific to a source similar to the Source filter.

That’s an interesting concept but I’m not sure how it would help. The best way would probably be to work around the fact that JSON-escaped body fields for SQS sources and Base64-Encoded data fields can be used as if they are just standard JSON for both filters and input transformers. (https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-input-transformation.html#input-transform-implicit)

@mrgrain
Copy link
Contributor

mrgrain commented Jul 14, 2023

Filter

I can't quite tell if I'm agreeing or disagreeing with @spac3lord but I like the concept of strongly typed fields for each Filter - the Source documentation has payload examples for each source.

For event buses pattern matching is a bit easier because each one has a standard envelope allowing us to use EventPattern. With Pipes each source is slightly different and we strived to be (mostly) backwards compatible with the payloads of Lambda EventSourceMapping, resulting in what you see here.

I would like to see the existing Matcher being included in the design for filters. In reality this will have to be a subset because not all matchers are supported, but the difference is now well documented. For the purpose of the RFC you can call it MatchSubSet or PipesMatch. We have a similar need for lambda event sources so an implementation will be very beneficial.

@mrgrain
Copy link
Contributor

mrgrain commented Jul 31, 2023

To Do

@moltar
Copy link

moltar commented Jul 31, 2023

* Example for enum like classes to avoid unions -> see https://docs.aws.amazon.com/cdk/api/v2/docs/aws-cdk-lib.aws_events.Match.htm

@mrgrain FYI this 404s

@mrgrain
Copy link
Contributor

mrgrain commented Jul 31, 2023

Fixed, thank you.

@evgenyka evgenyka added l2-request request for new L2 construct bar-raiser/assigned labels Aug 10, 2023
RaphaelManke added a commit to RaphaelManke/aws-cdk-rfcs that referenced this issue Sep 6, 2023
@RaphaelManke
Copy link
Contributor Author

I've updated to the corresponding PR and would like to move the discussion over to the PR so we can discuss around the RFC directly with the goal to get that one merged. So that we are enabled to start the work of an alpha module.

@mrgrain mrgrain added status/final-comment-period Pending final approval and removed status/proposed Newly proposed RFC labels Oct 17, 2023
@mrgrain
Copy link
Contributor

mrgrain commented Oct 17, 2023

Thanks @RaphaelManke for the iterations! I'm very happy with this now and have progressed the RFC to final comments period.

@mrgrain mrgrain added status/implementing RFC is being implemented and removed status/final-comment-period Pending final approval labels Oct 25, 2023
@mrgrain
Copy link
Contributor

mrgrain commented Oct 25, 2023

Approved and moving straight to implementing since you already have a prototype @RaphaelManke

You are most welcome to start submitting PRs. The actual code review and merging will be handled by whoever is currently doing PR reviews, which is not me at the moment. However I will be hanging around to provide context if needed. Feel free to ping me if something is stuck.

@mergify mergify bot closed this as completed in #488 Oct 25, 2023
mergify bot pushed a commit that referenced this issue Oct 25, 2023
This is a request for comments about EventBridge Pipes L2 Construct See #473 for
additional details. 

Closes: #473

APIs are signed off by @mrgrain 

---

_By submitting this pull request, I confirm that my contribution is made under
the terms of the Apache-2.0 license_
mergify bot pushed a commit to aws/aws-cdk that referenced this issue Jan 30, 2024
This PR is the starting point of the implementation for a L2 construct as defined in aws/aws-cdk-rfcs#473

In this PR the basic Pipe class is introduced including the api interfaces for how to define a pipe. 


Closes #23495

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
SankyRed pushed a commit to aws/aws-cdk that referenced this issue Feb 8, 2024
This PR is the starting point of the implementation for a L2 construct as defined in aws/aws-cdk-rfcs#473

In this PR the basic Pipe class is introduced including the api interfaces for how to define a pipe. 


Closes #23495

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
@mrgrain mrgrain added status/done Implementation complete and removed status/implementing RFC is being implemented labels Apr 1, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bar-raiser/assigned l2-request request for new L2 construct management/tracking status/done Implementation complete
Projects
None yet
Development

Successfully merging a pull request may close this issue.

10 participants