Skip to content

Commit

Permalink
minor updates for distributed tokenization (intel#64)
Browse files Browse the repository at this point in the history
* slim dockerfile

* remove credentials

* rename

* add postfix

* update

* push 1 version of dp

* add new code

* remove the old code

* revert

* remove unused libs

* add new package

* add parquet support

* change name

* use output_dir instead of output_prefix

* merge

* remove unused file

* fix typo

* add more automation

* add dp config

* add saving csv

* add dp config yaml

* add stop containers

* add stop containers

* remove dp config

* tokenier as input

* add a file to count row numbers

* change dockerfile name

* some refinement

* add file numbers

* add real script

* add mulit-processing code

* add file name

* add pyrecdp

* refine

* remove

* remove developer name

* remove pyrecdp

* change name oder

* remove files

* add use-slow flag

---------

Co-authored-by: N <matrix.yao@intel.com>
  • Loading branch information
faaany and yao-matrix authored Sep 21, 2023
1 parent f7a3126 commit 71cc3ce
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 87 deletions.
22 changes: 0 additions & 22 deletions tools/tokenize_and_save/count_parquet_row_num.py

This file was deleted.

31 changes: 19 additions & 12 deletions tools/tokenize_and_save/run-dp.sh
Original file line number Diff line number Diff line change
@@ -1,23 +1,30 @@
echo -e "\n distributed tokenization with ray"

start=`date +%s`
echo -e "\n distributed tokenization with ray for Book"
python tokenize_and_save.py \
--input-dir /home/user/shared/PILE_dedup/EuroParl \
--input-dir /home/user/shared/user/Book \
--file-type parquet \
--output-dir /home/user/shared/user/tokenized_Book \
--data-field text \
--tokenizer togethercomputer/LLaMA-2-7B-32K \
--output-dir /home/user/shared/EuroParl_tokenized \
--load-batch-size 1000 \
--cpu-per-node 90
--load-batch-size 10000 \
--cpu-per-node 220 \
--use-slow
end=`date +%s`
echo "Execution Time is: $(($end-$start)) seconds" | tee tokenized_Book.log

sleep 30
sleep 10
echo -e "\n merging multiple megatron data files.."
python merge_datasets.py --input /home/user/shared/EuroParl_tokenized --output-prefix /home/user/shared/EuroParl_tokenized
python merge_datasets.py --input /home/user/shared/user/tokenized_Book --output-prefix /home/user/shared/user/tokenized_Book >> tokenized_Book.log

sleep 15
sleep 5
echo -e "\n removing multiple megatron files.."
rm -fr /home/user/shared/EuroParl_tokenized
rm -fr /home/user/shared/user/tokenized_Book

sleep 5
echo -e "\n counting token numbers.."
python count_tokens.py /home/user/shared/EuroParl_tokenized /home/user/shared/EuroParl_tokenized.stat


python count_tokens.py /home/user/shared/user/tokenized_Book /home/user/shared/user/tokenized_Book.stat >> tokenized_Book.log

sleep 5
mkdir /home/user/shared/user/tokenized_Book
mv /home/user/shared/user/tokenized_Book.* /home/user/shared/user/tokenized_Book
117 changes: 66 additions & 51 deletions tools/tokenize_and_save/tokenize_and_save.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,13 @@ def get_args():
"--file-type",
type=str,
default="json",
help="the file type of the input data, only json, jsonl and parquet are supported, default type is json."
help="the file type of the input data, only json, jsonl and parquet are supported, default type is json"
)
group.add_argument(
"--ray-load",
action='store_true',
default=False,
help="whether or not to use native ray data API to load data; if set true, you need to ensure that the data is accessible by each worker, e.g. it NFS"
)
group.add_argument(
"--tokenizer",
Expand All @@ -41,7 +47,13 @@ def get_args():
help="tokenizer name"
)
group.add_argument(
"--model-max-length", type=int, default=100000000, help="batch size"
"--use-slow",
action='store_true',
default=False,
help="whether or not to use slow tokenizer"
)
group.add_argument(
"--model-max-length", type=int, default=100000000000, help="batch size"
)
group.add_argument(
"--data-field",
Expand Down Expand Up @@ -69,30 +81,28 @@ def get_args():
return args


def tokenize_batch(tokenizer_name, batch, data_field, model_max_length):
def tokenize_batch(tokenizer_name, batch, data_field, model_max_length, use_fast):

tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True)
tokenizer.model_max_length = model_max_length
tokenizer = AutoTokenizer.from_pretrained(tokenizer_name, use_fast= use_fast)
tokenizer.model_max_length = model_max_length

eos_token = tokenizer.eos_token
eos_id = tokenizer(eos_token)['input_ids']
eos_token = tokenizer.eos_token
eos_id = tokenizer(eos_token)['input_ids']

samples = batch[data_field].tolist()

ids = []
lens = []

for sample in samples:
encoded = tokenizer(sample,
truncation=False,
padding=False)
sample_id = encoded['input_ids'] + eos_id
ids.append(sample_id)
lens.append(len(sample))
samples = batch[data_field].tolist()

ids = []

for sample in samples:
encoded = tokenizer(sample,
truncation=False,
padding=False)
sample_id = encoded['input_ids'] + eos_id
ids.append(sample_id)

tokenized_batch = pd.DataFrame({"tokens": ids, "length": lens})
tokenized_batch = pd.DataFrame({"tokens": ids})

return tokenized_batch
return tokenized_batch


