Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add distributed example of Pytorch Lightning with Ray Train #170

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/workflows/test-examples.yml
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ jobs:
- integrations/model-training/pytorch/notebooks/Comet_Pytorch_TensorboardX.ipynb
- integrations/model-training/pytorch/notebooks/Histogram_Logging_Pytorch.ipynb
- integrations/model-training/ray-train/notebooks/Comet_with_ray_train_keras.ipynb
- integrations/model-training/ray-train/notebooks/Comet_with_ray_train_lightning.ipynb
- integrations/model-training/ray-train/notebooks/Comet_with_ray_train_xgboost.ipynb
- integrations/model-training/tensorflow/notebooks/Comet_and_Tensorflow.ipynb
- integrations/model-training/yolov5/notebooks/Comet_and_YOLOv5.ipynb
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"<img src=\"https://cdn.comet.ml/img/notebook_logo.png\">"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"[Comet](https://www.comet.com/site/products/ml-experiment-tracking/?utm_campaign=ray_train&utm_medium=colab) is an MLOps Platform that is designed to help Data Scientists and Teams build better models faster! Comet provides tooling to track, Explain, Manage, and Monitor your models in a single place! It works with Jupyter Notebooks and Scripts and most importantly it's 100% free to get started!\n",
"\n",
"[Ray Train](https://docs.ray.io/en/latest/train/train.html) abstracts away the complexity of setting up a distributed training system.\n",
"\n",
"Instrument your runs with Comet to start managing experiments, create dataset versions and track hyperparameters for faster and easier reproducibility and collaboration.\n",
"\n",
"[Find more information about our integration with Ray Train](https://www.comet.ml/docs/v2/integrations/ml-frameworks/ray/)\n",
"\n",
"Get a preview for what's to come. Check out a completed experiment created from this notebook [here](https://www.comet.com/examples/comet-example-ray-train-xgboost/43c968fda9e74260996f8cafb5b9f32c).\n",
"\n",
"This example is based on the [following Ray Train XGBoost example](https://docs.ray.io/en/latest/train/distributed-xgboost-lightgbm.html)."
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "ZYchV5RWwdv5"
},
"source": [
"# Install Dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "DJnmqphuY2eI"
},
"outputs": [],
"source": [
"%pip install -U comet_ml \"ray[air]>=2.1.0\" \"lightning\" torch torchvision"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "crOcPHobwhGL"
},
"source": [
"# Initialize Comet"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "HNQRM0U3caiY"
},
"outputs": [],
"source": [
"import comet_ml\n",
"import comet_ml.integration.ray\n",
"\n",
"comet_ml.init(project_name=\"comet-example-ray-train-lightning\")"
]
},
{
"cell_type": "markdown",
"metadata": {
"id": "cgqwGSwtzVWD"
},
"source": [
"# Import Dependencies"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"id": "e-5rRYaUw5AF"
},
"outputs": [],
"source": [
"import os\n",
"import tempfile\n",
"\n",
"import torch\n",
"from torch.utils.data import DataLoader\n",
"from torchvision.models import resnet18\n",
"from torchvision.datasets import FashionMNIST\n",
"from torchvision.transforms import ToTensor, Normalize, Compose\n",
"import lightning.pytorch as pl\n",
"\n",
"import ray.train.lightning\n",
"from ray.train.torch import TorchTrainer"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Define model"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Model, Loss, Optimizer\n",
"class ImageClassifier(pl.LightningModule):\n",
" def __init__(self):\n",
" super(ImageClassifier, self).__init__()\n",
" self.model = resnet18(num_classes=10)\n",
" self.model.conv1 = torch.nn.Conv2d(\n",
" 1, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False\n",
" )\n",
" self.criterion = torch.nn.CrossEntropyLoss()\n",
"\n",
" def forward(self, x):\n",
" return self.model(x)\n",
"\n",
" def training_step(self, batch, batch_idx):\n",
" x, y = batch\n",
" outputs = self.forward(x)\n",
" loss = self.criterion(outputs, y)\n",
" self.log(\"loss\", loss, on_step=True, prog_bar=True)\n",
" return loss\n",
"\n",
" def configure_optimizers(self):\n",
" return torch.optim.Adam(self.model.parameters(), lr=0.001)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Define your distributed training function\n",
"\n",
"This function is gonna be distributed and executed on each distributed worker."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def train_func(config):\n",
" from comet_ml.integration.ray import comet_worker_logger\n",
"\n",
" with comet_worker_logger(config) as comet_experiment:\n",
" # Data\n",
" transform = Compose([ToTensor(), Normalize((0.5,), (0.5,))])\n",
" data_dir = os.path.join(tempfile.gettempdir(), \"data\")\n",
" train_data = FashionMNIST(\n",
" root=data_dir, train=True, download=True, transform=transform\n",
" )\n",
" train_dataloader = DataLoader(train_data, batch_size=128, shuffle=True)\n",
"\n",
" # Training\n",
" model = ImageClassifier()\n",
"\n",
" comet_logger = pl.loggers.CometLogger(\n",
" experiment_key=comet_experiment.get_key(),\n",
" )\n",
" # Force the Comet Logger to use the same experiment to enable system metrics logging for all workers\n",
" comet_logger._experiment = comet_experiment\n",
"\n",
" # Configure PyTorch Lightning Trainer.\n",
" trainer = pl.Trainer(\n",
" max_epochs=5,\n",
" devices=\"auto\",\n",
" accelerator=\"auto\",\n",
" strategy=ray.train.lightning.RayDDPStrategy(),\n",
" plugins=[ray.train.lightning.RayLightningEnvironment()],\n",
" callbacks=[ray.train.lightning.RayTrainReportCallback()],\n",
" logger=comet_logger,\n",
" enable_checkpointing=False,\n",
" # Enable fast iteration\n",
" limit_train_batches=50,\n",
" )\n",
" trainer = ray.train.lightning.prepare_trainer(trainer)\n",
" trainer.fit(model, train_dataloaders=train_dataloader)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Define the function that schedule the distributed job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def train_lightning(num_workers: int = 2, use_gpu: bool = False) -> ray.train.Result:\n",
" # Configure scaling and resource requirements.\n",
" scaling_config = ray.train.ScalingConfig(num_workers=num_workers, use_gpu=use_gpu)\n",
"\n",
" # Define the configuration dictionary that is gonna be sent to the training function.\n",
" # The Comet Ray Integration rely on this configuration dictionary so it's important to pass it, even if it looks empty\n",
" train_loop_config = {}\n",
"\n",
" # Comet callback\n",
" comet_callback = comet_ml.integration.ray.CometTrainLoggerCallback(\n",
" train_loop_config\n",
" )\n",
"\n",
" run_config = ray.train.RunConfig(callbacks=[comet_callback])\n",
"\n",
" # Launch distributed training job.\n",
" trainer = TorchTrainer(\n",
" train_func,\n",
" train_loop_config=train_loop_config,\n",
" scaling_config=scaling_config,\n",
" run_config=run_config,\n",
" )\n",
"\n",
" return trainer.fit()"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Train the model\n",
"\n",
"Ray will wait indefinitely if we request more num_workers that the available resources, the code below ensure we never request more CPU than available locally."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ideal_num_workers = 2\n",
"\n",
"available_local_cpu_count = os.cpu_count() - 1\n",
"num_workers = min(ideal_num_workers, available_local_cpu_count)\n",
"\n",
"if num_workers < 1:\n",
" num_workers = 1\n",
"\n",
"train_lightning(num_workers, use_gpu=False)"
]
}
],
"metadata": {
"colab": {
"provenance": []
},
"kernelspec": {
"display_name": "Python 3 (ipykernel)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.10.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
Loading