Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel datagen #21

Open
wants to merge 4 commits into
base: frameworks
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
131 changes: 93 additions & 38 deletions Pilot2/P2B1/p2b1.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@
# from PIL import Image
# import cv2
# from tqdm import *

import multiprocessing as mp
from multiprocessing import Process, Manager
from functools import partial
import numpy as np
import keras.backend as K
import threading
Expand All @@ -30,6 +32,7 @@
import p2_common
import helper
import random
import theano

def common_parser(parser):

Expand Down Expand Up @@ -213,6 +216,44 @@ def get_activations(model, layer, X_batch):
activations = get_activations([X_batch,0])
return activations

def datagen_helper(frame, Xnorm, nbrs, conv_net, nbr_type, molecular_nbrs, full_conv_net):
xt_all = np.array([])
yt_all = np.array([])

if conv_net:
xt = Xnorm[frame]
if nbr_type == 'relative':
xt = helper.append_nbrs_relative(xt, nbrs[frame], molecular_nbrs)
elif nbr_type == 'invariant':
xt = helper.append_nbrs_invariant(xt, nbrs[frame], molecular_nbrs)
else:
print ('Invalid nbr_type')
exit()

yt = xt.copy()
xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1)
if full_conv_net:
yt = xt.copy()

else:
xt = Xnorm[frame]
if nbr_type == 'relative':
xt = helper.append_nbrs_relative(xt, nbrs[frame], molecular_nbrs)
elif nbr_type == 'invariant':
xt = helper.append_nbrs_invariant(xt, nbrs[frame], molecular_nbrs)
else:
print ('Invalid nbr_type')
exit()
yt = xt.copy()

if not len(xt_all):
xt_all = np.expand_dims(xt, axis=0)
yt_all = np.expand_dims(yt, axis=0)
else:
xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0)
yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0)

return xt_all, yt_all

class Candle_Molecular_Train():
def __init__(self, molecular_model, molecular_encoder, files, mb_epochs, callbacks, save_path='.', batch_size=32,
Expand All @@ -232,11 +273,52 @@ def __init__(self, molecular_model, molecular_encoder, files, mb_epochs, callbac
self.type_feature = type_bool
self.save_path = save_path+'/'
self.sampling_density = sampling_density

self.test_ind = random.sample(range(len(self.files)), 1)
self.train_ind = np.setdiff1d(range(len(self.files)), self.test_ind)


def datagen_helper(self, frame):
xt_all = np.array([])
yt_all = np.array([])

if self.conv_net:
xt = Xnorm[frame]
if self.nbr_type == 'relative':
xt = helper.append_nbrs_relative(xt, self.nbrs[frame], self.molecular_nbrs)
elif self.nbr_type == 'invariant':
xt = helper.append_nbrs_invariant(xt, self.nbrs[frame], self.molecular_nbrs)
else:
print ('Invalid nbr_type')
exit()

yt = xt.copy()
xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1)
if self.full_conv_net:
yt = xt.copy()

else:
xt = Xnorm[frame]
if self.nbr_type == 'relative':
xt = helper.append_nbrs_relative(xt, self.nbrs[frame], self.molecular_nbrs)
elif self.nbr_type == 'invariant':
xt = helper.append_nbrs_invariant(xt, self.nbrs[frame], self.molecular_nbrs)
else:
print ('Invalid nbr_type')
exit()
yt = xt.copy()

if not len(xt_all):
xt_all.expand_dims(xt, axis=0)
yt_all.expand_dims(yt, axis=0)
else:
xt_all.append(xt_all, np.expand_dims(xt, axis=0), axis=0)
yt_all.append(yt_all, np.expand_dims(yt, axis=0), axis=0)


return xt_all, yt_all

def datagen(self, epoch=0, print_out=1, test=0):
print("Cpu count", mp.cpu_count())
files = self.files
# order = range(13, 17) # Temporarily train on only a few files range(len(files))
# Randomize files after first training epoch
Expand All @@ -254,7 +336,7 @@ def datagen(self, epoch=0, print_out=1, test=0):
print (files[f_ind], '\n')

(X, nbrs, resnums) = helper.get_data_arrays(files[f_ind])

self.nbrs = nbrs
# normalizing the location coordinates and bond lengths and scale type encoding
# Changed the xyz normalization from 255 to 350
if self.type_feature:
Expand All @@ -266,48 +348,21 @@ def datagen(self, epoch=0, print_out=1, test=0):

num_frames = X.shape[0]

xt_all = np.array([])
yt_all = np.array([])
# these lists can be shared across porcesses. Array has to be sized,
# and cannot be appended to. Need to find a way to deal with the shared array size

#for i in range(num_frames):
num_active_frames = random.sample(range(num_frames), int(self.sampling_density*num_frames))

print('Datagen on the following frames', num_active_frames)

for i in num_active_frames:

if self.conv_net:
xt = Xnorm[i]
if self.nbr_type == 'relative':
xt = helper.append_nbrs_relative(xt, nbrs[i], self.molecular_nbrs)
elif self.nbr_type == 'invariant':
xt = helper.append_nbrs_invariant(xt, nbrs[i], self.molecular_nbrs)
else:
print ('Invalid nbr_type')
exit()
#f = lambda x: return datagen_helper(a, b, c, d, x)
pool = mp.Pool()
helper_partial = partial(datagen_helper, Xnorm=Xnorm, nbrs=nbrs, conv_net=self.conv_net, nbr_type=self.nbr_type, molecular_nbrs=self.molecular_nbrs, full_conv_net=self.full_conv_net)
results = pool.map(helper_partial, num_active_frames)

yt = xt.copy()
xt = xt.reshape(xt.shape[0], 1, xt.shape[1], 1)
if self.full_conv_net:
yt = xt.copy()

else:
xt = Xnorm[i]
if self.nbr_type == 'relative':
xt = helper.append_nbrs_relative(xt, nbrs[i], self.molecular_nbrs)
elif self.nbr_type == 'invariant':
xt = helper.append_nbrs_invariant(xt, nbrs[i], self.molecular_nbrs)
else:
print ('Invalid nbr_type')
exit()
yt = xt.copy()

if not len(xt_all):
xt_all = np.expand_dims(xt, axis=0)
yt_all = np.expand_dims(yt, axis=0)
else:
xt_all = np.append(xt_all, np.expand_dims(xt, axis=0), axis=0)
yt_all = np.append(yt_all, np.expand_dims(yt, axis=0), axis=0)
xt_all = np.concatenate([x[0] for x in results])
yt_all = np.concatenate([x[1] for x in results])

yield files[f_ind], xt_all, yt_all

Expand Down