Skip to content

Commit

Permalink
Merge branch 'main' into auto_cudagraph
Browse files Browse the repository at this point in the history
  • Loading branch information
pablo-garay authored Sep 3, 2024
2 parents 9de6189 + a1fd899 commit c975fb5
Show file tree
Hide file tree
Showing 53 changed files with 2,934 additions and 753 deletions.
502 changes: 298 additions & 204 deletions .github/workflows/cicd-main.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Dockerfile.ci
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ WORKDIR /workspace
# Install NeMo requirements
ARG TE_TAG=7d576ed25266a17a7b651f2c12e8498f67e0baea
ARG MODELOPT_VERSION=0.15.0
ARG MCORE_TAG=34e607ef41cf1c0ed481a678df9c76952d0ec00c
ARG MCORE_TAG=9ab31cbd6265f83640008801e1c3efbf80892cea
ARG APEX_TAG=810ffae374a2b9cb4b5c5e28eaeca7d7998fca0c
RUN \
--mount=type=bind,source=requirements,target=requirements \
Expand Down
1 change: 1 addition & 0 deletions docs/source/nlp/nemo_megatron/intro.rst
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ To learn more about using NeMo to train Large Language Models at scale, please r
peft/landing_page
positional_embeddings
mcore_customization
reset_learning_rate
rampup_batch_size


Expand Down
30 changes: 30 additions & 0 deletions docs/source/nlp/nemo_megatron/reset_learning_rate.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
.. _reset_learning_rate:

Reset Learning Rate
-------------------

The reset learning rate feature provides the ability to reset the learning rate for an existing checkpoint to its initial value (either 0 or ``optim.min_lr`` depending on the warmup steps) when performing continual pretraining.

Parameters
----------

* ``reset_lr`` (boolean): Enables resetting the learning rate to the initial value. This feature is only supported with the distributed optimizer and megatron_amp_O2.
* ``reset_lr_steps`` (boolean): Enables adjusting the learning rate's max_steps and decay_steps by subtracting the number of steps already completed at the checkpoint.

Use Cases
---------

1. ``reset_lr=True, reset_lr_steps=False``
When pretraining an existing checkpoint "from scratch" on a different dataset. The learning rate will be reset to its initial value. This allows the model to start training on a new dataset with the same learning rate dynamics as if it were starting from scratch.

2. ``reset_lr=True, reset_lr_steps=True``
When continuing training from an existing checkpoint with the same configuration. The learning rate will be reset to its initial value, and the ``max_steps`` and ``decay_steps`` for learning rate schedule will be recalculated by subtracting the number of steps already completed at the checkpoint. Specifically:
* ``max_steps`` will be recalculated as ``max_steps -= completed_steps``.
* ``decay_steps`` will be recalculated as ``decay_steps -= completed_steps``.
This ensures that the learning rate reaches the ``min_lr`` value by the end of training without changing the ``trainer.max_steps``:

.. image:: https://github.com/NVIDIA/NeMo/releases/download/v2.0.0rc0/asset-post-reset-learning-rate-example.png
:alt:
:width: 1080px


Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,17 @@ model:
name: ${model.data_name}
split: 'train.clean.360'
streaming: ${model.streaming}
trust_remote_code: true
- path: ${model.data_path}
name: ${model.data_name}
split: 'train.clean.100'
streaming: ${model.streaming}
trust_remote_code: true
- path: ${model.data_path}
name: ${model.data_name}
split: 'train.other.500'
streaming: ${model.streaming}
trust_remote_code: true

sample_rate: ${model.sample_rate}
batch_size: 16 # you may increase batch_size if your memory allows
Expand All @@ -65,6 +68,7 @@ model:
name: ${model.data_name}
split: 'validation.other'
streaming: ${model.streaming}
trust_remote_code: true

sample_rate: ${model.sample_rate}
batch_size: 8
Expand All @@ -87,10 +91,12 @@ model:
name: ${model.data_name}
split: 'test.other'
streaming: ${model.streaming}
trust_remote_code: true
- path: ${model.data_path}
name: ${model.data_name}
split: 'test.clean'
streaming: ${model.streaming}
trust_remote_code: true

