From d71f3dfc92e74564c24aa6a4863bec2ba446099d Mon Sep 17 00:00:00 2001 From: Michael Fuerst Date: Wed, 17 Jan 2018 23:31:35 +0100 Subject: [PATCH] Fully deployed pipeline. Gru training works. --- datasets | 2 +- examples/gru_function_classifier.json | 18 +++++ examples/gru_function_classifier_example.py | 56 +++++++++++++++ models/gru_function_classifier.py | 63 ++++++++++++++++ models/lfw.py | 58 ++++++--------- models/model.py | 73 +++++++++++++++++-- utils/prepare_training.py | 79 +++++++++++++++++++++ 7 files changed, 305 insertions(+), 44 deletions(-) create mode 100644 examples/gru_function_classifier.json create mode 100644 examples/gru_function_classifier_example.py create mode 100644 models/gru_function_classifier.py create mode 100644 utils/prepare_training.py diff --git a/datasets b/datasets index 21d11ab..f2465f9 160000 --- a/datasets +++ b/datasets @@ -1 +1 @@ -Subproject commit 21d11ab2af29642cf786eddc5d2c28a305712a30 +Subproject commit f2465f944fd8d77a8a45f8467fee460f18183a8b diff --git a/examples/gru_function_classifier.json b/examples/gru_function_classifier.json new file mode 100644 index 0000000..68ad63c --- /dev/null +++ b/examples/gru_function_classifier.json @@ -0,0 +1,18 @@ +{ + "train": { + "learning_rate": 0.0001, + "decay": 0.9, + "batch_size": 200, + "iters": 50000, + "summary_iters": 50, + "checkpoint_path": "models/checkpoints/function_classifier" + }, + "arch": { + "sequence_length": 100, + "input_dimension": 1, + "output_dimension": 2, + "hidden_layer_size": 30, + "hidden_layer_depth": 2, + "pkeep": 0.5 + } +} \ No newline at end of file diff --git a/examples/gru_function_classifier_example.py b/examples/gru_function_classifier_example.py new file mode 100644 index 0000000..f732886 --- /dev/null +++ b/examples/gru_function_classifier_example.py @@ -0,0 +1,56 @@ +import math + +import tensorflow as tf + +from datasets.classification.function_generator import function_generator +from utils.prepare_training import write_tf_records, read_tf_records +from models.gru_function_classifier import FunctionClassifier + + +GENERATE_DATA = False + + +def main(): + # Define "constants". + hyper_params_filepath = "examples/gru_function_classifier.json" + data_tmp_folder = "data/.records/gru_function_classifier" + training_examples_number = 10000 + validation_examples_number = 1000 + + if GENERATE_DATA: + # Create training data. + print("Generating data") + train_data = function_generator([lambda x, off: math.sin(x / 50.0 + off), lambda x, off: x / 50.0 + off], 100, training_examples_number) + validation_data = function_generator([lambda x, off: math.sin(x / 50.0 + off), lambda x, off: x / 50.0 + off], 100, validation_examples_number) + + # Write tf records + print("Writing data") + write_tf_records(data_tmp_folder, 4, 2, train_data, validation_data) + + # Create model. + print("Creating Model") + model = FunctionClassifier(hyper_params_filepath) + + # Load data with tf records. + print("Loading data") + train_features, train_labels = read_tf_records(data_tmp_folder, "train", model.hyper_params.train.batch_size, (100,), (2,), tf.float32, tf.uint8, 4) + validation_features, validation_labels = read_tf_records(data_tmp_folder, "validation", model.hyper_params.train.batch_size, (100,), (2,), tf.float32, tf.uint8, 2) + + # Limit used gpu memory. + config = tf.ConfigProto() + config.gpu_options.per_process_gpu_memory_fraction = 0.75 + + # train model. + with tf.Session(config=config) as sess: + print("Setup") + model.setup(sess) + + print("Training") + model.fit(train_features, train_labels, validation_examples_number, validation_features, validation_labels, verbose=True) + + print("Exporting") + model.export() + + +if __name__ == "__main__": + main() diff --git a/models/gru_function_classifier.py b/models/gru_function_classifier.py new file mode 100644 index 0000000..435e17d --- /dev/null +++ b/models/gru_function_classifier.py @@ -0,0 +1,63 @@ +import numpy as np + +import tensorflow as tf +from tensorflow.contrib import layers +from tensorflow.contrib import rnn + +from models.model import Model + + +class FunctionClassifier(Model): + def __init__(self, hyper_params_filepath): + super(FunctionClassifier, self).__init__(hyper_params_filepath) + + def _create_model(self, input_tensor, reuse_weights): + with tf.variable_scope('NeuralNet') as scope: + if reuse_weights: + scope.reuse_variables() + + input_tensor = tf.reshape(input_tensor, (self.hyper_params.train.batch_size, self.hyper_params.arch.sequence_length, 1)) + + Hin = tf.placeholder(tf.float32, [None, self.hyper_params.arch.hidden_layer_size * self.hyper_params.arch.hidden_layer_depth], name='Hin') # [ BATCHSIZE, INTERNALSIZE * NLAYERS] + self.feed_dict[Hin] = np.zeros([self.hyper_params.train.batch_size, self.hyper_params.arch.hidden_layer_size * self.hyper_params.arch.hidden_layer_depth]) + + # using a NLAYERS=3 layers of GRU cells, unrolled SEQLEN=30 times + # dynamic_rnn infers SEQLEN from the size of the inputs Xo + + # How to properly apply dropout in RNNs: see README.md + cells = [rnn.GRUCell(self.hyper_params.arch.hidden_layer_size) for _ in range(self.hyper_params.arch.hidden_layer_depth)] + # "naive dropout" implementation + dropcells = [rnn.DropoutWrapper(cell,input_keep_prob=self.hyper_params.arch.pkeep) for cell in cells] + multicell = rnn.MultiRNNCell(dropcells, state_is_tuple=False) + multicell = rnn.DropoutWrapper(multicell, output_keep_prob=self.hyper_params.arch.pkeep) # dropout for the softmax layer + + Yr, H = tf.nn.dynamic_rnn(multicell, input_tensor, dtype=tf.float32, initial_state=Hin) + # Yr: [ BATCHSIZE, SEQLEN, INTERNALSIZE ] + # H: [ BATCHSIZE, INTERNALSIZE*NLAYERS ] # this is the last state in the sequence + + H = tf.identity(H, name='H') # just to give it a name + + # Softmax layer implementation: + # Flatten the first two dimension of the output [ BATCHSIZE, SEQLEN, self.hyper_params.arch.output_dim ] => [ BATCHSIZE x SEQLEN, self.hyper_params.arch.output_dim ] + # then apply softmax readout layer. This way, the weights and biases are shared across unrolled time steps. + # From the readout point of view, a value coming from a sequence time step or a minibatch item is the same thing. + + # Select last output. + output = tf.transpose(Yr, [1, 0, 2]) + last = tf.gather(output, int(output.get_shape()[0]) - 1) + #Yflat = tf.reshape(Yr, [-1, self.hyper_params.arch.hidden_layer_size]) # [ BATCHSIZE x SEQLEN, INTERNALSIZE ] + self.outputs["logits"] = layers.linear(last, self.hyper_params.arch.output_dimension) # [ BATCHSIZE x SEQLEN, self.hyper_params.arch.output_dim ] + + + def _create_loss(self, labels, validation_labels=None): + labels = tf.reshape(labels, [-1, self.hyper_params.arch.output_dimension]) + loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.outputs["logits"], labels=labels)) + train_op = tf.train.RMSPropOptimizer(learning_rate=self.hyper_params.train.learning_rate, decay=self.hyper_params.train.decay).minimize(loss_op) + + # Create a validation loss if possible. + validation_loss_op = None + if validation_labels is not None: + validation_labels = tf.reshape(validation_labels, [-1, self.hyper_params.arch.output_dimension]) + validation_loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.outputs["logits"], labels=validation_labels)) + + return train_op, loss_op, validation_loss_op diff --git a/models/lfw.py b/models/lfw.py index e4ee228..efd9bf7 100644 --- a/models/lfw.py +++ b/models/lfw.py @@ -1,45 +1,31 @@ +import numpy as np + +import tensorflow as tf + from models.model import Model class LFWNetwork(Model): def __init__(self, hyper_params_filepath): - pass - - - def setup(self, session): - """ - Initialize everything for the model that needs a session. - This includes loading checkpoints if provided in the hyperparameters. - - :param session: The tensorflow session to live inside. - """ - pass - - def predict(self, features): - """ - Predict the output of the network given only the feature input. - This is handy for deployment of the network. - - :param features: The input features of the network. For a cnn this is an image. - """ - pass + super(LFWNetwork, self).__init__(hyper_params_filepath) - def fit(self, training_data, iters, validation_data=None, summary_iters=1000, verbose=True): - """ - Fit the model to given training data. + def _create_model(self, input_tensor, reuse_weights): + with tf.variable_scope('NeuralNet') as scope: + if reuse_weights: + scope.reuse_variables() - :param training_data: training_data TODO - :param validation_data: validation_data TODO (This data is optional, if not provided no validation is done.) - :param iters: iters The number of epochs to train in total. - :param summary_iters: summary_iters How many epochs to do between two summaries. - :param verbose: verbose If you want debug outputs or not. - """ - pass + # TODO define net architecture. + + self.outputs["logits"] = input_tensor + def _create_loss(self, labels, validation_labels=None): + labels = tf.reshape(labels, [-1, self.hyper_params.arch.output_dimension]) + loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.outputs["logits"], labels=labels)) + train_op = tf.train.RMSPropOptimizer(learning_rate=self.hyper_params.train.learning_rate, decay=self.hyper_params.train.decay).minimize(loss_op) - def export(self): - """ - Export the model for deployment. + # Create a validation loss if possible. + validation_loss_op = None + if validation_labels is not None: + validation_labels = tf.reshape(validation_labels, [-1, self.hyper_params.arch.output_dimension]) + validation_loss_op = tf.reduce_mean(tf.nn.softmax_cross_entropy_with_logits(logits=self.outputs["logits"], labels=validation_labels)) - The exported models can be used in an android app or a rosnode. - """ - pass + return train_op, loss_op, validation_loss_op diff --git a/models/model.py b/models/model.py index 1f44987..66299ea 100644 --- a/models/model.py +++ b/models/model.py @@ -2,6 +2,7 @@ import abc from utils.dict2obj import Dict2Obj +import tensorflow as tf class Model(object): @@ -16,8 +17,12 @@ def __init__(self, hyper_params_filepath): :param hyper_params_filepath: The path to the hyper parameters file """ self.hyper_params = Dict2Obj(json.load(open(hyper_params_filepath))) + self.model_train = None + self.model_validation = None + self.sess = None + self.feed_dict = {} + self.outputs = {} - @abc.abstractmethod def setup(self, session): """ Initialize everything for the model that needs a session. @@ -25,9 +30,22 @@ def setup(self, session): :param session: The tensorflow session to live inside. """ + self.sess = session + + @abc.abstractmethod + def _create_model(self, input_tensor, reuse_weights): + """ + Create a model. + """ pass @abc.abstractmethod + def _create_loss(self, labels, validation_labels=None): + """ + Create a loss. + """ + pass + def predict(self, features): """ Predict the output of the network given only the feature input. @@ -37,21 +55,62 @@ def predict(self, features): """ pass - @abc.abstractmethod - def fit(self, training_data, iters, validation_data=None, summary_iters=1000, verbose=True): + def fit(self, features, labels, validation_examples_num=0, validation_features=None, validation_labels=None, verbose=True): """ Fit the model to given training data. - :param training_data: training_data TODO - :param validation_data: validation_data TODO (This data is optional, if not provided no validation is done.) + :param features: features An input queue tensor as provided by prepare_training.read_tf_records(...). + :param labels: labels An input queue tensor as provided by prepare_training.read_tf_records(...). + :param validation_features: validation_features An input queue tensor. (This data is optional, if not provided no validation is done.) + :param validation_labels: validation_labels An input queue tensor. (This data is optional, if not provided no validation is done.) :param iters: iters The number of epochs to train in total. :param summary_iters: summary_iters How many epochs to do between two summaries. :param verbose: verbose If you want debug outputs or not. """ - pass + self.model_train = self._create_model(features, reuse_weights=False) + self.model_validation = self._create_model(validation_features, reuse_weights=True) + # Create loss and training op. + train_op, loss_op, validation_loss_op = self._create_loss(labels, validation_labels) + + # Init vars. + init_op = tf.group(tf.global_variables_initializer(), tf.local_variables_initializer()) + self.sess.run(init_op) + + # Prepare training. + coord = tf.train.Coordinator() + threads = tf.train.start_queue_runners(coord=coord, sess=self.sess) + saver = tf.train.Saver() + + # Train + acc_loss = 0.0 + for i_step in range(self.hyper_params.train.iters): + + # Train step. + _, loss = self.sess.run([train_op, loss_op], feed_dict=self.feed_dict) + acc_loss += loss + + # Do validation and summary. + if i_step % self.hyper_params.train.summary_iters == 0: + loss_val = 0.0 + if validation_examples_num > 0: + for i_val in range(int(validation_examples_num / self.hyper_params.train.batch_size)): + loss_val_loc = self.sess.run([validation_loss_op], feed_dict=self.feed_dict)[0] + loss_val += loss_val_loc / float(self.hyper_params.train.batch_size) + loss_val /= float(validation_examples_num) + + if verbose: + print("Iter: %d, Loss: %.4f, Validation Loss: %.4f" % (i_step, acc_loss / float(self.hyper_params.train.batch_size) / float(self.hyper_params.train.summary_iters), loss_val)) + acc_loss = 0.0 + + saver.save(self.sess, self.hyper_params.train.checkpoint_path, global_step=i_step) + + if verbose: + print("Training stopped.") + + coord.request_stop() + coord.join(threads) - @abc.abstractmethod def export(self): """ Export the model for deployment. diff --git a/utils/prepare_training.py b/utils/prepare_training.py new file mode 100644 index 0000000..cf8490d --- /dev/null +++ b/utils/prepare_training.py @@ -0,0 +1,79 @@ +import os +from multiprocessing import Pool +import tensorflow as tf +from os import listdir +from os.path import isfile, join + + +def _bytes_feature(value): + return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value])) + + +def _write_tf_record_pool_helper(args): + data, num_threads, i, record_filename, preprocess_feature, preprocess_label = args + _write_tf_record(data(num_threads, i), record_filename, preprocess_feature, preprocess_label) + + +def _write_tf_record(data, record_filename, preprocess_feature=None, preprocess_label=None): + writer = tf.python_io.TFRecordWriter(record_filename) + + for feature, label in data: + if preprocess_feature is not None: + feature = preprocess_feature(feature) + if preprocess_label is not None: + label = preprocess_label(label) + + example = tf.train.Example(features=tf.train.Features( + feature={ + 'feature': _bytes_feature(feature.tobytes()), + 'label': _bytes_feature(label.tobytes()) + })) + writer.write(example.SerializeToString()) + writer.close() + + +def _read_tf_record(record_filename, feature_shape, label_shape, feature_type, label_type): + reader = tf.TFRecordReader() + _, serialized_example = reader.read(record_filename) + + data = tf.parse_single_example( + serialized_example, + features={ + 'feature': tf.FixedLenFeature([], tf.string), + 'label': tf.FixedLenFeature([], tf.string) + }) + + feature = tf.decode_raw(data['feature'], feature_type) + feature.set_shape(feature_shape) + label = tf.decode_raw(data['label'], label_type) + label.set_shape(label_shape) + + return feature, label + + +def read_tf_records(folder, phase, batch_size, feature_shape, label_shape, feature_type, label_type, num_threads=4): + filenames = [folder + "/" + f for f in listdir(folder) if isfile(join(folder, f)) and phase in f] + + # Create a tf object for the filename list and the readers. + filename_queue = tf.train.string_input_producer(filenames, num_epochs=50000) + readers = [_read_tf_record(filename_queue, feature_shape, label_shape, feature_type, label_type) for _ in range(num_threads)] + + feature_batch, label_batch = tf.train.shuffle_batch_join( + readers, + batch_size=batch_size, + capacity=10 * batch_size, + min_after_dequeue=5 * batch_size + ) + + return feature_batch, label_batch + + +def write_tf_records(output_folder, num_threads_train, num_threads_validation, train_data, validation_data, preprocess_feature=None, preprocess_label=None): + args_train = [(train_data, num_threads_train, i, os.path.join(output_folder, "train_%d.tfrecords" % i ), preprocess_feature, preprocess_label) for i in range(num_threads_train)] + args_validation = [(validation_data, num_threads_validation, i, os.path.join(output_folder, "validation_%d.tfrecords" % i ), preprocess_feature, preprocess_label) for i in range(num_threads_validation)] + + for arg in args_train + args_validation: + _write_tf_record_pool_helper(arg) + + #pool = Pool(processes=(num_threads_train + num_threads_validation)) + #pool.map(_write_tf_record_pool_helper, args_train + args_validation)