diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/README.md b/packages/@aws-cdk/aws-pipes-sources-alpha/README.md index 69f4d58a4dcc9..4cdc3c48adf97 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/README.md +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/README.md @@ -17,17 +17,46 @@ -EventBridge Pipes let you create source to target connections between several -aws services. While transporting messages from a source to a target the messages -can be filtered, transformed and enriched. +EventBridge Pipes Sources let you create a source for a EventBridge Pipe. -![diagram of pipes](https://d1.awsstatic.com/product-marketing/EventBridge/Product-Page-Diagram_Amazon-EventBridge-Pipes.cd7961854be4432d63f6158ffd18271d6c9fa3ec.png) For more details see the service documentation: -[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes.html) +[Documentation](https://docs.aws.amazon.com/eventbridge/latest/userguide/eb-pipes-event-source.html) ## Pipe sources -// TODO +Pipe sources are the starting point of a EventBridge Pipe. They are the source of the events that are sent to the pipe. +### Amazon SQS + +A SQS message queue can be used as a source for a pipe. The queue will be polled for new messages and the messages will be sent to the pipe. + +```ts +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; + +const pipeSource = new sources.SqsSource(sourceQueue); + +const pipe = new pipes.Pipe(this, 'Pipe', { + source: pipeSource, + target: new SomeTarget(targetQueue) +}); +``` + +The polling configuration can be customized: + +```ts +declare const sourceQueue: sqs.Queue; +declare const targetQueue: sqs.Queue; + +const pipeSource = new sources.SqsSource(sourceQueue, { + batchSize: 10, + maximumBatchingWindow: cdk.Duration.seconds(10) +}); + +const pipe = new pipes.Pipe(this, 'Pipe', { + source: pipeSource, + target: new SomeTarget(targetQueue) +}); +``` diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/default.ts-fixture b/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/default.ts-fixture index 782590cf38ec1..5b6e1e3adf6aa 100644 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/default.ts-fixture +++ b/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/default.ts-fixture @@ -5,25 +5,9 @@ import * as lambda from 'aws-cdk-lib/aws-lambda'; import * as logs from 'aws-cdk-lib/aws-logs'; import { Construct } from 'constructs'; import * as pipes from '@aws-cdk/aws-pipes-alpha'; +import * as sources from '@aws-cdk/aws-pipes-sources-alpha'; -class SqsSource implements pipes.ISource { - sourceArn: string; - sourceParameters = undefined; - constructor(private readonly queue: sqs.Queue) { - this.queue = queue; - this.sourceArn = queue.queueArn; - } - bind(_pipe: pipes.IPipe): pipes.SourceConfig { - return { - sourceParameters: this.sourceParameters, - }; - } - grantRead(pipeRole: cdk.aws_iam.IRole): void { - this.queue.grantConsumeMessages(pipeRole); - } -} - -class SqsTarget implements pipes.ITarget { +class SomeTarget implements pipes.ITarget { targetArn: string; inputTransformation: pipes.InputTransformation | undefined; @@ -46,47 +30,6 @@ class SqsTarget implements pipes.ITarget { } } -class LambdaEnrichment implements pipes.IEnrichment { - enrichmentArn: string; - - private inputTransformation: pipes.InputTransformation | undefined; - constructor(private readonly lambda: lambda.Function, props: {inputTransformation?: pipes.InputTransformation} = {}) { - this.enrichmentArn = lambda.functionArn; - this.inputTransformation = props?.inputTransformation - } - bind(pipe: pipes.IPipe): pipes.EnrichmentParametersConfig { - return { - enrichmentParameters: { - inputTemplate: this.inputTransformation?.bind(pipe).inputTemplate, - }, - }; - } - grantInvoke(pipeRole: cdk.aws_iam.IRole): void { - this.lambda.grantInvoke(pipeRole); - } -} - -class CloudwatchDestination implements pipes.ILogDestination { - parameters: pipes.LogDestinationParameters; - constructor(private readonly logGroup: cdk.aws_logs.LogGroup) { - this.logGroup = logGroup; - this.parameters = { - cloudwatchLogsLogDestination: { - logGroupArn: logGroup.logGroupArn, - }, - }; - } - bind(_pipe: pipes.IPipe): pipes.LogDestinationConfig { - return { - parameters: this.parameters, - }; - } - - grantPush(pipeRole: cdk.aws_iam.IRole): void { - this.logGroup.grantWrite(pipeRole); - } -} - class Fixture extends cdk.Stack { constructor(scope: Construct, id: string) { super(scope, id); diff --git a/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/pipes-imports.ts-fixture b/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/pipes-imports.ts-fixture deleted file mode 100644 index 861d0e072a8ce..0000000000000 --- a/packages/@aws-cdk/aws-pipes-sources-alpha/rosetta/pipes-imports.ts-fixture +++ /dev/null @@ -1,7 +0,0 @@ -// Fixture with packages imported, but nothing else -import * as cdk from 'aws-cdk-lib'; -import * as sqs from 'aws-cdk-lib/aws-sqs'; -import * as lambda from 'aws-cdk-lib/aws-lambda'; -import * as pipes from '@aws-cdk/aws-pipes-alpha'; - -///here \ No newline at end of file