From cc1aaa04e3b72ff831a8d5b104fa3636f5dce672 Mon Sep 17 00:00:00 2001 From: David Haley Date: Sun, 7 Jul 2024 23:09:00 -0700 Subject: [PATCH 1/5] Add gs-fastcopy to requirements --- requirements.txt | 1 + 1 file changed, 1 insertion(+) diff --git a/requirements.txt b/requirements.txt index 3a7d026..335fdd4 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,6 +1,7 @@ deepcell==0.12.9 google-cloud-bigquery google-cloud-notebooks +gs-fastcopy imagecodecs numpy protobuf==3.20.3 From d41e62949cd6cea762c6f083572221e334bc6780 Mon Sep 17 00:00:00 2001 From: David Haley Date: Sun, 7 Jul 2024 23:09:25 -0700 Subject: [PATCH 2/5] Reformat --- benchmarking/deepcell-e2e/benchmark.py | 12 ++++++---- deepcell_imaging/benchmark_utils.py | 12 +++++++--- deepcell_imaging/gcloud_storage_utils.py | 19 ++++++++-------- deepcell_imaging/mesmer_app.py | 27 ++++++++++++----------- deepcell_imaging/mesmer_no_postprocess.py | 8 ++++--- deepcell_imaging/numpy_utils.py | 4 ++-- run-sample.py | 8 +++++-- scripts/gather-benchmark.py | 12 ++++++++-- scripts/postprocess.py | 13 ++++++----- scripts/predict.py | 17 +++++++++----- scripts/preprocess.py | 9 ++++---- scripts/visualize.py | 4 +--- 12 files changed, 87 insertions(+), 58 deletions(-) diff --git a/benchmarking/deepcell-e2e/benchmark.py b/benchmarking/deepcell-e2e/benchmark.py index 9d5487b..78df3c7 100644 --- a/benchmarking/deepcell-e2e/benchmark.py +++ b/benchmarking/deepcell-e2e/benchmark.py @@ -228,10 +228,14 @@ # smart_open doesn't support seeking on GCP, which tifffile uses. if output_path.startswith("gs://"): - with gcloud_storage_utils.writer("%s/predictions.tiff" % output_path) as predictions_tiff_file: + with gcloud_storage_utils.writer( + "%s/predictions.tiff" % output_path + ) as predictions_tiff_file: tifffile.imwrite(predictions_tiff_file, segmentation_predictions) else: - with smart_open.open("%s/predictions.tiff" % output_path, "wb") as predictions_tiff_file: + with smart_open.open( + "%s/predictions.tiff" % output_path, "wb" + ) as predictions_tiff_file: tifffile.imwrite(predictions_tiff_file, segmentation_predictions) if visualize_input or visualize_predictions: @@ -265,7 +269,7 @@ # The rgb values are 0..1, so normalize to 0..255 im = Image.fromarray((overlay_data * 255).astype(np.uint8)) with smart_open.open( - "%s/predictions.png" % output_path, "wb" + "%s/predictions.png" % output_path, "wb" ) as predictions_png_file: im.save(predictions_png_file, mode="RGB") @@ -408,7 +412,7 @@ def get_compute_engine_machine_type(): peak_mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss prediction_overhead_s = ( - prediction_time_s - preprocess_time_s - inference_time_s - postprocess_time_s + prediction_time_s - preprocess_time_s - inference_time_s - postprocess_time_s ) if gpu_count == 0: diff --git a/deepcell_imaging/benchmark_utils.py b/deepcell_imaging/benchmark_utils.py index 881f1c9..af0a658 100644 --- a/deepcell_imaging/benchmark_utils.py +++ b/deepcell_imaging/benchmark_utils.py @@ -41,7 +41,9 @@ def get_gce_region(): def get_gce_is_preemptible(): # Call the metadata server. try: - metadata_server = "http://metadata/computeMetadata/v1/instance/scheduling/preemptible" + metadata_server = ( + "http://metadata/computeMetadata/v1/instance/scheduling/preemptible" + ) metadata_flavor = {"Metadata-Flavor": "Google"} # This comes back like this: projects/1234567890/machineTypes/n2-standard-8 @@ -49,7 +51,9 @@ def get_gce_is_preemptible(): return preemptible_str.lower() == "true" except Exception as e: exception_string = traceback.format_exc() - logging.warning("Error getting preemptible, assuming false. Error: " + exception_string) + logging.warning( + "Error getting preemptible, assuming false. Error: " + exception_string + ) return False @@ -58,7 +62,9 @@ def get_gpu_info(): import tensorflow as tf gpu_devices = tf.config.experimental.list_physical_devices("GPU") - gpu_details = [tf.config.experimental.get_device_details(gpu) for gpu in gpu_devices] + gpu_details = [ + tf.config.experimental.get_device_details(gpu) for gpu in gpu_devices + ] gpus_by_name = { k: list(v) for k, v in groupby(gpu_details, key=lambda x: x["device_name"]) diff --git a/deepcell_imaging/gcloud_storage_utils.py b/deepcell_imaging/gcloud_storage_utils.py index 3a794e6..73325f5 100644 --- a/deepcell_imaging/gcloud_storage_utils.py +++ b/deepcell_imaging/gcloud_storage_utils.py @@ -1,4 +1,3 @@ - from contextlib import contextmanager import os import subprocess @@ -21,10 +20,10 @@ def reader(gs_uri): will be closed and the temporary directory will be deleted. """ with tempfile.TemporaryDirectory() as tmp: - if gs_uri.endswith('.gz'): - path = os.path.join(tmp, 'downloaded_file.gz') + if gs_uri.endswith(".gz"): + path = os.path.join(tmp, "downloaded_file.gz") else: - path = os.path.join(tmp, 'downloaded_file') + path = os.path.join(tmp, "downloaded_file") # Transfer the file. # TODO: handle errors @@ -33,11 +32,11 @@ def reader(gs_uri): # If necessary, decompress the file before reading. # unpigz is a parallel gunzip implementation that's # much faster when hardware is available. - if path.endswith('.gz'): + if path.endswith(".gz"): subprocess.run(["unpigz", path]) path = path[:-3] - with open(path, 'rb') as f: + with open(path, "rb") as f: yield f @@ -47,19 +46,19 @@ def writer(gs_uri): # Will be deleted when the 'with' closes. with tempfile.TemporaryDirectory() as tmp_dir: # We need an actual filename within the scratch directory. - buffer_file_name = os.path.join(tmp_dir, 'file_to_upload') + buffer_file_name = os.path.join(tmp_dir, "file_to_upload") # Yield the file object for the caller to write. - with open(buffer_file_name, 'wb') as tmp_file: + with open(buffer_file_name, "wb") as tmp_file: yield tmp_file # If requested, compress the file before uploading. # pigz is a parallel gzip implementation that's # much faster than numpy's savez_compressed. - if gs_uri.endswith('.gz'): + if gs_uri.endswith(".gz"): # TODO: handle errors subprocess.run(["pigz", buffer_file_name]) - buffer_file_name += '.gz' + buffer_file_name += ".gz" # TODO: handle errors subprocess.run(["gcloud", "storage", "cp", buffer_file_name, gs_uri]) diff --git a/deepcell_imaging/mesmer_app.py b/deepcell_imaging/mesmer_app.py index 136a73e..4f5c0de 100644 --- a/deepcell_imaging/mesmer_app.py +++ b/deepcell_imaging/mesmer_app.py @@ -99,7 +99,7 @@ def preprocess_image(model_input_shape, image, image_mpp): def predict(model, image, batch_size): logger = logging.getLogger(__name__) model_image_shape = model.input_shape[1:] - pad_mode = 'constant' + pad_mode = "constant" # TODO: we need to validate the input. But what validations? @@ -108,12 +108,8 @@ def predict(model, image, batch_size): # Run images through model t = timeit.default_timer() - output_tiles = batch_predict( - model=model, tiles=tiles, batch_size=batch_size - ) - logger.debug( - "Model prediction finished in %s s", timeit.default_timer() - t - ) + output_tiles = batch_predict(model=model, tiles=tiles, batch_size=batch_size) + logger.debug("Model prediction finished in %s s", timeit.default_timer() - t) # Untile images output_images = _untile_output(output_tiles, tiles_info, model_image_shape) @@ -122,7 +118,13 @@ def predict(model, image, batch_size): return format_output_mesmer(output_images) -def postprocess(output_images, input_shape, compartment="whole-cell", whole_cell_kwargs={}, nuclear_kwargs={}): +def postprocess( + output_images, + input_shape, + compartment="whole-cell", + whole_cell_kwargs={}, + nuclear_kwargs={}, +): logger = logging.getLogger(__name__) # TODO: We need to validate the input (the output_images parameter) @@ -179,7 +181,7 @@ def postprocess(output_images, input_shape, compartment="whole-cell", whole_cell "Post-processed results with %s in %s s", mesmer_postprocess.__name__, timeit.default_timer() - t, - ) + ) # Resize label_image back to original resolution if necessary return _resize_output(label_image, input_shape) @@ -346,7 +348,7 @@ def format_output_mesmer(output_list): def mesmer_postprocess( - model_output, compartment="whole-cell", whole_cell_kwargs=None, nuclear_kwargs=None + model_output, compartment="whole-cell", whole_cell_kwargs=None, nuclear_kwargs=None ): """Postprocess Mesmer output to generate predictions for distinct cellular compartments @@ -414,7 +416,7 @@ def batch_predict(model, tiles, batch_size): # loop through each batch for i in range(0, tiles.shape[0], batch_size): - batch_inputs = tiles[i: i + batch_size, ...] + batch_inputs = tiles[i : i + batch_size, ...] batch_outputs = model.predict(batch_inputs, batch_size=batch_size) @@ -430,7 +432,6 @@ def batch_predict(model, tiles, batch_size): # save each batch to corresponding index in output list for j, batch_out in enumerate(batch_outputs): - output_tiles[j][i: i + batch_size, ...] = batch_out + output_tiles[j][i : i + batch_size, ...] = batch_out return output_tiles - diff --git a/deepcell_imaging/mesmer_no_postprocess.py b/deepcell_imaging/mesmer_no_postprocess.py index b41d457..e52b7b2 100644 --- a/deepcell_imaging/mesmer_no_postprocess.py +++ b/deepcell_imaging/mesmer_no_postprocess.py @@ -1,9 +1,10 @@ - from deepcell.applications import Application, Mesmer from deepcell.applications.mesmer import mesmer_preprocess, format_output_mesmer + def noop(model_output, *args, **kwargs): - return model_output['whole-cell'][-1] + return model_output["whole-cell"][-1] + class MesmerNoPostprocess(Mesmer): def __init__(self, model): @@ -20,4 +21,5 @@ def __init__(self, model): postprocessing_fn=noop, format_model_output_fn=format_output_mesmer, dataset_metadata=Mesmer.dataset_metadata, - model_metadata=Mesmer.model_metadata) \ No newline at end of file + model_metadata=Mesmer.model_metadata, + ) diff --git a/deepcell_imaging/numpy_utils.py b/deepcell_imaging/numpy_utils.py index 69d7fe5..fafd922 100644 --- a/deepcell_imaging/numpy_utils.py +++ b/deepcell_imaging/numpy_utils.py @@ -9,9 +9,9 @@ def npz_headers(npz): """Takes a path to an .npz file, which is a Zip archive of .npy files. Generates a sequence of (name, shape, np.dtype). """ - with zipfile.ZipFile(smart_open.open(npz, mode='rb')) as archive: + with zipfile.ZipFile(smart_open.open(npz, mode="rb")) as archive: for name in archive.namelist(): - if not name.endswith('.npy'): + if not name.endswith(".npy"): continue npy = archive.open(name) diff --git a/run-sample.py b/run-sample.py index b56ea99..37f30bb 100755 --- a/run-sample.py +++ b/run-sample.py @@ -15,9 +15,13 @@ input_channels = loader["input_channels"] model = tf.keras.models.load_model(mesmer_app.model_path) -preprocessed_image = mesmer_app.preprocess_image(model.input_shape, input_channels[np.newaxis, ...], image_mpp=None) +preprocessed_image = mesmer_app.preprocess_image( + model.input_shape, input_channels[np.newaxis, ...], image_mpp=None +) inferred_images = mesmer_app.predict(model, preprocessed_image, batch_size=4) -predictions = mesmer_app.postprocess(inferred_images, input_channels[np.newaxis, ...].shape, compartment='whole-cell') +predictions = mesmer_app.postprocess( + inferred_images, input_channels[np.newaxis, ...].shape, compartment="whole-cell" +) print(input_channels.shape) print(predictions.shape) diff --git a/scripts/gather-benchmark.py b/scripts/gather-benchmark.py index 9bc05cd..fb11749 100755 --- a/scripts/gather-benchmark.py +++ b/scripts/gather-benchmark.py @@ -62,7 +62,11 @@ t = timeit.default_timer() -for data_uri in [preprocess_benchmarking_uri, prediction_benchmarking_uri, postprocess_benchmarking_uri]: +for data_uri in [ + preprocess_benchmarking_uri, + prediction_benchmarking_uri, + postprocess_benchmarking_uri, +]: with smart_open.open(data_uri, "r") as data_file: data = json.load(data_file) benchmarking_data.update(data) @@ -72,7 +76,11 @@ print("Loaded benchmarking data in %s s" % data_load_time_s) # Update the overall success to the logical AND of the individual steps -benchmarking_data['success'] = benchmarking_data['preprocessing_success'] and benchmarking_data['prediction_success'] and benchmarking_data['postprocessing_success'] +benchmarking_data["success"] = ( + benchmarking_data["preprocessing_success"] + and benchmarking_data["prediction_success"] + and benchmarking_data["postprocessing_success"] +) print("Sending data to BigQuery") diff --git a/scripts/postprocess.py b/scripts/postprocess.py index e1d319e..feb4c8f 100755 --- a/scripts/postprocess.py +++ b/scripts/postprocess.py @@ -77,8 +77,8 @@ with np.load(raw_predictions_file) as loader: # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels raw_predictions = { - 'whole-cell': [loader["arr_0"], loader["arr_1"]], - 'nuclear': [loader["arr_2"], loader["arr_3"]], + "whole-cell": [loader["arr_0"], loader["arr_1"]], + "nuclear": [loader["arr_2"], loader["arr_3"]], } raw_predictions_load_time_s = timeit.default_timer() - t @@ -90,9 +90,7 @@ t = timeit.default_timer() try: segmentation = mesmer_app.postprocess( - raw_predictions, - (1, input_rows, input_cols, 2), - compartment=compartment + raw_predictions, (1, input_rows, input_cols, 2), compartment=compartment ) success = True except Exception as e: @@ -101,7 +99,10 @@ postprocessing_time_s = timeit.default_timer() - t -print("Postprocessed raw predictions in %s s; success: %s" % (round(postprocessing_time_s, 2), success)) +print( + "Postprocessed raw predictions in %s s; success: %s" + % (round(postprocessing_time_s, 2), success) +) if success: print("Saving postprocessed output to %s" % output_uri) diff --git a/scripts/predict.py b/scripts/predict.py index 428f1e7..2a7100d 100755 --- a/scripts/predict.py +++ b/scripts/predict.py @@ -14,7 +14,12 @@ """ import argparse -from deepcell_imaging import benchmark_utils, cached_open, gcloud_storage_utils, mesmer_app +from deepcell_imaging import ( + benchmark_utils, + cached_open, + gcloud_storage_utils, + mesmer_app, +) import json import numpy as np import os @@ -35,7 +40,7 @@ help="Optional integer representing batch size to use for prediction. Default is 16.", type=int, required=False, - default=16 + default=16, ) parser.add_argument( "--output_uri", @@ -119,10 +124,10 @@ with gcloud_storage_utils.writer(output_uri) as output_writer: np.savez( output_writer, - arr_0=model_output['whole-cell'][0], - arr_1=model_output['whole-cell'][1], - arr_2=model_output['nuclear'][0], - arr_3=model_output['nuclear'][1], + arr_0=model_output["whole-cell"][0], + arr_1=model_output["whole-cell"][1], + arr_2=model_output["nuclear"][0], + arr_3=model_output["nuclear"][1], ) output_time_s = timeit.default_timer() - t diff --git a/scripts/preprocess.py b/scripts/preprocess.py index e127c9f..4f7d610 100755 --- a/scripts/preprocess.py +++ b/scripts/preprocess.py @@ -77,9 +77,7 @@ try: preprocessed_image = mesmer_app.preprocess_image( - model_input_shape, - input_channels[np.newaxis, ...], - image_mpp=image_mpp + model_input_shape, input_channels[np.newaxis, ...], image_mpp=image_mpp ) success = True except Exception as e: @@ -87,7 +85,10 @@ print("Preprocessing failed with error: %s" % e) preprocessing_time_s = timeit.default_timer() - t -print("Preprocessed input in %s s; success: %s" % (round(preprocessing_time_s, 2), success)) +print( + "Preprocessed input in %s s; success: %s" + % (round(preprocessing_time_s, 2), success) +) if success: print("Saving preprocessing output to %s" % output_uri) diff --git a/scripts/visualize.py b/scripts/visualize.py index 89781df..c98f0f8 100755 --- a/scripts/visualize.py +++ b/scripts/visualize.py @@ -111,9 +111,7 @@ # The rgb values are 0..1, so normalize to 0..255 im = Image.fromarray((overlay_data * 255).astype(np.uint8)) -with smart_open.open( - visualized_predictions_uri, "wb" -) as predictions_png_file: +with smart_open.open(visualized_predictions_uri, "wb") as predictions_png_file: im.save(predictions_png_file, mode="RGB") predictions_render_time_s = timeit.default_timer() - t From ec719c8393677eb27d1b521c3dd060ab46255ba4 Mon Sep 17 00:00:00 2001 From: David Haley Date: Sun, 7 Jul 2024 23:15:31 -0700 Subject: [PATCH 3/5] Add main guards to script files --- benchmarking/deepcell-e2e/benchmark.py | 901 +++++++++++++------------ scripts/gather-benchmark.py | 186 ++--- scripts/postprocess.py | 229 ++++--- scripts/predict.py | 234 +++---- scripts/preprocess.py | 220 +++--- scripts/visualize.py | 210 +++--- 6 files changed, 1008 insertions(+), 972 deletions(-) diff --git a/benchmarking/deepcell-e2e/benchmark.py b/benchmarking/deepcell-e2e/benchmark.py index 78df3c7..a18f0ea 100644 --- a/benchmarking/deepcell-e2e/benchmark.py +++ b/benchmarking/deepcell-e2e/benchmark.py @@ -24,459 +24,468 @@ BIGQUERY_RESULTS_TABLE = "deepcell-401920.benchmarking.results_batch" -parser = argparse.ArgumentParser("benchmark") -parser.add_argument( - "--input_channels_path", - help="Path to the input channels npz file", - type=str, - required=True, -) -parser.add_argument( - "--prediction_compartment", - help="The compartment to predict: whole-cell (default), nuclear, both", - type=str, - default="whole-cell", -) -parser.add_argument( - "--batch_size", - help="How many tiles are predicted at once.", - type=int, - default=4, -) -parser.add_argument( - "--model_path", - help="Path to the model archive", - type=str, - default="gs://davids-genomics-data-public/cellular-segmentation/deep-cell/vanvalenlab-tf-model-multiplex-downloaded-20230706/MultiplexSegmentation.tar.gz", -) -parser.add_argument( - "--model_hash", - help="Hash of the model archive", - type=str, - default="a1dfbce2594f927b9112f23a0a1739e0", -) -parser.add_argument( - "--model_extract_directory", - help="The directory name the archive extracts to", - type=str, - default="MultiplexSegmentation", -) -parser.add_argument( - "--output_path", - help="If set, base path for predictions & other output files.", - type=str, - default=None, -) -parser.add_argument( - "--output_tiff", - help="If true, write the predictions as predictions.tiff in the output path.", - action="store_true", -) -parser.add_argument( - "--visualize_input", - help="If true, visualize the input as input.png in the output path.", - action="store_true", -) -parser.add_argument( - "--visualize_predictions", - help="If true, visualize the predictions as predictions.png in the output path.", - action="store_true", -) -parser.add_argument( - "--provisioning_model", - help="The model provisioning method", - type=str, - required=True, -) -parser.add_argument( - "--bigquery_table", - help="The BigQuery table to write results to", - type=str, - default=BIGQUERY_RESULTS_TABLE, -) - -args = parser.parse_args() - -input_channels_path = args.input_channels_path -prediction_compartment = args.prediction_compartment -batch_size = args.batch_size -model_remote_path = args.model_path -model_hash = args.model_hash -model_extract_directory = args.model_extract_directory -output_path = args.output_path or "" -output_path = output_path.rstrip("/") # remove trailing slashes -output_tiff = args.output_tiff -visualize_input = args.visualize_input -visualize_predictions = args.visualize_predictions -provisioning_model = args.provisioning_model -bigquery_table = args.bigquery_table - -if (visualize_input or visualize_predictions or output_tiff) and not output_path: - raise ValueError("Can't output/visualize without an output path") - -# Import these here, to speed up startup & arg parsing -import deepcell -from deepcell.applications import Mesmer -import tensorflow as tf - -# The local cache location -model_path = os.path.expanduser("~") + "/.keras/models/%s" % model_extract_directory - -# Model warm-up - -logger = logging.getLogger() -old_level = logger.getEffectiveLevel() -logger.setLevel(logging.INFO) - -downloaded_file_path = cached_open.get_file( - "MultiplexSegmentation.tgz", - model_remote_path, - file_hash=model_hash, - extract=True, - cache_subdir="models", -) -# Remove the .tgz extension to get the model directory path -model_path = os.path.splitext(downloaded_file_path)[0] - -logging.info("Loading model from {}.".format(model_path)) -t = timeit.default_timer() -model = tf.keras.models.load_model(model_path) -logging.info("Loaded model in %s s" % (timeit.default_timer() - t)) -app = Mesmer(model=model) - -# Need to reset top-level logging for intercept to work. -# I dunno 🤷🏻‍♂️ There's probably a better way to do this... -logger.setLevel(old_level) -logging.basicConfig(force=True) -# %% md -# End-to-end Prediction - -# %% -start_time = timeit.default_timer() - -# Load inputs - -t = timeit.default_timer() -with smart_open.open(input_channels_path, "rb") as input_channel_file: - with np.load(input_channel_file) as loader: - # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels - input_channels = loader["input_channels"] -input_load_time_s = timeit.default_timer() - t - -print("Loaded input in %s s" % input_load_time_s) - -# Generate predictions - -## Intercept log (many shenanigans, such hacking) -logger = logging.getLogger() -old_level = logger.getEffectiveLevel() -logger.setLevel(logging.DEBUG) -logs_buffer = io.StringIO() -buffer_log_handler = logging.StreamHandler(logs_buffer) -logger.addHandler(buffer_log_handler) - -## The actual prediction - -t = timeit.default_timer() - -try: - # We're only predicting for 1 image, so extract the 1 image's predictions - segmentation_predictions = app.predict( - input_channels[np.newaxis, ...], - image_mpp=0.5, - compartment=prediction_compartment, - batch_size=batch_size, - )[0] - prediction_success = True - -except Exception as e: - # The exception is nom-nom'd. Safe? We'll see 🤔 - prediction_success = False - logger.error("Prediction exception: %s", e) - -prediction_time_s = timeit.default_timer() - t -total_time_s = timeit.default_timer() - start_time - -## Undo log intercept -logger.removeHandler(buffer_log_handler) -logger.setLevel(old_level) - -## Wrap up -print("Prediction finished in %s s" % prediction_time_s) -print("Overall operation finished in %s s" % total_time_s) -# %% -# Parse the intercepted debug logs to extract step timing. - -debug_logs = logs_buffer.getvalue() -pattern = r"(?sm)Pre-processed data with mesmer_preprocess in (.+?) s.*Model inference finished in (.+?) s.*Post-processed results with mesmer_postprocess in (.+?) s" -match = re.search(re.compile(pattern, re.MULTILINE), debug_logs) - -if match: - preprocess_time_s = float(match.group(1)) - inference_time_s = float(match.group(2)) - postprocess_time_s = float(match.group(3)) -else: - logger.warning("Couldn't parse step timings from debug_logs") - preprocess_time_s = inference_time_s = postprocess_time_s = math.nan - -if output_path: - with smart_open.open("%s/predictions.npz" % output_path, "wb") as predictions_file: - np.savez_compressed(predictions_file, predictions=segmentation_predictions) - - if output_tiff: - import tifffile - - # smart_open doesn't support seeking on GCP, which tifffile uses. - if output_path.startswith("gs://"): - with gcloud_storage_utils.writer( - "%s/predictions.tiff" % output_path - ) as predictions_tiff_file: - tifffile.imwrite(predictions_tiff_file, segmentation_predictions) - else: - with smart_open.open( - "%s/predictions.tiff" % output_path, "wb" - ) as predictions_tiff_file: - tifffile.imwrite(predictions_tiff_file, segmentation_predictions) - -if visualize_input or visualize_predictions: - from deepcell.utils.plot_utils import create_rgb_image - from PIL import Image - - nuclear_color = "green" - membrane_color = "blue" - - # Create rgb overlay of image data for visualization - # Note that this normalizes the values from "whatever" to rgb range 0..1 - input_rgb = create_rgb_image( - input_channels[np.newaxis, ...], channel_colors=[nuclear_color, membrane_color] - )[0] - -if visualize_input: - # The png needs to normalize rgb values from 0..1, so normalize to 0..255 - im = Image.fromarray((input_rgb * 255).astype(np.uint8)) - with smart_open.open("%s/input.png" % output_path, "wb") as input_png_file: - im.save(input_png_file, mode="RGB") - -if visualize_predictions: - from deepcell.utils.plot_utils import make_outline_overlay - from PIL import Image - - overlay_data = make_outline_overlay( - rgb_data=input_rgb[np.newaxis, ...], - predictions=segmentation_predictions[np.newaxis, ...], - )[0] - - # The rgb values are 0..1, so normalize to 0..255 - im = Image.fromarray((overlay_data * 255).astype(np.uint8)) - with smart_open.open( - "%s/predictions.png" % output_path, "wb" - ) as predictions_png_file: - im.save(predictions_png_file, mode="RGB") - -################## -# Benchmark data # -################## - -headers = [ - "input_file_id", - "numpy_size_mb", - "pixels_m", - "compartment", - "benchmark_datetime_utc", - "instance_type", - "gpu_type", - "num_gpus", - "batch_size", - "success", - "total_time_s", - "peak_memory_gb", - "load_time_s", - "total_prediction_time_s", - "prediction_overhead_s", - "predict_preprocess_time_s", - "predict_inference_time_s", - "predict_postprocess_time_s", - "deepcell_tf_version", - "machine_config", - "provisioning_model", -] - -parsed_url = urllib.parse.urlparse(input_channels_path) -filename = parsed_url.path.split("/")[-2] -image_size = round(input_channels.nbytes / 1000 / 1000, 2) - -# Multiply x * y to get pixel count. -pixels = input_channels.shape[0] * input_channels.shape[1] - -# Get the number of GPUs -gpu_devices = tf.config.experimental.list_physical_devices("GPU") -gpu_details = [tf.config.experimental.get_device_details(gpu) for gpu in gpu_devices] - -gpus_by_name = { - k: list(v) for k, v in groupby(gpu_details, key=lambda x: x["device_name"]) -} - -gpu_names = list(gpus_by_name.keys()) - -if len(gpu_names) == 0: - gpu_name = "None" - gpu_count = 0 -elif len(gpu_names) == 1: - gpu_name = gpu_names[0] - gpu_count = len(gpus_by_name[gpu_name]) -else: - raise "Dunno how to handle multiple gpu types" - - -def get_project_id(): - import json - - # In python 3.7, this works - env_project_id = os.getenv("GCP_PROJECT") - - if not env_project_id: # > python37 - # Only works on runtime. - import urllib.request - - url = "http://metadata.google.internal/computeMetadata/v1/project/project-id" - req = urllib.request.Request(url) - req.add_header("Metadata-Flavor", "Google") - env_project_id = urllib.request.urlopen(req).read().decode() - - if not env_project_id: # Running locally - with open(os.environ["GOOGLE_APPLICATION_CREDENTIALS"], "r") as fp: - credentials = json.load(fp) - env_project_id = credentials["project_id"] - - if not env_project_id: - raise ValueError("Could not get a value for PROJECT_ID") - - return env_project_id - - -try: - project_id = get_project_id() - logger.info("Project id: %s" % project_id) -except Exception as e: - project_id = "unknown" - logger.error("Error getting project id: %s" % e) - - -def get_compute_engine_machine_type(): - # Call the metadata server. + +def main(): + parser = argparse.ArgumentParser("benchmark") + parser.add_argument( + "--input_channels_path", + help="Path to the input channels npz file", + type=str, + required=True, + ) + parser.add_argument( + "--prediction_compartment", + help="The compartment to predict: whole-cell (default), nuclear, both", + type=str, + default="whole-cell", + ) + parser.add_argument( + "--batch_size", + help="How many tiles are predicted at once.", + type=int, + default=4, + ) + parser.add_argument( + "--model_path", + help="Path to the model archive", + type=str, + default="gs://davids-genomics-data-public/cellular-segmentation/deep-cell/vanvalenlab-tf-model-multiplex-downloaded-20230706/MultiplexSegmentation.tar.gz", + ) + parser.add_argument( + "--model_hash", + help="Hash of the model archive", + type=str, + default="a1dfbce2594f927b9112f23a0a1739e0", + ) + parser.add_argument( + "--model_extract_directory", + help="The directory name the archive extracts to", + type=str, + default="MultiplexSegmentation", + ) + parser.add_argument( + "--output_path", + help="If set, base path for predictions & other output files.", + type=str, + default=None, + ) + parser.add_argument( + "--output_tiff", + help="If true, write the predictions as predictions.tiff in the output path.", + action="store_true", + ) + parser.add_argument( + "--visualize_input", + help="If true, visualize the input as input.png in the output path.", + action="store_true", + ) + parser.add_argument( + "--visualize_predictions", + help="If true, visualize the predictions as predictions.png in the output path.", + action="store_true", + ) + parser.add_argument( + "--provisioning_model", + help="The model provisioning method", + type=str, + required=True, + ) + parser.add_argument( + "--bigquery_table", + help="The BigQuery table to write results to", + type=str, + default=BIGQUERY_RESULTS_TABLE, + ) + + args = parser.parse_args() + + input_channels_path = args.input_channels_path + prediction_compartment = args.prediction_compartment + batch_size = args.batch_size + model_remote_path = args.model_path + model_hash = args.model_hash + model_extract_directory = args.model_extract_directory + output_path = args.output_path or "" + output_path = output_path.rstrip("/") # remove trailing slashes + output_tiff = args.output_tiff + visualize_input = args.visualize_input + visualize_predictions = args.visualize_predictions + provisioning_model = args.provisioning_model + bigquery_table = args.bigquery_table + + if (visualize_input or visualize_predictions or output_tiff) and not output_path: + raise ValueError("Can't output/visualize without an output path") + + # Import these here, to speed up startup & arg parsing + import deepcell + from deepcell.applications import Mesmer + import tensorflow as tf + + # The local cache location + model_path = os.path.expanduser("~") + "/.keras/models/%s" % model_extract_directory + + # Model warm-up + + logger = logging.getLogger() + old_level = logger.getEffectiveLevel() + logger.setLevel(logging.INFO) + + downloaded_file_path = cached_open.get_file( + "MultiplexSegmentation.tgz", + model_remote_path, + file_hash=model_hash, + extract=True, + cache_subdir="models", + ) + # Remove the .tgz extension to get the model directory path + model_path = os.path.splitext(downloaded_file_path)[0] + + logging.info("Loading model from {}.".format(model_path)) + t = timeit.default_timer() + model = tf.keras.models.load_model(model_path) + logging.info("Loaded model in %s s" % (timeit.default_timer() - t)) + app = Mesmer(model=model) + + # Need to reset top-level logging for intercept to work. + # I dunno 🤷🏻‍♂️ There's probably a better way to do this... + logger.setLevel(old_level) + logging.basicConfig(force=True) + # %% md + # End-to-end Prediction + + # %% + start_time = timeit.default_timer() + + # Load inputs + + t = timeit.default_timer() + with smart_open.open(input_channels_path, "rb") as input_channel_file: + with np.load(input_channel_file) as loader: + # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels + input_channels = loader["input_channels"] + input_load_time_s = timeit.default_timer() - t + + print("Loaded input in %s s" % input_load_time_s) + + # Generate predictions + + ## Intercept log (many shenanigans, such hacking) + logger = logging.getLogger() + old_level = logger.getEffectiveLevel() + logger.setLevel(logging.DEBUG) + logs_buffer = io.StringIO() + buffer_log_handler = logging.StreamHandler(logs_buffer) + logger.addHandler(buffer_log_handler) + + ## The actual prediction + + t = timeit.default_timer() + try: - import requests + # We're only predicting for 1 image, so extract the 1 image's predictions + segmentation_predictions = app.predict( + input_channels[np.newaxis, ...], + image_mpp=0.5, + compartment=prediction_compartment, + batch_size=batch_size, + )[0] + prediction_success = True - metadata_server = "http://metadata/computeMetadata/v1/instance/machine-type" - metadata_flavor = {"Metadata-Flavor": "Google"} + except Exception as e: + # The exception is nom-nom'd. Safe? We'll see 🤔 + prediction_success = False + logger.error("Prediction exception: %s", e) + + prediction_time_s = timeit.default_timer() - t + total_time_s = timeit.default_timer() - start_time + + ## Undo log intercept + logger.removeHandler(buffer_log_handler) + logger.setLevel(old_level) + + ## Wrap up + print("Prediction finished in %s s" % prediction_time_s) + print("Overall operation finished in %s s" % total_time_s) + # %% + # Parse the intercepted debug logs to extract step timing. + + debug_logs = logs_buffer.getvalue() + pattern = r"(?sm)Pre-processed data with mesmer_preprocess in (.+?) s.*Model inference finished in (.+?) s.*Post-processed results with mesmer_postprocess in (.+?) s" + match = re.search(re.compile(pattern, re.MULTILINE), debug_logs) + + if match: + preprocess_time_s = float(match.group(1)) + inference_time_s = float(match.group(2)) + postprocess_time_s = float(match.group(3)) + else: + logger.warning("Couldn't parse step timings from debug_logs") + preprocess_time_s = inference_time_s = postprocess_time_s = math.nan + + if output_path: + with smart_open.open( + "%s/predictions.npz" % output_path, "wb" + ) as predictions_file: + np.savez_compressed(predictions_file, predictions=segmentation_predictions) + + if output_tiff: + import tifffile + + # smart_open doesn't support seeking on GCP, which tifffile uses. + if output_path.startswith("gs://"): + with gcloud_storage_utils.writer( + "%s/predictions.tiff" % output_path + ) as predictions_tiff_file: + tifffile.imwrite(predictions_tiff_file, segmentation_predictions) + else: + with smart_open.open( + "%s/predictions.tiff" % output_path, "wb" + ) as predictions_tiff_file: + tifffile.imwrite(predictions_tiff_file, segmentation_predictions) + + if visualize_input or visualize_predictions: + from deepcell.utils.plot_utils import create_rgb_image + from PIL import Image + + nuclear_color = "green" + membrane_color = "blue" + + # Create rgb overlay of image data for visualization + # Note that this normalizes the values from "whatever" to rgb range 0..1 + input_rgb = create_rgb_image( + input_channels[np.newaxis, ...], + channel_colors=[nuclear_color, membrane_color], + )[0] + + if visualize_input: + # The png needs to normalize rgb values from 0..1, so normalize to 0..255 + im = Image.fromarray((input_rgb * 255).astype(np.uint8)) + with smart_open.open("%s/input.png" % output_path, "wb") as input_png_file: + im.save(input_png_file, mode="RGB") + + if visualize_predictions: + from deepcell.utils.plot_utils import make_outline_overlay + from PIL import Image + + overlay_data = make_outline_overlay( + rgb_data=input_rgb[np.newaxis, ...], + predictions=segmentation_predictions[np.newaxis, ...], + )[0] + + # The rgb values are 0..1, so normalize to 0..255 + im = Image.fromarray((overlay_data * 255).astype(np.uint8)) + with smart_open.open( + "%s/predictions.png" % output_path, "wb" + ) as predictions_png_file: + im.save(predictions_png_file, mode="RGB") + + ################## + # Benchmark data # + ################## + + headers = [ + "input_file_id", + "numpy_size_mb", + "pixels_m", + "compartment", + "benchmark_datetime_utc", + "instance_type", + "gpu_type", + "num_gpus", + "batch_size", + "success", + "total_time_s", + "peak_memory_gb", + "load_time_s", + "total_prediction_time_s", + "prediction_overhead_s", + "predict_preprocess_time_s", + "predict_inference_time_s", + "predict_postprocess_time_s", + "deepcell_tf_version", + "machine_config", + "provisioning_model", + ] + + parsed_url = urllib.parse.urlparse(input_channels_path) + filename = parsed_url.path.split("/")[-2] + image_size = round(input_channels.nbytes / 1000 / 1000, 2) + + # Multiply x * y to get pixel count. + pixels = input_channels.shape[0] * input_channels.shape[1] + + # Get the number of GPUs + gpu_devices = tf.config.experimental.list_physical_devices("GPU") + gpu_details = [ + tf.config.experimental.get_device_details(gpu) for gpu in gpu_devices + ] + + gpus_by_name = { + k: list(v) for k, v in groupby(gpu_details, key=lambda x: x["device_name"]) + } + + gpu_names = list(gpus_by_name.keys()) + + if len(gpu_names) == 0: + gpu_name = "None" + gpu_count = 0 + elif len(gpu_names) == 1: + gpu_name = gpu_names[0] + gpu_count = len(gpus_by_name[gpu_name]) + else: + raise "Dunno how to handle multiple gpu types" + + def get_project_id(): + import json + + # In python 3.7, this works + env_project_id = os.getenv("GCP_PROJECT") + + if not env_project_id: # > python37 + # Only works on runtime. + import urllib.request + + url = ( + "http://metadata.google.internal/computeMetadata/v1/project/project-id" + ) + req = urllib.request.Request(url) + req.add_header("Metadata-Flavor", "Google") + env_project_id = urllib.request.urlopen(req).read().decode() - # This comes back like this: projects/1234567890/machineTypes/n2-standard-8 - full_machine_type = requests.get(metadata_server, headers=metadata_flavor).text - return full_machine_type.split("/")[-1] + if not env_project_id: # Running locally + with open(os.environ["GOOGLE_APPLICATION_CREDENTIALS"], "r") as fp: + credentials = json.load(fp) + env_project_id = credentials["project_id"] + + if not env_project_id: + raise ValueError("Could not get a value for PROJECT_ID") + + return env_project_id + + try: + project_id = get_project_id() + logger.info("Project id: %s" % project_id) except Exception as e: - exception_string = traceback.format_exc() - logging.warning("Error getting machine type: " + exception_string) - return "error" - - -try: - machine_type = get_compute_engine_machine_type() -except Exception as e: - logging.warning("Error getting machine type: '%s'. Defaulting to os info" + e) - # assume a generic python environment - # See also: - # https://docs.python.org/3.10/library/os.html#os.cpu_count + project_id = "unknown" + logger.error("Error getting project id: %s" % e) + + def get_compute_engine_machine_type(): + # Call the metadata server. + try: + import requests + + metadata_server = "http://metadata/computeMetadata/v1/instance/machine-type" + metadata_flavor = {"Metadata-Flavor": "Google"} + + # This comes back like this: projects/1234567890/machineTypes/n2-standard-8 + full_machine_type = requests.get( + metadata_server, headers=metadata_flavor + ).text + return full_machine_type.split("/")[-1] + except Exception as e: + exception_string = traceback.format_exc() + logging.warning("Error getting machine type: " + exception_string) + return "error" + try: - num_cpus = len(os.sched_getaffinity(0)) - except AttributeError: - num_cpus = os.cpu_count() - total_mem = psutil.virtual_memory().total - machine_type = "local {} CPUs {} GB RAM".format( - num_cpus, round(total_mem / 1000000000, 1) + machine_type = get_compute_engine_machine_type() + except Exception as e: + logging.warning("Error getting machine type: '%s'. Defaulting to os info" + e) + # assume a generic python environment + # See also: + # https://docs.python.org/3.10/library/os.html#os.cpu_count + try: + num_cpus = len(os.sched_getaffinity(0)) + except AttributeError: + num_cpus = os.cpu_count() + total_mem = psutil.virtual_memory().total + machine_type = "local {} CPUs {} GB RAM".format( + num_cpus, round(total_mem / 1000000000, 1) + ) + + # The getrusage call returns different units on mac & linux. + # Get the OS type from the platform library, + # then set the memory unit factor accordingly. + os_type = platform.system() + # This is crude and impartial– but it works across my mac & Google Cloud + if "Darwin" == os_type: + memory_unit_factor = 1000000000 + elif "Linux" == os_type: + memory_unit_factor = 1000000 + else: + # Assume kb like linux + logging.warning("Couldn't infer machine type from %s", os_type) + memory_unit_factor = 1000000 + + peak_mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss + + prediction_overhead_s = ( + prediction_time_s - preprocess_time_s - inference_time_s - postprocess_time_s ) -# The getrusage call returns different units on mac & linux. -# Get the OS type from the platform library, -# then set the memory unit factor accordingly. -os_type = platform.system() -# This is crude and impartial– but it works across my mac & Google Cloud -if "Darwin" == os_type: - memory_unit_factor = 1000000000 -elif "Linux" == os_type: - memory_unit_factor = 1000000 -else: - # Assume kb like linux - logging.warning("Couldn't infer machine type from %s", os_type) - memory_unit_factor = 1000000 - -peak_mem = resource.getrusage(resource.RUSAGE_SELF).ru_maxrss - -prediction_overhead_s = ( - prediction_time_s - preprocess_time_s - inference_time_s - postprocess_time_s -) - -if gpu_count == 0: - machine_config = machine_type -else: - machine_config = "%s + %sx %s" % (machine_type, gpu_count, gpu_name) - -# Write benchmarking data as CSV: - -output = io.StringIO() -writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) -writer.writerow(headers) - -deepcell_version = deepcell.__version__ - -writer.writerow( - [ - filename, - image_size, - round(pixels / 1000000, 2), - prediction_compartment, - datetime.now(timezone.utc), - machine_type, - gpu_name, - gpu_count, - batch_size, - prediction_success, - round(total_time_s, 2), - round(peak_mem / memory_unit_factor, 1), - round(input_load_time_s, 2), - round(prediction_time_s, 2), - round(prediction_overhead_s, 2), - round(preprocess_time_s, 2), - round(inference_time_s, 2), - round(postprocess_time_s, 2), - deepcell_version, - machine_config, - provisioning_model, - ] -) - -logger.info("Appending benchmark result to bigquery: %s", output.getvalue()) -# Construct a BigQuery client object. -bq_client = bigquery.Client() - -job_config = bigquery.LoadJobConfig( - write_disposition=bigquery.WriteDisposition.WRITE_APPEND, - source_format=bigquery.SourceFormat.CSV, - skip_leading_rows=1, -) -csv_file = io.StringIO(output.getvalue()) - - -@retry( - wait=wait_random_exponential(multiplier=1, max=60), - retry=retry_if_exception_message(match=".*403 Exceeded rate limits.*"), -) -def upload_to_bigquery(csv_string, table_id, bq_job_config): - load_job = bq_client.load_table_from_file( - csv_string, table_id, job_config=bq_job_config + if gpu_count == 0: + machine_config = machine_type + else: + machine_config = "%s + %sx %s" % (machine_type, gpu_count, gpu_name) + + # Write benchmarking data as CSV: + + output = io.StringIO() + writer = csv.writer(output, quoting=csv.QUOTE_NONNUMERIC) + writer.writerow(headers) + + deepcell_version = deepcell.__version__ + + writer.writerow( + [ + filename, + image_size, + round(pixels / 1000000, 2), + prediction_compartment, + datetime.now(timezone.utc), + machine_type, + gpu_name, + gpu_count, + batch_size, + prediction_success, + round(total_time_s, 2), + round(peak_mem / memory_unit_factor, 1), + round(input_load_time_s, 2), + round(prediction_time_s, 2), + round(prediction_overhead_s, 2), + round(preprocess_time_s, 2), + round(inference_time_s, 2), + round(postprocess_time_s, 2), + deepcell_version, + machine_config, + provisioning_model, + ] ) - load_job.result() # Waits for the job to complete. + + logger.info("Appending benchmark result to bigquery: %s", output.getvalue()) + # Construct a BigQuery client object. + bq_client = bigquery.Client() + + job_config = bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.CSV, + skip_leading_rows=1, + ) + csv_file = io.StringIO(output.getvalue()) + + @retry( + wait=wait_random_exponential(multiplier=1, max=60), + retry=retry_if_exception_message(match=".*403 Exceeded rate limits.*"), + ) + def upload_to_bigquery(csv_string, table_id, bq_job_config): + load_job = bq_client.load_table_from_file( + csv_string, table_id, job_config=bq_job_config + ) + load_job.result() # Waits for the job to complete. + + if bigquery_table: + upload_to_bigquery(csv_file, bigquery_table, job_config) + logger.info("Appended result row to bigquery.") -if bigquery_table: - upload_to_bigquery(csv_file, bigquery_table, job_config) - logger.info("Appended result row to bigquery.") +if __name__ == "__main__": + main() diff --git a/scripts/gather-benchmark.py b/scripts/gather-benchmark.py index fb11749..0704d59 100755 --- a/scripts/gather-benchmark.py +++ b/scripts/gather-benchmark.py @@ -16,99 +16,103 @@ from tenacity import retry, retry_if_exception_message, wait_random_exponential import timeit -parser = argparse.ArgumentParser("preprocess") - -parser.add_argument( - "--preprocess_benchmarking_uri", - help="URI to benchmarking data for the preprocessing step.", - type=str, - required=True, -) -parser.add_argument( - "--prediction_benchmarking_uri", - help="URI to benchmarking data for the prediction step.", - type=str, - required=True, -) -parser.add_argument( - "--postprocess_benchmarking_uri", - help="URI to benchmarking data for the postprocessing step.", - type=str, - required=True, -) -parser.add_argument( - "--bigquery_benchmarking_table", - help="The fully qualified name (project.dataset.table) of the BigQuery table to write benchmarking data to.", - type=str, - required=True, -) - -args = parser.parse_args() - -preprocess_benchmarking_uri = args.preprocess_benchmarking_uri -prediction_benchmarking_uri = args.prediction_benchmarking_uri -postprocess_benchmarking_uri = args.postprocess_benchmarking_uri -bigquery_benchmarking_table = args.bigquery_benchmarking_table - -if not bigquery_benchmarking_table: - print("Nothing to do; empty bigquery_benchmarking_table") - exit() - -benchmarking_data = { - "cloud_region": benchmark_utils.get_gce_region(), -} - -print("Loading benchmarking data") - -t = timeit.default_timer() - -for data_uri in [ - preprocess_benchmarking_uri, - prediction_benchmarking_uri, - postprocess_benchmarking_uri, -]: - with smart_open.open(data_uri, "r") as data_file: - data = json.load(data_file) - benchmarking_data.update(data) - -data_load_time_s = timeit.default_timer() - t - -print("Loaded benchmarking data in %s s" % data_load_time_s) - -# Update the overall success to the logical AND of the individual steps -benchmarking_data["success"] = ( - benchmarking_data["preprocessing_success"] - and benchmarking_data["prediction_success"] - and benchmarking_data["postprocessing_success"] -) - -print("Sending data to BigQuery") - -t = timeit.default_timer() - -bq_client = bigquery.Client() - -job_config = bigquery.LoadJobConfig( - write_disposition=bigquery.WriteDisposition.WRITE_APPEND, - source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, -) - -json_str = io.StringIO(json.dumps(benchmarking_data)) - - -@retry( - wait=wait_random_exponential(multiplier=1, max=60), - retry=retry_if_exception_message(match=".*403 Exceeded rate limits.*"), -) -def upload_to_bigquery(csv_string, table_id, bq_job_config): - load_job = bq_client.load_table_from_file( - csv_string, table_id, job_config=bq_job_config + +def main(): + parser = argparse.ArgumentParser("preprocess") + + parser.add_argument( + "--preprocess_benchmarking_uri", + help="URI to benchmarking data for the preprocessing step.", + type=str, + required=True, + ) + parser.add_argument( + "--prediction_benchmarking_uri", + help="URI to benchmarking data for the prediction step.", + type=str, + required=True, + ) + parser.add_argument( + "--postprocess_benchmarking_uri", + help="URI to benchmarking data for the postprocessing step.", + type=str, + required=True, ) - load_job.result() # Waits for the job to complete. + parser.add_argument( + "--bigquery_benchmarking_table", + help="The fully qualified name (project.dataset.table) of the BigQuery table to write benchmarking data to.", + type=str, + required=True, + ) + + args = parser.parse_args() + + preprocess_benchmarking_uri = args.preprocess_benchmarking_uri + prediction_benchmarking_uri = args.prediction_benchmarking_uri + postprocess_benchmarking_uri = args.postprocess_benchmarking_uri + bigquery_benchmarking_table = args.bigquery_benchmarking_table + + if not bigquery_benchmarking_table: + print("Nothing to do; empty bigquery_benchmarking_table") + exit() + + benchmarking_data = { + "cloud_region": benchmark_utils.get_gce_region(), + } + + print("Loading benchmarking data") + + t = timeit.default_timer() + + for data_uri in [ + preprocess_benchmarking_uri, + prediction_benchmarking_uri, + postprocess_benchmarking_uri, + ]: + with smart_open.open(data_uri, "r") as data_file: + data = json.load(data_file) + benchmarking_data.update(data) + + data_load_time_s = timeit.default_timer() - t + + print("Loaded benchmarking data in %s s" % data_load_time_s) + + # Update the overall success to the logical AND of the individual steps + benchmarking_data["success"] = ( + benchmarking_data["preprocessing_success"] + and benchmarking_data["prediction_success"] + and benchmarking_data["postprocessing_success"] + ) + + print("Sending data to BigQuery") + + t = timeit.default_timer() + + bq_client = bigquery.Client() + + job_config = bigquery.LoadJobConfig( + write_disposition=bigquery.WriteDisposition.WRITE_APPEND, + source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON, + ) + + json_str = io.StringIO(json.dumps(benchmarking_data)) + + @retry( + wait=wait_random_exponential(multiplier=1, max=60), + retry=retry_if_exception_message(match=".*403 Exceeded rate limits.*"), + ) + def upload_to_bigquery(csv_string, table_id, bq_job_config): + load_job = bq_client.load_table_from_file( + csv_string, table_id, job_config=bq_job_config + ) + load_job.result() # Waits for the job to complete. + + upload_to_bigquery(json_str, bigquery_benchmarking_table, job_config) + bigquery_upload_time_s = timeit.default_timer() - t -upload_to_bigquery(json_str, bigquery_benchmarking_table, job_config) + print("Send data to BigQuery in %s s" % bigquery_upload_time_s) -bigquery_upload_time_s = timeit.default_timer() - t -print("Send data to BigQuery in %s s" % bigquery_upload_time_s) +if __name__ == "__main__": + main() diff --git a/scripts/postprocess.py b/scripts/postprocess.py index feb4c8f..8819460 100755 --- a/scripts/postprocess.py +++ b/scripts/postprocess.py @@ -14,130 +14,135 @@ import smart_open import timeit -parser = argparse.ArgumentParser("postprocess") - -parser.add_argument( - "--raw_predictions_uri", - help="URI to model output npz file, containing 4 arrays: arr_0, arr_1, arr_2, arr_3", - type=str, - required=True, -) -parser.add_argument( - "--input_rows", - help="Number of rows in the input image.", - type=int, - required=True, -) -parser.add_argument( - "--input_cols", - help="Number of columns in the input image.", - type=int, - required=True, -) -parser.add_argument( - "--compartment", - help="Compartment to segment. One of 'whole-cell' (default) or 'nuclear' or 'both'.", - type=str, - required=False, - default="whole-cell", -) -parser.add_argument( - "--output_uri", - help="Where to write postprocessed segment predictions npz file containing an array named 'image'", - type=str, - required=True, -) -parser.add_argument( - "--tiff_output_uri", - help="Where to write postprocessed segment predictions TIFF file containing a segment number for each pixel", - type=str, - required=False, -) -parser.add_argument( - "--benchmark_output_uri", - help="Where to write preprocessing benchmarking data.", - type=str, - required=False, -) - -args = parser.parse_args() - -raw_predictions_uri = args.raw_predictions_uri -input_rows = args.input_rows -input_cols = args.input_cols -compartment = args.compartment -output_uri = args.output_uri -benchmark_output_uri = args.benchmark_output_uri - -print("Loading raw predictions") - -t = timeit.default_timer() - -with gcloud_storage_utils.reader(raw_predictions_uri) as raw_predictions_file: - with np.load(raw_predictions_file) as loader: - # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels - raw_predictions = { - "whole-cell": [loader["arr_0"], loader["arr_1"]], - "nuclear": [loader["arr_2"], loader["arr_3"]], - } - -raw_predictions_load_time_s = timeit.default_timer() - t -print("Loaded raw predictions in %s s" % round(raw_predictions_load_time_s, 2)) +def main(): + parser = argparse.ArgumentParser("postprocess") -print("Postprocessing raw predictions") - -t = timeit.default_timer() -try: - segmentation = mesmer_app.postprocess( - raw_predictions, (1, input_rows, input_cols, 2), compartment=compartment + parser.add_argument( + "--raw_predictions_uri", + help="URI to model output npz file, containing 4 arrays: arr_0, arr_1, arr_2, arr_3", + type=str, + required=True, + ) + parser.add_argument( + "--input_rows", + help="Number of rows in the input image.", + type=int, + required=True, + ) + parser.add_argument( + "--input_cols", + help="Number of columns in the input image.", + type=int, + required=True, + ) + parser.add_argument( + "--compartment", + help="Compartment to segment. One of 'whole-cell' (default) or 'nuclear' or 'both'.", + type=str, + required=False, + default="whole-cell", + ) + parser.add_argument( + "--output_uri", + help="Where to write postprocessed segment predictions npz file containing an array named 'image'", + type=str, + required=True, + ) + parser.add_argument( + "--tiff_output_uri", + help="Where to write postprocessed segment predictions TIFF file containing a segment number for each pixel", + type=str, + required=False, ) - success = True -except Exception as e: - print("Postprocessing failed with error: %s" % e) - success = False + parser.add_argument( + "--benchmark_output_uri", + help="Where to write preprocessing benchmarking data.", + type=str, + required=False, + ) + + args = parser.parse_args() -postprocessing_time_s = timeit.default_timer() - t + raw_predictions_uri = args.raw_predictions_uri + input_rows = args.input_rows + input_cols = args.input_cols + compartment = args.compartment + output_uri = args.output_uri + benchmark_output_uri = args.benchmark_output_uri -print( - "Postprocessed raw predictions in %s s; success: %s" - % (round(postprocessing_time_s, 2), success) -) + print("Loading raw predictions") -if success: - print("Saving postprocessed output to %s" % output_uri) t = timeit.default_timer() - with gcloud_storage_utils.writer(output_uri) as output_writer: - np.savez(output_writer, image=segmentation) - # TODO (#253): save tiff output. + with gcloud_storage_utils.reader(raw_predictions_uri) as raw_predictions_file: + with np.load(raw_predictions_file) as loader: + # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels + raw_predictions = { + "whole-cell": [loader["arr_0"], loader["arr_1"]], + "nuclear": [loader["arr_2"], loader["arr_3"]], + } - output_time_s = timeit.default_timer() - t - print("Saved output in %s s" % round(output_time_s, 2)) -else: - print("Not saving failed postprocessing output.") - output_time_s = 0.0 + raw_predictions_load_time_s = timeit.default_timer() - t + print("Loaded raw predictions in %s s" % round(raw_predictions_load_time_s, 2)) -# Gather & output timing information + print("Postprocessing raw predictions") + + t = timeit.default_timer() + try: + segmentation = mesmer_app.postprocess( + raw_predictions, (1, input_rows, input_cols, 2), compartment=compartment + ) + success = True + except Exception as e: + print("Postprocessing failed with error: %s" % e) + success = False + + postprocessing_time_s = timeit.default_timer() - t + + print( + "Postprocessed raw predictions in %s s; success: %s" + % (round(postprocessing_time_s, 2), success) + ) + + if success: + print("Saving postprocessed output to %s" % output_uri) + t = timeit.default_timer() + with gcloud_storage_utils.writer(output_uri) as output_writer: + np.savez(output_writer, image=segmentation) + + # TODO (#253): save tiff output. + + output_time_s = timeit.default_timer() - t + print("Saved output in %s s" % round(output_time_s, 2)) + else: + print("Not saving failed postprocessing output.") + output_time_s = 0.0 + + # Gather & output timing information + + if benchmark_output_uri: + gpu_info = benchmark_utils.get_gpu_info() + + timing_info = { + "compartment": compartment, + "postprocessing_instance_type": benchmark_utils.get_gce_instance_type(), + "postprocessing_gpu_type": gpu_info[0], + "postprocessing_num_gpus": gpu_info[1], + "postprocessing_success": success, + "postprocessing_peak_memory_gb": benchmark_utils.get_peak_memory(), + "postprocessing_is_preemptible": benchmark_utils.get_gce_is_preemptible(), + "postprocessing_input_load_time_s": raw_predictions_load_time_s, + "postprocessing_time_s": postprocessing_time_s, + "postprocessing_output_write_time_s": output_time_s, + } -if benchmark_output_uri: - gpu_info = benchmark_utils.get_gpu_info() + with smart_open.open(benchmark_output_uri, "w") as benchmark_output_file: + json.dump(timing_info, benchmark_output_file) - timing_info = { - "compartment": compartment, - "postprocessing_instance_type": benchmark_utils.get_gce_instance_type(), - "postprocessing_gpu_type": gpu_info[0], - "postprocessing_num_gpus": gpu_info[1], - "postprocessing_success": success, - "postprocessing_peak_memory_gb": benchmark_utils.get_peak_memory(), - "postprocessing_is_preemptible": benchmark_utils.get_gce_is_preemptible(), - "postprocessing_input_load_time_s": raw_predictions_load_time_s, - "postprocessing_time_s": postprocessing_time_s, - "postprocessing_output_write_time_s": output_time_s, - } + print("Wrote benchmarking data to %s" % benchmark_output_uri) - with smart_open.open(benchmark_output_uri, "w") as benchmark_output_file: - json.dump(timing_info, benchmark_output_file) - print("Wrote benchmarking data to %s" % benchmark_output_uri) +if __name__ == "__main__": + main() diff --git a/scripts/predict.py b/scripts/predict.py index 2a7100d..bc07387 100755 --- a/scripts/predict.py +++ b/scripts/predict.py @@ -27,135 +27,141 @@ import tensorflow as tf import timeit -parser = argparse.ArgumentParser("predict") -parser.add_argument( - "--image_uri", - help="URI to preprocessed image npz file, containing an array named 'image'", - type=str, - required=True, -) -parser.add_argument( - "--batch_size", - help="Optional integer representing batch size to use for prediction. Default is 16.", - type=int, - required=False, - default=16, -) -parser.add_argument( - "--output_uri", - help="Where to write model output npz file containing arr_0, arr_1, arr_2, arr_3", - type=str, - required=True, -) -parser.add_argument( - "--benchmark_output_uri", - help="Where to write preprocessing benchmarking data.", - type=str, - required=False, -) +def main(): + parser = argparse.ArgumentParser("predict") -args = parser.parse_args() - -image_uri = args.image_uri -batch_size = args.batch_size -output_uri = args.output_uri -benchmark_output_uri = args.benchmark_output_uri - -# Hard-code remote path & hash based on model_id -# THIS IS IN US-CENTRAL1 -# If you are running outside us-central1 you should make a copy to avoid egress. -model_remote_path = "gs://genomics-data-public-central1/cellular-segmentation/vanvalenlab/deep-cell/vanvalenlab-tf-model-multiplex-downloaded-20230706/MultiplexSegmentation.tar.gz" -model_hash = "a1dfbce2594f927b9112f23a0a1739e0" - -print("Loading model") + parser.add_argument( + "--image_uri", + help="URI to preprocessed image npz file, containing an array named 'image'", + type=str, + required=True, + ) + parser.add_argument( + "--batch_size", + help="Optional integer representing batch size to use for prediction. Default is 16.", + type=int, + required=False, + default=16, + ) + parser.add_argument( + "--output_uri", + help="Where to write model output npz file containing arr_0, arr_1, arr_2, arr_3", + type=str, + required=True, + ) + parser.add_argument( + "--benchmark_output_uri", + help="Where to write preprocessing benchmarking data.", + type=str, + required=False, + ) -downloaded_file_path = cached_open.get_file( - "MultiplexSegmentation.tgz", - model_remote_path, - file_hash=model_hash, - extract=True, - cache_subdir="models", -) -# Remove the .tgz extension to get the model directory path -model_path = os.path.splitext(downloaded_file_path)[0] + args = parser.parse_args() -print("Reading model from {}.".format(model_path)) + image_uri = args.image_uri + batch_size = args.batch_size + output_uri = args.output_uri + benchmark_output_uri = args.benchmark_output_uri -t = timeit.default_timer() -model = tf.keras.models.load_model(model_path) -model_load_time_s = timeit.default_timer() - t + # Hard-code remote path & hash based on model_id + # THIS IS IN US-CENTRAL1 + # If you are running outside us-central1 you should make a copy to avoid egress. + model_remote_path = "gs://genomics-data-public-central1/cellular-segmentation/vanvalenlab/deep-cell/vanvalenlab-tf-model-multiplex-downloaded-20230706/MultiplexSegmentation.tar.gz" + model_hash = "a1dfbce2594f927b9112f23a0a1739e0" -print("Loaded model in %s s" % round(model_load_time_s, 2)) + print("Loading model") -print("Loading preprocessed image") + downloaded_file_path = cached_open.get_file( + "MultiplexSegmentation.tgz", + model_remote_path, + file_hash=model_hash, + extract=True, + cache_subdir="models", + ) + # Remove the .tgz extension to get the model directory path + model_path = os.path.splitext(downloaded_file_path)[0] -t = timeit.default_timer() + print("Reading model from {}.".format(model_path)) -with gcloud_storage_utils.reader(image_uri) as image_file: - with np.load(image_file) as loader: - preprocessed_image = loader["image"] -input_load_time_s = timeit.default_timer() - t + t = timeit.default_timer() + model = tf.keras.models.load_model(model_path) + model_load_time_s = timeit.default_timer() - t -print("Loaded preprocessed image in %s s" % round(input_load_time_s, 2)) + print("Loaded model in %s s" % round(model_load_time_s, 2)) -print("Running prediction") + print("Loading preprocessed image") -t = timeit.default_timer() -try: - model_output = mesmer_app.predict( - model, - preprocessed_image, - batch_size=batch_size, - ) - success = True -except Exception as e: - success = False - print("Prediction failed with error: %s" % e) + t = timeit.default_timer() -predict_time_s = timeit.default_timer() - t + with gcloud_storage_utils.reader(image_uri) as image_file: + with np.load(image_file) as loader: + preprocessed_image = loader["image"] + input_load_time_s = timeit.default_timer() - t -print("Ran prediction in %s s; success: %s" % (round(predict_time_s, 2), success)) + print("Loaded preprocessed image in %s s" % round(input_load_time_s, 2)) -if success: - print("Saving raw predictions output to %s" % output_uri) + print("Running prediction") t = timeit.default_timer() - with gcloud_storage_utils.writer(output_uri) as output_writer: - np.savez( - output_writer, - arr_0=model_output["whole-cell"][0], - arr_1=model_output["whole-cell"][1], - arr_2=model_output["nuclear"][0], - arr_3=model_output["nuclear"][1], + try: + model_output = mesmer_app.predict( + model, + preprocessed_image, + batch_size=batch_size, ) - output_time_s = timeit.default_timer() - t - - print("Saved output in %s s" % round(output_time_s, 2)) -else: - print("Not saving failed prediction output.") - output_time_s = 0.0 - -# Gather & output timing information - -if benchmark_output_uri: - gpu_info = benchmark_utils.get_gpu_info() - - timing_info = { - "prediction_instance_type": benchmark_utils.get_gce_instance_type(), - "prediction_gpu_type": gpu_info[0], - "prediction_num_gpus": gpu_info[1], - "prediction_success": success, - "prediction_peak_memory_gb": benchmark_utils.get_peak_memory(), - "prediction_is_preemptible": benchmark_utils.get_gce_is_preemptible(), - "prediction_model_load_time_s": model_load_time_s, - "prediction_input_load_time_s": input_load_time_s, - "prediction_batch_size": batch_size, - "prediction_time_s": predict_time_s, - "prediction_output_write_time_s": output_time_s, - } - - with smart_open.open(benchmark_output_uri, "w") as benchmark_output_file: - json.dump(timing_info, benchmark_output_file) - - print("Wrote benchmarking data to %s" % benchmark_output_uri) + success = True + except Exception as e: + success = False + print("Prediction failed with error: %s" % e) + + predict_time_s = timeit.default_timer() - t + + print("Ran prediction in %s s; success: %s" % (round(predict_time_s, 2), success)) + + if success: + print("Saving raw predictions output to %s" % output_uri) + + t = timeit.default_timer() + with gcloud_storage_utils.writer(output_uri) as output_writer: + np.savez( + output_writer, + arr_0=model_output["whole-cell"][0], + arr_1=model_output["whole-cell"][1], + arr_2=model_output["nuclear"][0], + arr_3=model_output["nuclear"][1], + ) + output_time_s = timeit.default_timer() - t + + print("Saved output in %s s" % round(output_time_s, 2)) + else: + print("Not saving failed prediction output.") + output_time_s = 0.0 + + # Gather & output timing information + + if benchmark_output_uri: + gpu_info = benchmark_utils.get_gpu_info() + + timing_info = { + "prediction_instance_type": benchmark_utils.get_gce_instance_type(), + "prediction_gpu_type": gpu_info[0], + "prediction_num_gpus": gpu_info[1], + "prediction_success": success, + "prediction_peak_memory_gb": benchmark_utils.get_peak_memory(), + "prediction_is_preemptible": benchmark_utils.get_gce_is_preemptible(), + "prediction_model_load_time_s": model_load_time_s, + "prediction_input_load_time_s": input_load_time_s, + "prediction_batch_size": batch_size, + "prediction_time_s": predict_time_s, + "prediction_output_write_time_s": output_time_s, + } + + with smart_open.open(benchmark_output_uri, "w") as benchmark_output_file: + json.dump(timing_info, benchmark_output_file) + + print("Wrote benchmarking data to %s" % benchmark_output_uri) + + +if __name__ == "__main__": + main() diff --git a/scripts/preprocess.py b/scripts/preprocess.py index 4f7d610..cb74763 100755 --- a/scripts/preprocess.py +++ b/scripts/preprocess.py @@ -16,122 +16,128 @@ import timeit import urllib -parser = argparse.ArgumentParser("preprocess") - -parser.add_argument( - "--image_uri", - help="URI to input image npz file, containing an array named 'input_channels' by default (see --image-array-name)", - type=str, - required=True, -) -parser.add_argument( - "--image_array_name", - help="Name of array in input image npz file, default input_channels", - type=str, - required=False, - default="input_channels", -) -parser.add_argument( - "--image_mpp", - help="Optional float representing microns per pixel of input image. Leave blank to use model's mpp", - type=float, - required=False, -) -parser.add_argument( - "--output_uri", - help="Where to write preprocessed input npz file containing an array named 'image'", - type=str, - required=True, -) -parser.add_argument( - "--benchmark_output_uri", - help="Where to write preprocessing benchmarking data.", - type=str, - required=False, -) - -args = parser.parse_args() - -image_uri = args.image_uri -image_array_name = args.image_array_name -image_mpp = args.image_mpp -output_uri = args.output_uri -benchmark_output_uri = args.benchmark_output_uri - -# This is hard-coded from the only model-id we support. -model_input_shape = (None, 256, 256, 2) - -print("Loading input") - -t = timeit.default_timer() -with gcloud_storage_utils.reader(image_uri) as input_file: - with np.load(input_file) as loader: - input_channels = loader[image_array_name] -input_load_time_s = timeit.default_timer() - t - -print("Loaded input in %s s" % round(input_load_time_s, 2)) - -print("Preprocessing input") - -t = timeit.default_timer() - -try: - preprocessed_image = mesmer_app.preprocess_image( - model_input_shape, input_channels[np.newaxis, ...], image_mpp=image_mpp + +def main(): + parser = argparse.ArgumentParser("preprocess") + + parser.add_argument( + "--image_uri", + help="URI to input image npz file, containing an array named 'input_channels' by default (see --image-array-name)", + type=str, + required=True, + ) + parser.add_argument( + "--image_array_name", + help="Name of array in input image npz file, default input_channels", + type=str, + required=False, + default="input_channels", + ) + parser.add_argument( + "--image_mpp", + help="Optional float representing microns per pixel of input image. Leave blank to use model's mpp", + type=float, + required=False, + ) + parser.add_argument( + "--output_uri", + help="Where to write preprocessed input npz file containing an array named 'image'", + type=str, + required=True, ) - success = True -except Exception as e: - success = False - print("Preprocessing failed with error: %s" % e) - -preprocessing_time_s = timeit.default_timer() - t -print( - "Preprocessed input in %s s; success: %s" - % (round(preprocessing_time_s, 2), success) -) - -if success: - print("Saving preprocessing output to %s" % output_uri) + parser.add_argument( + "--benchmark_output_uri", + help="Where to write preprocessing benchmarking data.", + type=str, + required=False, + ) + + args = parser.parse_args() + + image_uri = args.image_uri + image_array_name = args.image_array_name + image_mpp = args.image_mpp + output_uri = args.output_uri + benchmark_output_uri = args.benchmark_output_uri + + # This is hard-coded from the only model-id we support. + model_input_shape = (None, 256, 256, 2) + + print("Loading input") + + t = timeit.default_timer() + with gcloud_storage_utils.reader(image_uri) as input_file: + with np.load(input_file) as loader: + input_channels = loader[image_array_name] + input_load_time_s = timeit.default_timer() - t + + print("Loaded input in %s s" % round(input_load_time_s, 2)) + + print("Preprocessing input") + t = timeit.default_timer() - with gcloud_storage_utils.writer(output_uri) as output_writer: - np.savez(output_writer, image=preprocessed_image) + try: + preprocessed_image = mesmer_app.preprocess_image( + model_input_shape, input_channels[np.newaxis, ...], image_mpp=image_mpp + ) + success = True + except Exception as e: + success = False + print("Preprocessing failed with error: %s" % e) + + preprocessing_time_s = timeit.default_timer() - t + print( + "Preprocessed input in %s s; success: %s" + % (round(preprocessing_time_s, 2), success) + ) + + if success: + print("Saving preprocessing output to %s" % output_uri) + t = timeit.default_timer() + + with gcloud_storage_utils.writer(output_uri) as output_writer: + np.savez(output_writer, image=preprocessed_image) + + output_time_s = timeit.default_timer() - t + + print("Saved output in %s s" % round(output_time_s, 2)) + else: + print("Not saving failed preprocessing output.") + output_time_s = 0.0 - output_time_s = timeit.default_timer() - t + # Gather & output timing information - print("Saved output in %s s" % round(output_time_s, 2)) -else: - print("Not saving failed preprocessing output.") - output_time_s = 0.0 + if benchmark_output_uri: + gpu_info = benchmark_utils.get_gpu_info() -# Gather & output timing information + parsed_url = urllib.parse.urlparse(image_uri) + filename = parsed_url.path.split("/")[-2] -if benchmark_output_uri: - gpu_info = benchmark_utils.get_gpu_info() + # BigQuery datetimes don't have a timezone. + benchmark_time = datetime.now(timezone.utc).replace(tzinfo=None).isoformat() - parsed_url = urllib.parse.urlparse(image_uri) - filename = parsed_url.path.split("/")[-2] + timing_info = { + "input_file_id": image_uri, + "numpy_size_mb": round(input_channels.nbytes / 1e6, 2), + "pixels_m": input_channels.shape[0] * input_channels.shape[1], + "benchmark_datetime_utc": benchmark_time, + "preprocessing_instance_type": benchmark_utils.get_gce_instance_type(), + "preprocessing_gpu_type": gpu_info[0], + "preprocessing_num_gpus": gpu_info[1], + "preprocessing_success": success, + "preprocessing_peak_memory_gb": benchmark_utils.get_peak_memory(), + "preprocessing_is_preemptible": benchmark_utils.get_gce_is_preemptible(), + "preprocessing_input_load_time_s": input_load_time_s, + "preprocessing_time_s": preprocessing_time_s, + "preprocessing_output_write_time_s": output_time_s, + } - # BigQuery datetimes don't have a timezone. - benchmark_time = datetime.now(timezone.utc).replace(tzinfo=None).isoformat() + with smart_open.open(benchmark_output_uri, "w") as benchmark_output_file: + json.dump(timing_info, benchmark_output_file) - timing_info = { - "input_file_id": image_uri, - "numpy_size_mb": round(input_channels.nbytes / 1e6, 2), - "pixels_m": input_channels.shape[0] * input_channels.shape[1], - "benchmark_datetime_utc": benchmark_time, - "preprocessing_instance_type": benchmark_utils.get_gce_instance_type(), - "preprocessing_gpu_type": gpu_info[0], - "preprocessing_num_gpus": gpu_info[1], - "preprocessing_success": success, - "preprocessing_peak_memory_gb": benchmark_utils.get_peak_memory(), - "preprocessing_is_preemptible": benchmark_utils.get_gce_is_preemptible(), - "preprocessing_input_load_time_s": input_load_time_s, - "preprocessing_time_s": preprocessing_time_s, - "preprocessing_output_write_time_s": output_time_s, - } + print("Wrote benchmarking data to %s" % benchmark_output_uri) - with smart_open.open(benchmark_output_uri, "w") as benchmark_output_file: - json.dump(timing_info, benchmark_output_file) - print("Wrote benchmarking data to %s" % benchmark_output_uri) +if __name__ == "__main__": + main() diff --git a/scripts/visualize.py b/scripts/visualize.py index c98f0f8..7ef55d7 100755 --- a/scripts/visualize.py +++ b/scripts/visualize.py @@ -15,105 +15,111 @@ import smart_open import timeit -parser = argparse.ArgumentParser("visualize") - -parser.add_argument( - "--image_uri", - help="URI to input image npz file, containing an array named 'input_channels' by default (see --image-array-name)", - type=str, - required=True, -) -parser.add_argument( - "--image_array_name", - help="Name of array in input image npz file, default: input_channels", - type=str, - required=False, - default="input_channels", -) -parser.add_argument( - "--predictions_uri", - help="URI to image predictions npz file, containing an array named 'image'", - type=str, - required=True, -) -parser.add_argument( - "--visualized_input_uri", - help="Where to write visualized input png file.", - type=str, - required=True, -) -parser.add_argument( - "--visualized_predictions_uri", - help="Where to write visualized predictions png file.", - type=str, - required=True, -) - -args = parser.parse_args() - -image_uri = args.image_uri -image_array_name = args.image_array_name -predictions_uri = args.predictions_uri -visualized_input_uri = args.visualized_input_uri -visualized_predictions_uri = args.visualized_predictions_uri - -print("Loading input") - -t = timeit.default_timer() -with gcloud_storage_utils.reader(image_uri) as file: - with np.load(file) as loader: - # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels - input_channels = loader[image_array_name] -input_load_time_s = timeit.default_timer() - t - -print("Loaded input in %s s" % input_load_time_s) - -print("Loading predictions") - -t = timeit.default_timer() -with gcloud_storage_utils.reader(predictions_uri) as file: - with np.load(file) as loader: - # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels - predictions = loader["image"] -predictions_load_time_s = timeit.default_timer() - t - -print("Loaded predictions in %s s" % predictions_load_time_s) - -nuclear_color = "green" -membrane_color = "blue" - -print("Rendering input to %s" % visualized_input_uri) - -t = timeit.default_timer() - -# Create rgb overlay of image data for visualization -# Note that this normalizes the values from "whatever" to rgb range 0..1 -input_rgb = create_rgb_image( - input_channels[np.newaxis, ...], channel_colors=[nuclear_color, membrane_color] -)[0] - -# The png needs to normalize rgb values from 0..1, so normalize to 0..255 -im = Image.fromarray((input_rgb * 255).astype(np.uint8)) -with smart_open.open(visualized_input_uri, "wb") as input_png_file: - im.save(input_png_file, mode="RGB") - -input_render_time_s = timeit.default_timer() - t - -print("Rendered input in %s s" % input_render_time_s) - -print("Rendering predictions to %s" % visualized_predictions_uri) - -t = timeit.default_timer() -overlay_data = make_outline_overlay( - rgb_data=input_rgb[np.newaxis, ...], - predictions=predictions, -)[0] - -# The rgb values are 0..1, so normalize to 0..255 -im = Image.fromarray((overlay_data * 255).astype(np.uint8)) -with smart_open.open(visualized_predictions_uri, "wb") as predictions_png_file: - im.save(predictions_png_file, mode="RGB") - -predictions_render_time_s = timeit.default_timer() - t - -print("Rendered predictions in %s s" % predictions_render_time_s) + +def main(): + parser = argparse.ArgumentParser("visualize") + + parser.add_argument( + "--image_uri", + help="URI to input image npz file, containing an array named 'input_channels' by default (see --image-array-name)", + type=str, + required=True, + ) + parser.add_argument( + "--image_array_name", + help="Name of array in input image npz file, default: input_channels", + type=str, + required=False, + default="input_channels", + ) + parser.add_argument( + "--predictions_uri", + help="URI to image predictions npz file, containing an array named 'image'", + type=str, + required=True, + ) + parser.add_argument( + "--visualized_input_uri", + help="Where to write visualized input png file.", + type=str, + required=True, + ) + parser.add_argument( + "--visualized_predictions_uri", + help="Where to write visualized predictions png file.", + type=str, + required=True, + ) + + args = parser.parse_args() + + image_uri = args.image_uri + image_array_name = args.image_array_name + predictions_uri = args.predictions_uri + visualized_input_uri = args.visualized_input_uri + visualized_predictions_uri = args.visualized_predictions_uri + + print("Loading input") + + t = timeit.default_timer() + with gcloud_storage_utils.reader(image_uri) as file: + with np.load(file) as loader: + # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels + input_channels = loader[image_array_name] + input_load_time_s = timeit.default_timer() - t + + print("Loaded input in %s s" % input_load_time_s) + + print("Loading predictions") + + t = timeit.default_timer() + with gcloud_storage_utils.reader(predictions_uri) as file: + with np.load(file) as loader: + # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels + predictions = loader["image"] + predictions_load_time_s = timeit.default_timer() - t + + print("Loaded predictions in %s s" % predictions_load_time_s) + + nuclear_color = "green" + membrane_color = "blue" + + print("Rendering input to %s" % visualized_input_uri) + + t = timeit.default_timer() + + # Create rgb overlay of image data for visualization + # Note that this normalizes the values from "whatever" to rgb range 0..1 + input_rgb = create_rgb_image( + input_channels[np.newaxis, ...], channel_colors=[nuclear_color, membrane_color] + )[0] + + # The png needs to normalize rgb values from 0..1, so normalize to 0..255 + im = Image.fromarray((input_rgb * 255).astype(np.uint8)) + with smart_open.open(visualized_input_uri, "wb") as input_png_file: + im.save(input_png_file, mode="RGB") + + input_render_time_s = timeit.default_timer() - t + + print("Rendered input in %s s" % input_render_time_s) + + print("Rendering predictions to %s" % visualized_predictions_uri) + + t = timeit.default_timer() + overlay_data = make_outline_overlay( + rgb_data=input_rgb[np.newaxis, ...], + predictions=predictions, + )[0] + + # The rgb values are 0..1, so normalize to 0..255 + im = Image.fromarray((overlay_data * 255).astype(np.uint8)) + with smart_open.open(visualized_predictions_uri, "wb") as predictions_png_file: + im.save(predictions_png_file, mode="RGB") + + predictions_render_time_s = timeit.default_timer() - t + + print("Rendered predictions in %s s" % predictions_render_time_s) + + +if __name__ == "__main__": + main() From 43bf4ab5d32acadb67f2ba12e32f846a2b2038cd Mon Sep 17 00:00:00 2001 From: David Haley Date: Sun, 7 Jul 2024 23:25:28 -0700 Subject: [PATCH 4/5] Replace gcloud_storage_utils with gs_fastcopy --- benchmarking/deepcell-e2e/benchmark.py | 5 +++-- scripts/postprocess.py | 7 ++++--- scripts/predict.py | 6 +++--- scripts/preprocess.py | 7 ++++--- scripts/visualize.py | 6 +++--- 5 files changed, 17 insertions(+), 14 deletions(-) diff --git a/benchmarking/deepcell-e2e/benchmark.py b/benchmarking/deepcell-e2e/benchmark.py index a18f0ea..acb19c5 100644 --- a/benchmarking/deepcell-e2e/benchmark.py +++ b/benchmarking/deepcell-e2e/benchmark.py @@ -4,6 +4,7 @@ import csv from datetime import datetime, timezone from google.cloud import bigquery +import gs_fastcopy import io from itertools import groupby import logging @@ -20,7 +21,7 @@ import timeit import urllib.parse -from deepcell_imaging import cached_open, gcloud_storage_utils +from deepcell_imaging import cached_open BIGQUERY_RESULTS_TABLE = "deepcell-401920.benchmarking.results_batch" @@ -232,7 +233,7 @@ def main(): # smart_open doesn't support seeking on GCP, which tifffile uses. if output_path.startswith("gs://"): - with gcloud_storage_utils.writer( + with gs_fastcopy.write( "%s/predictions.tiff" % output_path ) as predictions_tiff_file: tifffile.imwrite(predictions_tiff_file, segmentation_predictions) diff --git a/scripts/postprocess.py b/scripts/postprocess.py index 8819460..ef944e4 100755 --- a/scripts/postprocess.py +++ b/scripts/postprocess.py @@ -8,7 +8,8 @@ """ import argparse -from deepcell_imaging import benchmark_utils, gcloud_storage_utils, mesmer_app +from deepcell_imaging import benchmark_utils, mesmer_app +import gs_fastcopy import json import numpy as np import smart_open @@ -75,7 +76,7 @@ def main(): t = timeit.default_timer() - with gcloud_storage_utils.reader(raw_predictions_uri) as raw_predictions_file: + with gs_fastcopy.read(raw_predictions_uri) as raw_predictions_file: with np.load(raw_predictions_file) as loader: # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels raw_predictions = { @@ -109,7 +110,7 @@ def main(): if success: print("Saving postprocessed output to %s" % output_uri) t = timeit.default_timer() - with gcloud_storage_utils.writer(output_uri) as output_writer: + with gs_fastcopy.write(output_uri) as output_writer: np.savez(output_writer, image=segmentation) # TODO (#253): save tiff output. diff --git a/scripts/predict.py b/scripts/predict.py index bc07387..fb3060a 100755 --- a/scripts/predict.py +++ b/scripts/predict.py @@ -17,9 +17,9 @@ from deepcell_imaging import ( benchmark_utils, cached_open, - gcloud_storage_utils, mesmer_app, ) +import gs_fastcopy import json import numpy as np import os @@ -94,7 +94,7 @@ def main(): t = timeit.default_timer() - with gcloud_storage_utils.reader(image_uri) as image_file: + with gs_fastcopy.read(image_uri) as image_file: with np.load(image_file) as loader: preprocessed_image = loader["image"] input_load_time_s = timeit.default_timer() - t @@ -123,7 +123,7 @@ def main(): print("Saving raw predictions output to %s" % output_uri) t = timeit.default_timer() - with gcloud_storage_utils.writer(output_uri) as output_writer: + with gs_fastcopy.write(output_uri) as output_writer: np.savez( output_writer, arr_0=model_output["whole-cell"][0], diff --git a/scripts/preprocess.py b/scripts/preprocess.py index cb74763..b8814d1 100755 --- a/scripts/preprocess.py +++ b/scripts/preprocess.py @@ -9,7 +9,8 @@ import argparse from datetime import datetime, timezone -from deepcell_imaging import benchmark_utils, gcloud_storage_utils, mesmer_app +from deepcell_imaging import benchmark_utils, mesmer_app +import gs_fastcopy import json import numpy as np import smart_open @@ -66,7 +67,7 @@ def main(): print("Loading input") t = timeit.default_timer() - with gcloud_storage_utils.reader(image_uri) as input_file: + with gs_fastcopy.read(image_uri) as input_file: with np.load(input_file) as loader: input_channels = loader[image_array_name] input_load_time_s = timeit.default_timer() - t @@ -96,7 +97,7 @@ def main(): print("Saving preprocessing output to %s" % output_uri) t = timeit.default_timer() - with gcloud_storage_utils.writer(output_uri) as output_writer: + with gs_fastcopy.write(output_uri) as output_writer: np.savez(output_writer, image=preprocessed_image) output_time_s = timeit.default_timer() - t diff --git a/scripts/visualize.py b/scripts/visualize.py index 7ef55d7..21749f6 100755 --- a/scripts/visualize.py +++ b/scripts/visualize.py @@ -9,7 +9,7 @@ import argparse from deepcell.utils.plot_utils import create_rgb_image, make_outline_overlay -from deepcell_imaging import gcloud_storage_utils +import gs_fastcopy import numpy as np from PIL import Image import smart_open @@ -62,7 +62,7 @@ def main(): print("Loading input") t = timeit.default_timer() - with gcloud_storage_utils.reader(image_uri) as file: + with gs_fastcopy.read(image_uri) as file: with np.load(file) as loader: # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels input_channels = loader[image_array_name] @@ -73,7 +73,7 @@ def main(): print("Loading predictions") t = timeit.default_timer() - with gcloud_storage_utils.reader(predictions_uri) as file: + with gs_fastcopy.read(predictions_uri) as file: with np.load(file) as loader: # An array of shape [height, width, channel] containing intensity of nuclear & membrane channels predictions = loader["image"] From fcf0d68eaa6c595626ec396bc1983311e6fb8cbd Mon Sep 17 00:00:00 2001 From: David Haley Date: Sun, 7 Jul 2024 23:26:58 -0700 Subject: [PATCH 5/5] Remove migrated files --- deepcell_imaging/gcloud_storage_utils.py | 64 ------------------------ 1 file changed, 64 deletions(-) delete mode 100644 deepcell_imaging/gcloud_storage_utils.py diff --git a/deepcell_imaging/gcloud_storage_utils.py b/deepcell_imaging/gcloud_storage_utils.py deleted file mode 100644 index 73325f5..0000000 --- a/deepcell_imaging/gcloud_storage_utils.py +++ /dev/null @@ -1,64 +0,0 @@ -from contextlib import contextmanager -import os -import subprocess -import tempfile - - -@contextmanager -def reader(gs_uri): - """ - Context manager for reading a file from Google Cloud Storage. - - Usage: - ``` - with reader('gs://my-bucket/my-file.npz') as f: - npz = np.load(f) - ``` - - This will download the file to a temporary directory, and - open it for reading. When the 'with' block exits, the file - will be closed and the temporary directory will be deleted. - """ - with tempfile.TemporaryDirectory() as tmp: - if gs_uri.endswith(".gz"): - path = os.path.join(tmp, "downloaded_file.gz") - else: - path = os.path.join(tmp, "downloaded_file") - - # Transfer the file. - # TODO: handle errors - subprocess.run(["gcloud", "storage", "cp", gs_uri, path]) - - # If necessary, decompress the file before reading. - # unpigz is a parallel gunzip implementation that's - # much faster when hardware is available. - if path.endswith(".gz"): - subprocess.run(["unpigz", path]) - path = path[:-3] - - with open(path, "rb") as f: - yield f - - -@contextmanager -def writer(gs_uri): - # Create a temporary scratch directory. - # Will be deleted when the 'with' closes. - with tempfile.TemporaryDirectory() as tmp_dir: - # We need an actual filename within the scratch directory. - buffer_file_name = os.path.join(tmp_dir, "file_to_upload") - - # Yield the file object for the caller to write. - with open(buffer_file_name, "wb") as tmp_file: - yield tmp_file - - # If requested, compress the file before uploading. - # pigz is a parallel gzip implementation that's - # much faster than numpy's savez_compressed. - if gs_uri.endswith(".gz"): - # TODO: handle errors - subprocess.run(["pigz", buffer_file_name]) - buffer_file_name += ".gz" - - # TODO: handle errors - subprocess.run(["gcloud", "storage", "cp", buffer_file_name, gs_uri])