diff --git a/scripts/azure/machine_learning/run_job.py b/scripts/azure/machine_learning/run_job.py index 485cf3a..658b8c8 100644 --- a/scripts/azure/machine_learning/run_job.py +++ b/scripts/azure/machine_learning/run_job.py @@ -32,16 +32,13 @@ def connect_to_workspace(): def get_compute(ml_client): cpu_compute_target = os.getenv("AZURE_COMPUTE_TARGET") - size = os.getenv("AZURE_COMPUTE_SIZE") - min_instances = os.getenv("AZURE_COMPUTE_MIN_INSTANCES") - max_instances = os.getenv("AZURE_COMPUTE_MAX_INSTANCES") try: ml_client.compute.get(cpu_compute_target) except Exception: click.echo(f"Compute {cpu_compute_target} not found.") -def submit_job(ml_client, model, optimizer, loss, epochs, batch_size): +def submit_job(ml_client, model, optimizer, loss, epochs, batch_size, distributed): code = os.getenv("AZURE_CODE_PATH") environment = os.getenv("AZURE_ENVIRONMENT") type_ = os.getenv("AZURE_STORAGE_TYPE") @@ -53,15 +50,19 @@ def submit_job(ml_client, model, optimizer, loss, epochs, batch_size): job_name = f"train_{model}_{datetime.now().strftime('%Y%m%d%H%M%S')}" + command_var = ( + "python -m scripts.azure.machine_learning.train" + " --train ${{inputs.train}} --test ${{inputs.test}}" + " --epochs ${{inputs.epochs}} --optimizer ${{inputs.optimizer}}" + " --loss ${{inputs.loss}} --batch_size ${{inputs.batch_size}}" + " --model ${{inputs.model}} --job_name ${{inputs.job_name}}" + ) + + command_var = command_var + " --distributed" if distributed else command_var + command_job = command( code=code, - command=( - "python -m scripts.azure.machine_learning.train" - " --train ${{inputs.train}} --test ${{inputs.test}}" - " --epochs ${{inputs.epochs}} --optimizer ${{inputs.optimizer}}" - " --loss ${{inputs.loss}} --batch_size ${{inputs.batch_size}}" - " --model ${{inputs.model}} --job_name ${{inputs.job_name}}" - ), + command=command_var, environment=environment, inputs={ "train": Input( @@ -115,7 +116,8 @@ def register_model(ml_client, returned_job, run_name, run_description): ) @click.option("--epochs", type=int, default=10, help="Number of epochs to train for") @click.option("--batch_size", type=int, default=32, help="Batch size to use") -def run(model, optimizer, loss, epochs, batch_size): +@click.option("--distributed", is_flag=True, help="Use distributed startegy") +def run(model, optimizer, loss, epochs, batch_size, distributed): if model not in MODELS: raise ValueError(f"Model {model} not supported") @@ -130,6 +132,7 @@ def run(model, optimizer, loss, epochs, batch_size): loss=loss, epochs=epochs, batch_size=batch_size, + distributed=distributed, ) click.echo( diff --git a/scripts/azure/machine_learning/test_distributed_mobilenet.sh b/scripts/azure/machine_learning/test_distributed_mobilenet.sh new file mode 100755 index 0000000..a42506b --- /dev/null +++ b/scripts/azure/machine_learning/test_distributed_mobilenet.sh @@ -0,0 +1,7 @@ +python -m scripts.azure.machine_learning.run_job \ + --model mobilenet \ + --optimizer adam \ + --loss binary_crossentropy \ + --epochs 2 \ + --batch_size 128 \ + --distributed diff --git a/scripts/azure/machine_learning/test_distributed_models.sh b/scripts/azure/machine_learning/test_distributed_models.sh new file mode 100755 index 0000000..ffcd62b --- /dev/null +++ b/scripts/azure/machine_learning/test_distributed_models.sh @@ -0,0 +1,18 @@ +#!/bin/sh + +OPTIMIZER="adam" +LOSS="binary_crossentropy" +EPOCHS=5 +BATCH_SIZE=128 + +for model in mobilenet nasnet efficientnet efficientnetv2 densenet inceptionnet xception resnet resnetv2 convnext inceptionresnet vgg; + do + python -m scripts.azure.machine_learning.run_job \ + --model "$model" \ + --optimizer "$OPTIMIZER" \ + --loss "$LOSS" \ + --epochs "$EPOCHS" \ + --batch_size "$BATCH_SIZE" \ + --distributed + done + diff --git a/scripts/azure/machine_learning/test_mobilenet.sh b/scripts/azure/machine_learning/test_mobilenet.sh index 995a490..6f43a83 100755 --- a/scripts/azure/machine_learning/test_mobilenet.sh +++ b/scripts/azure/machine_learning/test_mobilenet.sh @@ -3,4 +3,4 @@ python -m scripts.azure.machine_learning.run_job \ --optimizer adam \ --loss binary_crossentropy \ --epochs 2 \ - --batch_size 64 + --batch_size 16 diff --git a/scripts/azure/machine_learning/test_models.sh b/scripts/azure/machine_learning/test_models.sh index d3bae4e..610d268 100755 --- a/scripts/azure/machine_learning/test_models.sh +++ b/scripts/azure/machine_learning/test_models.sh @@ -2,7 +2,7 @@ OPTIMIZER="adam" LOSS="binary_crossentropy" -EPOCHS=1 +EPOCHS=5 BATCH_SIZE=32 for model in mobilenet nasnet efficientnet efficientnetv2 densenet inceptionnet xception resnet resnetv2 convnext inceptionresnet vgg; diff --git a/scripts/azure/machine_learning/train.py b/scripts/azure/machine_learning/train.py index 2de56bb..c4e7a2c 100644 --- a/scripts/azure/machine_learning/train.py +++ b/scripts/azure/machine_learning/train.py @@ -18,7 +18,7 @@ MODELS, BUILDERS, CALLBACKS, - METRICS, + METRICS, config_logging ) @@ -27,6 +27,39 @@ logger = logging.getLogger("azure") +def get_compiled_model(model, optimizer, loss): + builder = BUILDERS[model]() + + director = ModelDirector(builder) + model_nn = director.make() + logger.info(f"Built model_nn with {str(builder)}") + + optimizer_cls = { + "adam": tf.keras.optimizers.Adam, + "sgd": tf.keras.optimizers.SGD, + }[optimizer]() + + loss_cls = { + "binary_crossentropy": tf.keras.losses.BinaryCrossentropy, + "categorical_crossentropy": tf.keras.losses.CategoricalCrossentropy, + }[loss]() + + metrics = [metric() for metric in METRICS] + + model_nn.compile(optimizer=optimizer_cls, loss=loss_cls, metrics=metrics, run_eagerly=True) + logger.info("Compiled model") + + return model_nn + + +def get_compiled_distributed_model(model, optimizer, loss): + strategy = tf.distribute.MultiWorkerMirroredStrategy() + + with strategy.scope(): + model_nn = get_compiled_model(model, optimizer, loss) + + return model_nn + @click.command() @click.option( "--model", type=click.Choice(MODELS), default="mobilenet", help="Model to train" @@ -50,7 +83,8 @@ @click.option("--epochs", type=click.INT, default=10, help="Number of epochs to train for") @click.option("--batch_size", type=click.INT, default=64, help="Batch size for dataset loaders") @click.option("--job_name", type=click.STRING, help="Azure Machine Learning job name") -def run(model, train, test, optimizer, loss, epochs, batch_size, job_name): +@click.option("--distributed", is_flag=True, help="Use distributed startegy") +def run(model, train, test, optimizer, loss, epochs, batch_size, job_name, distributed): mlflow.set_experiment("lung-cancer-detection") mlflow_run = mlflow.start_run(run_name=f"train_{model}_{datetime.now().strftime('%Y%m%d%H%M%S')}") @@ -64,12 +98,11 @@ def run(model, train, test, optimizer, loss, epochs, batch_size, job_name): logger.info( f"Run parameters - optimizer: {optimizer}, loss: {loss}" ) - - builder = BUILDERS[model]() - - director = ModelDirector(builder) - model_nn = director.make() - logger.info(f"Built model_nn with {str(builder)}") + + if not distributed: + model_nn = get_compiled_model(model, optimizer, loss) + else: + model_nn = get_compiled_distributed_model(model, optimizer, loss) train_loader = DatasetLoader(train) test_loader = DatasetLoader(test) @@ -81,9 +114,6 @@ def run(model, train, test, optimizer, loss, epochs, batch_size, job_name): test_dataset = test_loader.get_dataset() logger.info("Loaded train and test datasets") - model_nn.compile(optimizer=optimizer, loss=loss, metrics=METRICS) - logger.info("Compiled model") - history = model_nn.fit(train_dataset, epochs=epochs, callbacks=CALLBACKS) logger.info("Trained model") diff --git a/scripts/azure/machine_learning/train_distributed_models.sh b/scripts/azure/machine_learning/train_distributed_models.sh new file mode 100755 index 0000000..70798fc --- /dev/null +++ b/scripts/azure/machine_learning/train_distributed_models.sh @@ -0,0 +1,18 @@ +#!/bin/sh + +OPTIMIZER="adam" +LOSS="binary_crossentropy" +EPOCHS=100 +BATCH_SIZE=128 + +for model in mobilenet nasnet efficientnet efficientnetv2 densenet inceptionnet xception resnet resnetv2 convnext inceptionresnet vgg; + do + python -m scripts.azure.machine_learning.run_job \ + --model "$model" \ + --optimizer "$OPTIMIZER" \ + --loss "$LOSS" \ + --epochs "$EPOCHS" \ + --batch_size "$BATCH_SIZE" \ + --distributed + done + diff --git a/src/config.py b/src/config.py index 1236aac..e597eef 100644 --- a/src/config.py +++ b/src/config.py @@ -64,12 +64,14 @@ def config_logging(): } _threshold = 0.5 + +# lambda is needed, because metrics need to be created within strategy scope METRICS = [ - tf.keras.metrics.BinaryAccuracy(threshold=_threshold, name="accuracy"), - tfa.metrics.F1Score(num_classes=1, threshold=_threshold, name="f1"), - tf.keras.metrics.Precision(thresholds=_threshold, name="precision"), - tf.keras.metrics.Recall(thresholds=_threshold, name="recall"), - tf.keras.metrics.AUC(thresholds=[_threshold], curve="ROC", name="roc_auc"), + lambda : tf.keras.metrics.BinaryAccuracy(threshold=_threshold, name="accuracy"), + lambda : tfa.metrics.F1Score(num_classes=1, threshold=_threshold, name="f1"), + lambda : tf.keras.metrics.Precision(thresholds=_threshold, name="precision"), + lambda : tf.keras.metrics.Recall(thresholds=_threshold, name="recall"), + lambda : tf.keras.metrics.AUC(thresholds=[_threshold], curve="ROC", name="roc_auc"), ] EARLY_STOPPING_CONFIG = { diff --git a/src/dataset/dataset_loader.py b/src/dataset/dataset_loader.py index fd335b5..b2d1ae5 100644 --- a/src/dataset/dataset_loader.py +++ b/src/dataset/dataset_loader.py @@ -75,10 +75,20 @@ def _shuffle(self, paths, labels): return paths, labels - def _load_batch_data(self, paths_batch): + def _load_batch_data(self, paths_batch, labels_batch): """Loads batch of data""" - batch_data = [np.load(path) for path in paths_batch] - return batch_data + # def load(path: str) -> np.ndarray | None: + # try: + # data = np.load(path) + # except ValueError as e: + # logger.error(f"Error while loading {path}.\n{e}") + # return None + # + # batch_data = [(load(path), label) for path, label in zip(paths_batch, labels_batch)] + # batch_data = [(data, label) for data, label in batch_data if data is not None] + # + # return batch_data + return [(np.load(path), label) for path, label in zip(paths_batch, labels_batch)] def _data_generator(self): """Loads and yields batches of data""" @@ -89,9 +99,18 @@ def _data_generator(self): for i in range(0, len(paths), self.batch_size): paths_batch = paths[i : i + self.batch_size] labels_batch = labels[i : i + self.batch_size] - - batch_data = self._load_batch_data(paths_batch) + + try: + batch_data = self._load_batch_data(paths_batch, labels_batch) + except Exception as e: + logger.error(f"Error while loading batch {i}.\n{e}") + continue logger.info(f"Batch {i} loaded") - yield np.array(batch_data), np.array(labels_batch) + if len(batch_data) == 0: + continue + + data_batch, batch_labels = zip(*batch_data) + + yield np.array(data_batch), np.array(labels_batch)