-
Notifications
You must be signed in to change notification settings - Fork 0
/
Client.py
132 lines (103 loc) · 4.75 KB
/
Client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
from utils import *
class Client():
def __init__(self,data):
__address = QNetworkInterface.allAddresses()
ip = __address[2].toString()
self.host = ip#host#socket.gethostbyname(socket.gethostname())
print('my ip is ' + self.host)
self.pull_port = None
self.ping_port = None
self.pub_port = None
self.sub_port = None
self.ping_socket = None
self.pub_socket = None
self.pull_socket = None
self.data = data
self.mapf = map1
self.parsef = None
self.reducef = reduce1
self.id = None
pass
def __call__(self):
c = zmq.Context()
self.pull_socket = c.socket(zmq.PULL)
self.pull_port = self.pull_socket.bind_to_random_port('tcp://' + self.host)
print('my pull port is ' + str(self.pull_port))
self.pub_socket = c.socket(zmq.PUB)
self.pub_port = self.pub_socket.bind_to_random_port('tcp://' + self.host)
print('my pub port is ' + str(self.pub_port))
self.ping_socket = c.socket(zmq.PULL)
self.ping_port = self.ping_socket.bind_to_random_port('tcp://' + self.host)
_addr = do_broadcast(BROADCAST_PORT)
if _addr == '':
print('I dont found a Server ')
return
serverHost = _addr.split('//')[1].split(':')[0]
pull_port = int(_addr.split('//')[1].split(':')[1])
self.make_threads(c)
sendMessage(c=c,_name= 1,_type = 'id', _message = (self.host,self.pull_port,self.pub_port,self.ping_port,self.sub_port), _dadress = serverHost,_dport = pull_port , _adress = self.host,_port = self.pull_port)
def make_threads(self,c):
self.listen_thread = Thread(target=self.collectTask,args=(c,))
self.listen_thread.start()
print('Worker is On and Listen')
self.listenp_thread = Thread(target=self.listenping,args=(c,))
self.listenp_thread.start()
print('Worker is ready ')
pass
def listenping(self,c):
while True:
data = self.ping_socket.recv()
try:
data = data.decode()
print('.....................................................')
print(data)
s1 = c.socket(zmq.PUSH)
s1.connect(data)
s1.send(b'')
except:
break
def collectTask(self,c):
while True:
print('heeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee')
data = self.pull_socket.recv()
data = dill.loads(data)
print('/////////////////////////////////////////')
print(data)
_type = data[TYPE]
if _type == 'id':
self.id = data[MESSAGE]
#print(f'I am the Client with id: {data[MESSAGE]}')
print('I am the Client with id: ' + str(data[MESSAGE]))
pubMessage(s =self.pub_socket,_type = 'sendData', _message = 'sendData' ,_adress = self.host,_port = self.pull_port )
print('I send the message')
pass
if _type == 'task':
print('server is ready')
fd = open(os.path.join(self.data), 'a')
size = fd.tell()
fd.close()
fd = open(os.path.join(self.data), 'r+b')
pubMessage(s=self.pub_socket,_adress = self.host,_port=self.pull_port,_type='nameData',_file=self.data,_message='nameData',_size=size,_name=self.id)
while size > 0:
_read = fd.read( 4 * 1024 if 4 * 1024 < size else size)
size -= len(_read)
pubMessage(s= self.pub_socket,_adress = self.host,_port = self.pull_port,_type='Data',_message = _read,_file=self.data,_name=self.id,_size=len(_read))
pubMessage(s=self.pub_socket,_adress = self.host,_port = self.pull_port,_message = 1,_type='FinishC',_mapf=map1,_redf=reduce1,_file= self.data,_name=self.id)
fd.close()
pass
if _type == 'reciveData':
print('Server has taken the Data ')
if _type == 'sendAnswerF':
location = f'my {data[NAME]}'
fd = open(os.path.join(location), 'w')
fd.close()
if _type == 'sendAnswer':
location = f'my {data[NAME]}'
fd = open(os.path.join(location), 'a')
fd.write(str(data[MESSAGE]))
fd.close()
if _type == 'FinishAnswer':
print('I get my Answer')
break
a = Client( data = input('> dame el archivo '))
a()