diff --git a/tutorials/distributed_data_classification/pytorch-ensemble-classification.ipynb b/tutorials/distributed_data_classification/pytorch-ensemble-classification.ipynb new file mode 100644 index 000000000..77a3960e1 --- /dev/null +++ b/tutorials/distributed_data_classification/pytorch-ensemble-classification.ipynb @@ -0,0 +1,716 @@ +{ + "cells": [ + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Distributed Data Classification with Multiple Classifiers\n", + "\n", + "Cross-validation is a machine learning technique in which multiple models are trained on multiple subsets of your data and validated on the remaining data portions. It is useful because it reduces the risk of overfitting to your data and provides a better estimate of how the model will perform on unseen data. This is particularly valuable when dealing with limited data, as it allows for more efficient use of the available samples.\n", + "\n", + "In this tutorial, we demonstrate how to use NeMo Curator's `DistributedDataClassifier` to build our own `PyTorchClassifier` class for loading and performing batched inference with multiple pretrained models. We assume the user has pretrained PTH model files, with [DeBERTaV3](https://huggingface.co/microsoft/deberta-v3-base) as the base model used for training. The classifiers are accelerated using [CrossFit](https://github.com/rapidsai/crossfit), a library that leverages intellegent batching and RAPIDS to accelerate the offline inference on large datasets.\n", + "\n", + "First, let's run some preliminary imports and set up our Dask client." + ] + }, + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "env: PYTHONWARNINGS=ignore\n" + ] + } + ], + "source": [ + "import os\n", + "os.environ[\"RAPIDS_NO_INITIALIZE\"] = \"1\"\n", + "\n", + "# Silence Warnings (HuggingFace internal warnings)\n", + "%env PYTHONWARNINGS=ignore\n", + "import warnings\n", + "warnings.filterwarnings(\"ignore\")" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [], + "source": [ + "from dataclasses import dataclass\n", + "from typing import List, Optional" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "import cudf\n", + "import dask_cudf\n", + "import torch\n", + "import torch.nn as nn\n", + "from crossfit.backend.torch.hf.model import HFModel\n", + "from transformers import AutoConfig, AutoModel\n", + "from transformers.models.deberta_v2 import DebertaV2TokenizerFast" + ] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "# NeMo Curator modules\n", + "from nemo_curator import get_client\n", + "from nemo_curator.classifiers.base import (\n", + " DistributedDataClassifier,\n", + " _run_classifier_helper,\n", + ")\n", + "from nemo_curator.datasets import DocumentDataset" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "cuDF Spilling is enabled\n" + ] + } + ], + "source": [ + "client = get_client(cluster_type=\"gpu\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create `PyTorchClassifier` Class\n", + "\n", + "To create our `PyTorchClassifier` class, we will be extendeding NeMo Curator's `DistributedDataClassifier` class.\n", + "\n", + "The goal of the base `DistributedDataClassifier` class is to enable multi-node multi-GPU data classification of your data. NeMo Curator provides several subclasses that focus on domain, quality, content safety, and educational content classification. However, the `DistributedDataClassifier` can be extended to fit any model; the only requirement is that the model can fit on a single GPU. See NeMo Curator's [Distributed Data Classification](https://docs.nvidia.com/nemo-framework/user-guide/latest/datacuration/distributeddataclassification.html) documentation for more information.\n", + "\n", + "First, let's create a `PyTorchModelConfig` class. Its purpose is to store some of the attributes that will be used by our model, including the base model of the classifier." + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [], + "source": [ + "@dataclass\n", + "class PyTorchModelConfig:\n", + " base_model: str = \"microsoft/deberta-v3-base\"\n", + " fc_dropout: float = 0.2\n", + " max_len: int = 512" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Next, we create an `NCCustomModel` (for \"NeMo Curator Custom Model\") class. It inherits from `nn.Module`, the base class for all neural network modules in PyTorch.\n", + "\n", + "Inside `__init__`, the model loads the model configuration and model. The `autocast` boolean determines whether mixed precision (`torch.autocast`) is used during inference to speed up computations on CUDA devices. The `forward` method is required by `nn.Module` and runs the model's forward pass (the computation performed at every call)." + ] + }, + { + "cell_type": "code", + "execution_count": 7, + "metadata": {}, + "outputs": [], + "source": [ + "class NCCustomModel(nn.Module):\n", + " def __init__(\n", + " self,\n", + " config: dataclass,\n", + " out_dim: int,\n", + " config_path: str = None,\n", + " pretrained: bool = False,\n", + " autocast: bool = False,\n", + " ):\n", + " super().__init__()\n", + " self.config = config\n", + " if config_path is None:\n", + " self.config = AutoConfig.from_pretrained(\n", + " config.base_model, output_hidden_states=True\n", + " )\n", + " else:\n", + " self.config = torch.load(config_path)\n", + "\n", + " if pretrained:\n", + " self.model = AutoModel.from_pretrained(\n", + " config.base_model, config=self.config\n", + " )\n", + " else:\n", + " self.model = AutoModel(self.config)\n", + "\n", + " self.fc_dropout = nn.Dropout(config.fc_dropout)\n", + " self.fc = nn.Linear(self.config.hidden_size, out_dim)\n", + " self.autocast = autocast\n", + "\n", + " def feature(self, input_ids, attention_mask):\n", + " outputs = self.model(input_ids=input_ids, attention_mask=attention_mask)\n", + " last_hidden_states = outputs[0]\n", + " return last_hidden_states\n", + "\n", + " def _forward(self, batch):\n", + " feature = self.feature(batch[\"input_ids\"], batch[\"attention_mask\"])\n", + " output = self.fc(self.fc_dropout(feature))\n", + " output = output.to(torch.float32)\n", + " return torch.softmax(output[:, 0, :], dim=1)\n", + "\n", + " def forward(self, batch):\n", + " if self.autocast:\n", + " with torch.autocast(device_type=\"cuda\"):\n", + " return self._forward(batch)\n", + " else:\n", + " return self._forward(batch)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Now, let's create the `PyTorchModel` class, a model management class. It inherits from `HFModel`, a class created by NVIDIA's [CrossFit](https://github.com/rapidsai/crossfit) library, which enables multi-node and multi-GPU offline inference. In it, we create several methods which define how to load our model, its configuration, and its tokenizer." + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [], + "source": [ + "class PyTorchModel(HFModel):\n", + " def __init__(\n", + " self,\n", + " config: dataclass,\n", + " out_dim: int,\n", + " model_path: str,\n", + " autocast: bool = False,\n", + " ):\n", + " self.config = config\n", + " self.out_dim = out_dim\n", + " self.model_path = model_path\n", + " self.autocast = autocast\n", + " super().__init__(self.config.base_model)\n", + "\n", + " def load_model(self, device: str = \"cuda\"):\n", + " model = NCCustomModel(\n", + " self.config,\n", + " out_dim=self.out_dim,\n", + " config_path=None,\n", + " pretrained=True,\n", + " autocast=self.autocast,\n", + " )\n", + " model = model.to(device)\n", + "\n", + " if os.path.exists(self.model_path):\n", + " sd = torch.load(self.model_path, map_location=\"cpu\")\n", + " if \"model_state_dict\" in sd:\n", + " sd = sd[\"model_state_dict\"]\n", + " sd = {k[7:] if k.startswith(\"module.\") else k: sd[k] for k in sd.keys()}\n", + " model.load_state_dict(sd, strict=True)\n", + " else:\n", + " raise ValueError(f\"Model path {self.model_path} does not exist\")\n", + "\n", + " return model.eval()\n", + "\n", + " def load_tokenizer(self):\n", + " return DebertaV2TokenizerFast.from_pretrained(self.config.base_model)\n", + "\n", + " def load_config(self):\n", + " return AutoConfig.from_pretrained(self.path_or_name)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "Finally, we create the `PyTorchClassifier` class. We start with the `__init__` method, which uses the `DistributedDataClassifier`, `PyTorchModelConfig`, and `PyTorchModel` classes described above. Next is the `_run_classifier` method, which is called by `DistributedDataClassifier`'s `__call__` method; it is required for all classes that inherit the `DistributedDataClassifier` class.\n", + "\n", + "Here is a quick rundown of all the attributes of the `PyTorchClassifier` class:\n", + "- `pretrained_model_name_or_path` (`str`): The path to your PyTorch model file.\n", + "- `labels` (`list[str]`): The classes output by the model classifier.\n", + "- `out_dim` (`list[str], optional`): Set to 1 for a binary classification task. Otherwise, defaults to `len(labels)`.\n", + "- `filter_by` (`list[str], optional`): The classes to filter the dataset by. If None, all classes will be included. Defaults to None.\n", + "- `batch_size` (`int`): The number of samples per batch for inference. Defaults to 256.\n", + "- `text_field` (`str`): The field in the dataset that should be classified.\n", + "- `pred_column` (`str`): The column name where predictions will be stored. Defaults to \"pred\".\n", + "- `prob_column` (`str`): The column name where prediction probabilities will be stored. Defaults to \"prob\".\n", + "- `max_chars` (`int`): The maximum number of characters in each document to consider for classification. Defaults to 6000.\n", + "- `device_type` (`str`): The type of device to use for inference, either \"cuda\" or \"cpu\". Defaults to \"cuda\".\n", + "- `autocast` (`bool`): Whether to use mixed precision for faster inference. Defaults to True.\n", + "- `base_model` (`str`): The base model on which your PyTorch model was trained. Defaults to \"microsoft/deberta-v3-base\".\n", + "- `fc_dropout` (`str`): Dropout rate used during training. Defaults to 0.2.\n", + "- `max_len` (`str`): Maximum sequence length used during training. Defaults to 512." + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "class PyTorchClassifier(DistributedDataClassifier):\n", + " \"\"\"\n", + " PyTorchClassifier is a general classifier designed for running generic PTH model files.\n", + " This class is optimized for running on multi-node, multi-GPU setups to enable fast and efficient inference on large datasets.\n", + "\n", + " \"\"\"\n", + "\n", + " def __init__(\n", + " self,\n", + " pretrained_model_name_or_path: str,\n", + " labels: List[str],\n", + " out_dim: Optional[int] = None,\n", + " filter_by: Optional[List[str]] = None,\n", + " batch_size: int = 256,\n", + " text_field: str = \"text\",\n", + " pred_column: str = \"pred\",\n", + " prob_column: str = \"prob\",\n", + " max_chars: int = 6000,\n", + " device_type: str = \"cuda\",\n", + " autocast: bool = True,\n", + " base_model: str = \"microsoft/deberta-v3-base\",\n", + " fc_dropout: float = 0.2,\n", + " max_len: int = 512,\n", + " ):\n", + " config = PyTorchModelConfig(\n", + " base_model=base_model,\n", + " fc_dropout=fc_dropout,\n", + " max_len=max_len,\n", + " )\n", + "\n", + " self.labels = labels\n", + " if out_dim:\n", + " self.out_dim = out_dim\n", + " else:\n", + " self.out_dim = len(labels)\n", + "\n", + " self.text_field = text_field\n", + " self.prob_column = prob_column\n", + "\n", + " model = PyTorchModel(\n", + " config=config,\n", + " out_dim=self.out_dim,\n", + " model_path=pretrained_model_name_or_path,\n", + " autocast=autocast,\n", + " )\n", + "\n", + " super().__init__(\n", + " model=model,\n", + " labels=self.labels,\n", + " filter_by=filter_by,\n", + " batch_size=batch_size,\n", + " out_dim=self.out_dim,\n", + " pred_column=pred_column,\n", + " max_chars=max_chars,\n", + " device_type=device_type,\n", + " autocast=autocast,\n", + " )\n", + "\n", + " def _run_classifier(self, dataset: DocumentDataset):\n", + " print(\"Starting PyTorch classifier inference\", flush=True)\n", + " df = dataset.df\n", + " df = _run_classifier_helper(\n", + " df=df,\n", + " model=self.model,\n", + " labels=self.labels,\n", + " max_chars=self.max_chars,\n", + " batch_size=self.batch_size,\n", + " label_col=self.pred_column,\n", + " text_field=self.text_field,\n", + " prob_col=self.prob_column,\n", + " )\n", + " return DocumentDataset(df)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "We have successfully built our PyTorch classifier pipeline! Now, let's demonstrate how to use it with a simple example.\n", + "\n", + "# Prepare Dataset and Set File Paths\n", + "\n", + "For our demonstration, we need to create or read the dataset on which we want to run inference. In this notebook, we provide a sample dataset with 10 text sentences to evaluate. Alternatively, the user may read in their own existing data (e.g., JSON or Parquet files) as demonstrated by the commented code." + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "# Create sample DataFrame\n", + "text = [\n", + " \"Quantum computing is set to revolutionize the field of cryptography.\",\n", + " \"Investing in index funds is a popular strategy for long-term financial growth.\",\n", + " \"Recent advancements in gene therapy offer new hope for treating genetic disorders.\",\n", + " \"Online learning platforms have transformed the way students access educational resources.\",\n", + " \"Traveling to Europe during the off-season can be a more budget-friendly option.\",\n", + " \"Training regimens for athletes have become more sophisticated with the use of data analytics.\",\n", + " \"Streaming services are changing the way people consume television and film content.\",\n", + " \"Vegan recipes have gained popularity as more people adopt plant-based diets.\",\n", + " \"Climate change research is critical for developing sustainable environmental policies.\",\n", + " \"Telemedicine has become increasingly popular due to its convenience and accessibility.\",\n", + "]\n", + "df = cudf.DataFrame({\"text\": text})\n", + "dataset = DocumentDataset(dask_cudf.from_cudf(df, npartitions=1))\n", + "write_to_filename = False\n", + "\n", + "# Alternatively, read existing directory of JSONL files\n", + "# input_file_path=\"/input_data_dir/\"\n", + "# input_dataset = DocumentDataset.read_json(\n", + "# input_file_path, backend=\"cudf\", add_filename=True\n", + "# )\n", + "# write_to_filename = True" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "The user should also specify where to write the results, as well as the local file paths to the pretrained PyTorch classifiers. Finally, the user should include the labels the classifier is expected to produce." + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "output_file_path = \"output_data_dir/\"\n", + "model_paths = [\n", + " \"model0.pth\",\n", + " \"model1.pth\",\n", + " \"model2.pth\",\n", + " \"model3.pth\",\n", + " \"model4.pth\",\n", + "]\n", + "labels = [\"label_a\", \"label_b\", \"label_c\"]" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Run Classification with Multiple Models\n", + "\n", + "Now we can use the `PyTorchClassifier` class to load each of our PyTorch models and run inference. We will write the results to a JSON file." + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Starting PyTorch classifier inference\n", + "Starting PyTorch classifier inference\n", + "Starting PyTorch classifier inference\n", + "Starting PyTorch classifier inference\n", + "Starting PyTorch classifier inference\n" + ] + } + ], + "source": [ + "fold = 0\n", + "pred_columns = []\n", + "for model_path in model_paths:\n", + " pred_column = \"pred_\" + str(fold)\n", + " prob_column = \"prob_\" + str(fold)\n", + " pred_columns.append(pred_column)\n", + "\n", + " classifier = PyTorchClassifier(\n", + " pretrained_model_name_or_path=model_path,\n", + " labels=labels,\n", + " batch_size=1024,\n", + " text_field=\"text\",\n", + " pred_column=pred_column,\n", + " prob_column=prob_column,\n", + " )\n", + " dataset = classifier(dataset=dataset)\n", + " fold += 1" + ] + }, + { + "cell_type": "code", + "execution_count": 13, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:21<00:00, 2.13s/it]\n", + "GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00, 1.81it/s]\n", + "GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00, 1.81it/s]\n", + "GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00, 1.84it/s]\n", + "GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:04<00:00, 2.01it/s]" + ] + }, + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Writing to disk complete for 1 partitions\n", + "CPU times: user 7.01 s, sys: 4.56 s, total: 11.6 s\n", + "Wall time: 1min 4s\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "GPU: tcp://127.0.0.1:32893, Part: 0: 100%|██████████| 10/10 [00:05<00:00, 1.81it/s]\n" + ] + } + ], + "source": [ + "%%time\n", + "\n", + "dataset.to_json(output_file_dir=output_file_path, write_to_filename=write_to_filename)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Inspect the Output\n", + "\n", + "Finally, let's verify that everything worked as expected." + ] + }, + { + "cell_type": "code", + "execution_count": 14, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Reading 1 files\n" + ] + }, + { + "data": { + "text/html": [ + "
\n", + " | pred_0 | \n", + "pred_1 | \n", + "pred_2 | \n", + "pred_3 | \n", + "pred_4 | \n", + "prob_0 | \n", + "prob_1 | \n", + "prob_2 | \n", + "prob_3 | \n", + "prob_4 | \n", + "text | \n", + "
---|---|---|---|---|---|---|---|---|---|---|---|
0 | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "[0.37283509970000006, 0.49910834430000006, 0.1... | \n", + "[0.3027972281, 0.5215288401, 0.1756739765] | \n", + "[0.41288739440000005, 0.5265461801999999, 0.06... | \n", + "[0.32485893370000013, 0.46514019370000004, 0.2... | \n", + "[0.3685780168000001, 0.5256645678999999, 0.105... | \n", + "Quantum computing is set to revolutionize the ... | \n", + "
1 | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "[0.34135937690000007, 0.5343321562, 0.1243084297] | \n", + "[0.34347015620000004, 0.5304207801999999, 0.12... | \n", + "[0.4346009791000001, 0.5130862594, 0.052312787... | \n", + "[0.3181181848000001, 0.4944583774000001, 0.187... | \n", + "[0.39643365140000003, 0.5143401027, 0.08922628... | \n", + "Investing in index funds is a popular strategy... | \n", + "
2 | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "[0.38975748420000006, 0.48216831680000005, 0.1... | \n", + "[0.33265304570000004, 0.5090963244, 0.1582506448] | \n", + "[0.44722059370000006, 0.4945448935000001, 0.05... | \n", + "[0.3444236219000001, 0.45550799370000006, 0.20... | \n", + "[0.3919632137000001, 0.5084934831, 0.099543325... | \n", + "Recent advancements in gene therapy offer new ... | \n", + "
3 | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "[0.38686266540000014, 0.48784771560000006, 0.1... | \n", + "[0.3482291102, 0.5138959289, 0.13787493110000001] | \n", + "[0.4499093592, 0.49849084020000006, 0.05159985... | \n", + "[0.3489176929000001, 0.45996120570000004, 0.19... | \n", + "[0.38338246940000015, 0.5131927133, 0.10342480... | \n", + "Online learning platforms have transformed the... | \n", + "
4 | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "label_b | \n", + "[0.3207181096000001, 0.5833522080999999, 0.095... | \n", + "[0.3277938664, 0.5600519180000001, 0.112154245... | \n", + "[0.39969193940000003, 0.5546463728000001, 0.04... | \n", + "[0.3249147236000001, 0.5021025537999999, 0.172... | \n", + "[0.35228130220000003, 0.5585800408999999, 0.08... | \n", + "Traveling to Europe during the off-season can ... | \n", + "