Skip to content

Commit

Permalink
adding doc for integrationpattern
Browse files Browse the repository at this point in the history
  • Loading branch information
Rizbir Fahmid Khan committed Feb 7, 2024
1 parent 0e6e37e commit 86999f3
Show file tree
Hide file tree
Showing 2 changed files with 111 additions and 40 deletions.
16 changes: 16 additions & 0 deletions packages/aws-cdk-lib/aws-stepfunctions/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,22 @@ const submitJob = new tasks.LambdaInvoke(this, 'Submit Job', {
See [the AWS documentation](https://docs.aws.amazon.com/step-functions/latest/dg/concepts-access-cross-acct-resources.html)
to learn more about AWS Step Functions support for accessing resources in other AWS accounts.

## Service Integration Patterns

AWS Step functions integrate directly with other services, either through an optimised integration pattern, or through the AWS SDK.
Therefore, it is possible to change the `integrationPattern` of services, to enable additional functionality of the said AWS Service:

```ts
import * as glue from "@aws-cdk/aws-glue-alpha";

declare const submitGlue: glue.Job;

const submitJob = new tasks.GlueStartJobRun(this, "Submit Job", {
glueJobName: submitGlue.jobName,
integrationPattern: sfn.IntegrationPattern.RUN_JOB,
});
```

## State Machine Fragments

It is possible to define reusable (or abstracted) mini-state machines by
Expand Down
135 changes: 95 additions & 40 deletions packages/aws-cdk-lib/aws-stepfunctions/lib/states/task-base.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
import { Construct } from 'constructs';
import { renderJsonPath, State } from './state';
import * as cloudwatch from '../../../aws-cloudwatch';
import * as iam from '../../../aws-iam';
import * as cdk from '../../../core';
import { Chain } from '../chain';
import { FieldUtils } from '../fields';
import { StateGraph } from '../state-graph';
import { Credentials } from '../task-credentials';
import { CatchProps, IChainable, INextable, RetryProps } from '../types';
import { Construct } from "constructs";
import { renderJsonPath, State } from "./state";
import * as cloudwatch from "../../../aws-cloudwatch";
import * as iam from "../../../aws-iam";
import * as cdk from "../../../core";
import { Chain } from "../chain";
import { FieldUtils } from "../fields";
import { StateGraph } from "../state-graph";
import { Credentials } from "../task-credentials";
import { CatchProps, IChainable, INextable, RetryProps } from "../types";

/**
* Props that are common to all tasks
Expand Down Expand Up @@ -111,9 +111,11 @@ export interface TaskStateBaseProps {

/**
* AWS Step Functions integrates with services directly in the Amazon States Language.
* You can control these AWS services using service integration patterns
* You can control these AWS services using service integration patterns.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token
* Depending on the AWS Service, the Service Integration Pattern availability will vary.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-supported-services.html
*
* @default - `IntegrationPattern.REQUEST_RESPONSE` for most tasks.
* `IntegrationPattern.RUN_JOB` for the following exceptions:
Expand Down Expand Up @@ -144,7 +146,6 @@ export interface TaskStateBaseProps {
* which are more convenient to use.
*/
export abstract class TaskStateBase extends State implements INextable {

public readonly endStates: INextable[];

protected abstract readonly taskMetrics?: TaskMetricsConfig;
Expand Down Expand Up @@ -214,12 +215,15 @@ export abstract class TaskStateBase extends State implements INextable {
*
* @default - sum over 5 minutes
*/
public metric(metricName: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric {
public metric(
metricName: string,
props?: cloudwatch.MetricOptions
): cloudwatch.Metric {
return new cloudwatch.Metric({
namespace: 'AWS/States',
namespace: "AWS/States",
metricName,
dimensionsMap: this.taskMetrics?.metricDimensions,
statistic: 'sum',
statistic: "sum",
...props,
}).attachTo(this);
}
Expand All @@ -230,16 +234,25 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - average over 5 minutes
*/
public metricRunTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixSingular, 'RunTime', { statistic: 'avg', ...props });
return this.taskMetric(this.taskMetrics?.metricPrefixSingular, "RunTime", {
statistic: "avg",
...props,
});
}

/**
* The interval, in milliseconds, for which the activity stays in the schedule state.
*
* @default - average over 5 minutes
*/
public metricScheduleTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixSingular, 'ScheduleTime', { statistic: 'avg', ...props });
public metricScheduleTime(
props?: cloudwatch.MetricOptions
): cloudwatch.Metric {
return this.taskMetric(
this.taskMetrics?.metricPrefixSingular,
"ScheduleTime",
{ statistic: "avg", ...props }
);
}

/**
Expand All @@ -248,7 +261,10 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - average over 5 minutes
*/
public metricTime(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixSingular, 'Time', { statistic: 'avg', ...props });
return this.taskMetric(this.taskMetrics?.metricPrefixSingular, "Time", {
statistic: "avg",
...props,
});
}

/**
Expand All @@ -257,7 +273,11 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - sum over 5 minutes
*/
public metricScheduled(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixPlural, 'Scheduled', props);
return this.taskMetric(
this.taskMetrics?.metricPrefixPlural,
"Scheduled",
props
);
}

/**
Expand All @@ -266,7 +286,11 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - sum over 5 minutes
*/
public metricTimedOut(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixPlural, 'TimedOut', props);
return this.taskMetric(
this.taskMetrics?.metricPrefixPlural,
"TimedOut",
props
);
}

/**
Expand All @@ -275,7 +299,11 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - sum over 5 minutes
*/
public metricStarted(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixPlural, 'Started', props);
return this.taskMetric(
this.taskMetrics?.metricPrefixPlural,
"Started",
props
);
}

