-
Notifications
You must be signed in to change notification settings - Fork 493
/
Airflow_CloudFormation.yaml
267 lines (264 loc) · 8.79 KB
/
Airflow_CloudFormation.yaml
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
AWSTemplateFormatVersion: '2010-09-09'
Description: Airflow server backed by Postgres RDS
Parameters:
KeyName:
Description: Name of an existing EC2 KeyPair to enable SSH access into the Airflow web server
Type: AWS::EC2::KeyPair::KeyName
ConstraintDescription: Must be the name of an existing EC2 KeyPair
S3BucketName:
Description: REQUIRED - A new S3 Bucket name. This bucket will be used to read and write the Movielens dataset.
Type: String
AllowedPattern: '.+'
DBPassword:
Default: airflowpassword
NoEcho: 'true'
Description: Airflow database admin account password
Type: String
MinLength: '8'
MaxLength: '41'
AllowedPattern: '[a-zA-Z0-9]*'
ConstraintDescription: Must contain only alphanumeric characters
# Mapping to find the Amazon Linux AMI in each region.
Mappings:
RegionMap:
us-east-1:
AMI: ami-97785bed
us-east-2:
AMI: ami-f63b1193
us-west-1:
AMI: ami-824c4ee2
us-west-2:
AMI: ami-f2d3638a
ca-central-1:
AMI: ami-a954d1cd
eu-west-1:
AMI: ami-d834aba1
eu-west-2:
AMI: ami-403e2524
eu-west-3:
AMI: ami-8ee056f3
eu-central-1:
AMI: ami-5652ce39
sa-east-1:
AMI: ami-84175ae8
ap-south-1:
AMI: ami-531a4c3c
ap-southeast-1:
AMI: ami-68097514
ap-southeast-2:
AMI: ami-942dd1f6
ap-northeast-1:
AMI: ami-ceafcba8
ap-northeast-2:
AMI: ami-863090e8
Resources:
EC2Instance:
Type: AWS::EC2::Instance
Properties:
KeyName: !Ref 'KeyName'
SecurityGroups: [!Ref 'AirflowEC2SecurityGroup']
InstanceType: 'm4.xlarge'
IamInstanceProfile:
Ref: EC2InstanceProfile
Tags:
-
Key: Name
Value: Airflow
ImageId: !FindInMap
- RegionMap
- !Ref 'AWS::Region'
- AMI
UserData:
Fn::Base64: !Sub |
#!/bin/bash
set -x
exec > >(tee /var/log/user-data.log|logger -t user-data ) 2>&1
# Get the latest CloudFormation package
echo "Installing aws-cfn"
yum install -y aws-cfn-bootstrap
# Start cfn-init
/opt/aws/bin/cfn-init -v -c install --stack ${AWS::StackId} --resource EC2Instance --region ${AWS::Region}
# Download and unzip the Movielens dataset
wget http://files.grouplens.org/datasets/movielens/ml-latest.zip && unzip ml-latest.zip
# Upload the movielens dataset files to the S3 bucket
aws s3 cp ml-latest s3://${S3BucketName} --recursive
# Install git
sudo yum install -y git
# Clone the git repository
git clone https://github.com/aws-samples/aws-concurrent-data-orchestration-pipeline-emr-livy.git
sudo pip install boto3
# Install airflow using pip
echo "Install Apache Airflow"
sudo SLUGIFY_USES_TEXT_UNIDECODE=yes pip install -U apache-airflow
# Encrypt connection passwords in metadata db
sudo pip install apache-airflow[crypto]
# Postgres operators and hook, support as an Airflow backend
sudo pip install apache-airflow[postgres]
sudo -H pip install six==1.10.0
sudo pip install --upgrade six
sudo pip install markupsafe
sudo pip install --upgrade MarkupSafe
echo 'export PATH=/usr/local/bin:$PATH' >> /root/.bash_profile
source /root/.bash_profile
# Initialize Airflow
airflow initdb
# Update the RDS connection in the Airflow Config file
sed -i '/sql_alchemy_conn/s/^/#/g' ~/airflow/airflow.cfg
sed -i '/sql_alchemy_conn/ a sql_alchemy_conn = postgresql://airflow:${DBPassword}@${DBInstance.Endpoint.Address}:${DBInstance.Endpoint.Port}/airflowdb' ~/airflow/airflow.cfg
# Update the type of executor in the Airflow Config file
sed -i '/executor = SequentialExecutor/s/^/#/g' ~/airflow/airflow.cfg
sed -i '/executor = SequentialExecutor/ a executor = LocalExecutor' ~/airflow/airflow.cfg
airflow initdb
# Move all the files to the ~/airflow directory. The Airflow config file is setup to hold all the DAG related files in the ~/airflow/ folder.
mv aws-concurrent-data-orchestration-pipeline-emr-livy/* ~/airflow/
# Delete the higher-level git repository directory
rm -rf aws-concurrent-data-orchestration-pipeline-emr-livy
# Replace the name of the S3 bucket in each of the .scala files. CHANGE THE HIGHLIGHTED PORTION BELOW TO THE NAME OF THE S3 BUCKET YOU CREATED IN STEP 1. The below command replaces the instance of the string ‘<s3-bucket>’ in each of the scripts to the name of the actual bucket.
sed -i 's/<s3-bucket>/${S3BucketName}/g' /root/airflow/dags/transform/*
# Run Airflow webserver
airflow webserver
Metadata:
AWS::CloudFormation::Init:
configSets:
install:
- gcc
gcc:
packages:
yum:
gcc: []
DependsOn:
- DBInstance
- AirflowEC2SecurityGroup
DBInstance:
Type: AWS::RDS::DBInstance
DeletionPolicy: Delete
Properties:
DBName: airflowdb
Engine: postgres
MasterUsername: airflow
MasterUserPassword: !Ref 'DBPassword'
DBInstanceClass: db.t2.small
AllocatedStorage: 5
DBSecurityGroups:
- Ref: DBSecurityGroup
AirflowEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: AirflowEC2SG
GroupDescription: Enable HTTP access via port 80 + SSH access
SecurityGroupIngress:
- IpProtocol: tcp
FromPort: 80
ToPort: 80
CidrIp: 0.0.0.0/0
- IpProtocol: tcp
FromPort: 8080
ToPort: 8080
CidrIp: 0.0.0.0/0
- IpProtocol: tcp
FromPort: 22
ToPort: 22
CidrIp: 0.0.0.0/0
AirflowEMRMasterEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: AirflowEMRMasterSG
GroupDescription: Airflow EMR Master SG
DependsOn:
- AirflowEC2SecurityGroup
AirflowEMRMasterInboundRule:
Type: AWS::EC2::SecurityGroupIngress
Properties:
IpProtocol: tcp
FromPort: '8998'
ToPort: '8998'
SourceSecurityGroupName: !Ref 'AirflowEC2SecurityGroup'
GroupName: !Ref 'AirflowEMRMasterEC2SecurityGroup'
AirflowEMRSlaveEC2SecurityGroup:
Type: AWS::EC2::SecurityGroup
Properties:
GroupName: AirflowEMRSlaveSG
GroupDescription: Airflow EMR Slave SG
DBSecurityGroup:
Type: AWS::RDS::DBSecurityGroup
Properties:
GroupDescription: Frontend Access
DBSecurityGroupIngress:
EC2SecurityGroupName:
Ref: AirflowEC2SecurityGroup
EC2Role:
Type: AWS::IAM::Role
Properties:
RoleName: AirflowInstanceRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "ec2.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess
EC2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
InstanceProfileName: AirflowInstanceProfile
Roles:
-
Ref: EC2Role
EmrRole:
Type: AWS::IAM::Role
Properties:
RoleName: EmrRole
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "elasticmapreduce.amazonaws.com"
- "s3.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/AmazonS3FullAccess
- arn:aws:iam::aws:policy/AmazonElasticMapReduceFullAccess
EmrEc2Role:
Type: AWS::IAM::Role
Properties:
RoleName: EmrEc2Role
AssumeRolePolicyDocument:
Version: "2012-10-17"
Statement:
-
Effect: "Allow"
Principal:
Service:
- "ec2.amazonaws.com"
Action:
- "sts:AssumeRole"
ManagedPolicyArns:
- arn:aws:iam::aws:policy/service-role/AmazonElasticMapReduceforEC2Role
- arn:aws:iam::aws:policy/AmazonS3FullAccess
EmrEc2InstanceProfile:
Type: AWS::IAM::InstanceProfile
Properties:
InstanceProfileName: EmrEc2InstanceProfile
Roles:
-
Ref: EmrEc2Role
S3Bucket:
Type: AWS::S3::Bucket
DeletionPolicy: Retain
Properties:
AccessControl: BucketOwnerFullControl
BucketName: !Ref 'S3BucketName'
Outputs:
AirflowEC2PublicDNSName:
Description: Public DNS Name of the Airflow EC2 instance
Value: !Join ["", ["http://", !GetAtt EC2Instance.PublicDnsName, ":8080"]]