From 953cc91a32ebcb4759b3ff879779f4024c052a19 Mon Sep 17 00:00:00 2001 From: Ruhong Date: Fri, 31 Aug 2018 13:04:53 +0800 Subject: [PATCH] make it work on ml engine, google cloud storage --- config.yaml | 5 +++ eval_local.sh | 15 +++++++ infer_local.sh | 15 +++++++ infer_single_pass_local.sh | 16 +++++++ setup.py | 4 +- train.sh | 23 ++++++++++ train_local.sh | 16 +++++++ trainer/data.py | 25 +++++------ trainer/decode.py | 7 +-- trainer/model.py | 23 +++++----- trainer/my_model.py | 2 +- trainer/task.py | 49 ++++++++------------- trainer/util.py | 88 +++++++++++++++++--------------------- 13 files changed, 180 insertions(+), 108 deletions(-) create mode 100644 config.yaml create mode 100644 eval_local.sh create mode 100644 infer_local.sh create mode 100644 infer_single_pass_local.sh create mode 100644 train.sh create mode 100644 train_local.sh diff --git a/config.yaml b/config.yaml new file mode 100644 index 0000000..f746daf --- /dev/null +++ b/config.yaml @@ -0,0 +1,5 @@ +trainingInput: + scaleTier: BASIC #BASIC_GPU + pythonVersion: "3.5" + runtimeVersion: "1.9" + region: "us-east1" \ No newline at end of file diff --git a/eval_local.sh b/eval_local.sh new file mode 100644 index 0000000..9e1a734 --- /dev/null +++ b/eval_local.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +set -x #echo on + +source ./venv/bin/activate + +gcloud ml-engine local train \ + --module-name trainer.task \ + --package-path trainer/ \ + --job-dir /Users/me/googledrive/suma/log/huat \ + -- \ + --mode eval \ + --data_path "/Users/me/googledrive/suma/finished_files/chunked/val_*" \ + --vocab_path /Users/me/googledrive/suma/finished_files/vocab \ + +deactivate diff --git a/infer_local.sh b/infer_local.sh new file mode 100644 index 0000000..1707d60 --- /dev/null +++ b/infer_local.sh @@ -0,0 +1,15 @@ +#!/usr/bin/env bash +set -x #echo on + +source ./venv/bin/activate + +gcloud ml-engine local train \ + --module-name trainer.task \ + --package-path trainer/ \ + --job-dir /Users/me/googledrive/suma/log/huat \ + -- \ + --mode infer \ + --data_path "/Users/me/googledrive/suma/finished_files/chunked/val_*" \ + --vocab_path /Users/me/googledrive/suma/finished_files/vocab \ + +deactivate diff --git a/infer_single_pass_local.sh b/infer_single_pass_local.sh new file mode 100644 index 0000000..cb965ad --- /dev/null +++ b/infer_single_pass_local.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -x #echo on + +source ./venv/bin/activate + +gcloud ml-engine local train \ + --module-name trainer.task \ + --package-path trainer/ \ + --job-dir /Users/me/googledrive/suma/log/huat \ + -- \ + --mode infer \ + --data_path "/Users/me/googledrive/suma/finished_files/chunked/val_*" \ + --vocab_path /Users/me/googledrive/suma/finished_files/vocab \ + --single_pass 1 + +deactivate diff --git a/setup.py b/setup.py index bba95bb..2af353f 100644 --- a/setup.py +++ b/setup.py @@ -5,9 +5,9 @@ setup( name='sgcharts-pointer-generator', version=__version__, - python_requires='>=3.6.0', + python_requires='>=3.5.0', install_requires=[ - 'tensorflow==1.10.0', + 'tensorflow==1.9.0', 'pyrouge==0.1.3' ], packages=find_packages(exclude=["*.tests", "*.tests.*", "tests.*", "tests"]), diff --git a/train.sh b/train.sh new file mode 100644 index 0000000..eeb04b9 --- /dev/null +++ b/train.sh @@ -0,0 +1,23 @@ +#!/usr/bin/env bash +set -x #echo on + +DATE=`date '+%Y%m%d_%H%M%S'` +MAX_STEP=400 +BUCKET_NAME="mybucket" +MODEL_NAME="mymodel" +JOB_NAME="${MODEL_NAME}_${DATE}" +JOB_DIR="gs://${BUCKET_NAME}/models/${MODEL_NAME}" +DATA_PATH="gs://${BUCKET_NAME}/data/finished_files/chunked/train_*" +VOCAB_PATH="gs://${BUCKET_NAME}/data/finished_files/vocab" + + +gcloud ml-engine jobs submit training ${JOB_NAME} \ + --module-name trainer.task \ + --package-path trainer/ \ + --job-dir ${JOB_DIR} \ + --config config.yaml \ + -- \ + --mode train \ + --data_path ${DATA_PATH} \ + --vocab_path ${VOCAB_PATH} \ + --max_step ${MAX_STEP} diff --git a/train_local.sh b/train_local.sh new file mode 100644 index 0000000..3e14697 --- /dev/null +++ b/train_local.sh @@ -0,0 +1,16 @@ +#!/usr/bin/env bash +set -x #echo on + +source ./venv/bin/activate + +gcloud ml-engine local train \ + --module-name trainer.task \ + --package-path trainer/ \ + --job-dir /Users/me/googledrive/suma/log/huat \ + -- \ + --mode train \ + --data_path "/Users/my/googledrive/suma/finished_files/chunked/train_*" \ + --vocab_path /Users/my/googledrive/suma/finished_files/vocab \ + --max_step 30 + +deactivate diff --git a/trainer/data.py b/trainer/data.py index 1d11f86..5cd7128 100644 --- a/trainer/data.py +++ b/trainer/data.py @@ -16,12 +16,13 @@ """This file contains code to read the train/eval/test data from file and process it, and read the vocab data from file and process it""" -import glob +from tensorflow.gfile import Glob import random import struct import csv from tensorflow.core.example import example_pb2 from tensorflow import logging as log +from tensorflow.python.lib.io import file_io # and are used in the data files to segment the abstracts into sentences. They don't receive vocab ids. SENTENCE_START = '' @@ -56,7 +57,7 @@ def __init__(self, vocab_file, max_size): self._count += 1 # Read the vocab file and add words up to max_size - with open(vocab_file, 'r', encoding='utf8') as vocab_f: + with file_io.FileIO(vocab_file, 'r') as vocab_f: for line in vocab_f: pieces = line.split() if len(pieces) != 2: @@ -102,8 +103,8 @@ def write_metadata(self, fpath): Args: fpath: place to write the metadata file """ - log.info("Writing word embedding metadata file to %s..." % (fpath)) - with open(fpath, "w", encoding='utf8') as f: + log.info("Writing word embedding metadata file to %s..." % fpath) + with file_io.FileIO(fpath, "w") as f: fieldnames = ['word'] writer = csv.DictWriter(f, delimiter="\t", fieldnames=fieldnames) for i in range(self.size()): @@ -127,20 +128,20 @@ def example_generator(data_path, single_pass): Deserialized tf.Example. """ while True: - filelist = glob.glob(data_path) # get the list of datafiles + filelist = Glob(data_path) # get the list of datafiles assert filelist, ('Error: Empty filelist at %s' % data_path) # check filelist isn't empty if single_pass: filelist = sorted(filelist) else: random.shuffle(filelist) for f in filelist: - reader = open(f, 'rb') - while True: - len_bytes = reader.read(8) - if not len_bytes: break # finished reading this file - str_len = struct.unpack('q', len_bytes)[0] - example_str = struct.unpack('%ds' % str_len, reader.read(str_len))[0] - yield example_pb2.Example.FromString(example_str) + with file_io.FileIO(f, 'rb') as reader: + while True: + len_bytes = reader.read(8) + if not len_bytes: break # finished reading this file + str_len = struct.unpack('q', len_bytes)[0] + example_str = struct.unpack('%ds' % str_len, reader.read(str_len))[0] + yield example_pb2.Example.FromString(example_str) if single_pass: log.info("example_generator completed reading all datafiles. No more data.") break diff --git a/trainer/decode.py b/trainer/decode.py index 6624a17..af26f27 100644 --- a/trainer/decode.py +++ b/trainer/decode.py @@ -28,6 +28,7 @@ import trainer.util as util import logging from tensorflow.python.estimator.model_fn import ModeKeys as Modes +from tensorflow.python.lib.io import file_io SECS_UNTIL_NEW_CKPT = 60 # max number of seconds before loading new checkpoint @@ -192,10 +193,10 @@ def write_for_rouge(self, reference_sents, decoded_words, ex_index): ref_file = os.path.join(self._rouge_ref_dir, "%06d_reference.txt" % ex_index) decoded_file = os.path.join(self._rouge_dec_dir, "%06d_decoded.txt" % ex_index) - with open(ref_file, "w") as f: + with file_io.FileIO(ref_file, "w") as f: for idx, sent in enumerate(reference_sents): f.write(sent) if idx == len(reference_sents) - 1 else f.write(sent + "\n") - with open(decoded_file, "w") as f: + with file_io.FileIO(decoded_file, "w") as f: for idx, sent in enumerate(decoded_sents): f.write(sent) if idx == len(decoded_sents) - 1 else f.write(sent + "\n") @@ -251,7 +252,7 @@ def rouge_log(results_dict, dir_to_write): log.info(log_str) # log to screen results_file = os.path.join(dir_to_write, "ROUGE_results.txt") log.info("Writing final ROUGE results to %s...", results_file) - with open(results_file, "w") as f: + with file_io.FileIO(results_file, "w") as f: f.write(log_str) diff --git a/trainer/model.py b/trainer/model.py index ebc0b4f..8e5bea4 100644 --- a/trainer/model.py +++ b/trainer/model.py @@ -356,18 +356,17 @@ def build_graph(self): """Add the placeholders, model, global step, train_op and summaries to the graph""" log.info('Building graph...') t0 = time.time() - with tf.device(tf.train.replica_device_setter(cluster=self._conf.cluster_spec)): - self._add_placeholders() - self._add_seq2seq() - self.global_step = tf.get_variable( - 'global_step', - dtype=tf.int32, - initializer=tf.constant(0), - trainable=False - ) - if self._mode == Modes.TRAIN: - self._add_train_op() - self._summaries = tf.summary.merge_all() + self._add_placeholders() + self._add_seq2seq() + self.global_step = tf.get_variable( + 'global_step', + dtype=tf.int32, + initializer=tf.constant(0), + trainable=False + ) + if self._mode == Modes.TRAIN: + self._add_train_op() + self._summaries = tf.summary.merge_all() t1 = time.time() log.info('Time to build graph: %i seconds', t1 - t0) diff --git a/trainer/my_model.py b/trainer/my_model.py index c0d9db2..218885f 100644 --- a/trainer/my_model.py +++ b/trainer/my_model.py @@ -6,7 +6,7 @@ def generate_model_fn(): def _model_fn(features, labels, mode, params, config): modes = [Modes.TRAIN, Modes.EVAL, Modes.PREDICT] if mode not in modes: - raise ValueError(f'mode must be one of {repr(modes)} but mode={mode}') + raise ValueError('mode must be one of {} but mode={}'.format(repr(modes), mode)) if mode == Modes.TRAIN: loss = 0 # TODO filler grads = [0] # TODO filler diff --git a/trainer/task.py b/trainer/task.py index c53edbc..e49515e 100644 --- a/trainer/task.py +++ b/trainer/task.py @@ -140,17 +140,21 @@ def setup_training( def __train_session(train_dir, debug, conf): + log.info('RunConfig is_chief={}, master={}, task_id={}'.format( + repr(conf.is_chief), + repr(conf.master), + repr(conf.task_id))) sess = tf.train.MonitoredTrainingSession( checkpoint_dir=train_dir, # required to restore variables! - summary_dir=train_dir, + master=conf.master, is_chief=conf.is_chief, save_summaries_secs=60, - save_checkpoint_secs=60, - max_wait_secs=60, + save_checkpoint_secs=conf.save_checkpoints_secs, + max_wait_secs=120, stop_grace_period_secs=60, config=conf.session_config, scaffold=tf.train.Scaffold( - saver=tf.train.Saver(max_to_keep=3) + saver=tf.train.Saver(max_to_keep=conf.keep_checkpoint_max) ) ) if debug: # start the tensorflow debugger @@ -164,6 +168,7 @@ def run_training(model, batcher, train_dir, coverage, debug, max_step, conf): log.debug("starting run_training") summary_writer = tf.summary.FileWriterCache.get(train_dir) with __train_session(train_dir=train_dir, debug=debug, conf=conf) as sess: + log.debug('after MonitoredTrainingSession') train_step = 0 # repeats until max_step is reached while not sess.should_stop() and train_step <= max_step: @@ -172,6 +177,7 @@ def run_training(model, batcher, train_dir, coverage, debug, max_step, conf): t0 = time.time() results = model.run_train_step(sess, batch) t1 = time.time() + log.debug('after session.run') loss = results['loss'] if not np.isfinite(loss): raise Exception("Loss is not finite. Stopping.") @@ -260,30 +266,18 @@ def __log_verbosity(level): log.set_verbosity(log.DEBUG) -def __log_root(base_path, exp_name, mode): - """Change log_root to FLAGS.log_root/FLAGS.exp_name and create the dir if necessary""" - res = os.path.join(base_path, exp_name) - if not os.path.exists(res): - if mode == "train": - os.makedirs(res) - else: - raise Exception("Logdir %s doesn't exist. Run in train mode to create it." % res) - return res - - def __main( + job_dir, mode, data_path, vocab_path, vocab_size, - log_root, beam_size, pointer_gen, convert_to_coverage_model, coverage, restore_best_model, log_level, - exp_name, single_pass, random_seed, debug, @@ -291,11 +285,10 @@ def __main( ): __log_verbosity(log_level) log.info('Starting seq2seq_attention in %s mode...', mode) - log_root = __log_root(log_root, exp_name, mode) vocab = Vocab(vocab_path, vocab_size) # create a vocabulary hps = __hparams(**hparams) - conf = util.run_config(model_dir=log_root, random_seed=random_seed) - log.info(f'hps={repr(hps)}\nconf={util.repr_run_config(conf)}') + conf = util.run_config(model_dir=job_dir, random_seed=random_seed) + log.info('hps={}\nconf={}'.format(repr(hps), util.repr_run_config(conf))) # Create a batcher object that will create minibatches of data batcher = Batcher( @@ -375,15 +368,11 @@ def __main( and log the results to screen, indefinitely.\ """) parser.add_argument( - '--log_root', - type=str, - required=True, - help='Root directory for all logging.') - parser.add_argument( - '--exp_name', + '--job-dir', type=str, - default='default_exp_name', - help='Name for experiment. Logs will be saved in a directory with this name, under log_root.') + help='GCS location to write checkpoints and export models', + required=True + ) parser.add_argument( '--hidden_dim', type=int, @@ -521,9 +510,9 @@ def __main( args = parser.parse_args() modes = [Modes.TRAIN, Modes.EVAL, Modes.PREDICT] if args.mode not in modes: - raise ValueError(f'--mode flag must be one of {repr(modes)}') + raise ValueError('--mode flag must be one of {}'.format(repr(modes))) if args.single_pass and args.mode != Modes.PREDICT: - raise ValueError(f'--single_pass flag should only be True in {repr(Modes.PREDICT)} mode') + raise ValueError('--single_pass flag should only be True in {} mode'.format(repr(Modes.PREDICT))) # If in decode mode, set batch_size = beam_size # Reason: in decode mode, we decode one example at a time. # On each step, we have beam_size-many hypotheses in the beam, diff --git a/trainer/util.py b/trainer/util.py index 7a01db4..dc38c80 100644 --- a/trainer/util.py +++ b/trainer/util.py @@ -48,34 +48,6 @@ def __tf_config_json(): return json.loads(conf) -def __tf_config(): # TODO remove - """Parse TF_CONFIG to cluster_spec - TF_CONFIG environment variable is available when running using - gcloud either locally or on cloud. It has all the information required - to create a ClusterSpec which is important for running distributed code. - """ - config_json = __tf_config_json() - res = { - 'cluster_spec': None, - 'server': None, - 'is_chief': True - } - if config_json is None: - return res - cluster = config_json.get('cluster') - job_name = config_json.get('task', {}).get('type') - task_index = config_json.get('task', {}).get('index') - # If cluster information is empty run local - if job_name is None or task_index is None: - return res - res['cluster_spec'] = tf.train.ClusterSpec(cluster) - res['server'] = tf.train.Server(res['cluster_spec'], - job_name=job_name, - task_index=task_index) - res['is_chief'] = (job_name == 'master') - return res - - def __session_config(): """Returns a tf.ConfigProto instance that has appropriate device_filters set. """ @@ -98,6 +70,7 @@ def __session_config(): allow_soft_placement=True, log_device_placement=False, device_filters=device_filters, + operation_timeout_in_ms=120000, gpu_options=tf.GPUOptions( allow_growth=True ) @@ -117,24 +90,43 @@ def run_config(model_dir, random_seed): def repr_run_config(conf): assert isinstance(conf, tf.estimator.RunConfig) - return f"""tf.estimator.RunConfig( - model_dir={repr(conf.model_dir)}, - cluster_spec={repr(conf.cluster_spec.as_dict())}, - is_chief={repr(conf.is_chief)}, - master={repr(conf.master)}, - num_worker_replicas={repr(conf.num_worker_replicas)}, - num_ps_replicas={repr(conf.num_ps_replicas)}, - task_id={repr(conf.task_id)}, - task_type={repr(conf.task_type)}, - tf_random_seed={repr(conf.tf_random_seed)}, - save_summary_steps={repr(conf.save_summary_steps)}, - save_checkpoints_steps={repr(conf.save_checkpoints_steps)}, - save_checkpoints_secs={repr(conf.save_checkpoints_secs)}, - session_config={repr(conf.session_config)}, - keep_checkpoint_max={repr(conf.keep_checkpoint_max)}, - keep_checkpoint_every_n_hours={repr(conf.keep_checkpoint_every_n_hours)}, - log_step_count_steps={repr(conf.log_step_count_steps)}, - train_distribute={repr(conf.train_distribute)}, - device_fn={repr(conf.device_fn)} + return """tf.estimator.RunConfig( + model_dir={}, + cluster_spec={}, + is_chief={}, + master={}, + num_worker_replicas={}, + num_ps_replicas={}, + task_id={}, + task_type={}, + tf_random_seed={}, + save_summary_steps={}, + save_checkpoints_steps={}, + save_checkpoints_secs={}, + session_config={}, + keep_checkpoint_max={}, + keep_checkpoint_every_n_hours={}, + log_step_count_steps={}, + train_distribute={}, + device_fn={} + ) + """.format( + repr(conf.model_dir), + repr(conf.cluster_spec.as_dict()), + repr(conf.is_chief), + repr(conf.master), + repr(conf.num_worker_replicas), + repr(conf.num_ps_replicas), + repr(conf.task_id), + repr(conf.task_type), + repr(conf.tf_random_seed), + repr(conf.save_summary_steps), + repr(conf.save_checkpoints_steps), + repr(conf.save_checkpoints_secs), + repr(conf.session_config), + repr(conf.keep_checkpoint_max), + repr(conf.keep_checkpoint_every_n_hours), + repr(conf.log_step_count_steps), + repr(conf.train_distribute), + repr(conf.device_fn) ) - """