Skip to content

Commit

Permalink
integration test with timestamp
Browse files Browse the repository at this point in the history
  • Loading branch information
msambol committed Mar 28, 2024
1 parent a2673e4 commit f7c12af
Show file tree
Hide file tree
Showing 9 changed files with 57 additions and 33 deletions.
17 changes: 12 additions & 5 deletions packages/@aws-cdk/aws-pipes-sources-alpha/lib/kinesis.ts
Original file line number Diff line number Diff line change
Expand Up @@ -87,14 +87,14 @@ export interface KinesisSourceParameters {

/**
* With StartingPosition set to AT_TIMESTAMP, the time from which to start reading, in Unix time seconds.
*
*
* @example
* 1711576897
* '2025-01-01T00:00:00Z'
*
* @see https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-pipes-pipe-pipesourcekinesisstreamparameters.html#cfn-pipes-pipe-pipesourcekinesisstreamparameters-startingpositiontimestamp
* @default - no starting position timestamp
*/
readonly startingPositionTimestamp?: number;
readonly startingPositionTimestamp?: string;
}

/**
Expand All @@ -112,6 +112,8 @@ export class KinesisSource implements ISource {
private parallelizationFactor;
private deadLetterTarget;
private deadLetterTargetArn;
private startingPosition;
private startingPositionTimestamp;

constructor(stream: IStream, parameters: KinesisSourceParameters) {
this.stream = stream;
Expand All @@ -124,6 +126,8 @@ export class KinesisSource implements ISource {
this.maximumRetryAttempts = this.sourceParameters.maximumRetryAttempts;
this.parallelizationFactor = this.sourceParameters.parallelizationFactor;
this.deadLetterTarget = this.sourceParameters.deadLetterTarget;
this.startingPosition = this.sourceParameters.startingPosition;
this.sourceParameters = this.sourceParameters.startingPositionTimestamp;

if (this.batchSize !== undefined) {
if (this.batchSize < 1 || this.batchSize > 10000) {
Expand Down Expand Up @@ -151,6 +155,9 @@ export class KinesisSource implements ISource {
throw new Error(`Parallelization factor must be between 1 and 10, received ${this.parallelizationFactor}`);
}
}
if (this.startingPositionTimestamp && this.startingPosition !== KinesisStartingPosition.AT_TIMESTAMP) {
throw new Error(`Timestamp only valid with StartingPosition AT_TIMESTAMP for Kinesis streams, received ${this.startingPosition}`);
}

if (this.deadLetterTarget instanceof Queue) {
this.deadLetterTargetArn = this.deadLetterTarget.queueArn;
Expand All @@ -170,8 +177,8 @@ export class KinesisSource implements ISource {
maximumRetryAttempts: this.maximumRetryAttempts,
onPartialBatchItemFailure: this.sourceParameters.onPartialBatchItemFailure,
parallelizationFactor: this.sourceParameters.parallelizationFactor,
startingPosition: this.sourceParameters.startingPosition,
startingPositionTimestamp: this.sourceParameters.startingPositionTimestamp?.toString(),
startingPosition: this.startingPosition,
startingPositionTimestamp: this.startingPositionTimestamp,
},
},
};
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,8 @@
"MaximumRetryAttempts": 3,
"OnPartialBatchItemFailure": "AUTOMATIC_BISECT",
"ParallelizationFactor": 1,
"StartingPosition": "LATEST"
"StartingPosition": "AT_TIMESTAMP",
"StartingPositionTimestamp": "2024-01-01T00:00:00Z"
}
},
"Target": {
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@ const sourceUnderTest = new KinesisSource(sourceKinesisStream, {
maximumRetryAttempts: 3,
onPartialBatchItemFailure: OnPartialBatchItemFailure.AUTOMATIC_BISECT,
parallelizationFactor: 1,
startingPosition: KinesisStartingPosition.LATEST,
startingPositionTimestamp: 1711576897,
startingPosition: KinesisStartingPosition.AT_TIMESTAMP,
startingPositionTimestamp: '2024-01-01T00:00:00Z',
});

new Pipe(stack, 'Pipe', {
Expand Down
23 changes: 19 additions & 4 deletions packages/@aws-cdk/aws-pipes-sources-alpha/test/kinesis.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ describe('kinesis source', () => {
onPartialBatchItemFailure: OnPartialBatchItemFailure.AUTOMATIC_BISECT,
parallelizationFactor: 10,
startingPosition: KinesisStartingPosition.LATEST,
startingPositionTimestamp: 1711576897,
startingPositionTimestamp: '2024-01-01T00:00:00Z',
});

new Pipe(stack, 'MyPipe', {
Expand Down Expand Up @@ -87,7 +87,7 @@ describe('kinesis source', () => {
OnPartialBatchItemFailure: 'AUTOMATIC_BISECT',
ParallelizationFactor: 10,
StartingPosition: 'LATEST',
StartingPositionTimestamp: '1711576897',
StartingPositionTimestamp: '2024-01-01T00:00:00Z',
},
},
});
Expand All @@ -108,7 +108,7 @@ describe('kinesis source', () => {
onPartialBatchItemFailure: OnPartialBatchItemFailure.AUTOMATIC_BISECT,
parallelizationFactor: 10,
startingPosition: KinesisStartingPosition.LATEST,
startingPositionTimestamp: 1711576897,
startingPositionTimestamp: '2024-01-01T00:00:00Z',
});

new Pipe(stack, 'MyPipe', {
Expand Down Expand Up @@ -144,7 +144,7 @@ describe('kinesis source', () => {
OnPartialBatchItemFailure: 'AUTOMATIC_BISECT',
ParallelizationFactor: 10,
StartingPosition: 'LATEST',
StartingPositionTimestamp: '1711576897',
StartingPositionTimestamp: '2024-01-01T00:00:00Z',
},
},
});
Expand Down Expand Up @@ -356,4 +356,19 @@ describe('kinesis source parameters validation', () => {
});
}).toThrow('Parallelization factor must be between 1 and 10, received 11');
});

test('timestamp provided and starting position not set to AT_TIMESTAMP should throw', () => {
// GIVEN
const app = new App();
const stack = new Stack(app, 'demo-stack');
const stream = new Stream(stack, 'MyStream', {});

// WHEN
expect(() => {
new KinesisSource(stream, {
startingPosition: KinesisStartingPosition.LATEST,
startingPositionTimestamp: '2024-01-01T00:00:00Z',
});
}).toThrow('Timestamp only valid with StartingPosition AT_TIMESTAMP for Kinesis streams, received KinesisStartingPosition.LATEST');
});
});

0 comments on commit f7c12af

Please sign in to comment.