Skip to content

Commit

Permalink
failing test for select_columns
Browse files Browse the repository at this point in the history
Signed-off-by: Praateek <praateekm@gmail.com>
  • Loading branch information
praateekmahajan committed Nov 22, 2024
1 parent 3a0f13f commit 9c6428c
Show file tree
Hide file tree
Showing 3 changed files with 47 additions and 15 deletions.
12 changes: 12 additions & 0 deletions docs/user-guide/bestpractices.rst
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,17 @@ Handling GPU Out-of-Memory (OOM) Errors
NeMo Curator is designed to be scalable with large amounts of text data, but OOM errors occur when the available GPU memory is insufficient for a given task.
To help avoid these issues and ensure efficient processing, here are some strategies for managing memory usage and mitigating OOM challenges.

Controlling Partition Sizes
~~~~~~~~~~~~~~~~~~~~~~~~~~~

You should consider using ``files_per_partition`` or ``blocksize`` when reading data. This can help reduce the memory load by processing large datasets in smaller chunks.

#. ``blocksize`` argument is available for ``jsonl`` and ``parquet`` files. But for `parquet` files it's only available when ``add_filename=False``

#. For ``blocksize``, the recommendation is to use 1/32 of the total GPU memory. For example, if you have a GPU with 32GB of memory, you can set the blocksize to ``1gb``.



Utilize RMM Options
~~~~~~~~~~~~~~~~~~~
`RAPIDS Memory Manager (RMM) <https://github.com/rapidsai/rmm>`_ is a package that enables you to allocate device memory in a highly configurable way.
Expand Down Expand Up @@ -59,6 +70,7 @@ Alternatively, you can set these flags while initializing your own Dask client,
client = Client(cluster)
Fuzzy Deduplication Guidelines
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Fuzzy deduplication is one of the most computationally expensive algorithms within the NeMo Curator pipeline.
Expand Down
12 changes: 7 additions & 5 deletions nemo_curator/utils/distributed_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -269,18 +269,20 @@ def _set_torch_to_use_rmm():
torch.cuda.memory.change_current_allocator(rmm_torch_allocator)


def select_and_sort_columns(
def select_columns(
df: Union[dd.DataFrame, dask_cudf.DataFrame],
columns: List[str],
filetype: Literal["jsonl", "json", "parquet", "pickle"],
filetype: Literal["jsonl", "json", "parquet"],
add_filename: bool,
) -> Union[dd.DataFrame, dask_cudf.DataFrame]:
# We exclude parquet because the parquet readers already support column selection
if columns is not None and filetype != "parquet":
if filetype in ["jsonl", "json"] and columns is not None:
if add_filename and "filename" not in columns:
columns.append("filename")
df = df[columns]

df = df[sorted(df.columns)]

return df


Expand Down Expand Up @@ -366,7 +368,7 @@ def read_single_partition(
else:
df = read_f(files, **read_kwargs, **kwargs)

return select_and_sort_columns(df, columns, filetype, add_filename)
return select_columns(df, columns, filetype, add_filename)


def read_data_blocksize(
Expand Down Expand Up @@ -426,7 +428,7 @@ def extract_filename(path: str) -> str:
df = read_func(input_files, blocksize=blocksize, **read_kwargs, **kwargs)
if postprocessing_func is not None:
df = postprocessing_func(df)
return select_and_sort_columns(df, columns, file_type, add_filename)
return select_columns(df, columns, file_type, add_filename)


def read_data_fpp(
Expand Down
38 changes: 28 additions & 10 deletions tests/test_read_data.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import pandas as pd
import pytest
from zict import Func

from nemo_curator.utils.distributed_utils import read_data_blocksize, read_data_fpp

Expand Down Expand Up @@ -285,8 +286,7 @@ def test_read_data_fpp_add_filename(
columns=None,
)

print(f"Column names are {df.columns}")
assert "filename" in df.columns
assert set(df.columns) == {"filename", "id", "text"}
file_names = df["filename"].unique().compute()
if backend == "cudf":
file_names = file_names.to_pandas()
Expand All @@ -304,16 +304,27 @@ def test_read_data_fpp_add_filename(
pytest.param("cudf", marks=pytest.mark.gpu),
],
)
@pytest.mark.parametrize("file_type", ["jsonl", "parquet"])
@pytest.mark.parametrize("function_name", ["read_data_blocksize", "read_data_fpp"])
@pytest.mark.parametrize(
"cols_to_select", [["id"], ["text"], ["text", "id"], ["id", "text"]]
"file_type,add_filename,function_name",
[
*[("jsonl", True, func) for func in ["read_data_blocksize", "read_data_fpp"]],
*[("jsonl", False, func) for func in ["read_data_blocksize", "read_data_fpp"]],
*[
("parquet", False, func)
for func in ["read_data_blocksize", "read_data_fpp"]
],
*[("parquet", True, "read_data_fpp")],
],
)
@pytest.mark.parametrize(
"cols_to_select", [None, ["id"], ["text", "id"], ["id", "text"]]
)
def test_read_data_select_columns(
mock_multiple_jsonl_files,
mock_multiple_parquet_files,
backend,
file_type,
add_filename,
function_name,
cols_to_select,
):
Expand All @@ -333,13 +344,18 @@ def test_read_data_select_columns(
input_files=input_files,
backend=backend,
file_type=file_type,
add_filename=False,
add_filename=add_filename,
input_meta=None,
columns=cols_to_select,
columns=list(cols_to_select) if cols_to_select else None,
**read_kwargs,
)
if not cols_to_select:
cols_to_select = ["id", "text"]

assert list(df.columns) == cols_to_select
if not add_filename:
assert list(df.columns) == sorted(cols_to_select)
else:
assert list(df.columns) == sorted(cols_to_select + ["filename"])


@pytest.mark.parametrize(
Expand Down Expand Up @@ -373,7 +389,9 @@ def test_read_data_input_data(
**read_kwargs,
)

if function_name == "read_data_fpp":
if function_name == "read_data_fpp" and backend == "cudf":
assert list(df.columns) == list(input_meta.keys())
elif function_name == "read_data_blocksize":
else:
# In the read_data_fpp case, because pandas doesn't support `prune_columns`, it'll always return all columns even if input_meta is specified
# In the `read_data_blocksize` case, `dask.read_json` also doesn't `prune_columns` so it'll always return all columns
assert list(df.columns) == ["id", "text"]

0 comments on commit 9c6428c

Please sign in to comment.