Skip to content

Commit

Permalink
fix(stepfunctions): maxConcurrency does not support JsonPath (aws#29330)
Browse files Browse the repository at this point in the history
### Issue # (if applicable)

Relates to aws#20835 

### Reason for this change

`MaxConcurrency` does not support `JsonPath`. This change adds `MaxConcurrencyPath` so that CDK users can specify a `JsonPath` for their `MaxConcurrency`

_Note_ : This does not invalidate JsonPaths for `MaxConcurrency`, as I'm unsure how to do so without reverting aws#20279 . Open to suggestions

### Description of changes

Added a new `maxConcurrencyPath` field that accepts a `JsonPath` value. Decided to go with another explicit field as it is similar to what is done for `ErrorPath` and `CausePath`, in addition to most other Path fields

### Description of how you validated changes

Added unit tests

### Checklist
- [x] My code adheres to the [CONTRIBUTING GUIDE](https://github.com/aws/aws-cdk/blob/main/CONTRIBUTING.md) and [DESIGN GUIDELINES](https://github.com/aws/aws-cdk/blob/main/docs/DESIGN_GUIDELINES.md)

----

*By submitting this pull request, I confirm that my contribution is made under the terms of the Apache-2.0 license*
  • Loading branch information
abdelnn authored Mar 2, 2024
1 parent c9d8add commit b19f822
Show file tree
Hide file tree
Showing 10 changed files with 94 additions and 14 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
"StateMachine2E01A3A5": {
"Type": "AWS::StepFunctions::StateMachine",
"Properties": {
"DefinitionString": "{\"StartAt\":\"My-Map-State\",\"States\":{\"My-Map-State\":{\"Type\":\"Map\",\"End\":true,\"Parameters\":{\"foo\":\"foo\",\"bar.$\":\"$.bar\"},\"ItemsPath\":\"$.inputForMap\",\"ItemProcessor\":{\"ProcessorConfig\":{\"Mode\":\"DISTRIBUTED\",\"ExecutionType\":\"STANDARD\"},\"StartAt\":\"Pass State\",\"States\":{\"Pass State\":{\"Type\":\"Pass\",\"End\":true}}},\"MaxConcurrency\":1}},\"TimeoutSeconds\":30}",
"DefinitionString": "{\"StartAt\":\"My-Map-State\",\"States\":{\"My-Map-State\":{\"Type\":\"Map\",\"End\":true,\"Parameters\":{\"foo\":\"foo\",\"bar.$\":\"$.bar\"},\"ItemsPath\":\"$.inputForMap\",\"ItemProcessor\":{\"ProcessorConfig\":{\"Mode\":\"DISTRIBUTED\",\"ExecutionType\":\"STANDARD\"},\"StartAt\":\"Pass State\",\"States\":{\"Pass State\":{\"Type\":\"Pass\",\"End\":true}}},\"MaxConcurrencyPath\":\"$.maxConcurrency\"}},\"TimeoutSeconds\":30}",
"RoleArn": {
"Fn::GetAtt": [
"StateMachineRoleB840431D",
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const stack = new cdk.Stack(app, 'cdk-stepfunctions-map-distributed-stack');

const map = new sfn.Map(stack, 'Map', {
stateName: 'My-Map-State',
maxConcurrency: 1,
maxConcurrencyPath: sfn.JsonPath.stringAt('$.maxConcurrency'),
itemsPath: sfn.JsonPath.stringAt('$.inputForMap'),
parameters: {
foo: 'foo',
Expand Down
24 changes: 23 additions & 1 deletion packages/aws-cdk-lib/aws-stepfunctions/lib/states/map-base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,24 @@ export interface MapBaseProps {
*
* An upper bound on the number of iterations you want running at once.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-inline.html#map-state-inline-additional-fields
*
* @default - full concurrency
*/
readonly maxConcurrency?: number;

/**
* MaxConcurrencyPath
*
* A JsonPath that specifies the maximum concurrency dynamically from the state input.
*
* @see
* https://docs.aws.amazon.com/step-functions/latest/dg/concepts-asl-use-map-state-inline.html#map-state-inline-additional-fields
*
* @default - full concurrency
*/
readonly maxConcurrencyPath?: string;
}

/**
Expand Down Expand Up @@ -122,13 +137,15 @@ export abstract class MapBase extends State implements INextable {
public readonly endStates: INextable[];

private readonly maxConcurrency?: number;
private readonly maxConcurrencyPath?: string;
protected readonly itemsPath?: string;
protected readonly itemSelector?: { [key: string]: any };

constructor(scope: Construct, id: string, props: MapBaseProps = {}) {
super(scope, id, props);
this.endStates = [this];
this.maxConcurrency = props.maxConcurrency;
this.maxConcurrencyPath = props.maxConcurrencyPath;
this.itemsPath = props.itemsPath;
this.itemSelector = props.itemSelector;
}
Expand Down Expand Up @@ -156,7 +173,8 @@ export abstract class MapBase extends State implements INextable {
...this.renderItemsPath(),
...this.renderItemSelector(),
...this.renderItemProcessor(),
MaxConcurrency: this.maxConcurrency,
...(this.maxConcurrency && { MaxConcurrency: this.maxConcurrency }),
...(this.maxConcurrencyPath && { MaxConcurrencyPath: renderJsonPath(this.maxConcurrencyPath) }),
};
}

Expand All @@ -174,6 +192,10 @@ export abstract class MapBase extends State implements INextable {
errors.push('maxConcurrency has to be a positive integer');
}

if (this.maxConcurrency && this.maxConcurrencyPath) {
errors.push('Provide either `maxConcurrency` or `maxConcurrencyPath`, but not both');
}

return errors;
}

Expand Down
58 changes: 58 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/test/map.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,49 @@ describe('Map State', () => {
});
}),

test('State Machine With Map State and MaxConcurrencyPath', () => {
// GIVEN
const stack = new cdk.Stack();

// WHEN
const map = new stepfunctions.Map(stack, 'Map State', {
stateName: 'My-Map-State',
maxConcurrencyPath: stepfunctions.JsonPath.stringAt('$.maxConcurrencyPath'),
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
parameters: {
foo: 'foo',
bar: stepfunctions.JsonPath.stringAt('$.bar'),
},
});
map.iterator(new stepfunctions.Pass(stack, 'Pass State'));

// THEN
expect(render(map)).toStrictEqual({
StartAt: 'My-Map-State',
States: {
'My-Map-State': {
Type: 'Map',
End: true,
Parameters: {
'foo': 'foo',
'bar.$': '$.bar',
},
Iterator: {
StartAt: 'Pass State',
States: {
'Pass State': {
Type: 'Pass',
End: true,
},
},
},
ItemsPath: '$.inputForMap',
MaxConcurrencyPath: '$.maxConcurrencyPath',
},
},
});
}),

test('State Machine With Map State and ResultSelector', () => {
// GIVEN
const stack = new cdk.Stack();
Expand Down Expand Up @@ -395,6 +438,21 @@ describe('Map State', () => {
expect(() => app.synth()).toThrow(/maxConcurrency has to be a positive integer/);
}),

test('fails in synthesis when maxConcurrency and maxConcurrencyPath are both defined', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
maxConcurrency: 1,
maxConcurrencyPath: stepfunctions.JsonPath.stringAt('$.maxConcurrencyPath'),
itemsPath: stepfunctions.JsonPath.stringAt('$.inputForMap'),
});
map.iterator(new stepfunctions.Pass(stack, 'Pass State'));

return map;
});

expect(() => app.synth()).toThrow(/Provide either `maxConcurrency` or `maxConcurrencyPath`, but not both/);
}),

test('does not fail synthesis when maxConcurrency is a jsonPath', () => {
const app = createAppWithMap((stack) => {
const map = new stepfunctions.Map(stack, 'Map State', {
Expand Down

0 comments on commit b19f822

Please sign in to comment.