-
Notifications
You must be signed in to change notification settings - Fork 2
/
chunkserver.py
357 lines (296 loc) · 15.1 KB
/
chunkserver.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
import os
import threading
from typing import Dict, List
from xmlrpc.server import SimpleXMLRPCServer
from commons.datastructures import ChunkInfo
from commons.errors import FileNotFoundErr
from commons.loggers import request_logger
from commons.metadata_manager import load_metadata, OplogActions, update_metadata
from commons.settings import DEFAULT_MASTER_ADDR, DEFAULT_IP, CHUNK_SIZE
from commons.utils import rpc_call, ensure_dir
class ChunkServer:
chunks: Dict[int, ChunkInfo]
pending_extensions: List[int]
data: Dict[str, List[bytes]]
__slots__ = 'my_addr', 'master_addr', 'metadata_file', 'path', 'chunks', 'mutex', \
'pending_extensions', 'pendingextensions_lock', 'data', 'data_mutex'
def __init__(self, my_addr, master_addr, path, metadata_file):
self.my_addr = my_addr
self.master_addr = master_addr
# Filename of a file that contains chunkserver's meta data
self.metadata_file = metadata_file
self.path = path
# Store a mapping from handle to information.
self.chunks = {}
self.mutex = threading.Lock()
# Stores client's data in memory before commit to disk.
self.data = {}
self.data_mutex = threading.Lock()
# PushData handles client RPC to store data in memory.
# Data is identifwrite_helperied with a mapping from DataId:[ClientID, Timestamp] -> Data.
def push_data(self, client_id, timestamp, data):
log.debug("me=%s: client_id=%s, timestamp=%s, data=%s", self.my_addr, client_id, timestamp, data)
with self.data_mutex:
key = f'{client_id}|{timestamp}'
value = self.data.get(key, None)
# if data already exists
if value:
return
# else
self.data[key] = data.data
# Write handles client RPC write requests to the primary chunk. The primary
# first applies requested write to its local storage, serializes and records
# the order of application in ChunkServer.writeRequests, then sends the write
# requests to secondary replicas.
def write(self, client_id, timestamp, path, chunk_index, chunk_handle, offset, chunk_locations):
log.debug("ChunkServer addr: %s", self.my_addr)
with self.mutex:
log.debug("ChunkServer: Write RPC. Lock Acquired")
# Extract/define arguments.
key = f'{client_id}|{timestamp}'
data = self.data.get(key, None)
if not data:
log.debug("ChunkServer: Write RPC. Lock Released.")
return "ChunkServer.Write: requested data is not in memory"
length = len(data)
filename = f"{chunk_handle}"
# Apply write request to local state.
err = self.apply_write(filename, data, offset)
if err:
log.debug("ChunkServer: Write RPC. Lock Released.")
return err
else:
del self.data[key]
# // Update chunkserver metadata.
self.report_chunk_info(chunk_handle, chunk_index, path, length, offset)
# Apply the write to all secondary replicas.
# lock automatically release outside context
err = self.apply_to_secondary(client_id, timestamp, path, chunk_index, chunk_handle, offset, chunk_locations)
if err:
log.debug("ChunkServer: Write RPC. Lock Released.")
return err
with self.mutex:
# // Since we are still writing to the chunk, we must continue request
# // lease extensions on the chunk.
# TODO: probably need to request for chunk lease extension
log.debug("ChunkServer: Write RPC. Lock Released.")
return None
# // applyWrite is a helper function for Write and SerializedWrite to apply
# // writes from memory to local storage.
# // Note: ChunkServer.mutex must be held before calling this function.
def apply_write(self, filename, data, offset):
# Open file that stores the chunk.
# FIXME: possible bug, 'w' will truncate existing file
try:
with open(f'{self.path}/{filename}', 'ab') as fp:
# creates the file if not exists
pass
# open in r+ mode so that we don't truncate the existing contents and overwrite only by seeking
with open(f'{self.path}/{filename}', 'r+b') as fp:
fp.seek(offset)
fp.write(data)
except FileNotFoundError:
return FileNotFoundErr
# // reportChunkInfo is a helper function for ChunkServer.Write,
# // ChunkServer.SerializedWrite and ChunkServer.Append to update chunkserver's
# // metadata after a write request.
def report_chunk_info(self, chunk_handle, chunk_index, path, length, offset):
# // Update chunkserver metadata.
ok = self.chunks.get(chunk_handle, None)
# // If we have never seen this chunk before,
# // or chunk size has changed, we should
# // report to Master immediately.
if not ok: # we have never seen this chunk
self.chunks[chunk_handle] = ChunkInfo(path, chunk_handle, chunk_index) # length will default to zero
chunk_info = self.chunks[chunk_handle]
if offset + length > chunk_info.length: # chunk size has changed
chunk_info.length = offset + length
# log to oplog
update_metadata(self.metadata_file, OplogActions.REPORT_CHUNK, (chunk_info.path,
chunk_info.chunk_handle,
chunk_info.chunk_index,
chunk_info.length))
report_chunk(self, chunk_info)
# // apply_to_secondary is used by the primary replica to apply any modifications
# // that are serialized by the replica, to all of its secondary replicas.
def apply_to_secondary(self, client_id, timestamp, path, chunk_index, chunk_handle, offset, chunk_locations):
# // RPC each secondary chunkserver to apply the write.
for address in chunk_locations:
if address != self.my_addr:
cs = rpc_call(address)
err = cs.serialized_write(client_id, timestamp, path, chunk_index, chunk_handle, offset,
chunk_locations, False)
if err:
return err
return None
# // serialized_write handles RPC calls from primary replica's write requests to
# // secondary replicas.
def serialized_write(self, client_id, timestamp, path, chunk_index, chunk_handle, offset, chunk_locations,
append_mode):
log.debug(self.my_addr)
with self.mutex:
key = f'{client_id}|{timestamp}'
data = None
if append_mode:
# TODO: Do some stuff needed in append
# # // Padding chunk with zeros. TODO: but Why?
# pad_length = CHUNK_SIZE - offset
# # data = [0] * pad_length # TODO: how to pad data
pass
else:
# // Fetch data from chunk_server.data
data = self.data.get(key)
if not data:
return "ChunkServer.SerializedWrite: requested data is not in memory"
# // Apply write reqeust to local state.
filename = f'{chunk_handle}'
err = self.apply_write(filename, data, offset)
if err:
return err
elif not append_mode:
del self.data[key]
# // Update chunkserver metadata.
length = len(data)
self.report_chunk_info(chunk_handle, chunk_index, path,
length, offset)
return None
# read content from specific chunk
def read(self, chunk_handle, offset, length):
"""Called by client to read data from specific chunk"""
# open file to read data
log.debug("CHUNK SERVER READ CALLED")
try:
with open(f'{self.path}/{chunk_handle}', 'rb') as file:
file.seek(int(offset)) # goes to specific offset in a chunk
filecontent = file.read(length) # read all required content in filecontent
log.debug("FileContent %s", filecontent)
return filecontent, None
except Exception as err:
return None, err
# // Append accepts client append request and append the data to an offset
# // chosen by the primary replica. It then serializes the request just like
# // Write request, and send it to all secondary replicas.
# // If appending the record to the current chunk would cause the chunk to
# // exceed the maximum size, append fails and the client must retry.
# // It also puts the offset chosen in AppendReply so the client knows where
# // the data is appended to.
def append(self, client_id, timestamp, chunk_handle, chunk_index, path, chunk_locations):
log.debug("ChunkServer addr: %s", self.my_addr)
with self.mutex:
log.debug("ChunkServer: Append RPC. Lock Acquired")
# Extract/define arguments.
key = f'{client_id}|{timestamp}'
data = self.data.get(key, None)
if not data:
log.debug("ChunkServer: Append RPC. Lock Released.")
return "ChunkServer.Append: requested data is not in memory"
length = len(data)
filename = f"{chunk_handle}"
# Get length of the current chunk so we can calculate an offset.
chunk_info = self.chunks[chunk_handle]
# If we cannot find chunkInfo, means this is a new chunk, therefore offset
# should be zero, otherwise the offset should be the chunk length.
if chunk_info is None:
chunk_length = 0
else:
chunk_length = chunk_info.length
# If appending the record to the current chunk would cause the chunk to
# exceed the maximum size, report error for client to retry at another
# chunk.
if chunk_length + length >= CHUNK_SIZE:
# TODO error by padding chunk
return "error by padding chunk"
# Apply write request to local state, with chunkLength as offset.
err = self.apply_append(filename, data, chunk_length)
if err:
log.debug("ChunkServer: Append RPC. Lock Released.")
return "ChunkServer: Append RPC. Lock Released."
# Update chunkserver metadata.
self.report_chunk_info(chunk_handle, chunk_index, path, length, chunk_length)
# Apply append to all secondary replicas.
err = self.apply_to_secondary(client_id, timestamp, path, chunk_index, chunk_handle, chunk_length,
chunk_locations)
if err:
return "cant Apply append to all secondary replicas"
# TODO chunk lease extension
print(chunk_length + (chunk_index * CHUNK_SIZE))
return chunk_length + (chunk_index * CHUNK_SIZE)
def apply_append(self, filename, data, offset):
# Open file that stores the chunk.
# FIXME: possible bug, 'w' will truncate existing file
try:
with open(f'{self.path}/{filename}', 'ab') as fp: # TODO: create with 0777 perm
fp.seek(offset)
fp.write(data)
except FileNotFoundError:
return FileNotFoundErr
def order_chunk_copy_from_peer(self, peer_address, chunk_handle):
"""This RPC is called by master to order a chunkserver to copy some chunks from a peer chunk server
so as to meet the replication goal for that chunk."""
peer_chunk_server = rpc_call(peer_address)
# get chunk_info from peer
chunk_index, path, length = peer_chunk_server.get_chunk_info_from_peer(chunk_handle)
# get chunk's actual data
data, err = peer_chunk_server.read(chunk_handle, 0, length)
if err:
log.error(err)
return err
# write data with that chunk_handle as filename to local filesystem
filename = f"{chunk_handle}"
err = self.apply_write(filename, data.data, 0)
if err:
return err
self.report_chunk_info(chunk_handle, chunk_index, path, length, 0)
def get_chunk_info_from_peer(self, chunk_handle):
"""Called by a chunkserver for another chunkserver to get a chunk's data"""
chunk_info = self.chunks.get(chunk_handle, None)
return chunk_info.chunk_index, chunk_info.path, chunk_info.length
# delete bad chunk
def delete_bad_chunk(self, bad_chunk):
for chunk in bad_chunk:
if chunk in self.chunks.keys():
if os.path.exists(f'{self.path}/{chunk}'):
log.info("Deleting Chunk with chunk handle %s", chunk)
os.remove(f'{self.path}/{chunk}')
del self.chunks[chunk]
update_metadata(self.metadata_file, OplogActions.DEL_BAD_CHUNK, chunk)
else:
log.error("Unable to delete chunk %s", chunk)
return False
return True # TODO: should we return success message to master
def get_chunk_handles(self):
"""RPC called by master to get list of chunk handles in this server."""
return list(self.chunks.keys())
def report_chunk(cs, chunk_info):
ms = rpc_call(cs.master_addr)
# TODO: receive returned error if any
ms.report_chunk(cs.my_addr, chunk_info.chunk_handle, chunk_info.chunk_index, chunk_info.length, chunk_info.path)
def start_chunkserver(master_addr, my_ip, my_port, path):
ensure_dir(path) # make sure this path exists
my_address = f'http://{my_ip}:{my_port}'
metadata_filename = f'logs/ck_{my_port}.txt'
cs = ChunkServer(my_address, master_addr, path, metadata_filename)
# Load metadata
load_metadata(cs)
# tell master about the presence of this chunk server
# and also send the list of chunks present here
# must do this after loading from oplog
ms = rpc_call(cs.master_addr)
ms.notify_master(cs.my_addr, list(cs.chunks.keys()))
chunk_server = SimpleXMLRPCServer((my_ip, my_port),
logRequests=True,
allow_none=True)
chunk_server.register_introspection_functions()
chunk_server.register_instance(cs)
chunk_server.serve_forever()
# TODO: launch heart beat on separate thread
if __name__ == '__main__':
log = request_logger
import argparse
parser = argparse.ArgumentParser()
parser.add_argument('--ip', default=DEFAULT_IP)
parser.add_argument('--port', type=int, required=True)
parser.add_argument('--master', default=DEFAULT_MASTER_ADDR, help="http://<ip address>:<port>")
parser.add_argument('--path', help="Defaults to temp/ck<PORT>")
args = parser.parse_args()
start_chunkserver(args.master, args.ip, args.port, args.path or f"temp/ck{args.port}")