Skip to content

Commit

Permalink
feat(pipes): add pipes base implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
RaphaelManke committed Dec 17, 2023
1 parent ecbbea4 commit b16b69c
Show file tree
Hide file tree
Showing 11 changed files with 657 additions and 6 deletions.
45 changes: 45 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/enrichment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import { IRole } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';

/**
* Enrichment step to enhance the data from the source before sending it to the target.
*/
export interface IPipeEnrichment {
/**
* The ARN of the enrichment resource.
*
* Length Constraints: Minimum length of 0. Maximum length of 1600.
*/
readonly enrichmentArn: string;

/**
* The parameters required to set up enrichment on your pipe.
*/
readonly enrichmentParameters: CfnPipe.PipeEnrichmentParametersProperty;

/**
* Grant the pipes role to invoke the enrichment.
*/
grantInvoke(grantee: IRole): void;
}

/**
* Enrichment step to enhance the data from the source before sending it to the target.
*/
export abstract class PipeEnrichment implements IPipeEnrichment {
public readonly enrichmentArn: string;
public readonly enrichmentParameters: CfnPipe.PipeEnrichmentParametersProperty;

constructor(
enrichmentArn: string,
props: CfnPipe.PipeEnrichmentParametersProperty,
) {
this.enrichmentParameters = props;
// TODO - validate ARN is a valid enrichment ARN based on regex from cfn
this.enrichmentArn = enrichmentArn;
}
/**
* Grant the pipes role to invoke the enrichment.
*/
abstract grantInvoke(grantee: IRole): void;
}
6 changes: 6 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
export * from './pipe';
export * from './enrichment';
export * from './inputTransformation';
export * from './source';
export * from './sourceFilter';
export * from './target';
export * from './targetParameter';
55 changes: 55 additions & 0 deletions packages/@aws-cdk/aws-pipes-alpha/lib/inputTransformation.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@

enum reservedVariables {
PIPES_ARN = '<aws.pipes.pipe-arn>',
PIPES_NAME = '<aws.pipes.pipe-name>',
PIPES_TARGET_ARN = '<aws.pipes.target-arn>',
PIPE_EVENT_INGESTION_TIME = '<aws.pipes.event.ingestion-time>',
PIPE_EVENT = '<aws.pipes.event>',
PIPE_EVENT_JSON = '<aws.pipes.event.json>',
}

type StaticString = string;
// type JsonPath = `<$.${string}>`;
type KeyValue = Record<string, string | reservedVariables>;
type StaticJsonFlat = Record<string, StaticString | KeyValue>;
type InputTransformJson = Record<string, StaticString | KeyValue | StaticJsonFlat>;

type PipeInputTransformationValue = StaticString | InputTransformJson;

/**
* Transform or replace the input event payload
*/
export interface IInputTransformation {
/**
* Valid JSON text passed to the target.
*/
inputTemplate: string;
}

/**
* Transform or replace the input event payload
*/
export class PipeInputTransformation implements IInputTransformation {
/**
* Builds an input transformation from a JSON object.
* @param inputTemplate
* @returns
*/
static fromJson(inputTemplate: Record<string, any>): PipeInputTransformation {
return new PipeInputTransformation(inputTemplate);
}

inputTemplate: string;

constructor(inputTemplate: PipeInputTransformationValue) {
this.inputTemplate = this.unquoteKeyPlaceholders(inputTemplate);
}

private unquoteKeyPlaceholders(obj: any) {
const jsonString = JSON.stringify(obj);

const result = jsonString.replace(/"(<(?:\$\.|aws\.pipes)[^"]*?)"/g, '$1'); // Retain the "<>" and remove the outer quotes for the values that start with either "<$." or "<aws.pipes"

return result;
}
}
167 changes: 163 additions & 4 deletions packages/@aws-cdk/aws-pipes-alpha/lib/pipe.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,39 @@
import { IResource, Resource } from 'aws-cdk-lib';
import { IRole, Role, ServicePrincipal } from 'aws-cdk-lib/aws-iam';
import { CfnPipe } from 'aws-cdk-lib/aws-pipes';
import { Construct } from 'constructs';
import { IPipeEnrichment } from './enrichment';
import { IPipeSource } from './source';
import { IPipeSourceFilter } from './sourceFilter';
import { IPipeTarget } from './target';

