-
Notifications
You must be signed in to change notification settings - Fork 0
/
flyer.py
executable file
·125 lines (104 loc) · 3.96 KB
/
flyer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
#!/usr/bin/env python
#coding=utf-8
import gevent
from gevent import monkey;monkey.patch_all();
import argparse
import redis
from scpfile import scp
import pexpect
import time
from redisconfig import config
class flyer():
id = 0
channels = {}
redisIp = ""
redisPort = 0
redisPassword = ""
def __init__(self, redisIp=config["redisIp"], redisPort=config["redisPort"], redisPassword=config["redisPassword"], redisDb=config["redisDb"] ):
"""get redis instance and check if it can be used"""
self.redisIp = redisIp
self.redisPort = redisPort
self.redisPassword = redisPassword
self.redisDb = redisDb
r = self.getRedis()
if r.ping():
ps = r.pubsub()
ps.subscribe("rootChannel") #订阅两个频道,分别是count_alarm ip_alarm
self.channels["rootChannel"] = ps
print "init ok"
def getRedis(self):
"""return the redis instance"""
pool = redis.ConnectionPool(host=self.redisIp, port=self.redisPort, password=self.redisPassword, db=self.redisDb)
return redis.Redis(connection_pool=pool)
def getPubsub(self):
"""return the redis instance"""
r = self.getRedis()
return r.pubsub()
def start(self):
"""get the process id in the cluster"""
r = self.getRedis()
self.id = r.incr("flyerId")
return self.id
def stop(self):
"""send to all process to exit"""
r = self.getRedis()
r.set("flyerStop", 1)
exit(0)
def send(self, channelName, message):
"""send message to chanel (if you want sent to specific process you can use
difference chanel name in different process)"""
r = self.getRedis()
return r.publish(channelName, message)
def recieve(self, channelName):
"""recieve num messages from chanel"""
ps = self.getPubsub()
ps.subscribe(channelName)
for item in ps.listen():
if item['type'] == 'message':
return item["data"]
def sendAll(self, message):
"""send message to each process"""
r = self.getRedis()
r.publish("rootChannel", message)
def recieveAll(self):
"""recieve message from all"""
ps = self.channels["rootChannel"]
for item in ps.listen():
if item['type'] == 'message':
return item["data"]
def clean(self):
r = self.getRedis()
r.flushdb()
def runProcess(cmd):
child=pexpect.spawn(cmd)
print child.read()
child.close()
return 1
def stop():
pass
def sshRun(hostFile, remotePath, localFile):
f = open(hostFile)
hostList = []
for i in f.readlines():
hostList.append(i)
f.close()
cmdList = []
for i in xrange(len(hostList)):
cmd='ssh %s "python %s >%slog%d"'%(hostList[i],remotePath+"/"+localFile, remotePath+"/",i)
cmdList.append(cmd)
gevent.joinall([ gevent.spawn(runProcess, cmd) for cmd in cmdList]+[gevent.spawn(stop)])
return 1
if __name__ == "__main__":
parser = argparse.ArgumentParser(description='Flyer--An easy MPI-like distributed\
tool for python, to ran applications in clusters.')
parser.add_argument('--h', action="store", default="host.list", dest="hostList") #the host use, one line for one proccess
parser.add_argument('--f', action="store", default="file.list", dest="fileList") #the file to copy
parser.add_argument('--r', action="store", default="/tmp", dest="rpath") #the remote path to use ,save the file
parser.add_argument('--e', action="store", default="run.py", dest="exeFile") #the file to exe
results = parser.parse_args() #get arg
print results.exeFile
flyerTest = flyer() #get instance
flyerTest.clean() #clean redis to use for next process
print flyerTest.start()
scpInfo = scp(results.hostList, results.rpath, results.fileList) #copy file
sshRun(results.hostList, results.rpath, results.exeFile) #start run