From 8794cd75852eae0c79a33f7b576b198ff75b7435 Mon Sep 17 00:00:00 2001 From: Ryan Lane Berg Date: Wed, 23 May 2018 15:31:36 -0700 Subject: [PATCH 1/4] began workon parallelizing datagen --- Pilot2/P2B1/p2b1.py | 97 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 94 insertions(+), 3 deletions(-) diff --git a/Pilot2/P2B1/p2b1.py b/Pilot2/P2B1/p2b1.py index 816fd998..d80e54ad 100755 --- a/Pilot2/P2B1/p2b1.py +++ b/Pilot2/P2B1/p2b1.py @@ -10,7 +10,8 @@ # from PIL import Image # import cv2 # from tqdm import * - +import multiprocessing as mp +from multiprocessing import Process, Manager import numpy as np import keras.backend as K import threading @@ -214,6 +215,40 @@ def get_activations(model, layer, X_batch): return activations +def datagen_helper(Xnorm, frame, xt, yt): + if self.conv_net: + xt = Xnorm[frame] + if self.nbr_type == 'relative': + xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) + elif self.nbr_type == 'invariant': + xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + else: + print ('Invalid nbr_type') + exit() + + yt = xt.copy() + xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1) + if self.full_conv_net: + yt = xt.copy() + + else: + xt = Xnorm[frame] + if self.nbr_type == 'relative': + xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) + elif self.nbr_type == 'invariant': + xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + else: + print ('Invalid nbr_type') + exit() + yt = xt.copy() + + if not len(xt_all): + xt_all = np.expand_dims(xt, axis=0) + yt_all = np.expand_dims(yt, axis=0) + else: + xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) + yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) + class Candle_Molecular_Train(): def __init__(self, molecular_model, molecular_encoder, files, mb_epochs, callbacks, save_path='.', batch_size=32, nbr_type='relative', len_molecular_hidden_layers=1, molecular_nbrs=0, @@ -236,7 +271,43 @@ def __init__(self, molecular_model, molecular_encoder, files, mb_epochs, callbac self.test_ind = random.sample(range(len(self.files)), 1) self.train_ind = np.setdiff1d(range(len(self.files)), self.test_ind) + + def datagen_helper(self, Xnorm, frame, xt_all, yt_all, nbrs): + if self.conv_net: + xt = Xnorm[frame] + if self.nbr_type == 'relative': + xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) + elif self.nbr_type == 'invariant': + xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + else: + print ('Invalid nbr_type') + exit() + + yt = xt.copy() + xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1) + if self.full_conv_net: + yt = xt.copy() + + else: + xt = Xnorm[frame] + if self.nbr_type == 'relative': + xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) + elif self.nbr_type == 'invariant': + xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + else: + print ('Invalid nbr_type') + exit() + yt = xt.copy() + + if not len(xt_all): + xt_all.expand_dims(xt, axis=0) + yt_all.expand_dims(yt, axis=0) + else: + xt_all.append(xt_all, np.expand_dims(xt, axis=0), axis=0) + yt_all.append(yt_all, np.expand_dims(yt, axis=0), axis=0) + def datagen(self, epoch=0, print_out=1, test=0): + print("Cpu count", mp.cpu_count()) files = self.files # order = range(13, 17) # Temporarily train on only a few files range(len(files)) # Randomize files after first training epoch @@ -266,14 +337,20 @@ def datagen(self, epoch=0, print_out=1, test=0): num_frames = X.shape[0] - xt_all = np.array([]) - yt_all = np.array([]) + # these lists can be shared across porcesses. Array has to be sized, + # and cannot be appended to. Need to find a way to deal with the shared array size + xt_all = Manager().list() + yt_all = Manager().list() #for i in range(num_frames): num_active_frames = random.sample(range(num_frames), int(self.sampling_density*num_frames)) print('Datagen on the following frames', num_active_frames) + #f = lambda x: return datagen_helper(a, b, c, d, x) + + #np.array(Pool.map(f, num_active_frames)) look into this in the morning + processes = [] for i in num_active_frames: if self.conv_net: @@ -308,6 +385,20 @@ def datagen(self, epoch=0, print_out=1, test=0): else: xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) + """ + process = Process(target=self.datagen_helper, args=(Xnorm, i, xt_all, yt_all, nbrs)) + processes.append(process) + + print("proceses:", len(processes)) + + for i in range(len(processes)): + print('Starting process', i) + processes[i].start() + + for i in range(len(processes)): + processes[i].join() + """ + print('xt_all: ', len(xt_all)) yield files[f_ind], xt_all, yt_all From 889fae3d0abed93e4b5349c68396bf996c8bf79d Mon Sep 17 00:00:00 2001 From: Ryan Lane Berg Date: Thu, 31 May 2018 16:34:11 -0700 Subject: [PATCH 2/4] updated datagen to use multiprocessing pool function --- Pilot2/P2B1/p2b1.py | 59 +++++++++++++++++++++++++++------------------ 1 file changed, 35 insertions(+), 24 deletions(-) diff --git a/Pilot2/P2B1/p2b1.py b/Pilot2/P2B1/p2b1.py index d80e54ad..06a13da6 100755 --- a/Pilot2/P2B1/p2b1.py +++ b/Pilot2/P2B1/p2b1.py @@ -12,6 +12,7 @@ # from tqdm import * import multiprocessing as mp from multiprocessing import Process, Manager +from functools import partial import numpy as np import keras.backend as K import threading @@ -214,29 +215,31 @@ def get_activations(model, layer, X_batch): activations = get_activations([X_batch,0]) return activations +def datagen_helper(frame, Xnorm, nbrs, conv_net, nbr_type, molecular_nbrs, full_conv_net): + xt_all = np.array([]) + yt_all = np.array([]) -def datagen_helper(Xnorm, frame, xt, yt): - if self.conv_net: + if conv_net: xt = Xnorm[frame] - if self.nbr_type == 'relative': - xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) - elif self.nbr_type == 'invariant': - xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + if nbr_type == 'relative': + xt = helper.append_nbrs_relative(xt, nbrs[frame], molecular_nbrs) + elif nbr_type == 'invariant': + xt = helper.append_nbrs_invariant(xt, nbrs[frame], molecular_nbrs) else: print ('Invalid nbr_type') exit() yt = xt.copy() xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1) - if self.full_conv_net: + if full_conv_net: yt = xt.copy() else: xt = Xnorm[frame] - if self.nbr_type == 'relative': - xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) - elif self.nbr_type == 'invariant': - xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + if nbr_type == 'relative': + xt = helper.append_nbrs_relative(xt, nbrs[frame], molecular_nbrs) + elif nbr_type == 'invariant': + xt = helper.append_nbrs_invariant(xt, nbrs[frame], molecular_nbrs) else: print ('Invalid nbr_type') exit() @@ -249,6 +252,8 @@ def datagen_helper(Xnorm, frame, xt, yt): xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) + return xt_all, yt_all + class Candle_Molecular_Train(): def __init__(self, molecular_model, molecular_encoder, files, mb_epochs, callbacks, save_path='.', batch_size=32, nbr_type='relative', len_molecular_hidden_layers=1, molecular_nbrs=0, @@ -267,18 +272,20 @@ def __init__(self, molecular_model, molecular_encoder, files, mb_epochs, callbac self.type_feature = type_bool self.save_path = save_path+'/' self.sampling_density = sampling_density - self.test_ind = random.sample(range(len(self.files)), 1) self.train_ind = np.setdiff1d(range(len(self.files)), self.test_ind) - def datagen_helper(self, Xnorm, frame, xt_all, yt_all, nbrs): + def datagen_helper(self, frame): + xt_all = np.array([]) + yt_all = np.array([]) + if self.conv_net: xt = Xnorm[frame] if self.nbr_type == 'relative': - xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) + xt = helper.append_nbrs_relative(xt, self.nbrs[frame], self.molecular_nbrs) elif self.nbr_type == 'invariant': - xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + xt = helper.append_nbrs_invariant(xt, self.nbrs[frame], self.molecular_nbrs) else: print ('Invalid nbr_type') exit() @@ -291,9 +298,9 @@ def datagen_helper(self, Xnorm, frame, xt_all, yt_all, nbrs): else: xt = Xnorm[frame] if self.nbr_type == 'relative': - xt = helper.append_nbrs_relative(xt, nbrs[frame], self.molecular_nbrs) + xt = helper.append_nbrs_relative(xt, self.nbrs[frame], self.molecular_nbrs) elif self.nbr_type == 'invariant': - xt = helper.append_nbrs_invariant(xt, nbrs[frame], self.molecular_nbrs) + xt = helper.append_nbrs_invariant(xt, self.nbrs[frame], self.molecular_nbrs) else: print ('Invalid nbr_type') exit() @@ -306,6 +313,8 @@ def datagen_helper(self, Xnorm, frame, xt_all, yt_all, nbrs): xt_all.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all.append(yt_all, np.expand_dims(yt, axis=0), axis=0) + return xt_all, yt_all + def datagen(self, epoch=0, print_out=1, test=0): print("Cpu count", mp.cpu_count()) files = self.files @@ -325,7 +334,7 @@ def datagen(self, epoch=0, print_out=1, test=0): print (files[f_ind], '\n') (X, nbrs, resnums) = helper.get_data_arrays(files[f_ind]) - + self.nbrs = nbrs # normalizing the location coordinates and bond lengths and scale type encoding # Changed the xyz normalization from 255 to 350 if self.type_feature: @@ -339,8 +348,6 @@ def datagen(self, epoch=0, print_out=1, test=0): # these lists can be shared across porcesses. Array has to be sized, # and cannot be appended to. Need to find a way to deal with the shared array size - xt_all = Manager().list() - yt_all = Manager().list() #for i in range(num_frames): num_active_frames = random.sample(range(num_frames), int(self.sampling_density*num_frames)) @@ -348,9 +355,13 @@ def datagen(self, epoch=0, print_out=1, test=0): print('Datagen on the following frames', num_active_frames) #f = lambda x: return datagen_helper(a, b, c, d, x) + pool = mp.Pool() + helper_partial = partial(datagen_helper, Xnorm=Xnorm, nbrs=nbrs, conv_net=self.conv_net, nbr_type=self.nbr_type, molecular_nbrs=self.molecular_nbrs, full_conv_net=self.full_conv_net) + results = pool.map(helper_partial, num_active_frames) - #np.array(Pool.map(f, num_active_frames)) look into this in the morning - processes = [] + xt_all = np.concatenate([x[0] for x in results]) + yt_all = np.concatenate([x[1] for x in results]) + """ for i in num_active_frames: if self.conv_net: @@ -385,7 +396,7 @@ def datagen(self, epoch=0, print_out=1, test=0): else: xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) - """ + process = Process(target=self.datagen_helper, args=(Xnorm, i, xt_all, yt_all, nbrs)) processes.append(process) @@ -398,7 +409,7 @@ def datagen(self, epoch=0, print_out=1, test=0): for i in range(len(processes)): processes[i].join() """ - print('xt_all: ', len(xt_all)) + #print('xt_all: ', len(xt_all)) yield files[f_ind], xt_all, yt_all From c8b9353e4232908e983c473a006588e22c490fc6 Mon Sep 17 00:00:00 2001 From: Ryan Lane Berg Date: Fri, 1 Jun 2018 13:10:03 -0700 Subject: [PATCH 3/4] added some debug statements for testing --- Pilot2/P2B1/p2b1.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/Pilot2/P2B1/p2b1.py b/Pilot2/P2B1/p2b1.py index 06a13da6..5804d467 100755 --- a/Pilot2/P2B1/p2b1.py +++ b/Pilot2/P2B1/p2b1.py @@ -32,6 +32,7 @@ import p2_common import helper import random +import theano def common_parser(parser): @@ -251,7 +252,7 @@ def datagen_helper(frame, Xnorm, nbrs, conv_net, nbr_type, molecular_nbrs, full_ else: xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) - + print("Finished Process") return xt_all, yt_all class Candle_Molecular_Train(): @@ -313,6 +314,7 @@ def datagen_helper(self, frame): xt_all.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all.append(yt_all, np.expand_dims(yt, axis=0), axis=0) + return xt_all, yt_all def datagen(self, epoch=0, print_out=1, test=0): From 8b3bff53a4a7c7c462218b8e55ce8ec2dd52e1c3 Mon Sep 17 00:00:00 2001 From: Ryan Lane Berg Date: Fri, 22 Jun 2018 10:41:28 -0700 Subject: [PATCH 4/4] Removed commented out code --- Pilot2/P2B1/p2b1.py | 51 +-------------------------------------------- 1 file changed, 1 insertion(+), 50 deletions(-) diff --git a/Pilot2/P2B1/p2b1.py b/Pilot2/P2B1/p2b1.py index 5804d467..228831f9 100755 --- a/Pilot2/P2B1/p2b1.py +++ b/Pilot2/P2B1/p2b1.py @@ -252,7 +252,7 @@ def datagen_helper(frame, Xnorm, nbrs, conv_net, nbr_type, molecular_nbrs, full_ else: xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) - print("Finished Process") + return xt_all, yt_all class Candle_Molecular_Train(): @@ -363,55 +363,6 @@ def datagen(self, epoch=0, print_out=1, test=0): xt_all = np.concatenate([x[0] for x in results]) yt_all = np.concatenate([x[1] for x in results]) - """ - for i in num_active_frames: - - if self.conv_net: - xt = Xnorm[i] - if self.nbr_type == 'relative': - xt = helper.append_nbrs_relative(xt, nbrs[i], self.molecular_nbrs) - elif self.nbr_type == 'invariant': - xt = helper.append_nbrs_invariant(xt, nbrs[i], self.molecular_nbrs) - else: - print ('Invalid nbr_type') - exit() - - yt = xt.copy() - xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1) - if self.full_conv_net: - yt = xt.copy() - - else: - xt = Xnorm[i] - if self.nbr_type == 'relative': - xt = helper.append_nbrs_relative(xt, nbrs[i], self.molecular_nbrs) - elif self.nbr_type == 'invariant': - xt = helper.append_nbrs_invariant(xt, nbrs[i], self.molecular_nbrs) - else: - print ('Invalid nbr_type') - exit() - yt = xt.copy() - - if not len(xt_all): - xt_all = np.expand_dims(xt, axis=0) - yt_all = np.expand_dims(yt, axis=0) - else: - xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0) - yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0) - - process = Process(target=self.datagen_helper, args=(Xnorm, i, xt_all, yt_all, nbrs)) - processes.append(process) - - print("proceses:", len(processes)) - - for i in range(len(processes)): - print('Starting process', i) - processes[i].start() - - for i in range(len(processes)): - processes[i].join() - """ - #print('xt_all: ', len(xt_all)) yield files[f_ind], xt_all, yt_all