diff --git a/nemo_curator/datasets/doc_dataset.py b/nemo_curator/datasets/doc_dataset.py index 482456a43..981762765 100644 --- a/nemo_curator/datasets/doc_dataset.py +++ b/nemo_curator/datasets/doc_dataset.py @@ -37,6 +37,9 @@ def __len__(self) -> int: def persist(self) -> "DocumentDataset": return DocumentDataset(self.df.persist()) + def to_backend(self, backend: Optional[str] = None) -> "DocumentDataset": + return DocumentDataset(self.df.to_backend(backend)) + def head(self, n: int = 5) -> Any: return self.df.head(n) diff --git a/nemo_curator/modules/fuzzy_dedup.py b/nemo_curator/modules/fuzzy_dedup.py index ec642322f..f6731a1c0 100644 --- a/nemo_curator/modules/fuzzy_dedup.py +++ b/nemo_curator/modules/fuzzy_dedup.py @@ -195,6 +195,7 @@ def minhash64( """ if not isinstance(ser, cudf.Series): raise TypeError("Expected data of type cudf.Series") + if not MINHASH_PERMUTED_AVAILABLE: warnings.warn( "Using an outdated minhash implementation, please update to cuDF version 24.12 " @@ -223,6 +224,12 @@ def __call__(self, dataset: DocumentDataset) -> Union[str, DocumentDataset]: ------- DocumentDataset containing IDs of all documents and the corresponding MinHash Signature """ + if "cudf" not in str(type(dataset.df)): + raise TypeError( + "Dask-cuDF DataFrame is required to run minhashes. " + 'Please convert your DocumentDataset by using .to_backend("gpu").' + ) + result = dataset.df[[self.id_field]] result["_minhash_signature"] = dataset.df[self.text_field].map_partitions( self.minhash_method, @@ -430,6 +437,12 @@ def lsh( self._logger.info(f"Wrote data for buckets: {value_vars}") def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + if "cudf" not in str(type(dataset.df)): + raise TypeError( + "Dask-cuDF DataFrame is required to run locality-sensitive hashing. " + 'Please convert your DocumentDataset by using .to_backend("gpu").' + ) + df = dataset.df write_path = os.path.join(self.cache_dir, "_buckets.parquet") @@ -549,18 +562,23 @@ def __call__(self, dataset: DocumentDataset): DocumentDataset containing IDs of all documents and the corresponding duplicate group they belong to. Documents in the same group are near duplicates. """ + if "cudf" not in str(type(dataset.df)): + raise TypeError( + "Dask-cuDF DataFrame is required to run fuzzy deduplication. " + 'Please convert your DocumentDataset by using .to_backend("gpu").' + ) # Minhash + LSH stage_num = 1 - print(f"Stage{stage_num}: Starting Minhash + LSH computation") + print(f"Stage {stage_num}: Starting Minhash + LSH computation") minhashLSH = Sequential([self.minhash, self.lsh]) buckets_df = minhashLSH(dataset) - print(f"Stage{stage_num}: Minhash + LSH complete!") + print(f"Stage {stage_num}: Minhash + LSH complete!") stage_num += 1 if self.config.false_positive_check: # Map buckets to lower cardinality distribution - print(f"Stage{stage_num} (False Positive Check): Starting Map_Buckets") + print(f"Stage {stage_num} (False Positive Check): Starting Map_Buckets") t0 = time.time() mapped_buckets_w_anchors_path = os.path.join( self.config.cache_dir, "anchor_docs_with_bk.parquet" @@ -578,14 +596,14 @@ def __call__(self, dataset: DocumentDataset): mapped_buckets_w_anchors_path, write_index=False, overwrite=True ) self._logger.info( - f"Time taken for Map_buckets : {time.time() - t0}s and output written at {mapped_buckets_w_anchors_path}" + f"Time taken for Map_Buckets: {time.time() - t0}s and output written at {mapped_buckets_w_anchors_path}" ) - print(f"Stage{stage_num} (False Postive Check): Map_Buckets Complete!") + print(f"Stage {stage_num} (False Positive Check): Map_Buckets complete!") stage_num += 1 # Shuffle documents based on mapped buckets - print(f"Stage{stage_num} (False Postive Check): Shuffle docs") + print(f"Stage {stage_num} (False Positive Check): Shuffle Documents") shuffled_docs_path = os.path.join( self.config.cache_dir, "shuffled_docs.parquet" ) @@ -597,12 +615,14 @@ def __call__(self, dataset: DocumentDataset): parts_per_worker=self.config.parts_per_worker, bucket_parts_per_worker=self.config.bucket_parts_per_worker, ) - print(f"Stage{stage_num} (False Postive Check): Shuffle docs complete!") + print( + f"Stage {stage_num} (False Positive Check): Shuffling Documents complete!" + ) stage_num += 1 # jaccard comparision within buckets print( - f"Stage{stage_num} (False Postive Check): Jaccard Similarity in Buckets" + f"Stage {stage_num} (False Positive Check): Jaccard Similarity in Buckets" ) jaccard_pairs_path = os.path.join( self.config.cache_dir, "jaccard_similarity_results.parquet" @@ -622,26 +642,28 @@ def __call__(self, dataset: DocumentDataset): overwrite=True, ) self._logger.info( - f"Time taken for Jaccard Similarity = {time.time()-t0}s and output written at {jaccard_pairs_path}" + f"Time taken for Jaccard Similarity: {time.time()-t0}s and output written at {jaccard_pairs_path}" ) print( - f"Stage{stage_num} (False Postive Check): Jaccard Similarity in Buckets Complete!" + f"Stage {stage_num} (False Positive Check): Jaccard Similarity in Buckets complete!" ) stage_num += 1 else: # Map buckets to lower cardinality distribution - print(f"Stage{stage_num}: Starting LSH Buckets to Graph edgelist") + print(f"Stage {stage_num}: Starting LSH Buckets to Graph Edgelist") self.buckets_to_edges(buckets_df) - print(f"Stage{stage_num}: Starting LSH Buckets to Graph edgelist Complete!") + print( + f"Stage {stage_num}: Starting LSH Buckets to Graph Edgelist complete!" + ) stage_num += 1 # Connected components across buckets - print(f"Stage{stage_num}: Connected Components across buckets") + print(f"Stage {stage_num}: Connected Components Across Buckets") cc_path = os.path.join(self.config.cache_dir, "connected_components.parquet") - self.connected_components.cc_workflow(cc_path) - print(f"Stage{stage_num}: Connected Components across buckets complete!") + self.connected_components(cc_path) + print(f"Stage{stage_num}: Connected Components Across Buckets complete!") stage_num += 1 return DocumentDataset(dask_cudf.read_parquet(cc_path, split_row_groups=False)) @@ -739,6 +761,12 @@ def buckets_to_edges( return result_df def __call__(self, dataset: DocumentDataset) -> DocumentDataset: + if "cudf" not in str(type(dataset.df)): + raise TypeError( + "Dask-cuDF DataFrame is required to run buckets to edges. " + 'Please convert your DocumentDataset by using .to_backend("gpu").' + ) + buckets_df = dataset.df if len(self.id_fields) > 1: buckets_df = buckets_df.map_partitions( @@ -1281,9 +1309,6 @@ def __init__( self.right_id = f"{self.id_field}_y" self.ngram_width = ngram_width - def __call__(DocumentDataset): - raise NotImplementedError - def jaccard_compute(self, shuffled_docs_path): paths = [ entry.path @@ -1469,7 +1494,7 @@ def __init__( else: self._logger = logger - def cc_workflow(self, output_path): + def __call__(self, output_path): deduped_parsed_id_path = self._write_dedup_parsed_id() encoded_jaccard_pair_path = self._write_encoded_jaccard_pair( deduped_parsed_id_path diff --git a/nemo_curator/scripts/fuzzy_deduplication/connected_components.py b/nemo_curator/scripts/fuzzy_deduplication/connected_components.py index 725a34eac..585a5b85b 100644 --- a/nemo_curator/scripts/fuzzy_deduplication/connected_components.py +++ b/nemo_curator/scripts/fuzzy_deduplication/connected_components.py @@ -43,7 +43,7 @@ def main(args): logger=args.log_dir, profile_dir=args.profile_path, ) - components_stage.cc_workflow(output_path=output_path) + components_stage(output_path=output_path) print(f"All done in {time.time()-st:.1f} seconds") print(f"Results written to {output_path}") diff --git a/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb b/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb index 0d1e23a85..ffffcd375 100644 --- a/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb +++ b/tutorials/pretraining-data-curation/red-pajama-v2-curation-tutorial.ipynb @@ -2751,7 +2751,7 @@ " id_column=id_field,\n", " jaccard_threshold=jaccard_threshold,\n", ")\n", - "components_stage.cc_workflow(output_path=output_path)\n", + "components_stage(output_path=output_path)\n", "print(f\"Connected Component took {time.time()-t0} seconds\")" ] }, @@ -4455,7 +4455,7 @@ " id_column=id_field,\n", " jaccard_threshold=jaccard_threshold,\n", ")\n", - "components_stage.cc_workflow(output_path=output_path)\n", + "components_stage(output_path=output_path)\n", "print(f\"Connected Component took {time.time()-t0} seconds\")" ] }, diff --git a/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb b/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb index 3170b3502..aa379d075 100644 --- a/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb +++ b/tutorials/single_node_tutorial/single_gpu_tutorial.ipynb @@ -1753,7 +1753,7 @@ ")\n", "\n", "#Load and run connected component\n", - "components_stage.cc_workflow(output_path=connected_component_output_path)\n", + "components_stage(output_path=connected_component_output_path)\n", "print(f\"Time taken for Connected Component: {time.time()-t0} s\")" ] }, diff --git a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py index db76ce5b3..8b212ad9a 100644 --- a/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py +++ b/tutorials/zyda2-tutorial/1_fuzzy_dedup/3_connected_components.py @@ -41,5 +41,5 @@ ) # Load and run connected components - components_stage.cc_workflow(output_path=connected_component_output_path) + components_stage(output_path=connected_component_output_path) logging.info(f"Time taken for Connected Components: {time.time() - t0:.2f} s")