Skip to content

Commit

Permalink
Add README content
Browse files Browse the repository at this point in the history
  • Loading branch information
RaphaelManke committed Feb 9, 2024
1 parent 3216e27 commit 080289a
Show file tree
Hide file tree
Showing 3 changed files with 37 additions and 72 deletions.
41 changes: 35 additions & 6 deletions packages/@aws-cdk/aws-pipes-sources-alpha/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,46 @@
<!--END STABILITY BANNER-->


EventBridge Pipes let you create source to target connections between several
aws services. While transporting messages from a source to a target the messages
can be filtered, transformed and enriched.
EventBridge Pipes Sources let you create a source for a EventBridge Pipe.

![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png)

For more details see the service documentation:

[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html)
[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html)

## Pipe sources

// TODO
Pipe sources are the starting point of a EventBridge Pipe. They are the source of the events that are sent to the pipe.

### Amazon SQS

A SQS message queue can be used as a source for a pipe. The queue will be polled for new messages and the messages will be sent to the pipe.

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.SqsSource(sourceQueue);

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```

The polling configuration can be customized:

```ts
declare const sourceQueue: sqs.Queue;
declare const targetQueue: sqs.Queue;

const pipeSource = new sources.SqsSource(sourceQueue, {
batchSize: 10,
maximumBatchingWindow: cdk.Duration.seconds(10)
});

const pipe = new pipes.Pipe(this, 'Pipe', {
source: pipeSource,
target: new SomeTarget(targetQueue)
});
```
Original file line number Diff line number Diff line change
Expand Up @@ -5,25 +5,9 @@ import * as lambda from 'aws-cdk-lib/aws-lambda';
import * as logs from 'aws-cdk-lib/aws-logs';
import { Construct } from 'constructs';
import * as pipes from '@aws-cdk/aws-pipes-alpha';
import * as sources from '@aws-cdk/aws-pipes-sources-alpha';

class SqsSource implements pipes.ISource {
sourceArn: string;
sourceParameters = undefined;
constructor(private readonly queue: sqs.Queue) {
this.queue = queue;
this.sourceArn = queue.queueArn;
}
bind(_pipe: pipes.IPipe): pipes.SourceConfig {
return {
sourceParameters: this.sourceParameters,
};
}
grantRead(pipeRole: cdk.aws_iam.IRole): void {
this.queue.grantConsumeMessages(pipeRole);
}
}

class SqsTarget implements pipes.ITarget {
class SomeTarget implements pipes.ITarget {
targetArn: string;
inputTransformation: pipes.InputTransformation | undefined;

Expand All @@ -46,47 +30,6 @@ class SqsTarget implements pipes.ITarget {
}
}

class LambdaEnrichment implements pipes.IEnrichment {
enrichmentArn: string;

private inputTransformation: pipes.InputTransformation | undefined;
constructor(private readonly lambda: lambda.Function, props: {inputTransformation?: pipes.InputTransformation} = {}) {
this.enrichmentArn = lambda.functionArn;
this.inputTransformation = props?.inputTransformation
}
bind(pipe: pipes.IPipe): pipes.EnrichmentParametersConfig {
return {
enrichmentParameters: {
inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate,
},
};
}
grantInvoke(pipeRole: cdk.aws_iam.IRole): void {
this.lambda.grantInvoke(pipeRole);
}
}

class CloudwatchDestination implements pipes.ILogDestination {
parameters: pipes.LogDestinationParameters;
constructor(private readonly logGroup: cdk.aws_logs.LogGroup) {
this.logGroup = logGroup;
this.parameters = {
cloudwatchLogsLogDestination: {
logGroupArn: logGroup.logGroupArn,
},
};
}
bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig {
return {
parameters: this.parameters,
};
}

grantPush(pipeRole: cdk.aws_iam.IRole): void {
this.logGroup.grantWrite(pipeRole);
}
}

class Fixture extends cdk.Stack {
constructor(scope: Construct, id: string) {
super(scope, id);
Expand Down

This file was deleted.

0 comments on commit 080289a

Please sign in to comment.