/**
* Interface representing a created or an imported `Pipe`.
*/
export interface IPipe extends IResource {
/**
* The name of the pipe
*
* @attribute
* @link https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html#cfn-pipes-pipe-name
*/
readonly pipeName: string;

/**
* The ARN of the pipe
*
* @attribute
* @link https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html#Arn-fn::getatt
*/
readonly pipeArn: string;

/**
* The role used by the pipe
*
* @attribute
*/
readonly pipeRole: IRole;

}

/**
Expand All @@ -16,15 +45,145 @@ export interface PipeProps {
*/
readonly pipeName: string

}
/**
* The state the pipe should be in.
*/
export enum DesiredState {
/**
* The pipe should be running.
*/
RUNNING = 'RUNNING',
/**
* The pipe should be stopped.
* */
STOPPED = 'STOPPED',
}

export interface PipeProps {
/**
* The source of the pipe
*/
readonly source: IPipeSource;
/**
* The filter pattern for the pipe source
* @default - no filter
*/
readonly filter?: IPipeSourceFilter;
/**
* Enrichment step to enhance the data from the source before sending it to the target.
* @default - no enrichment
*/
readonly enrichment?: IPipeEnrichment;
/**
* The target of the pipe
*/
readonly target: IPipeTarget;
/**
* Name of the pipe in the AWS console
*
* @link http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html#cfn-pipes-pipe-name
*
* @default - automatically generated name
*/
readonly name?: string;
/**
* The role used by the pipe which has permissions to read from the source and write to the target.
* If an enriched target is used, the role also have permissions to call the enriched target.
* If no role is provided, a role will be created.
*
* @default - a new role will be created.
*/
readonly role?: IRole;
/**
* A description of the pipe displayed in the AWS console
*
* @link http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html#cfn-pipes-pipe-description
*
* @default - no description
*/
readonly description?: string;
/**
* The desired state of the pipe. If the state is set to STOPPED, the pipe will not process events.
*
* @link https://docs.aws.amazon.com/eventbridge/latest/pipes-reference/API_Pipe.html#eventbridge-Type-Pipe-DesiredState
*
* @default - DesiredState.RUNNING
*/
readonly desiredState?: DesiredState;
/**
* `AWS::Pipes::Pipe.Tags`
*
* @link http://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-pipes-pipe.html#cfn-pipes-pipe-tags
*
* @default - no tags
*/
readonly tags?: {
[key: string]: string;
};
}

abstract class PipeBase extends Resource implements IPipe {
public abstract readonly pipeName: string;
public abstract readonly pipeArn: string;
public abstract readonly pipeRole: IRole;

}

/**
* An EventBridge Pipe
*/
export class Pipe extends Resource implements IPipe {
*/
export class Pipe extends PipeBase {
/**
* Reference an existing pipe by ARN
*/
public static fromPipeArn(scope: Construct, id: string, pipeArn: string): IPipe {
// TODO implement fromPipeArn function
throw Error(`Not implemented! ${scope} ${id} ${pipeArn}`);
}

public readonly pipeName: string;
public readonly pipeArn: string;
public readonly pipeRole: IRole;

constructor(scope: Construct, id: string, props: PipeProps) {
super(scope, id, {
physicalName: props.pipeName,
const pipeName =
props.name;
super(scope, id, { physicalName: pipeName });

this.pipeRole =
props.role ||
new Role(this, 'Role', {
assumedBy: new ServicePrincipal('pipes.amazonaws.com'),
});

const sourceParameters = {
...props.source.sourceParameters,
filterCriteria: props.filter,
};
if (props.enrichment) {
props.enrichment.grantInvoke(this.pipeRole);
}
props.source.grantRead(this.pipeRole);
props.target.grantPush(this.pipeRole);

const resource = new CfnPipe(this, 'Resource', {
name: props.name,
source: props.source.sourceArn,
sourceParameters: sourceParameters,
target: props.target.targetArn,
targetParameters: props.target.targetParameters,
enrichment: props.enrichment?.enrichmentArn,
roleArn: this.pipeRole.roleArn,
description: props.description,
desiredState: props.desiredState,
enrichmentParameters: props.enrichment?.enrichmentParameters,
tags: props.tags,
});

this.pipeName = resource.ref;
this.pipeArn = resource.attrArn;

}

}
Loading

0 comments on commit b16b69c

Please sign in to comment.