-
Notifications
You must be signed in to change notification settings - Fork 0
/
demo.py
141 lines (122 loc) · 5.42 KB
/
demo.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#!/usr/bin/env python
# coding=utf-8
import json
import time
import traceback
from aliyunsdkcore.client import AcsClient
from aliyunsdkcore.acs_exception.exceptions import ClientException, ServerException
from aliyunsdkecs.request.v20140526.RunInstancesRequest import RunInstancesRequest
from aliyunsdkecs.request.v20140526.DescribeInstancesRequest import DescribeInstancesRequest
RUNNING_STATUS = 'Running'
CHECK_INTERVAL = 3
CHECK_TIMEOUT = 180
class AliyunRunInstancesExample(object):
def __init__(self):
self.access_id = 'LTAI4FjVptWPzSfCTsefuc7Q'
self.access_secret = 'ZrY2gwsEbCZ3i8863gVHnuSnqwJWNK'
# 是否只预检此次请求。true:发送检查请求,不会创建实例,也不会产生费用;false:发送正常请求,通过检查后直接创建实例,并直接产生费用
self.dry_run = False
# 实例所属的地域ID
self.region_id = 'cn-shanghai'
# 实例的资源规格
self.instance_type = 'ecs.xn4.small'
# 实例的计费方式
self.instance_charge_type = 'PostPaid'
# 镜像ID
self.image_id = 'centos_7_7_64_20G_alibase_20191008.vhd'
# 购买资源的时长
self.period = 1
# 购买资源的时长单位
self.period_unit = 'Hourly'
# 实例所属的可用区编号
self.zone_id = 'cn-shanghai-c'
# 网络计费类型
self.internet_charge_type = 'PayByTraffic'
# 实例名称
self.instance_name = 'launch-advisor-20191209'
# 实例的密码
self.password = 'Test123!'
# 指定创建ECS实例的数量
self.amount = 1
# 公网出带宽最大值
self.internet_max_bandwidth_out = 1
# 是否为I/O优化实例
self.io_optimized = 'optimized'
# 后付费实例的抢占策略
self.spot_strategy = 'SpotAsPriceGo'
# 是否开启安全加固
self.security_enhancement_strategy = 'Active'
# 系统盘大小
self.system_disk_size = '40'
# 系统盘的磁盘种类
self.system_disk_category = 'cloud_efficiency'
self.client = AcsClient(self.access_id, self.access_secret, self.region_id)
def run(self):
try:
ids = self.run_instances()
self._check_instances_status(ids)
except ClientException as e:
print('Fail. Something with your connection with Aliyun go incorrect.'
' Code: {code}, Message: {msg}'
.format(code=e.error_code, msg=e.message))
except ServerException as e:
print('Fail. Business error.'
' Code: {code}, Message: {msg}'
.format(code=e.error_code, msg=e.message))
except Exception:
print('Unhandled error')
print(traceback.format_exc())
def run_instances(self):
"""
调用创建实例的API,得到实例ID后继续查询实例状态
:return:instance_ids 需要检查的实例ID
"""
request = RunInstancesRequest()
request.set_DryRun(self.dry_run)
request.set_InstanceType(self.instance_type)
request.set_InstanceChargeType(self.instance_charge_type)
request.set_ImageId(self.image_id)
request.set_Period(self.period)
request.set_PeriodUnit(self.period_unit)
request.set_ZoneId(self.zone_id)
request.set_InternetChargeType(self.internet_charge_type)
request.set_InstanceName(self.instance_name)
request.set_Password(self.password)
request.set_Amount(self.amount)
request.set_InternetMaxBandwidthOut(self.internet_max_bandwidth_out)
request.set_IoOptimized(self.io_optimized)
request.set_SpotStrategy(self.spot_strategy)
request.set_SecurityEnhancementStrategy(self.security_enhancement_strategy)
request.set_SystemDiskSize(self.system_disk_size)
request.set_SystemDiskCategory(self.system_disk_category)
body = self.client.do_action_with_exception(request)
data = json.loads(body)
instance_ids = data['InstanceIdSets']['InstanceIdSet']
print('Success. Instance creation succeed. InstanceIds: {}'.format(', '.join(instance_ids)))
return instance_ids
def _check_instances_status(self, instance_ids):
"""
每3秒中检查一次实例的状态,超时时间设为3分钟.
:param instance_ids 需要检查的实例ID
:return:
"""
start = time.time()
while True:
request = DescribeInstancesRequest()
request.set_InstanceIds(json.dumps(instance_ids))
body = self.client.do_action_with_exception(request)
data = json.loads(body)
for instance in data['Instances']['Instance']:
if RUNNING_STATUS in instance['Status']:
instance_ids.remove(instance['InstanceId'])
print('Instance boot successfully: {}'.format(instance['InstanceId']))
if not instance_ids:
print('Instances all boot successfully')
break
if time.time() - start > CHECK_TIMEOUT:
print('Instances boot failed within {timeout}s: {ids}'
.format(timeout=CHECK_TIMEOUT, ids=', '.join(instance_ids)))
break
time.sleep(CHECK_INTERVAL)
if __name__ == '__main__':
AliyunRunInstancesExample().run()