/**
Expand All @@ -284,7 +312,11 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - sum over 5 minutes
*/
public metricSucceeded(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixPlural, 'Succeeded', props);
return this.taskMetric(
this.taskMetrics?.metricPrefixPlural,
"Succeeded",
props
);
}

/**
Expand All @@ -293,16 +325,26 @@ export abstract class TaskStateBase extends State implements INextable {
* @default - sum over 5 minutes
*/
public metricFailed(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixPlural, 'Failed', props);
return this.taskMetric(
this.taskMetrics?.metricPrefixPlural,
"Failed",
props
);
}

/**
* Metric for the number of times the heartbeat times out for this activity
*
* @default - sum over 5 minutes
*/
public metricHeartbeatTimedOut(props?: cloudwatch.MetricOptions): cloudwatch.Metric {
return this.taskMetric(this.taskMetrics?.metricPrefixPlural, 'HeartbeatTimedOut', props);
public metricHeartbeatTimedOut(
props?: cloudwatch.MetricOptions
): cloudwatch.Metric {
return this.taskMetric(
this.taskMetrics?.metricPrefixPlural,
"HeartbeatTimedOut",
props
);
}

protected whenBoundToGraph(graph: StateGraph) {
Expand All @@ -311,11 +353,13 @@ export abstract class TaskStateBase extends State implements INextable {
graph.registerPolicyStatement(policyStatement);
}
if (this.credentials) {
graph.registerPolicyStatement(new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ['sts:AssumeRole'],
resources: [this.credentials.role.resource],
}));
graph.registerPolicyStatement(
new iam.PolicyStatement({
effect: iam.Effect.ALLOW,
actions: ["sts:AssumeRole"],
resources: [this.credentials.role.resource],
})
);
}
}

Expand All @@ -324,24 +368,35 @@ export abstract class TaskStateBase extends State implements INextable {
*/
protected abstract _renderTask(): any;

private taskMetric(prefix: string | undefined, suffix: string, props?: cloudwatch.MetricOptions): cloudwatch.Metric {
private taskMetric(
prefix: string | undefined,
suffix: string,
props?: cloudwatch.MetricOptions
): cloudwatch.Metric {
if (prefix === undefined) {
throw new Error('Task does not expose metrics. Use the \'metric()\' function to add metrics.');
throw new Error(
"Task does not expose metrics. Use the 'metric()' function to add metrics."
);
}
return this.metric(prefix + suffix, props);
}

private renderCredentials() {
return this.credentials ? FieldUtils.renderObject({ Credentials: { RoleArn: this.credentials.role.roleArn } }) : undefined;
return this.credentials
? FieldUtils.renderObject({
Credentials: { RoleArn: this.credentials.role.roleArn },
})
: undefined;
}

private renderTaskBase() {
return {
Type: 'Task',
Type: "Task",
Comment: this.comment,
TimeoutSeconds: this.timeout?.toSeconds() ?? this.taskTimeout?.seconds,
TimeoutSecondsPath: this.taskTimeout?.path,
HeartbeatSeconds: this.heartbeat?.toSeconds() ?? this.heartbeatTimeout?.seconds,
HeartbeatSeconds:
this.heartbeat?.toSeconds() ?? this.heartbeatTimeout?.seconds,
HeartbeatSecondsPath: this.heartbeatTimeout?.path,
InputPath: renderJsonPath(this.inputPath),
OutputPath: renderJsonPath(this.outputPath),
Expand Down Expand Up @@ -392,22 +447,22 @@ export enum IntegrationPattern {
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-default
*/
REQUEST_RESPONSE = 'REQUEST_RESPONSE',
REQUEST_RESPONSE = "REQUEST_RESPONSE",

/**
* Step Functions can wait for a request to complete before progressing to the next state.
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-sync
*/
RUN_JOB = 'RUN_JOB',
RUN_JOB = "RUN_JOB",

/**
* Callback tasks provide a way to pause a workflow until a task token is returned.
* You must set a task token when using the callback pattern
*
* @see https://docs.aws.amazon.com/step-functions/latest/dg/connect-to-resource.html#connect-wait-token
*/
WAIT_FOR_TASK_TOKEN = 'WAIT_FOR_TASK_TOKEN',
WAIT_FOR_TASK_TOKEN = "WAIT_FOR_TASK_TOKEN",
}

/**
Expand Down

0 comments on commit 86999f3

Please sign in to comment.