diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.assets.json index 41aa262dfadaa..07c13c21ea421 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.assets.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.assets.json @@ -1,7 +1,7 @@ { "version": "38.0.1", "files": { - "25f5843484c10a3b762cdda9cddcdbaf948c1d795dd2294a83ba77c6c1b732ef": { + "1772e18de899d6961118c074890d20c1b2f65ac8abfc63b6cb92c0aa6ccea511": { "source": { "path": "kinesis-resource-policy.template.json", "packaging": "file" @@ -9,7 +9,7 @@ "destinations": { "current_account-current_region": { "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", - "objectKey": "25f5843484c10a3b762cdda9cddcdbaf948c1d795dd2294a83ba77c6c1b732ef.json", + "objectKey": "1772e18de899d6961118c074890d20c1b2f65ac8abfc63b6cb92c0aa6ccea511.json", "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" } } diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.template.json index 7431a787d0228..9a437ed9874b7 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.template.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/kinesis-resource-policy.template.json @@ -67,6 +67,65 @@ "Version": "2012-10-17" } } + }, + "StreamConsumer58240CBA": { + "Type": "AWS::Kinesis::StreamConsumer", + "Properties": { + "ConsumerName": "stream-consumer", + "StreamARN": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + } + }, + "StreamConsumerPolicy925BAE36": { + "Type": "AWS::Kinesis::ResourcePolicy", + "Properties": { + "ResourceArn": { + "Fn::GetAtt": [ + "StreamConsumer58240CBA", + "ConsumerARN" + ] + }, + "ResourcePolicy": { + "Statement": [ + { + "Action": [ + "kinesis:DescribeStreamConsumer", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Principal": { + "AWS": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::", + { + "Ref": "AWS::AccountId" + }, + ":root" + ] + ] + } + }, + "Resource": { + "Fn::GetAtt": [ + "StreamConsumer58240CBA", + "ConsumerARN" + ] + } + } + ], + "Version": "2012-10-17" + } + } } }, "Conditions": { diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/manifest.json index 5b7e1577fd04f..e0a924656f394 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/manifest.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/manifest.json @@ -19,7 +19,7 @@ "notificationArns": [], "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", - "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/25f5843484c10a3b762cdda9cddcdbaf948c1d795dd2294a83ba77c6c1b732ef.json", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/1772e18de899d6961118c074890d20c1b2f65ac8abfc63b6cb92c0aa6ccea511.json", "requiresBootstrapStackVersion": 6, "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", "additionalDependencies": [ @@ -53,6 +53,18 @@ "data": "AwsCdkKinesisEncryptedStreamsUnsupportedRegions" } ], + "/kinesis-resource-policy/StreamConsumer/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "StreamConsumer58240CBA" + } + ], + "/kinesis-resource-policy/StreamConsumer/Policy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "StreamConsumerPolicy925BAE36" + } + ], "/kinesis-resource-policy/BootstrapVersion": [ { "type": "aws:cdk:logicalId", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/tree.json index 1cb4e4dd9901b..5b328bf35010b 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/tree.json +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.js.snapshot/tree.json @@ -118,6 +118,101 @@ "version": "0.0.0" } }, + "StreamConsumer": { + "id": "StreamConsumer", + "path": "kinesis-resource-policy/StreamConsumer", + "children": { + "Resource": { + "id": "Resource", + "path": "kinesis-resource-policy/StreamConsumer/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Kinesis::StreamConsumer", + "aws:cdk:cloudformation:props": { + "consumerName": "stream-consumer", + "streamArn": { + "Fn::GetAtt": [ + "MyStream5C050E93", + "Arn" + ] + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.CfnStreamConsumer", + "version": "0.0.0" + } + }, + "Policy": { + "id": "Policy", + "path": "kinesis-resource-policy/StreamConsumer/Policy", + "children": { + "Resource": { + "id": "Resource", + "path": "kinesis-resource-policy/StreamConsumer/Policy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Kinesis::ResourcePolicy", + "aws:cdk:cloudformation:props": { + "resourceArn": { + "Fn::GetAtt": [ + "StreamConsumer58240CBA", + "ConsumerARN" + ] + }, + "resourcePolicy": { + "Statement": [ + { + "Action": [ + "kinesis:DescribeStreamConsumer", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Principal": { + "AWS": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::", + { + "Ref": "AWS::AccountId" + }, + ":root" + ] + ] + } + }, + "Resource": { + "Fn::GetAtt": [ + "StreamConsumer58240CBA", + "ConsumerARN" + ] + } + } + ], + "Version": "2012-10-17" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.CfnResourcePolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.ResourcePolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.StreamConsumer", + "version": "0.0.0" + } + }, "BootstrapVersion": { "id": "BootstrapVersion", "path": "kinesis-resource-policy/BootstrapVersion", diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.ts index bd9911520e5c6..0d2d651fecdb9 100644 --- a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.ts +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.resource-policy.ts @@ -1,5 +1,5 @@ import { App, Stack } from 'aws-cdk-lib'; -import { Stream } from 'aws-cdk-lib/aws-kinesis'; +import { Stream, StreamConsumer } from 'aws-cdk-lib/aws-kinesis'; import { AccountPrincipal, PolicyStatement } from 'aws-cdk-lib/aws-iam'; import { IntegTest } from '@aws-cdk/integ-tests-alpha'; @@ -8,6 +8,11 @@ const stack = new Stack(app, 'kinesis-resource-policy'); const stream = new Stream(stack, 'MyStream'); +const streamConsumer = new StreamConsumer(stack, 'StreamConsumer', { + streamConsumerName: 'stream-consumer', + stream: stream, +}); + stream.addToResourcePolicy(new PolicyStatement({ resources: [stream.streamArn], actions: [ @@ -17,6 +22,15 @@ stream.addToResourcePolicy(new PolicyStatement({ principals: [new AccountPrincipal(stack.account)], })); +streamConsumer.addToResourcePolicy(new PolicyStatement({ + resources: [streamConsumer.streamConsumerArn], + actions: [ + 'kinesis:DescribeStreamConsumer', + 'kinesis:SubscribeToShard', + ], + principals: [new AccountPrincipal(stack.account)], +})); + new IntegTest(app, 'integ-kinesis-resource-policy', { testCases: [stack], stackUpdateWorkflow: false, diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/cdk.out b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/cdk.out new file mode 100644 index 0000000000000..c6e612584e352 --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/cdk.out @@ -0,0 +1 @@ +{"version":"38.0.1"} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integ.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integ.json new file mode 100644 index 0000000000000..e584b18a39025 --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integ.json @@ -0,0 +1,13 @@ +{ + "version": "38.0.1", + "testCases": { + "integ-kinesis-stream-consumer/DefaultTest": { + "stacks": [ + "kinesis-stream-consumer" + ], + "stackUpdateWorkflow": false, + "assertionStack": "integ-kinesis-stream-consumer/DefaultTest/DeployAssert", + "assertionStackName": "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB" + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets.json new file mode 100644 index 0000000000000..a256b5c892f00 --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets.json @@ -0,0 +1,19 @@ +{ + "version": "38.0.1", + "files": { + "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22": { + "source": { + "path": "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.template.json", + "packaging": "file" + }, + "destinations": { + "current_account-current_region": { + "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", + "objectKey": "21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json", + "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" + } + } + } + }, + "dockerImages": {} +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.template.json new file mode 100644 index 0000000000000..ad9d0fb73d1dd --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.template.json @@ -0,0 +1,36 @@ +{ + "Parameters": { + "BootstrapVersion": { + "Type": "AWS::SSM::Parameter::Value", + "Default": "/cdk-bootstrap/hnb659fds/version", + "Description": "Version of the CDK Bootstrap resources in this environment, automatically retrieved from SSM Parameter Store. [cdk:skip]" + } + }, + "Rules": { + "CheckBootstrapVersion": { + "Assertions": [ + { + "Assert": { + "Fn::Not": [ + { + "Fn::Contains": [ + [ + "1", + "2", + "3", + "4", + "5" + ], + { + "Ref": "BootstrapVersion" + } + ] + } + ] + }, + "AssertDescription": "CDK bootstrap stack version 6 required. Please run 'cdk bootstrap' with a recent version of the CDK CLI." + } + ] + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/kinesis-stream-consumer.assets.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/kinesis-stream-consumer.assets.json new file mode 100644 index 0000000000000..c3ac627307e09 --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/kinesis-stream-consumer.assets.json @@ -0,0 +1,19 @@ +{ + "version": "38.0.1", + "files": { + "52b6c32d15bf28e7bb9a43db853fe7f4ff5190a0fc21ca12439f1916bf38123b": { + "source": { + "path": "kinesis-stream-consumer.template.json", + "packaging": "file" + }, + "destinations": { + "current_account-current_region": { + "bucketName": "cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}", + "objectKey": "52b6c32d15bf28e7bb9a43db853fe7f4ff5190a0fc21ca12439f1916bf38123b.json", + "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-file-publishing-role-${AWS::AccountId}-${AWS::Region}" + } + } + } + }, + "dockerImages": {} +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/kinesis-stream-consumer.template.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/kinesis-stream-consumer.template.json new file mode 100644 index 0000000000000..c2d4b682c75ac --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/kinesis-stream-consumer.template.json @@ -0,0 +1,173 @@ +{ + "Resources": { + "Stream790BDEE4": { + "Type": "AWS::Kinesis::Stream", + "Properties": { + "RetentionPeriodHours": 24, + "ShardCount": 1, + "StreamEncryption": { + "Fn::If": [ + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + { + "Ref": "AWS::NoValue" + }, + { + "EncryptionType": "KMS", + "KeyId": "alias/aws/kinesis" + } + ] + } + }, + "UpdateReplacePolicy": "Retain", + "DeletionPolicy": "Retain" + }, + "StreamConsumer58240CBA": { + "Type": "AWS::Kinesis::StreamConsumer", + "Properties": { + "ConsumerName": "stream-consumer", + "StreamARN": { + "Fn::GetAtt": [ + "Stream790BDEE4", + "Arn" + ] + } + } + }, + "Role1ABCC5F0": { + "Type": "AWS::IAM::Role", + "Properties": { + "AssumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "AWS": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::", + { + "Ref": "AWS::AccountId" + }, + ":root" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "RoleDefaultPolicy5FFB7DAB": { + "Type": "AWS::IAM::Policy", + "Properties": { + "PolicyDocument": { + "Statement": [ + { + "Action": [ + "kinesis:DescribeStream", + "kinesis:DescribeStreamConsumer", + "kinesis:DescribeStreamSummary", + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:ListShards", + "kinesis:ListStreams", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Stream790BDEE4", + "Arn" + ] + } + }, + { + "Action": [ + "kinesis:DescribeStreamConsumer", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "StreamConsumer58240CBA", + "ConsumerARN" + ] + } + } + ], + "Version": "2012-10-17" + }, + "PolicyName": "RoleDefaultPolicy5FFB7DAB", + "Roles": [ + { + "Ref": "Role1ABCC5F0" + } + ] + } + } + }, + "Conditions": { + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions": { + "Fn::Or": [ + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-north-1" + ] + }, + { + "Fn::Equals": [ + { + "Ref": "AWS::Region" + }, + "cn-northwest-1" + ] + } + ] + } + }, + "Parameters": { + "BootstrapVersion": { + "Type": "AWS::SSM::Parameter::Value", + "Default": "/cdk-bootstrap/hnb659fds/version", + "Description": "Version of the CDK Bootstrap resources in this environment, automatically retrieved from SSM Parameter Store. [cdk:skip]" + } + }, + "Rules": { + "CheckBootstrapVersion": { + "Assertions": [ + { + "Assert": { + "Fn::Not": [ + { + "Fn::Contains": [ + [ + "1", + "2", + "3", + "4", + "5" + ], + { + "Ref": "BootstrapVersion" + } + ] + } + ] + }, + "AssertDescription": "CDK bootstrap stack version 6 required. Please run 'cdk bootstrap' with a recent version of the CDK CLI." + } + ] + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/manifest.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/manifest.json new file mode 100644 index 0000000000000..6d40605261784 --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/manifest.json @@ -0,0 +1,139 @@ +{ + "version": "38.0.1", + "artifacts": { + "kinesis-stream-consumer.assets": { + "type": "cdk:asset-manifest", + "properties": { + "file": "kinesis-stream-consumer.assets.json", + "requiresBootstrapStackVersion": 6, + "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version" + } + }, + "kinesis-stream-consumer": { + "type": "aws:cloudformation:stack", + "environment": "aws://unknown-account/unknown-region", + "properties": { + "templateFile": "kinesis-stream-consumer.template.json", + "terminationProtection": false, + "validateOnSynth": false, + "notificationArns": [], + "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", + "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/52b6c32d15bf28e7bb9a43db853fe7f4ff5190a0fc21ca12439f1916bf38123b.json", + "requiresBootstrapStackVersion": 6, + "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", + "additionalDependencies": [ + "kinesis-stream-consumer.assets" + ], + "lookupRole": { + "arn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-lookup-role-${AWS::AccountId}-${AWS::Region}", + "requiresBootstrapStackVersion": 8, + "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version" + } + }, + "dependencies": [ + "kinesis-stream-consumer.assets" + ], + "metadata": { + "/kinesis-stream-consumer/Stream/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "Stream790BDEE4" + } + ], + "/kinesis-stream-consumer/AwsCdkKinesisEncryptedStreamsUnsupportedRegions": [ + { + "type": "aws:cdk:logicalId", + "data": "AwsCdkKinesisEncryptedStreamsUnsupportedRegions" + } + ], + "/kinesis-stream-consumer/StreamConsumer/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "StreamConsumer58240CBA" + } + ], + "/kinesis-stream-consumer/Role/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "Role1ABCC5F0" + } + ], + "/kinesis-stream-consumer/Role/DefaultPolicy/Resource": [ + { + "type": "aws:cdk:logicalId", + "data": "RoleDefaultPolicy5FFB7DAB" + } + ], + "/kinesis-stream-consumer/BootstrapVersion": [ + { + "type": "aws:cdk:logicalId", + "data": "BootstrapVersion" + } + ], + "/kinesis-stream-consumer/CheckBootstrapVersion": [ + { + "type": "aws:cdk:logicalId", + "data": "CheckBootstrapVersion" + } + ] + }, + "displayName": "kinesis-stream-consumer" + }, + "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets": { + "type": "cdk:asset-manifest", + "properties": { + "file": "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets.json", + "requiresBootstrapStackVersion": 6, + "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version" + } + }, + "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB": { + "type": "aws:cloudformation:stack", + "environment": "aws://unknown-account/unknown-region", + "properties": { + "templateFile": "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.template.json", + "terminationProtection": false, + "validateOnSynth": false, + "notificationArns": [], + "assumeRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-deploy-role-${AWS::AccountId}-${AWS::Region}", + "cloudFormationExecutionRoleArn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-cfn-exec-role-${AWS::AccountId}-${AWS::Region}", + "stackTemplateAssetObjectUrl": "s3://cdk-hnb659fds-assets-${AWS::AccountId}-${AWS::Region}/21fbb51d7b23f6a6c262b46a9caee79d744a3ac019fd45422d988b96d44b2a22.json", + "requiresBootstrapStackVersion": 6, + "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version", + "additionalDependencies": [ + "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets" + ], + "lookupRole": { + "arn": "arn:${AWS::Partition}:iam::${AWS::AccountId}:role/cdk-hnb659fds-lookup-role-${AWS::AccountId}-${AWS::Region}", + "requiresBootstrapStackVersion": 8, + "bootstrapStackVersionSsmParameter": "/cdk-bootstrap/hnb659fds/version" + } + }, + "dependencies": [ + "integkinesisstreamconsumerDefaultTestDeployAssert5861EECB.assets" + ], + "metadata": { + "/integ-kinesis-stream-consumer/DefaultTest/DeployAssert/BootstrapVersion": [ + { + "type": "aws:cdk:logicalId", + "data": "BootstrapVersion" + } + ], + "/integ-kinesis-stream-consumer/DefaultTest/DeployAssert/CheckBootstrapVersion": [ + { + "type": "aws:cdk:logicalId", + "data": "CheckBootstrapVersion" + } + ] + }, + "displayName": "integ-kinesis-stream-consumer/DefaultTest/DeployAssert" + }, + "Tree": { + "type": "cdk:tree", + "properties": { + "file": "tree.json" + } + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/tree.json b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/tree.json new file mode 100644 index 0000000000000..1a40fdc285bde --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.js.snapshot/tree.json @@ -0,0 +1,300 @@ +{ + "version": "tree-0.1", + "tree": { + "id": "App", + "path": "", + "children": { + "kinesis-stream-consumer": { + "id": "kinesis-stream-consumer", + "path": "kinesis-stream-consumer", + "children": { + "Stream": { + "id": "Stream", + "path": "kinesis-stream-consumer/Stream", + "children": { + "Resource": { + "id": "Resource", + "path": "kinesis-stream-consumer/Stream/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Kinesis::Stream", + "aws:cdk:cloudformation:props": { + "retentionPeriodHours": 24, + "shardCount": 1, + "streamEncryption": { + "Fn::If": [ + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + { + "Ref": "AWS::NoValue" + }, + { + "EncryptionType": "KMS", + "KeyId": "alias/aws/kinesis" + } + ] + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.CfnStream", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.Stream", + "version": "0.0.0" + } + }, + "AwsCdkKinesisEncryptedStreamsUnsupportedRegions": { + "id": "AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + "path": "kinesis-stream-consumer/AwsCdkKinesisEncryptedStreamsUnsupportedRegions", + "constructInfo": { + "fqn": "aws-cdk-lib.CfnCondition", + "version": "0.0.0" + } + }, + "StreamConsumer": { + "id": "StreamConsumer", + "path": "kinesis-stream-consumer/StreamConsumer", + "children": { + "Resource": { + "id": "Resource", + "path": "kinesis-stream-consumer/StreamConsumer/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::Kinesis::StreamConsumer", + "aws:cdk:cloudformation:props": { + "consumerName": "stream-consumer", + "streamArn": { + "Fn::GetAtt": [ + "Stream790BDEE4", + "Arn" + ] + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.CfnStreamConsumer", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_kinesis.StreamConsumer", + "version": "0.0.0" + } + }, + "Role": { + "id": "Role", + "path": "kinesis-stream-consumer/Role", + "children": { + "ImportRole": { + "id": "ImportRole", + "path": "kinesis-stream-consumer/Role/ImportRole", + "constructInfo": { + "fqn": "aws-cdk-lib.Resource", + "version": "0.0.0" + } + }, + "Resource": { + "id": "Resource", + "path": "kinesis-stream-consumer/Role/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Role", + "aws:cdk:cloudformation:props": { + "assumeRolePolicyDocument": { + "Statement": [ + { + "Action": "sts:AssumeRole", + "Effect": "Allow", + "Principal": { + "AWS": { + "Fn::Join": [ + "", + [ + "arn:", + { + "Ref": "AWS::Partition" + }, + ":iam::", + { + "Ref": "AWS::AccountId" + }, + ":root" + ] + ] + } + } + } + ], + "Version": "2012-10-17" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnRole", + "version": "0.0.0" + } + }, + "DefaultPolicy": { + "id": "DefaultPolicy", + "path": "kinesis-stream-consumer/Role/DefaultPolicy", + "children": { + "Resource": { + "id": "Resource", + "path": "kinesis-stream-consumer/Role/DefaultPolicy/Resource", + "attributes": { + "aws:cdk:cloudformation:type": "AWS::IAM::Policy", + "aws:cdk:cloudformation:props": { + "policyDocument": { + "Statement": [ + { + "Action": [ + "kinesis:DescribeStream", + "kinesis:DescribeStreamConsumer", + "kinesis:DescribeStreamSummary", + "kinesis:GetRecords", + "kinesis:GetShardIterator", + "kinesis:ListShards", + "kinesis:ListStreams", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "Stream790BDEE4", + "Arn" + ] + } + }, + { + "Action": [ + "kinesis:DescribeStreamConsumer", + "kinesis:SubscribeToShard" + ], + "Effect": "Allow", + "Resource": { + "Fn::GetAtt": [ + "StreamConsumer58240CBA", + "ConsumerARN" + ] + } + } + ], + "Version": "2012-10-17" + }, + "policyName": "RoleDefaultPolicy5FFB7DAB", + "roles": [ + { + "Ref": "Role1ABCC5F0" + } + ] + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.CfnPolicy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Policy", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.aws_iam.Role", + "version": "0.0.0" + } + }, + "BootstrapVersion": { + "id": "BootstrapVersion", + "path": "kinesis-stream-consumer/BootstrapVersion", + "constructInfo": { + "fqn": "aws-cdk-lib.CfnParameter", + "version": "0.0.0" + } + }, + "CheckBootstrapVersion": { + "id": "CheckBootstrapVersion", + "path": "kinesis-stream-consumer/CheckBootstrapVersion", + "constructInfo": { + "fqn": "aws-cdk-lib.CfnRule", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.Stack", + "version": "0.0.0" + } + }, + "integ-kinesis-stream-consumer": { + "id": "integ-kinesis-stream-consumer", + "path": "integ-kinesis-stream-consumer", + "children": { + "DefaultTest": { + "id": "DefaultTest", + "path": "integ-kinesis-stream-consumer/DefaultTest", + "children": { + "Default": { + "id": "Default", + "path": "integ-kinesis-stream-consumer/DefaultTest/Default", + "constructInfo": { + "fqn": "constructs.Construct", + "version": "10.4.2" + } + }, + "DeployAssert": { + "id": "DeployAssert", + "path": "integ-kinesis-stream-consumer/DefaultTest/DeployAssert", + "children": { + "BootstrapVersion": { + "id": "BootstrapVersion", + "path": "integ-kinesis-stream-consumer/DefaultTest/DeployAssert/BootstrapVersion", + "constructInfo": { + "fqn": "aws-cdk-lib.CfnParameter", + "version": "0.0.0" + } + }, + "CheckBootstrapVersion": { + "id": "CheckBootstrapVersion", + "path": "integ-kinesis-stream-consumer/DefaultTest/DeployAssert/CheckBootstrapVersion", + "constructInfo": { + "fqn": "aws-cdk-lib.CfnRule", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.Stack", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "@aws-cdk/integ-tests-alpha.IntegTestCase", + "version": "0.0.0" + } + } + }, + "constructInfo": { + "fqn": "@aws-cdk/integ-tests-alpha.IntegTest", + "version": "0.0.0" + } + }, + "Tree": { + "id": "Tree", + "path": "Tree", + "constructInfo": { + "fqn": "constructs.Construct", + "version": "10.4.2" + } + } + }, + "constructInfo": { + "fqn": "aws-cdk-lib.App", + "version": "0.0.0" + } + } +} \ No newline at end of file diff --git a/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.ts b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.ts new file mode 100644 index 0000000000000..e716ca06e7320 --- /dev/null +++ b/packages/@aws-cdk-testing/framework-integ/test/aws-kinesis/test/integ.stream-consumer.ts @@ -0,0 +1,25 @@ +import { App, Stack } from 'aws-cdk-lib'; +import { Stream, StreamConsumer } from 'aws-cdk-lib/aws-kinesis'; +import { IntegTest } from '@aws-cdk/integ-tests-alpha'; +import { Role, AccountRootPrincipal } from 'aws-cdk-lib/aws-iam'; + +const app = new App(); +const stack = new Stack(app, 'kinesis-stream-consumer'); + +const stream = new Stream(stack, 'Stream'); + +const streamConsumer = new StreamConsumer(stack, 'StreamConsumer', { + streamConsumerName: 'stream-consumer', + stream: stream, +}); + +const role = new Role(stack, 'Role', { + assumedBy: new AccountRootPrincipal(), +}); + +streamConsumer.grantRead(role); + +new IntegTest(app, 'integ-kinesis-stream-consumer', { + testCases: [stack], + stackUpdateWorkflow: false, +}); diff --git a/packages/aws-cdk-lib/aws-kinesis/README.md b/packages/aws-cdk-lib/aws-kinesis/README.md index 27f89e65b7626..06548f5d4d814 100644 --- a/packages/aws-cdk-lib/aws-kinesis/README.md +++ b/packages/aws-cdk-lib/aws-kinesis/README.md @@ -15,7 +15,8 @@ intake and aggregation. - [Write Permissions](#write-permissions) - [Custom Permissions](#custom-permissions) - [Metrics](#metrics) - - [Resource Policy](#resource-policy) +- [Stream Consumers](#stream-consumers) +- [Resource Policy](#resource-policy) ## Streams @@ -189,9 +190,54 @@ stream.metricGetRecordsSuccess(); stream.metricGetRecordsSuccess({ statistic: 'Maximum' }); ``` -### Resource Policy +## Stream Consumers -You can create a resource policy for a data stream. +Creating stream consumers allow consumers to receive data from the stream using enhanced fan-out at a rate of up to 2 MiB per second for every shard. +This rate is unaffected by the total number of consumers that read from the same stream. + +For more information, see [Develop enhanced fan-out consumers with dedicated throughput](https://docs.aws.amazon.com/streams/latest/dev/enhanced-consumers.html). + +To create and associate a stream consumer with a stream + +```ts +const stream = new kinesis.Stream(this, 'MyStream'); + +const streamConsumer = new StreamConsumer(stack, 'MyStreamConsumer', { + streamConsumerName: 'MyStreamConsumer', + stream, +}); +``` + +#### Read Permissions + +Grant `read` access to a stream consumer, and the stream it is registered with, by calling the `grantRead()` API. + +```ts +const lambdaRole = new iam.Role(this, 'Role', { + assumedBy: new iam.ServicePrincipal('lambda.amazonaws.com'), + description: 'Example role...', +}); + +const stream = new kinesis.Stream(this, 'MyEncryptedStream', { + encryption: kinesis.StreamEncryption.KMS, +}); +const streamConsumer = new StreamConsumer(stack, 'MyStreamConsumer', { + streamConsumerName: 'MyStreamConsumer', + stream, +}); + +// give lambda permissions to read stream via the stream consumer +streamConsumer.grantRead(lambdaRole); +``` + +In addition to stream's permissions, the following permissions are provided to a service principal by the `grantRead()` API: + +- `kinesis:DescribeStreamConsumer`, +- `kinesis:SubscribeToShard`, + +## Resource Policy + +You can create a resource policy for a data stream or a stream consumer. For more information, see [Controlling access to Amazon Kinesis Data Streams resources using IAM](https://docs.aws.amazon.com/streams/latest/dev/controlling-access.html). A resource policy is automatically created when `addToResourcePolicy` is called, if one doesn't already exist. @@ -200,13 +246,24 @@ Using `addToResourcePolicy` is the simplest way to add a resource policy: ```ts const stream = new kinesis.Stream(this, 'MyStream'); +const streamConsumer = new StreamConsumer(stack, 'MyStreamConsumer', { + streamConsumerName: 'MyStreamConsumer', + stream, +}); -// create a resource policy via addToResourcePolicy method +// create a stream resource policy via addToResourcePolicy method stream.addToResourcePolicy(new iam.PolicyStatement({ resources: [stream.streamArn], actions: ['kinesis:GetRecords'], principals: [new iam.AnyPrincipal()], })); + +// create a stream consumer resource policy via addToResourcePolicy method +streamConsumer.addToResourcePolicy(new iam.PolicyStatement({ + resources: [stream.streamArn], + actions: ['kinesis:DescribeStreamConsumer'], + principals: [new iam.AnyPrincipal()], +})); ``` You can create a resource manually by using `ResourcePolicy`. @@ -215,6 +272,10 @@ If not, a blank policy document will be set. ```ts const stream = new kinesis.Stream(this, 'MyStream'); +const streamConsumer = new StreamConsumer(stack, 'MyStreamConsumer', { + streamConsumerName: 'MyStreamConsumer', + stream, +}); // create a custom policy document const policyDocument = new iam.PolicyDocument({ @@ -228,9 +289,15 @@ const policyDocument = new iam.PolicyDocument({ ], }); -// create a resource policy manually +// create a stream resource policy manually new kinesis.ResourcePolicy(this, 'ResourcePolicy', { stream, policyDocument, }); -``` \ No newline at end of file + +// create a stream consumer resource policy manually +new kinesis.ResourcePolicy(this, 'ResourcePolicy', { + streamConsumer, + policyDocument, +}); +``` diff --git a/packages/aws-cdk-lib/aws-kinesis/lib/index.ts b/packages/aws-cdk-lib/aws-kinesis/lib/index.ts index c239368eaa87c..74a83e78d23be 100644 --- a/packages/aws-cdk-lib/aws-kinesis/lib/index.ts +++ b/packages/aws-cdk-lib/aws-kinesis/lib/index.ts @@ -1,4 +1,5 @@ export * from './stream'; +export * from './stream-consumer'; export * from './resource-policy'; // AWS::Kinesis CloudFormation Resources: diff --git a/packages/aws-cdk-lib/aws-kinesis/lib/resource-policy.ts b/packages/aws-cdk-lib/aws-kinesis/lib/resource-policy.ts index 3987268f52bfe..ab1bd2213752d 100644 --- a/packages/aws-cdk-lib/aws-kinesis/lib/resource-policy.ts +++ b/packages/aws-cdk-lib/aws-kinesis/lib/resource-policy.ts @@ -1,6 +1,7 @@ import { Construct } from 'constructs'; import { CfnResourcePolicy } from './kinesis.generated'; import { IStream } from './stream'; +import { IStreamConsumer } from './stream-consumer'; import { PolicyDocument } from '../../aws-iam'; import { Resource } from '../../core'; @@ -10,8 +11,21 @@ import { Resource } from '../../core'; export interface ResourcePolicyProps { /** * The stream this policy applies to. + * + * Note: only one of `stream` and `streamConsumer` must be set. + * + * @default - policy is not associated to a stream + */ + readonly stream?: IStream; + + /** + * The stream consumer this policy applies to. + * + * Note: only one of `stream` and `streamConsumer` must be set. + * + * @default - policy is not associated to a consumer */ - readonly stream: IStream; + readonly streamConsumer?: IStreamConsumer; /** * IAM policy document to apply to a data stream. @@ -44,11 +58,23 @@ export class ResourcePolicy extends Resource { constructor(scope: Construct, id: string, props: ResourcePolicyProps) { super(scope, id); + if (props.stream && props.streamConsumer) { + throw new Error('Only one of stream or streamConsumer can be set'); + } + if (props.stream === undefined && props.streamConsumer === undefined) { + throw new Error('One of stream or streamConsumer must be set'); + } + this.document = props.policyDocument ?? this.document; - new CfnResourcePolicy(this, 'Resource', { + if (props.stream) this.resourcePolicy(props.stream.streamArn); + else if (props.streamConsumer) this.resourcePolicy(props.streamConsumer.streamConsumerArn); + } + + private resourcePolicy(resourceArn: string): CfnResourcePolicy { + return new CfnResourcePolicy(this, 'Resource', { resourcePolicy: this.document, - resourceArn: props.stream.streamArn, + resourceArn, }); } } diff --git a/packages/aws-cdk-lib/aws-kinesis/lib/stream-consumer.ts b/packages/aws-cdk-lib/aws-kinesis/lib/stream-consumer.ts new file mode 100644 index 0000000000000..0da666526462b --- /dev/null +++ b/packages/aws-cdk-lib/aws-kinesis/lib/stream-consumer.ts @@ -0,0 +1,231 @@ +import { Construct } from 'constructs'; +import { CfnStreamConsumer } from './kinesis.generated'; +import { ResourcePolicy } from './resource-policy'; +import { IStream, Stream } from './stream'; +import * as iam from '../../aws-iam'; +import { ArnFormat, IResource, Resource, Stack } from '../../core'; + +const READ_OPERATIONS = [ + 'kinesis:DescribeStreamConsumer', + 'kinesis:SubscribeToShard', +]; + +/** + * A Kinesis Stream Consumer + */ +export interface IStreamConsumer extends IResource { + /** + * The ARN of the stream consumer. + * + * @attribute + */ + readonly streamConsumerArn: string; + + /** + * The name of the stream consumer. + * + * @attribute + */ + readonly streamConsumerName: string; + + /** + * The stream associated with this consumer. + * + * @attribute + */ + readonly stream: IStream; + + /** + * Adds a statement to the IAM resource policy associated with this stream consumer. + * + * If this stream consumer was created in this stack (`new StreamConsumer`), a resource policy + * will be automatically created upon the first call to `addToResourcePolicy`. If + * the stream consumer is imported (`StreamConsumer.from`), then this is a no-op. + */ + addToResourcePolicy(statement: iam.PolicyStatement): iam.AddToResourcePolicyResult; + + /** + * Grant read permissions for this stream consumer and its associated stream to an IAM + * principal (Role/Group/User). + */ + grantRead(grantee: iam.IGrantable): iam.Grant; + + /** + * Grant the indicated permissions on this stream consumer to the provided IAM principal. + */ + grant(grantee: iam.IGrantable, ...actions: string[]): iam.Grant; +} + +abstract class StreamConsumerBase extends Resource implements IStreamConsumer { + /** + * The ARN of the stream consumer. + */ + public abstract readonly streamConsumerArn: string; + + /** + * The name of the stream consumer. + */ + public abstract readonly streamConsumerName: string; + + /** + * The Kinesis data stream this consumer is associated with. + */ + public abstract readonly stream: IStream; + + /** + * Indicates if a resource policy should automatically be created upon + * the first call to `addToResourcePolicy`. + * + * Set by subclasses. + */ + protected abstract readonly autoCreatePolicy: boolean; + + private resourcePolicy?: ResourcePolicy; + + /** + * Adds a statement to the IAM resource policy associated with this stream consumer. + * + * If this stream consumer was created in this stack (`new StreamConsumer`), a resource policy + * will be automatically created upon the first call to `addToResourcePolicy`. If + * the stream is imported (`StreamConsumer.from`), then this is a no-op. + */ + public addToResourcePolicy(statement: iam.PolicyStatement): iam.AddToResourcePolicyResult { + if (!this.resourcePolicy && this.autoCreatePolicy) { + this.resourcePolicy = new ResourcePolicy(this, 'Policy', { streamConsumer: this }); + } + + if (this.resourcePolicy) { + this.resourcePolicy.document.addStatements(statement); + return { statementAdded: true, policyDependable: this.resourcePolicy }; + } + return { statementAdded: false }; + } + + /** + * Grant read permissions for this stream consumer and its associated stream to an IAM + * principal (Role/Group/User). + */ + public grantRead(grantee: iam.IGrantable): iam.Grant { + this.stream.grantRead(grantee); + return this.grant(grantee, ...READ_OPERATIONS); + } + + /** + * Grant the indicated permissions on this stream consumer to the given IAM principal (Role/Group/User). + */ + public grant(grantee: iam.IGrantable, ...actions: string[]): iam.Grant { + return iam.Grant.addToPrincipalOrResource({ + grantee, + actions, + resourceArns: [this.streamConsumerArn], + resource: this, + }); + } +} + +/** + * A reference to a StreamConsumer, which can be imported using `StreamConsumer.fromStreamConsumerAttributes`. + */ +export interface StreamConsumerAttributes { + /** + * The Amazon Resource Name (ARN) of the stream consumer. + */ + readonly streamConsumerArn: string; +} + +/** + * Properties for a Kinesis Stream Consumer. + */ +export interface StreamConsumerProps { + /** + * The name of the stream consumer. + */ + readonly streamConsumerName: string; + + /** + * The Kinesis data stream to associate this consumer with. + */ + readonly stream: IStream; +} + +/** + * A Kinesis Stream Consumer + */ +export class StreamConsumer extends StreamConsumerBase { + + /** + * Imports an existing Kinesis Stream Consumer by its arn. + * + * @param scope the Construct scope. + * @param id the ID of the construct. + * @param streamConsumerArn the arn of the existing stream consumer. + */ + public static fromStreamConsumerArn(scope: Construct, id: string, streamConsumerArn: string): IStreamConsumer { + return StreamConsumer.fromStreamConsumerAttributes(scope, id, { streamConsumerArn }); + } + + /** + * Imports an existing Kinesis Stream Consumer by its attributes. + * + * @param scope the Construct scope. + * @param id the ID of the construct. + * @param attrs the attributes of the existing stream consumer. + */ + public static fromStreamConsumerAttributes(scope: Construct, id: string, attrs: StreamConsumerAttributes): IStreamConsumer { + const parsedArn = Stack.of(scope).splitArn(attrs.streamConsumerArn, ArnFormat.SLASH_RESOURCE_NAME); + const [streamName, _consumer, consumerNameTimestamp] = parsedArn.resourceName!.split('/'); + const [consumerName, _creationTimestamp] = consumerNameTimestamp.split(':'); + const streamArn = Stack.of(scope).formatArn({ + ...parsedArn, + resourceName: streamName, + }); + + class Import extends StreamConsumerBase { + public readonly streamConsumerArn = attrs.streamConsumerArn; + public readonly streamConsumerName = consumerName; + public readonly stream = Stream.fromStreamArn(scope, `${id}ImportedStream`, streamArn); + + protected readonly autoCreatePolicy = false; + } + + return new Import(scope, id); + } + + /** + * The Amazon Resource Name (ARN) of the stream consumer. + */ + public readonly streamConsumerArn: string; + + /** + * The name of the stream consumer. + */ + public readonly streamConsumerName: string; + + /** + * The Kinesis data stream this consumer is associated with. + */ + public readonly stream: IStream; + + protected readonly autoCreatePolicy = true; + + constructor(scope: Construct, id: string, props: StreamConsumerProps) { + super(scope, id, { + physicalName: props.streamConsumerName, + }); + + const streamConsumer = new CfnStreamConsumer(this, 'Resource', { + consumerName: props.streamConsumerName, + streamArn: props.stream.streamArn, + }); + + this.streamConsumerArn = this.getResourceArnAttribute(streamConsumer.attrConsumerArn, { + service: 'kinesis', + resource: 'stream', + // use '*' in place of the consumer creation timestamp for cross environment references + resourceName: `${props.stream.streamName}/consumer/${this.physicalName}:*`, + arnFormat: ArnFormat.SLASH_RESOURCE_NAME, + }); + this.streamConsumerName = this.getResourceNameAttribute(streamConsumer.attrConsumerName); + this.stream = props.stream; + } +} diff --git a/packages/aws-cdk-lib/aws-kinesis/lib/stream.ts b/packages/aws-cdk-lib/aws-kinesis/lib/stream.ts index 0fb74df769f50..2c3ee36d59f61 100644 --- a/packages/aws-cdk-lib/aws-kinesis/lib/stream.ts +++ b/packages/aws-cdk-lib/aws-kinesis/lib/stream.ts @@ -357,7 +357,7 @@ abstract class StreamBase extends Resource implements IStream { /** * Adds a statement to the IAM resource policy associated with this stream. * - * If this stream was created in this stack (`new Strem`), a resource policy + * If this stream was created in this stack (`new Stream`), a resource policy * will be automatically created upon the first call to `addToResourcePolicy`. If * the stream is imported (`Stream.import`), then this is a no-op. */ @@ -422,11 +422,11 @@ abstract class StreamBase extends Resource implements IStream { * Grant the indicated permissions on this stream to the given IAM principal (Role/Group/User). */ public grant(grantee: iam.IGrantable, ...actions: string[]) { - return iam.Grant.addToPrincipal({ + return iam.Grant.addToPrincipalOrResource({ grantee, actions, resourceArns: [this.streamArn], - scope: this, + resource: this, }); } diff --git a/packages/aws-cdk-lib/aws-kinesis/test/resource-policy.test.ts b/packages/aws-cdk-lib/aws-kinesis/test/resource-policy.test.ts index 01cd727cf8aed..ee3f5500de9a2 100644 --- a/packages/aws-cdk-lib/aws-kinesis/test/resource-policy.test.ts +++ b/packages/aws-cdk-lib/aws-kinesis/test/resource-policy.test.ts @@ -1,14 +1,23 @@ import { Template } from '../../assertions'; import * as iam from '../../aws-iam'; import { Stack } from '../../core'; -import { ResourcePolicy, Stream } from '../lib'; +import { ResourcePolicy, Stream, StreamConsumer } from '../lib'; describe('Kinesis resource policy', () => { - test('create resource policy', () => { - // GIVEN - const stack = new Stack(); - const stream = new Stream(stack, 'Stream', {}); + let stack: Stack; + let stream: Stream; + let streamConsumer: StreamConsumer; + beforeEach(() => { + stack = new Stack(); + stream = new Stream(stack, 'Stream', {}); + streamConsumer = new StreamConsumer(stack, 'StreamConsumer', { + streamConsumerName: 'consumer', + stream, + }); + }); + + test('create stream resource policy', () => { // WHEN const policyDocument = new iam.PolicyDocument({ assignSids: true, @@ -42,4 +51,81 @@ describe('Kinesis resource policy', () => { }, }); }); + + test('create stream consumer resource policy', () => { + // WHEN + const policyDocument = new iam.PolicyDocument({ + assignSids: true, + statements: [ + new iam.PolicyStatement({ + actions: ['kinesis:GetRecords'], + principals: [new iam.AnyPrincipal()], + resources: [streamConsumer.streamConsumerArn], + }), + ], + }); + + new ResourcePolicy(stack, 'ResourcePolicy', { + streamConsumer, + policyDocument, + }); + + // THEN + Template.fromStack(stack).hasResourceProperties('AWS::Kinesis::ResourcePolicy', { + ResourcePolicy: { + Version: '2012-10-17', + Statement: [ + { + Sid: '0', + Action: 'kinesis:GetRecords', + Effect: 'Allow', + Principal: { AWS: '*' }, + Resource: stack.resolve(streamConsumer.streamConsumerArn), + }, + ], + }, + }); + }); + + test('fail resource policy creation with both stream and streamConsumer set', () => { + // WHEN + const policyDocument = new iam.PolicyDocument({ + assignSids: true, + statements: [ + new iam.PolicyStatement({ + actions: ['kinesis:GetRecords'], + principals: [new iam.AnyPrincipal()], + resources: [streamConsumer.streamConsumerArn], + }), + ], + }); + + expect(() => { + new ResourcePolicy(stack, 'ResourcePolicy', { + stream, + streamConsumer, + policyDocument, + }); + }).toThrow('Only one of stream or streamConsumer can be set'); + }); + + test('fail resource policy creation with neither stream nor streamConsumer set', () => { + // WHEN + const policyDocument = new iam.PolicyDocument({ + assignSids: true, + statements: [ + new iam.PolicyStatement({ + actions: ['kinesis:GetRecords'], + principals: [new iam.AnyPrincipal()], + resources: [streamConsumer.streamConsumerArn], + }), + ], + }); + + expect(() => { + new ResourcePolicy(stack, 'ResourcePolicy', { + policyDocument, + }); + }).toThrow('One of stream or streamConsumer must be set'); + }); }); diff --git a/packages/aws-cdk-lib/aws-kinesis/test/stream-consumer.test.ts b/packages/aws-cdk-lib/aws-kinesis/test/stream-consumer.test.ts new file mode 100644 index 0000000000000..552213d4c9eca --- /dev/null +++ b/packages/aws-cdk-lib/aws-kinesis/test/stream-consumer.test.ts @@ -0,0 +1,304 @@ +import { Match, Template } from '../../assertions'; +import * as iam from '../../aws-iam/index'; +import { Stack } from '../../core'; +import { IStreamConsumer, Stream, StreamConsumer } from '../lib'; + +describe('Kinesis stream consumer', () => { + let stack: Stack; + let stream: Stream; + + beforeEach(() => { + stack = new Stack(); + stream = new Stream(stack, 'Stream', {}); + }); + + describe('stream consumer from attributes', () => { + let consumer: IStreamConsumer; + + beforeEach(() => { + consumer = StreamConsumer.fromStreamConsumerAttributes(stack, 'MyConsumer', { + streamConsumerArn: 'arn:aws:kinesis:region:account-id:stream/stream-name/consumer/consumer-name:123456', + }); + }); + + test('has expected properties', () => { + expect(consumer.streamConsumerArn).toEqual('arn:aws:kinesis:region:account-id:stream/stream-name/consumer/consumer-name:123456'); + expect(consumer.streamConsumerName).toEqual('consumer-name'); + expect(consumer.stream.streamArn).toEqual('arn:aws:kinesis:region:account-id:stream/stream-name'); + }); + + test('addToResourcePolicy is a no-op', () => { + consumer.addToResourcePolicy(new iam.PolicyStatement({ + actions: ['kinesis:DescribeStreamConsumer'], + principals: [new iam.AnyPrincipal()], + resources: [consumer.streamConsumerArn], + })); + + Template.fromStack(stack).resourceCountIs('AWS::Kinesis::ResourcePolicy', 0); + }); + + test('grantRead grants stream and consumer permissions', () => { + const user = new iam.User(stack, 'MyUser'); + consumer.grantRead(user); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + Users: [stack.resolve(user.userName)], + PolicyDocument: { + Statement: Match.arrayWith([ + // stream read permissions + { + Action: [ + 'kinesis:DescribeStreamSummary', + 'kinesis:GetRecords', + 'kinesis:GetShardIterator', + 'kinesis:ListShards', + 'kinesis:SubscribeToShard', + 'kinesis:DescribeStream', + 'kinesis:ListStreams', + 'kinesis:DescribeStreamConsumer', + ], + Effect: 'Allow', + Resource: 'arn:aws:kinesis:region:account-id:stream/stream-name', + }, + // consumer read permissions + { + Action: [ + 'kinesis:DescribeStreamConsumer', + 'kinesis:SubscribeToShard', + ], + Effect: 'Allow', + Resource: 'arn:aws:kinesis:region:account-id:stream/stream-name/consumer/consumer-name:123456', + }, + ]), + }, + }); + Template.fromStack(stack).resourceCountIs('AWS::Kinesis::ResourcePolicy', 0); + }); + }); + + describe('new stream consumer', () => { + let consumer: StreamConsumer; + + beforeEach(() => { + consumer = new StreamConsumer(stack, 'StreamConsumer', { + streamConsumerName: 'MyStreamConsumer', + stream, + }); + }); + + test('creates stream consumer resource', () => { + Template.fromStack(stack).hasResourceProperties('AWS::Kinesis::StreamConsumer', { + ConsumerName: 'MyStreamConsumer', + StreamARN: stack.resolve(stream.streamArn), + }); + }); + + test('addToResourcePolicy creates a consumer resource policy ', () => { + consumer.addToResourcePolicy(new iam.PolicyStatement({ + actions: ['kinesis:DescribeStreamConsumer'], + principals: [new iam.AnyPrincipal()], + resources: [consumer.streamConsumerArn], + })); + + Template.fromStack(stack).hasResourceProperties('AWS::Kinesis::ResourcePolicy', { + ResourceArn: stack.resolve(consumer.streamConsumerArn), + ResourcePolicy: { + Version: '2012-10-17', + Statement: [ + { + Action: 'kinesis:DescribeStreamConsumer', + Effect: 'Allow', + Principal: { AWS: '*' }, + Resource: stack.resolve(consumer.streamConsumerArn), + }, + ], + }, + }); + }); + + test('grantRead grants stream and consumer permissions', () => { + const user = new iam.User(stack, 'MyUser'); + consumer.grantRead(user); + + Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', { + Users: [stack.resolve(user.userName)], + PolicyDocument: { + Statement: Match.arrayWith([ + // stream read permissions + { + Action: [ + 'kinesis:DescribeStreamSummary', + 'kinesis:GetRecords', + 'kinesis:GetShardIterator', + 'kinesis:ListShards', + 'kinesis:SubscribeToShard', + 'kinesis:DescribeStream', + 'kinesis:ListStreams', + 'kinesis:DescribeStreamConsumer', + ], + Effect: 'Allow', + Resource: stack.resolve(stream.streamArn), + }, + // consumer read permissions + { + Action: [ + 'kinesis:DescribeStreamConsumer', + 'kinesis:SubscribeToShard', + ], + Effect: 'Allow', + Resource: stack.resolve(consumer.streamConsumerArn), + }, + ]), + }, + }); + }); + + test('grantRead when stream/consumer and grantee are in different accounts', () => { + const stackA = new Stack(undefined, 'StackA', { env: { account: '123456789012' } }); + const streamFromStackA = new Stream(stackA, 'Stream', { + streamName: 'MyStream', + }); + const streamConsumerFromStackA = new StreamConsumer(stackA, 'StreamConsumer', { + streamConsumerName: 'MyStreamConsumer', + stream: streamFromStackA, + }); + + const stackB = new Stack(undefined, 'StackB', { env: { account: '234567890123' } }); + const roleFromStackB = new iam.Role(stackB, 'MyRole', { + assumedBy: new iam.AccountPrincipal('234567890123'), + roleName: 'MyRole', + }); + + streamConsumerFromStackA.grantRead(roleFromStackB); + + // Grantee stack has the correct IAM Policy + Template.fromStack(stackB).hasResourceProperties('AWS::IAM::Policy', { + Roles: [stackB.resolve(roleFromStackB.roleName)], + PolicyDocument: { + Statement: Match.arrayWith([ + // stream read permissions + { + Action: [ + 'kinesis:DescribeStreamSummary', + 'kinesis:GetRecords', + 'kinesis:GetShardIterator', + 'kinesis:ListShards', + 'kinesis:SubscribeToShard', + 'kinesis:DescribeStream', + 'kinesis:ListStreams', + 'kinesis:DescribeStreamConsumer', + ], + Effect: 'Allow', + Resource: { + 'Fn::Join': ['', + [ + 'arn:', + { Ref: 'AWS::Partition' }, + ':kinesis:', + { Ref: 'AWS::Region' }, + ':123456789012:stream/MyStream', + ]], + }, + }, + // consumer read permissions + { + Action: [ + 'kinesis:DescribeStreamConsumer', + 'kinesis:SubscribeToShard', + ], + Effect: 'Allow', + Resource: { + 'Fn::Join': ['', + [ + 'arn:', + { Ref: 'AWS::Partition' }, + ':kinesis:', + { Ref: 'AWS::Region' }, + ':123456789012:stream/MyStream/consumer/MyStreamConsumer:*', + ]], + }, + }, + ]), + }, + }); + + // Stream stack + // - has the correct Stream resource policy + Template.fromStack(stackA).hasResourceProperties('AWS::Kinesis::ResourcePolicy', { + ResourceArn: { + 'Fn::GetAtt': ['Stream790BDEE4', 'Arn'], + }, + ResourcePolicy: { + Version: '2012-10-17', + Statement: [ + { + Action: [ + 'kinesis:DescribeStreamSummary', + 'kinesis:GetRecords', + 'kinesis:GetShardIterator', + 'kinesis:ListShards', + 'kinesis:SubscribeToShard', + 'kinesis:DescribeStream', + 'kinesis:ListStreams', + 'kinesis:DescribeStreamConsumer', + ], + Effect: 'Allow', + Principal: { + AWS: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':iam::234567890123:role/MyRole', + ], + ], + }, + }, + Resource: { + 'Fn::GetAtt': ['Stream790BDEE4', 'Arn'], + }, + }, + ], + }, + }); + // - has the correct StreamConsumer resource policy + Template.fromStack(stackA).hasResourceProperties('AWS::Kinesis::ResourcePolicy', { + ResourceArn: { + 'Fn::GetAtt': ['StreamConsumer58240CBA', 'ConsumerARN'], + }, + ResourcePolicy: { + Version: '2012-10-17', + Statement: [ + { + Action: [ + 'kinesis:DescribeStreamConsumer', + 'kinesis:SubscribeToShard', + ], + Effect: 'Allow', + Principal: { + AWS: { + 'Fn::Join': [ + '', + [ + 'arn:', + { + Ref: 'AWS::Partition', + }, + ':iam::234567890123:role/MyRole', + ], + ], + }, + }, + Resource: { + 'Fn::GetAtt': ['StreamConsumer58240CBA', 'ConsumerARN'], + }, + }, + ], + }, + }); + }); + }); +}); diff --git a/packages/aws-cdk-lib/aws-kinesis/test/stream.test.ts b/packages/aws-cdk-lib/aws-kinesis/test/stream.test.ts index c9e7569bdf1d8..682d271970852 100644 --- a/packages/aws-cdk-lib/aws-kinesis/test/stream.test.ts +++ b/packages/aws-cdk-lib/aws-kinesis/test/stream.test.ts @@ -898,6 +898,52 @@ describe('Kinesis data streams', () => { }, }, }); + Template.fromStack(stack).resourceCountIs('AWS::Kinesis::ResourcePolicy', 0); + }), + test('grant when stream/consumer and grantee are in different accounts', () => { + const stackA = new Stack(undefined, 'StackA', { env: { account: '123456789012' } }); + const streamFromStackA = new Stream(stackA, 'Stream', { + streamName: 'MyStream', + }); + + const stackB = new Stack(undefined, 'StackB', { env: { account: '234567890123' } }); + const roleFromStackB = new iam.Role(stackB, 'MyRole', { + assumedBy: new iam.AccountPrincipal('234567890123'), + roleName: 'MyRole', + }); + + streamFromStackA.grant(roleFromStackB, 'kinesis:GetRecords'); + + Template.fromStack(stackA).hasResourceProperties('AWS::Kinesis::ResourcePolicy', { + ResourceArn: stackA.resolve(streamFromStackA.streamArn), + ResourcePolicy: { + Version: '2012-10-17', + Statement: [ + { + Action: 'kinesis:GetRecords', + Effect: 'Allow', + Principal: { + AWS: stackA.resolve(roleFromStackB.roleArn), + }, + Resource: stackA.resolve(streamFromStackA.streamArn), + }, + ], + }, + }); + + Template.fromStack(stackB).hasResourceProperties('AWS::IAM::Policy', { + Roles: [stackB.resolve(roleFromStackB.roleName)], + PolicyDocument: { + Statement: Match.arrayWith([ + // stream read permissions + { + Action: 'kinesis:GetRecords', + Effect: 'Allow', + Resource: stackB.resolve(streamFromStackA.streamArn), + }, + ]), + }, + }); }), test('cross-stack permissions - no encryption', () => { @@ -1304,6 +1350,7 @@ describe('Kinesis data streams', () => { // THEN Template.fromStack(stack).hasResourceProperties('AWS::Kinesis::ResourcePolicy', { + ResourceArn: stack.resolve(stream.streamArn), ResourcePolicy: { Version: '2012-10-17', Statement: [ diff --git a/packages/aws-cdk-lib/awslint.json b/packages/aws-cdk-lib/awslint.json index 987770519230f..51a8a369a47c2 100644 --- a/packages/aws-cdk-lib/awslint.json +++ b/packages/aws-cdk-lib/awslint.json @@ -34,6 +34,7 @@ "docs-public-apis:aws-cdk-lib.TagType", "docs-public-apis:aws-cdk-lib.TagType.*", "module-name:aws-cdk-lib", + "construct-ctor-props-optional:aws-cdk-lib.aws_kinesis.ResourcePolicy", "construct-ctor:aws-cdk-lib.App.", "props-no-cfn-types:aws-cdk-lib.CfnCodeDeployBlueGreenHookProps.applications", "props-no-cfn-types:aws-cdk-lib.CfnCodeDeployBlueGreenHookProps.additionalOptions",