-
Notifications
You must be signed in to change notification settings - Fork 2
/
transfer.py
128 lines (109 loc) · 4.08 KB
/
transfer.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
import math
import os
import threading
import parse
import zmq
CHUNK_SIZE = 1024 * 1024
# ---------------------Generic Send/Receive Functions---------------------
def send_chunk(file, socket, end):
if end == -1:
chunk_size = CHUNK_SIZE
else:
chunk_size = min(CHUNK_SIZE, end - file.tell())
chunk = file.read(chunk_size)
socket.send(chunk)
if chunk:
return True
return False
def receive_chunk(file, socket):
chunk = socket.recv()
if len(chunk) == 0:
return False
file.write(chunk)
return True
# ---------------------Upload---------------------
# *********Client Master Side*********
# Client send upload request to master
# Master responds with a data keeper port to upload to
# *********Client Data Keeper Side*********
# Client calls (upload_to_server) to sends an upload request to data keeper
# Data keeper calls (download_from_client) for receiving and writing data
def download_from_client(socket, filename):
file = open(filename, "wb")
has_next = True
while has_next:
socket.send_string("")
has_next = receive_chunk(file, socket)
socket.send_string("")
size = file.tell()
file.close()
return size
def upload_to_server(filename, user_id, context, ip, port):
socket = context.socket(zmq.REQ)
socket.connect("tcp://{}:{}".format(ip, port))
socket.send_string("upload {} {}".format(user_id, filename))
file = open(filename, "rb")
has_next = True
while has_next:
request = socket.recv_string()
has_next = send_chunk(file, socket, -1)
file.close()
socket.disconnect("tcp://{}:{}".format(ip, port))
# ---------------------Download---------------------
# *********Client Master Side*********
# Client send download request to master
# Master responds with a data keeper port to download from
# *********Client Data Keeper Side*********
# Client calls (download_from_server(s)) to sends an upload request to data keeper
# Data keeper calls (upload_to_client) for sending data
def download_from_server(filename_src, context, ip, port, filename_dst=None, start=0, end=-1):
if filename_dst is None:
filename_dst = filename_src
socket = context.socket(zmq.REQ)
socket.connect("tcp://{}:{}".format(ip, port))
file = open(filename_dst, "wb")
request = "fetch {} {} {}".format(filename_src, start, end)
socket.send_string(request)
while True:
if not receive_chunk(file, socket):
break
socket.send_string("")
file.close()
socket.disconnect("tcp://{}:{}".format(ip, port))
def upload_to_client(socket, request):
parsed = parse.parse("fetch {} {} {}", request)
filename = str(parsed[0])
start = int(parsed[1])
end = int(parsed[2])
file = open(filename, "rb")
file.seek(start)
while True:
if not send_chunk(file, socket, end):
break
request = socket.recv_string()
file.close()
def async_download_from_server(filename_src, filename_dst, context, ip, port, start, end):
thread = threading.Thread(target=download_from_server,
args=(filename_src, context, ip, port, filename_dst, start, end))
thread.start()
return thread
def download_from_servers(filename, context, ips, ports, size):
file_part_size = math.floor(size / len(ips))
filename_base = filename + "_merge_"
threads = []
for i in range(len(ips) - 1):
threads.append(
async_download_from_server(filename, filename_base + str(i), context, ips[i], ports[i], i * file_part_size,
(i + 1) * file_part_size))
i = len(ips) - 1
threads.append(async_download_from_server(
filename, filename_base + str(i), context, ips[i], ports[i], i * file_part_size, -1))
file = open(filename + "_received", "wb")
for i in range(len(ips)):
threads[i].join()
file_to_merge_name = filename_base + str(i)
file_to_merge = open(file_to_merge_name, "rb")
file.write(file_to_merge.read())
file_to_merge.close()
os.remove(file_to_merge_name)
file.close()