def __best_fitting_dtype(vocab_size=None):
Expand All @@ -103,12 +113,11 @@ def __best_fitting_dtype(vocab_size=None):


def save_megatron(output_dir, task_id, tokenized_batch, vocab_size):
save_dir = os.path.join(output_dir)
if not os.path.exists(save_dir):
os.makedirs(save_dir)
if not os.path.exists(output_dir):
os.makedirs(output_dir)

out_file = os.path.join(save_dir, f"{task_id}.bin")
idx_file = os.path.join(save_dir, f"{task_id}.idx")
out_file = os.path.join(output_dir, f"{task_id}.bin")
idx_file = os.path.join(output_dir, f"{task_id}.idx")

data_builder = MMapIndexedDatasetBuilder(out_file, dtype=__best_fitting_dtype(vocab_size))

Expand All @@ -128,9 +137,11 @@ def main():
data_field = args.data_field
input_dir = args.input_dir
file_type = args.file_type
use_fast = not args.use_slow
ray_load = args.ray_load

# ctx = ray.data.DataContext.get_current()
# ctx.execution_options.verbose_progress = True
ctx = ray.data.DataContext.get_current()
ctx.execution_options.verbose_progress = True

if file_type == 'json':
input_files = sorted(glob.glob(os.path.join(input_dir, "*.json")))
Expand All @@ -140,8 +151,8 @@ def main():
input_files = sorted(glob.glob(os.path.join(input_dir, "*.parquet")))
else:
raise ValueError("Please specify the correct file type. Choose one from json, jsonl and parquet.")
vocab_size = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=True).vocab_size

vocab_size = AutoTokenizer.from_pretrained(tokenizer_name, use_fast=use_fast).vocab_size

# init ray
ray.init(address='auto')
Expand All @@ -151,43 +162,47 @@ def main():

ray_job_id = ray.runtime_context.get_runtime_context().get_job_id()

timestr = time.strftime("%Y%m%d-%H%M%S")
output_parent = os.path.dirname(output_dir)
name_dir = os.path.join(output_parent, f"{ray_job_id}_csv")
name_dir = os.path.join(output_parent, f"{timestr}_{ray_job_id}.csv")
if not os.path.exists(name_dir):
os.makedirs(name_dir)

def preprocess_megatron(batch: Dict[str, np.ndarray]) -> pd.DataFrame:

task_id = ray.get_runtime_context().get_task_id()
tokenized_batch = tokenize_batch(tokenizer_name, batch, data_field, model_max_length)
tokenized_batch = tokenize_batch(tokenizer_name, batch, data_field, model_max_length, use_fast)
save_megatron(output_dir, task_id, tokenized_batch, vocab_size)

return pd.DataFrame({'task_id': [task_id]})

if file_type == 'parquet':
dataset = load_dataset("parquet", data_files=input_files, streaming=True)['train']
else:
dataset = load_dataset("json", data_files=input_files, streaming=True)['train']

idx = 1
for rows in dataset.iter(batch_size=args.load_batch_size):
print("-----------------------------")
df = pd.DataFrame(rows)
ray_dataset = ray.data.from_pandas(df)
ray_dataset = ray_dataset.repartition(parallelism)

if ray_load:
if file_type == 'parquet':
ray_dataset = ray.data.read_parquet(input_files, columns=[args.data_field])
else:
ray_dataset = ray.data.read_json(input_files)
tokenized_data = ray_dataset.map_batches(preprocess_megatron, batch_format="numpy", batch_size=None)
tokenized_data = tokenized_data.repartition(1)
tokenized_data.write_csv(name_dir)

print(f"{idx} * {args.load_batch_size} samples were written to disk.")
idx += 1
print("============================")
else:
if file_type == 'parquet':
dataset = load_dataset("parquet", data_files=input_files, streaming=True)['train']
else:
dataset = load_dataset("json", data_files=input_files, streaming=True)['train']
dataset = dataset.select_columns(args.data_field)

idx = 1
for rows in dataset.iter(batch_size=args.load_batch_size):
df = pd.DataFrame(rows)
ray_dataset = ray.data.from_pandas(df)
ray_dataset = ray_dataset.repartition(parallelism)

tokenized_data = ray_dataset.map_batches(preprocess_megatron, batch_format="numpy", batch_size=None)
tokenized_data = tokenized_data.repartition(1)
tokenized_data.write_csv(name_dir)
idx += 1

if __name__ == "__main__":
start = time.time()
main()
end = time.time()
print(f"\nthis script took {end-start}s.")

print(f"\nthis script took {end-start}s.")
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ RUN pip install -U ray[default,data]

RUN pip install astunparse nltk gymnasium pyyaml datasets presidio_analyzer presidio_anonymizer sentencepiece transformers
RUN pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cpu
RUN python -m spacy download en_core_web_lg
RUN python -m spacy download en_core_web_lg parquet-tools

#install PII detection/redaction related libs for code
RUN pip install gibberish-detector
Expand Down
2 changes: 1 addition & 1 deletion tools/workload_in_containers/build-image.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ dockerfile=Dockerfile
if [[ $1 = "megatron-gpu" ]]; then
dockerfile=Dockerfile.megatron.gpu
elif [[ $1 = "dp" ]]; then
dockerfile=Dockerfile-dp
dockerfile=Dockerfile.dp
elif [[ $1 = "megatron-habana" ]]; then
dockerfile=Dockerfile.megatron.habana
fi
Expand Down

0 comments on commit 71cc3ce

Please sign in to comment.