sample_rate: ${model.sample_rate}
batch_size: 8
Expand Down
4 changes: 1 addition & 3 deletions examples/llm/run/llama3_pretraining.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def main():
# Uses configs from NeMo directly
pretrain = MODEL_SIZE_MAPPING[args.size]["nemo"]["pretrain"](
name=exp_name,
ckpt_dir=f"/{exp_name}/checkpoints",
ckpt_dir="/nemo_run/checkpoints",
)

# Overwrite the dataloader in the recipe to use your custom dataloader.
Expand Down Expand Up @@ -170,8 +170,6 @@ def main():
executor = local_executor_torchrun(nodes=pretrain.trainer.num_nodes, devices=pretrain.trainer.devices)

with run.Experiment(f"{exp_name}{args.tag}") as exp:
pretrain.log.dir = f"/{exp_name}/checkpoints"

for i in range(1):
exp.add(
pretrain,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -175,7 +175,7 @@ model:
fsdp_sharded_checkpoint: False # Store and load FSDP shared checkpoint.

# Distributed checkpoint setup
dist_ckpt_format: 'zarr' # Set to 'torch_dist' to use PyTorch distributed checkpoint format.
dist_ckpt_format: 'torch_dist' # Set to 'torch_dist' to use PyTorch distributed checkpoint format.
dist_ckpt_load_on_device: True # whether to load checkpoint weights directly on GPU or to CPU
dist_ckpt_parallel_save: True # if true, each worker will write its own part of the dist checkpoint
dist_ckpt_parallel_save_within_dp: False # if true, save will be parallelized only within a DP group (whole world otherwise), which might slightly reduce the save overhead
Expand Down
3 changes: 1 addition & 2 deletions nemo/collections/asr/parts/utils/transcribe_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -535,9 +535,8 @@ def transcribe_partial_audio(
if isinstance(asr_model, EncDecHybridRNNTCTCModel) and decoder_type == "ctc":
logits = asr_model.ctc_decoder(encoder_output=logits)

logits = logits.cpu()

if logprobs:
logits = logits.cpu()
logits = logits.numpy()
# dump log probs per file
for idx in range(logits.shape[0]):
Expand Down
36 changes: 29 additions & 7 deletions nemo/collections/llm/gpt/data/mock.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,12 @@

from nemo.lightning.pytorch.plugins import MegatronDataSampler

HAVE_TE = True
try:
import transformer_engine
except (ImportError, ModuleNotFoundError):
HAVE_TE = False

if TYPE_CHECKING:
from nemo.collections.common.tokenizers.tokenizer_spec import TokenizerSpec

Expand All @@ -27,6 +33,7 @@ def __init__(
num_workers: int = 8,
pin_memory: bool = True,
persistent_workers: bool = False,
create_attention_mask: bool = False,
):
super().__init__()
self.seq_length = seq_length
Expand All @@ -36,6 +43,7 @@ def __init__(
self.num_workers = num_workers
self.pin_memory = pin_memory
self.persistent_workers = persistent_workers
self.create_attention_mask = create_attention_mask or not HAVE_TE

from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer

Expand All @@ -48,9 +56,15 @@ def __init__(
)

def setup(self, stage: str = "") -> None:
self._train_ds = _MockGPTDataset(self.tokenizer, "train", self.num_train_samples, self.seq_length)
self._validation_ds = _MockGPTDataset(self.tokenizer, "valid", self.num_val_samples, self.seq_length)
self._test_ds = _MockGPTDataset(self.tokenizer, "test", self.num_test_samples, self.seq_length)
self._train_ds = _MockGPTDataset(
self.tokenizer, "train", self.num_train_samples, self.seq_length, self.create_attention_mask
)
self._validation_ds = _MockGPTDataset(
self.tokenizer, "valid", self.num_val_samples, self.seq_length, self.create_attention_mask
)
self._test_ds = _MockGPTDataset(
self.tokenizer, "test", self.num_test_samples, self.seq_length, self.create_attention_mask
)

def train_dataloader(self) -> TRAIN_DATALOADERS:
if not hasattr(self, "_train_ds"):
Expand Down Expand Up @@ -86,16 +100,20 @@ def __init__(
num_samples: int,
seq_length: int,
seed: int = 42,
create_attention_mask: bool = False,
) -> None:
super().__init__()
self.name = name
self.seq_length = seq_length
self.vocab_size = tokenizer.vocab_size
self.length = num_samples
self.seed = seed
self.create_attention_mask = create_attention_mask

if create_attention_mask:
self.attention_mask = torch.tril(torch.ones((self.seq_length, self.seq_length), device='cpu')).unsqueeze(0)
self.attention_mask = self.attention_mask < 0.5

self.attention_mask = torch.tril(torch.ones((self.seq_length, self.seq_length))).unsqueeze(0)
self.attention_mask = self.attention_mask < 0.5
self.loss_mask = torch.ones(self.seq_length, dtype=torch.float)
self.position_ids = torch.arange(self.seq_length, dtype=torch.int64)

Expand All @@ -112,14 +130,18 @@ def __getitem__(self, idx) -> Dict[str, torch.Tensor]:
tokens = torch.from_numpy(np_gen.integers(self.vocab_size, size=[self.seq_length], dtype=np.int64))
labels = torch.from_numpy(np_gen.integers(self.vocab_size, size=[self.seq_length], dtype=np.int64))

return {
batch = {
"tokens": tokens,
"labels": labels,
"attention_mask": self.attention_mask,
"loss_mask": self.loss_mask,
"position_ids": self.position_ids,
}

if self.create_attention_mask:
batch["attention_mask"] = self.attention_mask

return batch

def _collate_fn(self, batch):
"""
A default implementation of a collation function.
Expand Down
19 changes: 18 additions & 1 deletion nemo/collections/llm/gpt/data/pre_training.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from nemo.lightning.io.mixin import IOMixin
from nemo.lightning.pytorch.plugins import MegatronDataSampler

HAVE_TE = True
try:
import transformer_engine
except (ImportError, ModuleNotFoundError):
HAVE_TE = False

if TYPE_CHECKING:
from megatron.core.datasets.gpt_dataset import GPTDatasetConfig

Expand Down Expand Up @@ -54,6 +60,7 @@ class PreTrainingDataModule(pl.LightningDataModule, IOMixin):
split (str): A string of 3 comma-separated integers denoting how much of the distribution
to allocate to train, validation, and test sets, respectively. Unused if ``paths`` is a dict.
index_mapping_dir (Optional[str]): Path to a directory to write index mapping files.
num_dataset_builder_threads (int): The number of threads to use for dataset building.
"""

def __init__(
Expand All @@ -68,11 +75,13 @@ def __init__(
pin_memory: bool = True,
persistent_workers: bool = False,
reset_position_ids: bool = False,
create_attention_mask: bool = False,
reset_attention_mask: bool = False,
eod_mask_loss: bool = False,
seed: int = 1234,
split: str = "900,50,50",
index_mapping_dir: Optional[str] = None,
num_dataset_builder_threads: int = 1,
) -> None:
super().__init__()
if not isinstance(paths, (list, tuple, dict)):
Expand Down Expand Up @@ -105,11 +114,13 @@ def __init__(
self.pin_memory = pin_memory
self.persistent_workers = persistent_workers
self.reset_position_ids = reset_position_ids
self.create_attention_mask = create_attention_mask or not HAVE_TE
self.reset_attention_mask = reset_attention_mask
self.eod_mask_loss = eod_mask_loss
self.seed = seed
self.split = split
self.index_mapping_dir = index_mapping_dir
self.num_dataset_builder_threads = num_dataset_builder_threads
self.init_global_step = 0

from nemo.collections.nlp.modules.common.tokenizer_utils import get_nmt_tokenizer
Expand Down Expand Up @@ -139,7 +150,11 @@ def setup(self, stage: str = "") -> None:
num_val_samples = int(eval_iters * self.data_sampler.global_batch_size)
num_test_samples = int(test_iters * self.data_sampler.global_batch_size)

if self.trainer.limit_val_batches <= 1.0 and isinstance(self.trainer.limit_val_batches, float):
if (
self.trainer.limit_val_batches > 0.0
and self.trainer.limit_val_batches <= 1.0
and isinstance(self.trainer.limit_val_batches, float)
):
assert "blend" not in self.build_kwargs, (
"When using a single data distribution, limit_val_batches <= 1.0 is not supported. If you'd "
"like to run with a fractional value of limit_val_batches, please pass in separate datasets for "
Expand Down Expand Up @@ -212,8 +227,10 @@ def gpt_dataset_config(self) -> "GPTDatasetConfig":
tokenizer=self.tokenizer,
path_to_cache=self.index_mapping_dir,
reset_position_ids=self.reset_position_ids,
create_attention_mask=self.create_attention_mask,
reset_attention_mask=self.reset_attention_mask,
eod_mask_loss=self.eod_mask_loss,
num_dataset_builder_threads=self.num_dataset_builder_threads,
**self.build_kwargs,
)

Expand Down
16 changes: 9 additions & 7 deletions nemo/collections/llm/gpt/model/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ def gpt_data_step(dataloader_iter) -> Dict[str, torch.Tensor]:
required_keys.update(("tokens", "position_ids"))
if parallel_state.is_pipeline_last_stage():
required_keys.update(("labels", "loss_mask"))
# if self.get_attention_mask_from_fusion:
# required_keys.remove('attention_mask')

_batch = {key: val.cuda(non_blocking=True) if key in required_keys else None for key, val in _batch.items()}
# slice batch along sequence dimension for context parallelism
Expand All @@ -61,10 +59,17 @@ def gpt_forward_step(model, batch) -> torch.Tensor:
forward_args = {
"input_ids": batch["tokens"],
"position_ids": batch["position_ids"],
"attention_mask": batch["attention_mask"],
"labels": batch["labels"],
}

if 'attention_mask' not in batch:
assert (
HAVE_TE
), "The dataloader did not provide an attention mask, however Transformer Engine was not detected. \
This requires Transformer Engine's implementation of fused or flash attention."
else:
forward_args["attention_mask"] = batch['attention_mask']

if 'cu_seqlens' in batch:
forward_args['packed_seq_params'] = get_packed_seq_params(batch)

Expand Down Expand Up @@ -110,9 +115,6 @@ class GPTConfig(TransformerConfig, io.IOMixin):
masked_softmax_fusion: bool = True
deallocate_pipeline_outputs = True

# TODO: Move this to better places?
get_attention_mask_from_fusion: bool = False

transformer_layer_spec: Union[ModuleSpec, Callable[["GPTConfig"], ModuleSpec]] = default_layer_spec
forward_step_fn: Callable = gpt_forward_step
data_step_fn: Callable = gpt_data_step
Expand Down Expand Up @@ -184,7 +186,7 @@ def forward(
self,
input_ids: torch.Tensor,
position_ids: torch.Tensor,
attention_mask: torch.Tensor,
attention_mask: Optional[torch.Tensor] = None,
labels: Optional[torch.Tensor] = None,
decoder_input: Optional[torch.Tensor] = None,
inference_params=None,
Expand Down
3 changes: 3 additions & 0 deletions nemo/collections/llm/recipes/mixtral_8x22b.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ def trainer(
virtual_pipeline_parallelism: Optional[int],
context_parallelism: int,
sequence_parallelism: bool,
expert_parallelism: int,
num_nodes: int = 1,
num_gpus_per_node: int = 8,
max_steps: int = 1168251,
Expand All @@ -43,6 +44,7 @@ def trainer(
virtual_pipeline_model_parallel_size=virtual_pipeline_parallelism,
context_parallel_size=context_parallelism,
sequence_parallel=sequence_parallelism,
expert_model_parallel_size=expert_parallelism,
gradient_as_bucket_view=True,
ckpt_include_optimizer=True,
ckpt_async_save=True,
Expand Down Expand Up @@ -88,6 +90,7 @@ def pretrain_recipe(
virtual_pipeline_parallelism=None,
context_parallelism=1,
sequence_parallelism=True,
expert_parallelism=1,
num_nodes=num_nodes,
num_gpus_per_node=num_gpus_per_node,
callbacks=[Config(TimingCallback)],
Expand Down
Loading

0 comments on commit c975fb5

Please sign in to comment.