diff --git a/tools/pii_removal_for_contact_mp/README.md b/tools/pii_removal_for_contact_mp/README.md
new file mode 100644
index 000000000..1b8d8fd95
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/README.md
@@ -0,0 +1,27 @@
+# PII removal for contact info
+
+## Intro
+
+PII removal for contact info is to replace personal information such as email, phone number to a random-non-sense string to protect personal infomation
+This script is using multi processing method to speed up PPI removal
+
+## Expected input and Output
+
+Input format: a folder of *parquet, 'text' will required in parquet column names.
+
+Out format: a folder of *parquet, 'text' will be processed and personal info will be replaced.
+
+## How to RUN
+```
+conda create --name pyrecdp
+conda activate pyrecdp
+pip install pyrecdp --pre
+pip install presidio_analyzer
+python -m spacy download en_core_web_lg
+python pii_redaction.py -d ../falcon-refinedweb -o ../falcon-refinedweb-pii_removal -mp 224
+```
+
+## NOTICE
+
+We are running at file-wised parallism, usually a 300MB file took around 15-20min to complete, so you will see slow progress in progress bar.
+One thing to identify the activity of the process may be using 'top' to check of there are multiple activitily running python processes.
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/README.md b/tools/pii_removal_for_contact_mp/pii_detection_redaction/README.md
new file mode 100644
index 000000000..dae0a7658
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/README.md
@@ -0,0 +1,34 @@
+# How to run PII-for-text pipeline
+
+## Overview
+The pipeline detects 5 different types of PIIs: 'PHONE_NUMBER, 'IP_ADDRESS', 'EMAIL', 'USER', 'KEY'. The detection is based on regular expressions using open-source packages including presidio and bigscience-pii. The detection precision/recall has been tuned for web scrapes based datasets, for example, Falcon-RefinedWeb, SlimPajama-StackExchange, PILE-Hackernews. But the detection precion/recall is not good for code data like Github.
+
+Two redaction methods have been implemented:
+1. Replacement with random values
+2. Replacement with tags such as [PHONE_NUMBER], [EMAIL], etc.
+Currently, the option 1) is used.
+
+
+## How to run
+### Step 1: Set up Env
+Please follow [this guide](../workload_in_containers/README.md) on how to set-up the container environment of this workload. When the containers are running, you can enter the container on head node using following command:
+```bash
+docker exec -it ray-leader bash
+```
+
+### Step 2: Run PII removal
+Once you are inside the ray-leader container, go to the scripts folder. You can change the `BATCH_SIZE` and `CPUCORES` depending on the memory and number of cores on your systems. Then you can run the pii script, for example:
+```
+bash pii-refinedweb.sh
+```
+
+### Step 3: Validate outputs
+We implemented 3 checks:
+1. Check schema and sample rows in output parquets by loading parquet with pandas
+2. Count numbers of PIIs per category by sampling from the outputs. You can further get an estimate of the total number of PIIs per category by multiplying total_num_samples/sample_used_for_this_check
+3. Visual check of a small sample by producing a html with yellow highlights of the PIIs and annotating with corresponding category (note that sometimes the highlights are not at the exact location, but should be quite close).
+
+```
+# First change the path to the data files in the python script
+python src/validate_ray_outputs.py
+```
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-pile-hn.sh b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-pile-hn.sh
new file mode 100644
index 000000000..cc3d168b5
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-pile-hn.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+BATCHSIZE=1000
+CPUCORES=48
+DATA=pile_hn
+OUTPUT_PREFIX=pile_hn
+DATA_DIR=/home/user/local/PILE/hn
+
+python ../src/pii_redaction_v2.py \
+--load-batch-size $BATCHSIZE \
+--cpu-per-worker $CPUCORES \
+--dataset-family $DATA \
+--output-prefix $OUTPUT_PREFIX \
+--data-dir $DATA_DIR \
+--local
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-redpj.sh b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-redpj.sh
new file mode 100644
index 000000000..aea0eda67
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-redpj.sh
@@ -0,0 +1,17 @@
+#!/bin/bash
+BATCHSIZE=50000
+CPUCORES=48
+INPUT=togethercomputer/RedPajama-Data-1T-Sample
+DATA=slimpajama
+OUTPUT_PREFIX=redpajama
+DATA_DIR=/home/user/local/dataset/RedPajama-Data-1T-Sample/
+
+python ../src/pii_redaction.py \
+--load-batch-size $BATCHSIZE \
+--cpu-per-worker $CPUCORES \
+--input $INPUT \
+--dataset-family $DATA \
+--output-prefix $OUTPUT_PREFIX \
+--data-dir $DATA_DIR \
+--local \
+#--skip 500000
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-refinedweb.sh b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-refinedweb.sh
new file mode 100644
index 000000000..2b3a4b858
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-refinedweb.sh
@@ -0,0 +1,14 @@
+#!/bin/bash
+BATCHSIZE=1000
+CPUCORES=48
+DATA=refinedweb
+OUTPUT_PREFIX=pii_test_output
+DATA_DIR=/home/user/local/refinedweb_samples
+
+python ../src/pii_redaction_v2.py \
+--load-batch-size $BATCHSIZE \
+--cpu-per-worker $CPUCORES \
+--dataset-family $DATA \
+--output-prefix $OUTPUT_PREFIX \
+--data-dir $DATA_DIR \
+--local
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-slimpj.sh b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-slimpj.sh
new file mode 100644
index 000000000..3601b582f
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/scripts/pii-slimpj.sh
@@ -0,0 +1,16 @@
+#!/bin/bash
+BATCHSIZE=50000
+CPUCORES=48
+INPUT=cerebras/SlimPajama-627B
+DATA=slimpajama
+OUTPUT_PREFIX=pii_slimpajama_se
+DATA_DIR=/home/user/local/
+
+python ../src/pii_redaction_v2.py \
+--load-batch-size $BATCHSIZE \
+--cpu-per-worker $CPUCORES \
+--input $INPUT \
+--dataset-family $DATA \
+--output-prefix $OUTPUT_PREFIX \
+--data-dir $DATA_DIR \
+--local
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/bigscience_pii_detect_redact.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/bigscience_pii_detect_redact.py
new file mode 100644
index 000000000..904477dcc
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/bigscience_pii_detect_redact.py
@@ -0,0 +1,325 @@
+# -*- coding: utf-8 -*-
+"""MST BigScience PII Code
+
+Original colab that is a source of this file is located at
+ https://colab.research.google.com/drive/1086H3-LGMz3gX0pGy9ECgr8KflosSKso
+
+# License
+
+Copyright 2022 Authors of this Notebook
+
+Licensed under the Apache License, Version 2.0 (the "License");
+you may not use this file except in compliance with the License.
+You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+
+# What is this colab?
+
+This colab detects the following kinds of PII for all languages in BigScience.
+Languages assumed are ["ar", "as", "bn", "ca", "en", "es", "eu", "fr", "gu", "hi", "id", "ig", "mr", "ny", "pa", "pt", "sn", "st", "sw", "ur", "vi", "xh", "yo", "zh", "zu"]
+
+## Highest Risk
+### Simple spans of characters:
+* **IDs [general]:** This is anything that is a sequence of 6 or more digits, as is common in identifiers for people internationally (national IDs, tax IDs, passport numbers, etc.), credit card numbers, IBAN codes, etc.
+* **Key [general]**: This is anything that is a sequence of digits and letters in the same string, optionally with spaces. Common for Credit Card and API, SSH, GPG keys. (Privacy group doesn't have a regex for this)
+* **Email address**, **User name**: Strings using @
+* **IP address**: Digits with periods in them
+* **Phone number**: At least 7 digits with spaces in them
+* **License plate**: (Privacy group doesn't have cross-lingual handling for this, MST group doesn't have a regex for this)
+
+### More complex spans: (WORK IN PROGRESS)
+* **Full Names**: Requires additional NER package
+* **Address**
+
+
+## Lower Risk: (We're not doing)
+* **URL**
+* **Time**: dateparser dependency
+* **Date**: dateparser dependency
+* **Age**
+
+"""
+
+
+#@title Define highest risk PII. TODO: License plate
+# NUMBER removed last minute due to false positives. See https://huggingface.slack.com/archives/C0307KE5UNT/p1647011702716159
+high_risk_tags = {'KEY', 'EMAIL', 'USER', 'IP_ADDRESS', 'IPv4', 'IPv6'} # , 'NUMBER', "ID"}
+
+"""# Regexes"""
+
+#@title Get the less sophisticated MST regexes for High Risk scenarios (baseline comparison). Not language-specific; all are general.
+import sys
+import regex
+import ipaddress
+# These are ordered so that we can return upon a match; no need to search for a substring.
+year_patterns = [
+ regex.compile(r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([1-2][0-9]{3}[\p{Pd}/][1-2][0-9]{3})(?:$|[\s@,?!;:\'\"(.\p{Han}])"), # yyyy-yyyy or yyyy/yyyy
+ regex.compile(r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([1-2][0-9]{3}[\p{Pd}/.][0-3][0-9][\p{Pd}/.][0-3][0-9])(?:$|[\s@,?!;:\'\"(.\p{Han}])"), # yyyy-mm-dd or yyyy-dd-mm or yyyy/mm/dd or yyyy/dd/mm or yyyy.mm.dd or yyyy.dd.mm
+ regex.compile(r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([0-3][0-9][\p{Pd}/.][0-3][0-9][\p{Pd}/.](?:[0-9]{2}|[1-2][0-9]{3}))(?:$|[\s@,?!;:\'\"(.\p{Han}])"), # mm-dd-yyyy or dd-mm-yyyy or mm/dd/yyyy or dd/mm/yyyy or mm.dd.yyyy or dd.mm.yyyy or the same but with yy instead of yyyy
+ regex.compile(r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([0-3][0-9][\p{Pd}/](?:[0-9]{2}|[1-2][0-9]{3}))(?:$|[\s@,?!;:\'\"(.\p{Han}])"), # mm-yyyy or mm/yyyy or the same but with yy
+ regex.compile(r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([1-2][0-9]{3}-[0-3][0-9])(?:$|[\s@,?!;:\'\"(.\p{Han}])"), # yyyy-mm or yyyy/mm
+]
+
+# Patterns for high-risk character strings
+id_pattern = r'(?:^|[\b\s@?,!;:\'\")(.\p{Han}])([A-Za-z]*(?:[\p{Pd}]*\p{Nd}){6,})(?:$|[\b\s@?,!;:\'\")(.\p{Han}])'
+# https://regex101.com/r/JQkmh8/2
+# key_pattern = r'(?:^|[\b\s@?,!;:\'\")(.\p{Han}])((?:(?:[A-Za-z]+[\p{Nd}\p{Pd}\/\+\=:]+|[\p{Nd}\p{Pd}\/\+\=:]+[A-Za-z]+)){4,}|(?:(?:\p{Nd}{3,}|[A-Z]+\p{Nd}+[A-Z]*|\p{Nd}+[A-Z]+\p{Nd}*)[\s\p{Pd}]?){4,})(?:$|[\b\s\p{Han}@?,!;:\'\"])'
+# https://regex101.com/r/JQkmh8/5
+key_pattern = r'(?:^|[\b\s@?,!:;\'\")(.\p{Han}])((?:(?:[A-Za-z]+[\p{Nd}\p{Pd}\/\+\=:_]+|[\p{Nd}\p{Pd}\/\+\=:]+[A-Za-z]+)){4,}|(?:(?:\p{Nd}{3,}|[A-Z]+\p{Nd}+[A-Z]*|\p{Nd}+[A-Z]+\p{Nd}*)[ \p{Pd}]?){3,})(?:$|[\b\s\p{Han}@?,!;:\'\")(.])'
+ipv4_pattern = r'(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}'
+ipv6_pattern = r'(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])'
+
+# presidio
+# #ipv4_pattern = r"\b(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)\b"
+# this one below gives a lot of false positives ::
+#ipv6_pattern = r"\b(([0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,7}:|([0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|([0-9a-fA-F]{1,4}:){1,5}(:[0-9a-fA-F]{1,4}){1,2}|([0-9a-fA-F]{1,4}:){1,4}(:[0-9a-fA-F]{1,4}){1,3}|([0-9a-fA-F]{1,4}:){1,3}(:[0-9a-fA-F]{1,4}){1,4}|([0-9a-fA-F]{1,4}:){1,2}(:[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:((:[0-9a-fA-F]{1,4}){1,6})|:((:[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(:[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(ffff(:0{1,4}){0,1}:){0,1}((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])|([0-9a-fA-F]{1,4}:){1,4}:((25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(2[0-4]|1{0,1}[0-9]){0,1}[0-9]))\b"
+ip_pattern = r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])(" + r"|".join([ipv4_pattern, ipv6_pattern]) + ")(?:$|[\s@,?!;:\'\"(.\p{Han}])"
+
+
+# bigcode-pii
+ipv4_pattern = r"(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(?:\.(?:25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}"
+ipv6_pattern = r"(?:[0-9a-fA-F]{1,4}:){7,7}[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,7}:|(?:[0-9a-fA-F]{1,4}:){1,6}:[0-9a-fA-F]{1,4}|(?:[0-9a-fA-F]{1,4}:){1,5}(?::[0-9a-fA-F]{1,4}){1,2}|(?:[0-9a-fA-F]{1,4}:){1,4}(?::[0-9a-fA-F]{1,4}){1,3}|(?:[0-9a-fA-F]{1,4}:){1,3}(?::[0-9a-fA-F]{1,4}){1,4}|(?:[0-9a-fA-F]{1,4}:){1,2}(?::[0-9a-fA-F]{1,4}){1,5}|[0-9a-fA-F]{1,4}:(?:(?::[0-9a-fA-F]{1,4}){1,6})|:(?:(?::[0-9a-fA-F]{1,4}){1,7}|:)|fe80:(?::[0-9a-fA-F]{0,4}){0,4}%[0-9a-zA-Z]{1,}|::(?:ffff(?::0{1,4}){0,1}:){0,1}(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])|(?:[0-9a-fA-F]{1,4}:){1,4}:(?:(?:25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])\.){3,3}(25[0-5]|(?:2[0-4]|1{0,1}[0-9]){0,1}[0-9])"
+ip_pattern = (
+ r"(?:^|[\b\s@?,!;:\'\")(.\p{Han}])("
+ + r"|".join([ipv4_pattern, ipv6_pattern])
+ + ")(?:$|[\s@,?!;:'\"(.\p{Han}])"
+)
+
+# https://regex101.com/r/EpA5B7/1
+email_pattern = r'''
+ (?<= ^ | [\b\s@,?!;:)('".\p{Han}<] )
+ (
+ [^\b\s@?!;,:)('"<]+
+ @
+ [^\b\s@!?;,/]*
+ [^\b\s@?!;,/:)('">.]
+ \.
+ \p{L} \w{1,}
+ )
+ (?= $ | [\b\s@,?!;:)('".\p{Han}>] )
+'''
+
+# https://regex101.com/r/mOqi1s/3
+#user_pattern = r'(?:^|[\s@,?!;:\'\")(\p{Han}])(@[^\s@,?!;:\'\")(]{3,})'
+user_pattern = r'''
+ (?<= ^ | [)(\s@,?!;:'"\p{Han}] )
+ (@
+ [^)(\s@,?!;:'"]{3,}
+ )
+'''
+# Examples from https://regexpattern.com/phone-number/
+# https://regex101.com/r/lZZ0XP/4
+# Also matches MLS numbers
+# phone_pattern = r'(?:^|[\s\'\"(\p{Han}])((?:\+\p{Nd}+[ \/.\p{Pd}]*)?(?:(?:\(\+?\p{Nd}+\))?(?:[ \/.\p{Pd}]*\p{Nd})){7,}(?:[\t\f #]*\p{Nd}+)?)(?:$|[\s@,?!;:\'\"(.\p{Han}])'
+
+id_regex = regex.compile(id_pattern, flags=regex.MULTILINE) #, re.MULTILINE)
+key_regex = regex.compile(key_pattern, flags=regex.MULTILINE) #, re.MULTILINE)
+ipv4_regex = regex.compile(ipv4_pattern)
+ipv6_regex = regex.compile(ipv6_pattern)
+ip_regex = regex.compile(ip_pattern, flags=regex.MULTILINE) #, re.MULTILINE)
+email_regex = regex.compile(email_pattern, flags=regex.MULTILINE|regex.VERBOSE) #, re.MULTILINE)
+user_regex = regex.compile(user_pattern, flags=regex.MULTILINE|regex.VERBOSE) #, re.MULTILINE)
+# phone_regex = regex.compile(phone_pattern, flags=regex.MULTILINE) #, re.MULTILINE)
+# TODO: license
+
+
+#sasha_regexes = copy.deepcopy(regex_rulebase)
+mst_regexes = {}
+for tag in high_risk_tags:
+ #print(tag)
+ if tag == 'ID':
+ mst_regexes['ID'] = id_regex
+ elif tag == 'KEY':
+ mst_regexes['KEY'] = key_regex
+ elif tag == 'IPv4':
+ mst_regexes['IPv4'] = ipv4_regex
+ elif tag == 'IPv6':
+ mst_regexes['IPv6'] = ipv6_regex
+ elif tag == 'IP_ADDRESS':
+ mst_regexes['IP_ADDRESS'] = ip_regex
+ elif tag == 'EMAIL':
+ mst_regexes['EMAIL'] = email_regex
+ elif tag == 'USER':
+ mst_regexes['USER'] = user_regex
+# elif tag == 'NUMBER':
+# mst_regexes['NUMBER'] = phone_regex
+ else:
+ sys.stderr.write('Dont have tag regex pattern for %s =(' % tag)
+
+#print("MST regexes under examination are:")
+#for tag, regx in mst_regexes.items():
+ #print(tag, end=":\t")
+ #print(regx)
+
+"""# PI Detection and Redaction functions are defined here! """
+
+#@title The detection functions and basic filtering functions are defined here.
+# tag_type = {'ID', 'KEY', 'EMAIL', 'IP_ADDRESS', 'PHONE', 'LICENSE_PLATE'}
+# Choose whether to put this import before or after, depending on which you're testing. =)
+
+def ip_has_digit(matched_str):
+ """Checks to make sure the PII span is not just :: or whatever that may
+ accidentally be picked up by making sure there are digits."""
+ return any(map(str.isdigit, matched_str))
+
+# from bigcode-pii
+def filter_versions(matched_str, context):
+ """Filter addresses in this format x.x.x.x, x.xx.x.x and the words dns/server
+ don't appear in the neighboring context, usually they are just versions"""
+ # count occurrence of dots
+ dot_count = matched_str.count('.')
+ exclude = (dot_count <= 3 and len(matched_str) <= 8)
+ if exclude:
+ if "dns" in context.lower() or "server" in context.lower():
+ return False
+ return exclude
+
+# from bigcode-pii
+def not_ip_address(matched_str):
+ """ make sure the string has a valid IP address format
+ e.g: 33.01.33.33 is not a valid IP address because of the 0 in front of 1
+ TODO: fix this directly in the regex"""
+ try:
+ ipaddress.ip_address(matched_str)
+ return False
+ except:
+ return True
+
+
+def matches_date_pattern(matched_str):
+ # Screen out date false positives
+ for year_regex in year_patterns:
+ if year_regex.match(matched_str):
+ return True
+ return False
+
+def is_website(matched_str):
+ # TODO
+ return False
+
+def detect_pii(text, lang, tag_types):
+ matches = []
+ for tag in tag_types:
+ label_pattern = mst_regexes[tag]
+ # !! regex.match happens here!!
+ matches_tmp = label_pattern.finditer(text)
+ for match in matches_tmp:
+ # TODO: Why does this happen?
+ if match.groups():
+ if len(match.groups()) > 1 and match.groups()[1]:
+ sys.stderr.write("Warning: Found substring matches in the main match.")
+ #print(tag)
+ #print(text)
+ #print(match.groups())
+ matched_str = match.groups()
+ #print(matched_str)
+ # Why does this happen?
+ matched_str = matched_str[0]
+ start, end = match.span(1)
+
+ if matched_str:
+ if tag in ["IP_ADDRESS"]:
+ # Filter out false positive IPs
+ if not ip_has_digit(matched_str):
+ continue
+ # this is to filter out versions, copied from bigcode-pii
+ if filter_versions(matched_str, text[start-100:end+100]):
+ #print('Detected: version: ', matched_str)
+ continue
+ # this is to filer out invalid ip address, copied from bigcode-pii
+ if not_ip_address(matched_str):
+ #print('Detected: invalid id address: ', matched_str)
+ continue
+ if tag in ["ID", "IP_ADDRESS"]: #, "NUMBER"]:
+ # Filter out date false positives
+ if matches_date_pattern(matched_str):
+ continue
+ # TODO: Implement
+ # if tag in ["KEY"]:
+ # # TODO: implement
+ # if is_website(matched_str):
+ # continue
+ matches += [(matched_str, match.span(), str(label_pattern), tag, lang)]
+ return matches
+
+
+#@title Redaction function defined here.
+def redact_pii(text, matches):
+ """Takes a match as defined in the detect_pii function and redacts it from the full string, returning a tuple."""
+ redacted_str = text
+ metadata = []
+ for match in matches:
+ matched_str = match[0]
+ tag = match[3]
+ redact_tag = "PI:" + tag
+ redacted_str = redacted_str.replace(matched_str, redact_tag)
+ # Create the "metadata" as all of the information we had before redaction
+ metadata += [(match)]
+ return (redacted_str, metadata)
+
+#@title General function to run the PII detection and redact it, saving everything else to metadata, is defined here.
+def run_pii(text, lang):
+ """
+ Runs the given set of regexes on the data "lines" and pulls out the
+ tagged items.
+ The lines structure stores the language type(s). This can be used for
+ language-specific regexes, although we're dropping that for now and using
+ only "default"/non-language-specific regexes.
+ """
+
+ #print('Detecting....')
+ # What is this for...?
+ text = text.encode().decode()
+ matches = detect_pii(text, lang, high_risk_tags)
+ #print(matches)
+ match_set = (text, {})
+ if len(matches) > 0:
+ # !!! REDACTION HAPPENS HERE !!!
+ redacted_str, metadata = redact_pii(text, matches)
+ metadata_out = {"regex metadata":metadata, "original": text, "redacted": redacted_str}
+ match_set = (redacted_str, metadata_out)
+ return match_set
+
+
+def run_pii_batch(exs, lang):
+ """
+ Runs the given set of regexes on the data "lines" and pulls out the
+ tagged items.
+ The lines structure stores the language type(s). This can be used for
+ language-specific regexes, although we're dropping that for now and using
+ only "default"/non-language-specific regexes.
+ """
+ regex_metadata = []
+ old_text = []
+ new_text = []
+ modified = []
+ for text in exs["text"]:
+ # What is this for...?
+ text = text.encode().decode()
+ matches = detect_pii(text, lang, high_risk_tags)
+ if len(matches) > 0:
+ # !!! REDACTION HAPPENS HERE !!!
+ redacted_str, metadata = redact_pii(text, matches)
+ regex_metadata.append(repr(metadata))
+ old_text.append(text)
+ new_text.append(redacted_str)
+ modified.append(True)
+ else:
+ regex_metadata.append("")
+ old_text.append(text)
+ new_text.append(text)
+ modified.append(False)
+ result = {
+ "regex_metadata": regex_metadata,
+ "old_text": old_text,
+ "text": new_text,
+ "modified": modified
+ }
+ return result
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/detect_pii.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/detect_pii.py
new file mode 100644
index 000000000..b9df49d64
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/detect_pii.py
@@ -0,0 +1,80 @@
+
+#from presidio_analyzer import AnalyzerEngine
+from utils import parse_recognizer_result, high_risk_tags
+from bigscience_pii_detect_redact import matches_date_pattern, detect_pii
+
+
+
+def detect_phone_numbers(text, analyzer):
+ # use presidio phone recognizer to detect phone numbers
+ # threshold is set to 0.4 based on a sample study
+ results = analyzer.analyze(text=text,
+ entities=['PHONE_NUMBER'],
+ #language='en',
+ #score_threshold=0.4,
+ #return_decision_process=True
+ )
+
+ pii_list = []
+
+ if len(results)>0:
+ for result in results:
+ # parse the output into dictionary
+ pii_dict = parse_recognizer_result(result)
+
+ # check if the number string is a date
+ number_str = text[pii_dict['start']: pii_dict['end']]
+
+ if matches_date_pattern(number_str):
+ #print('Date, not phone number')
+ pass
+
+ else:
+ pii_dict['value']=number_str
+ pii_list.append(pii_dict)
+ #print(pii_dict)
+
+ return pii_list
+
+
+
+def detect_other_piis(text):
+ matches = detect_pii(text, None, high_risk_tags)
+ if len(matches)>0:
+ pii_list = []
+ for m in matches:
+ pii = {}
+ pii['type']=m[-2]
+ pii['start']=m[1][0]
+ pii['end']=m[1][1]
+ pii['value']=m[0]
+ #print(pii)
+ pii_list.append(pii)
+
+ return pii_list
+ else:
+ return None
+
+def merge_outputs(presidio_outputs, bigscience_outputs):
+ if bigscience_outputs!=None:
+ piis = presidio_outputs + bigscience_outputs
+ # TODO: sometimes KEY and PHONE_NUMBER overlap
+ # when merging, only keep one of them
+ # right now, the short-cut is to have the KEY and PHONE_NUMBER replacement to be the same format
+
+ # detected_spans = []
+ # piis_to_remove = []
+ # for pii in piis:
+ # span = (pii['start'], pii['end'])
+ # if span in detected_spans:
+ # #remove pii from piis
+ # print('remove this pii: ', pii)
+ # piis_to_remove.append(pii)
+
+ # detected_spans.append(span)
+
+ # piis = [pii for pii in piis if pii not in piis_to_remove]
+
+ else:
+ piis = presidio_outputs
+ return piis
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/pii_redaction.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/pii_redaction.py
new file mode 100644
index 000000000..d5054a37e
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/pii_redaction.py
@@ -0,0 +1,320 @@
+"""
+this script is for processing pii-detection-dedaction
+"""
+
+# from presidio_analyzer import AnalyzerEngine
+from presidio_analyzer.predefined_recognizers import PhoneRecognizer
+
+from utils import summarize_pii_entities
+import time
+import json
+from redact_pii import redact_pii_with_random_values, redact_pii_with_tags
+from detect_pii import detect_other_piis, detect_phone_numbers, merge_outputs
+
+import os, sys
+import time
+import argparse
+from pprint import pprint
+from typing import Dict, List
+
+import ray
+import ray.data
+import pandas as pd
+import numpy as np
+from datasets import load_dataset
+
+import logging
+import glob
+
+
+def detect_redact_pii_for_one_text(text, analyzer):
+
+ detected_phone_numbers = detect_phone_numbers(text, analyzer)
+
+ # get output from bigscience-pii
+ detected_other_piis = detect_other_piis(text)
+ # merge the two outputs
+ piis = merge_outputs(detected_phone_numbers, detected_other_piis)
+ #print('Merged PIIs: ', piis)
+
+ if len(piis)>0:
+ # save result
+ #redact
+ redacted_text = redact_pii_with_random_values(text, piis)
+ #redacted_text = redact_pii_with_tags(text, piis)
+
+ output = {
+ 'redacted': redacted_text,
+ 'pii': piis,
+ "modified": True
+ }
+
+
+ else:
+ output = {
+ 'redacted': None,
+ 'pii': None,
+ "modified": False
+
+ }
+
+ return output
+
+
+def get_args():
+ parser = argparse.ArgumentParser()
+ group = parser.add_argument_group(title="input data")
+ group.add_argument(
+ "--input",
+ type=str,
+ default="tiiuae/falcon-refinedweb",
+ required=False,
+ help="Name of the dataset repository,e.g. togethercomputer/RedPajama-Data-1T"
+ )
+
+ # group.add_argument(
+ # "--format",
+ # type=str,
+ # default="parquet",
+ # required=False,
+ # help="input data format, parquet or json"
+ # )
+
+ group.add_argument(
+ "--dataset-family",
+ type=str,
+ default="refinedweb",
+ required=False,
+ help="choose from: refinedweb, slimpajama, pile"
+ )
+
+ group.add_argument(
+ "--data-dir",
+ type=str,
+ required=False,
+ help="for local mode, you need to provide local dataset repository, e.g. /home/user/local"
+ )
+ group.add_argument(
+ "--cache-dir",
+ type=str,
+ default='/root/.cache',
+ help="Hugging Face cache dir, where the hugging face dataset it stored"
+ )
+ group.add_argument(
+ '--local',
+ default=False,
+ action='store_true',
+ help="whether to use local mode to preprocess data"
+ )
+ group.add_argument(
+ "--load-batch-size", type=int, default=1000, help="only needed if you use streaming mode to read data from hugging face"
+ )
+ group.add_argument(
+ "--skip", type=int, default=None, help="how many samples to skip"
+ )
+ group = parser.add_argument_group(title="output data")
+ group.add_argument(
+ "--output-prefix",
+ type=str,
+ required=False,
+ default="processed",
+ help="Path to binary output file without suffix",
+ )
+ group = parser.add_argument_group(title="runtime")
+ group.add_argument(
+ "--cpu-per-worker", type=int, default=1, help="Number of CPUs to use per worker"
+ )
+
+ args = parser.parse_args()
+ args.output_path = '/home/user/local'
+ return args
+
+
+def main():
+ args = get_args()
+
+ # if args.format not in ['parquet', 'json']:
+ # raise ValueError('data file format must be parquet or json')
+
+ output_dir = os.path.join(args.output_path, args.output_prefix)
+ if not os.path.exists(output_dir):
+ os.mkdir(output_dir)
+ exception_dir = output_dir+'/exceptions/'
+ cache_dir = args.cache_dir
+ dataset_family = args.dataset_family
+ log_dir = output_dir+'/logs/'
+ if not os.path.exists(log_dir):
+ os.mkdir(log_dir)
+
+ logging.basicConfig(filename=log_dir+"newlog.txt",
+ format='%(asctime)s %(message)s',
+ filemode='w')
+
+ logger = logging.getLogger()
+ logger.setLevel(logging.DEBUG)
+
+ logger.info(args)
+ logger.info('processing {} data.....'.format(dataset_family))
+
+ # init ray
+ ray.init(address='auto')
+ pprint(ray.cluster_resources())
+ num_nodes = len(ray.nodes())
+ parallelism = num_nodes * args.cpu_per_worker
+
+ logger.info('num of ray nodes: {}'.format(num_nodes))
+ logger.info('parallelism: {}'.format(parallelism))
+
+ def preprocess_fn(contents, metas, analyzer) -> pd.DataFrame:
+ # inputs are in batches
+ redacted_content = []
+ modified = []
+ piis = []
+ meta_output = []
+ original_content = []
+
+ exceptions = []
+
+ for i, text in enumerate(contents):
+ try:
+ # # for testing exception
+ # if i%5 == 0:
+ # raise ValueError
+ output = detect_redact_pii_for_one_text(text, analyzer)
+ modified.append(output['modified'])
+ piis.append(output['pii'])
+ if output['pii'] != None:
+ redacted_content.append(output['redacted'])
+ else:
+ redacted_content.append(text)
+ meta_output.append(metas[i])
+ except:
+ logger.debug('exception occurred!') # seems cannot log from ray actor using this method
+ exceptions.append({
+ 'text':text,
+ 'meta': metas[i]
+ })
+ if len(exceptions)>0:
+ if not os.path.exists(exception_dir):
+ os.mkdir(exception_dir)
+ task_id = ray.get_runtime_context().get_task_id()
+ with open(exception_dir + task_id+'.json', 'w') as f:
+ json.dump(exceptions, f)
+
+ return pd.DataFrame({#"original": original_content,
+ 'redacted': redacted_content,
+ 'piis': piis,
+ 'meta': meta_output,
+ 'modified': modified})
+
+
+
+ def pii_removal_refinedweb(batch: Dict[str, List]) -> pd.DataFrame:
+ # analyzer = AnalyzerEngine()
+ analyzer = PhoneRecognizer()
+
+ contents = batch['content'].tolist()
+
+ try:
+ urls = batch['url'].tolist()
+ timestamps = batch['timestamp'].tolist()
+ dump = batch['dump'].tolist()
+ segment = batch['segment'].tolist()
+ metas = []
+
+ for i in range(len(urls)):
+ metas.append({
+ 'url': urls[i],
+ 'timestamp': timestamps[i],
+ 'dump': dump[i],
+ 'segment': segment[i]
+ })
+ except:
+ metas = [None]*len(contents)
+
+ return preprocess_fn(contents, metas, analyzer)
+
+ def pii_removal_slimpajama_pile(batch: Dict[str, List]) -> pd.DataFrame:
+ # analyzer = AnalyzerEngine()
+ analyzer = PhoneRecognizer()
+ # try:
+ contents = batch['text'].tolist()
+ try:
+ metas = batch['meta'].tolist()
+ # print(metas)
+ except:
+ metas = [None]*len(contents)
+ return preprocess_fn(contents, metas, analyzer)
+ # except:
+ # if not os.path.exists(exception_dir):
+ # os.mkdir(exception_dir)
+ # task_id = ray.get_runtime_context().get_task_id()
+ # with open(exception_dir + task_id+'.json', 'w') as f:
+ # json.dump(batch, f)
+
+
+
+ if not args.local:
+ dataset = load_dataset(args.input, streaming=True)['train']
+ else:
+ data_dir = args.data_dir
+ if dataset_family == 'refinedweb':
+ datafiles = glob.glob(data_dir + '*.parquet')
+ dataset = load_dataset('parquet', data_files = datafiles, streaming=True)['train']
+ elif dataset_family == 'slimpajama' or dataset_family == 'pile':
+ datafiles = glob.glob(data_dir+'*.jsonl')
+ dataset = load_dataset('json', data_files = datafiles, streaming=True)['train']
+ else:
+ raise ValueError('{} not supported'.format(dataset_family))
+
+ if args.skip != None:
+ dataset_to_process = dataset.skip(args.skip)
+ else:
+ dataset_to_process = dataset
+
+ idx = 1
+
+ t0 = time.time()
+ for rows in dataset_to_process.iter(batch_size=args.load_batch_size):
+ logger.info('Start processing batch # {}'.format(idx))
+ print("-----------------------------")
+ df = pd.DataFrame(rows)
+ # logger.info(df['meta'])
+ ray_dataset = ray.data.from_pandas(df)
+ ray_dataset = ray_dataset.repartition(parallelism) #, shuffle = True)
+
+ if dataset_family == 'refinedweb':
+ print('process refinedweb')
+ process_fn = pii_removal_refinedweb
+ elif dataset_family == 'slimpajama' or dataset_family == 'pile':
+ print('process slimpj or pile')
+ process_fn = pii_removal_slimpajama_pile
+ else:
+ raise ValueError('{} not supported'.format(dataset_family))
+
+ tokenized_data = ray_dataset.map_batches(process_fn, batch_format="numpy", batch_size=None)
+
+ if dataset_family == 'refinedweb':
+ tokenized_data.write_parquet(output_dir)
+ elif dataset_family == 'slimpajama' or dataset_family == 'pile':
+ tokenized_data.write_json(output_dir)
+ else:
+ raise ValueError('{} not supported'.format(dataset_family))
+
+ logger.info('Finished processing batch # {}'.format(idx))
+ logger.info(f"{idx} * {args.load_batch_size} samples were written to disk.")
+ idx += 1
+ print("============================")
+ if idx == 2:
+ #sys.exit()
+ break
+ t1 = time.time()
+ logger.info('Processing {} samples took {:.3f} sec'.format((idx-1)*args.load_batch_size, t1-t0))
+
+
+if __name__ == "__main__":
+ start = time.time()
+ main()
+ end = time.time()
+ print(f"\nthis script took {end-start}s.")
+
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/pii_redaction_v2.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/pii_redaction_v2.py
new file mode 100644
index 000000000..ff78bb9c9
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/pii_redaction_v2.py
@@ -0,0 +1,294 @@
+"""
+this script is for processing pii-detection-dedaction
+"""
+
+# from presidio_analyzer import AnalyzerEngine
+from presidio_analyzer.predefined_recognizers import PhoneRecognizer
+
+from utils import summarize_pii_entities
+import time
+import json
+from redact_pii import redact_pii_with_random_values, redact_pii_with_tags
+from detect_pii import detect_other_piis, detect_phone_numbers, merge_outputs
+
+import os, sys
+import time
+import argparse
+from pprint import pprint
+from typing import Dict, List
+
+try:
+ import ray
+ import ray.data
+except:
+ pass
+import pandas as pd
+import numpy as np
+from datasets import load_dataset
+
+import logging
+import glob
+
+
+def detect_redact_pii_for_one_text(text, analyzer):
+
+ detected_phone_numbers = detect_phone_numbers(text, analyzer)
+
+ # get output from bigscience-pii
+ detected_other_piis = detect_other_piis(text)
+ # merge the two outputs
+ piis = merge_outputs(detected_phone_numbers, detected_other_piis)
+ #print('Merged PIIs: ', piis)
+
+ if len(piis)>0:
+ # save result
+ #redact
+ redacted_text = redact_pii_with_random_values(text, piis)
+ #redacted_text = redact_pii_with_tags(text, piis)
+
+ output = {
+ 'redacted': redacted_text,
+ 'pii': piis,
+ "modified": True
+ }
+
+
+ else:
+ output = {
+ 'redacted': None,
+ 'pii': None,
+ "modified": False
+
+ }
+
+ return output
+
+
+def get_args():
+ parser = argparse.ArgumentParser()
+ group = parser.add_argument_group(title="input data")
+ group.add_argument(
+ "--input",
+ type=str,
+ default="tiiuae/falcon-refinedweb",
+ required=False,
+ help="Name of the dataset repository,e.g. togethercomputer/RedPajama-Data-1T"
+ )
+
+ # group.add_argument(
+ # "--format",
+ # type=str,
+ # default="parquet",
+ # required=False,
+ # help="input data format, parquet or json"
+ # )
+
+ group.add_argument(
+ "--dataset-family",
+ type=str,
+ default="refinedweb",
+ required=False,
+ help="choose from: refinedweb, slimpajama, pile"
+ )
+
+ group.add_argument(
+ "--data-dir",
+ type=str,
+ required=False,
+ help="for local mode, you need to provide local dataset repository, e.g. /home/user/local"
+ )
+ group.add_argument(
+ "--cache-dir",
+ type=str,
+ default='/root/.cache',
+ help="Hugging Face cache dir, where the hugging face dataset it stored"
+ )
+ group.add_argument(
+ '--local',
+ default=False,
+ action='store_true',
+ help="whether to use local mode to preprocess data"
+ )
+ group.add_argument(
+ "--load-batch-size", type=int, default=1000, help="only needed if you use streaming mode to read data from hugging face"
+ )
+ group.add_argument(
+ "--skip", type=int, default=None, help="how many samples to skip"
+ )
+ group = parser.add_argument_group(title="output data")
+ group.add_argument(
+ "--output-prefix",
+ type=str,
+ required=False,
+ default="processed",
+ help="Path to binary output file without suffix",
+ )
+ group = parser.add_argument_group(title="runtime")
+ group.add_argument(
+ "--cpu-per-worker", type=int, default=1, help="Number of CPUs to use per worker"
+ )
+
+ args = parser.parse_args()
+ args.output_path = '/home/user/local'
+ return args
+
+
+def main():
+ args = get_args()
+
+ # if args.format not in ['parquet', 'json']:
+ # raise ValueError('data file format must be parquet or json')
+
+ output_dir = os.path.join(args.output_path, args.output_prefix)
+ if not os.path.exists(output_dir):
+ os.mkdir(output_dir)
+ exception_dir = output_dir+'/exceptions/'
+ cache_dir = args.cache_dir
+ dataset_family = args.dataset_family
+ log_dir = output_dir+'/logs/'
+ if not os.path.exists(log_dir):
+ os.mkdir(log_dir)
+
+ logging.basicConfig(filename=log_dir+"newlog.txt",
+ format='%(asctime)s %(message)s',
+ filemode='w')
+
+ logger = logging.getLogger()
+ logger.setLevel(logging.DEBUG)
+
+ logger.info(args)
+ logger.info('processing {} data.....'.format(dataset_family))
+
+ # init ray
+ ray.init(address='auto')
+ pprint(ray.cluster_resources())
+ num_nodes = len(ray.nodes())
+ parallelism = num_nodes * args.cpu_per_worker
+
+ logger.info('num of ray nodes: {}'.format(num_nodes))
+ logger.info('parallelism: {}'.format(parallelism))
+
+ def preprocess_fn(contents, analyzer) -> pd.DataFrame:
+ # inputs are in batches
+ text, doc_id, hash, meta, source, bytes = contents
+ redacted_content = []
+ modified = []
+ piis = []
+ meta_output = []
+ doc_id_output = []
+ hash_output = []
+ source_output = []
+ bytes_output = []
+
+ exceptions = []
+
+ for i, txt in enumerate(text):
+ try:
+ # # for testing exception
+ # if i%5 == 0:
+ # raise ValueError
+ output = detect_redact_pii_for_one_text(txt, analyzer)
+ modified.append(output['modified'])
+ piis.append(output['pii'])
+ if output['pii'] != None: # have PII so output redacted text
+ redacted_content.append(output['redacted'])
+ else: # did not have PII so output original text
+ redacted_content.append(txt)
+ meta_output.append(meta[i])
+ doc_id_output.append(doc_id[i])
+ hash_output.append(hash[i])
+ source_output.append(source[i])
+ bytes_output.append(bytes[i])
+ except:
+ logger.debug('exception occurred!') # seems cannot log from ray actor using this method
+ exceptions.append({
+ 'text':txt,
+ 'doc_id': doc_id[i]
+ })
+ if len(exceptions)>0:
+ if not os.path.exists(exception_dir):
+ os.mkdir(exception_dir)
+ task_id = ray.get_runtime_context().get_task_id()
+ with open(exception_dir + task_id+'.json', 'w') as f:
+ json.dump(exceptions, f)
+
+ return pd.DataFrame({#"original": original_content,
+ 'new_content': redacted_content,
+ 'meta': meta_output,
+ 'doc_id':doc_id_output,
+ 'hash': hash_output,
+ 'source': source_output,
+ 'bytesize':bytes_output,
+ 'secrets': piis,
+ 'modified': modified})
+
+
+ def pii_removal(batch: Dict[str, List]) -> pd.DataFrame:
+ # analyzer = AnalyzerEngine()
+ analyzer = PhoneRecognizer()
+ text = batch['text'].tolist()
+ doc_id = batch['doc_id'] #.to_list()
+ hash = batch['hash']#.to_list()
+ source = batch['source'].tolist()
+ bytes = batch['bytesize'].tolist()
+ meta = batch['meta'].tolist()
+
+ # try:
+ # meta = batch['meta'].tolist()
+ # # print(metas)
+ # except:
+ # meta = [None]*len(contents)
+
+ contents = (text, doc_id, hash, meta, source, bytes)
+
+ return preprocess_fn(contents, analyzer)
+
+
+ if not args.local:
+ dataset = load_dataset(args.input, streaming=True)['train']
+ else:
+ data_dir = args.data_dir
+ if data_dir[-1] != '/':
+ data_dir+='/'
+ datafiles = glob.glob(data_dir + '*.parquet')
+ dataset = load_dataset('parquet', data_files = datafiles, streaming=True)['train']
+
+ if args.skip != None:
+ dataset_to_process = dataset.skip(args.skip)
+ else:
+ dataset_to_process = dataset
+
+ idx = 1
+
+ t0 = time.time()
+ for rows in dataset_to_process.iter(batch_size=args.load_batch_size):
+ logger.info('Start processing batch # {}'.format(idx))
+ print("-----------------------------")
+ df = pd.DataFrame(rows)
+ # logger.info(df['meta'])
+ ray_dataset = ray.data.from_pandas(df)
+ # partition batch into total number of workers
+ ray_dataset = ray_dataset.repartition(parallelism) #, shuffle = True)
+ # processing batch
+ tokenized_data = ray_dataset.map_batches(pii_removal, batch_format="numpy", batch_size=None)
+ # gather data into one file per node
+ tokenized_data = tokenized_data.repartition(num_nodes)
+ tokenized_data.write_parquet(output_dir)
+
+ logger.info('Finished processing batch # {}'.format(idx))
+ logger.info(f"{idx} * {args.load_batch_size} samples were written to disk.")
+ idx += 1
+ print("============================")
+ # if idx == 2:
+ # #sys.exit()
+ # break
+ t1 = time.time()
+ logger.info('Processing {} samples took {:.3f} sec'.format((idx-1)*args.load_batch_size, t1-t0))
+
+
+if __name__ == "__main__":
+ start = time.time()
+ main()
+ end = time.time()
+ print(f"\nthis script took {end-start}s.")
+
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/process_exceptions.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/process_exceptions.py
new file mode 100644
index 000000000..ac156c750
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/process_exceptions.py
@@ -0,0 +1,10 @@
+import json
+import glob
+
+data_dir = "/home/user/local/processed/refinedweb/exceptions/"
+filename = "2af2834dff874b4affffffffffffffffffffffff13000000.json"
+
+with open(data_dir+filename, 'r') as f:
+ data = json.load(f)
+
+print(len(data))
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/redact_pii.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/redact_pii.py
new file mode 100644
index 000000000..0fee73d1a
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/redact_pii.py
@@ -0,0 +1,186 @@
+import ipaddress
+import random
+import string
+
+
+
+# option 1: similar to bigscience-pii redaction:
+# # replace with [TAG], e.g., [EMAIL]
+# #redacted_str, metadata = redact_pii(text, matches)
+# option 2: similar to bigcode-pii redaction:
+# # IP: replace with predefined random IP address, or DNS servers
+# # EMAIL, USERNAME, KEY: replace with random values
+# # also keeping track of pii values through a sample
+# # and replace with the same random value for the same pii value
+# #print(redacted_str)
+# redacted_str = redact_pii_with_random_values(text, matches)
+# # metadata_out = {"regex metadata":metadata, "original": text, "redacted": redacted_str}
+# # match_set = (redacted_str, metadata_out)
+
+
+# The IP replacements are copied from bigcode-pii
+# List of random private IP addresses to use as replacements
+REPLACEMENTS_IP = {
+ "IPv4": ["172.16.31.10", "172.16.58.3", "172.16.17.32", "192.168.127.12", "192.168.3.11"],
+ "IPv6": [
+ "fd00:c2b6:b24b:be67:2827:688d:e6a1:6a3b",
+ "fd00:a516:7c1b:17cd:6d81:2137:bd2a:2c5b",
+ "fc00:e968:6179::de52:7100",
+ "fc00:db20:35b:7399::5",
+ "fdf8:f53e:61e4::18",
+ ],
+}
+
+# providergs = ["google", "cloudfare", "alternate-dns", "quad9","open-dns", "comodo", "adguard"]
+POPULAR_DNS_SERVERS = [
+ "8.8.8.8",
+ "8.8.4.4",
+ "1.1.1.1",
+ "1.0.0.1",
+ "76.76.19.19",
+ "76.223.122.150",
+ "9.9.9.9",
+ "149.112.112.112",
+ "208.67.222.222",
+ "208.67.220.220",
+ "8.26.56.26",
+ "8.20.247.20",
+ "94.140.14.14",
+ "94.140.15.15",
+]
+
+letters = string.ascii_lowercase
+digits = string.digits
+lettters_digits = string.ascii_lowercase + string.digits
+
+# random emails
+n = 100
+REPLACEMENT_EMAIL = [
+ "".join(random.choice(letters) for i in range(10)) + "@example.com"
+ for i in range(n)
+ ]
+
+# random keys
+REPLACEMENT_KEY = [
+ "".join(random.choice(digits) for i in range(10))
+ for i in range(n)
+ ]
+# simple hack: make key replacement and phone replacement to be
+# both 10 random digits
+# to simplify redaction
+# [
+# "".join(random.choice(lettters_digits) for i in range(32)) for i in range(n)
+# ]
+
+# random usernames
+REPLACEMENT_USERNAME = [
+ "@"+"".join(random.choice(letters) for i in range(10))
+ for i in range(n)
+ ]
+
+REPLACEMENT_PHONE = [
+ "".join(random.choice(digits) for i in range(10))
+ for i in range(n)
+ ]
+
+REPLACEMENT_DICT={
+ 'EMAIL': REPLACEMENT_EMAIL,
+ 'KEY': REPLACEMENT_KEY,
+ 'USER': REPLACEMENT_USERNAME,
+ 'PHONE_NUMBER':REPLACEMENT_PHONE
+}
+
+def is_private_ip(ip):
+ """Check if an IP address is allocated for private networks"""
+ ip = ipaddress.ip_address(ip)
+ return ip.is_private
+
+def replace_ip(value):
+ """Replace an IP address with a synthetic IP address of the same format"""
+ # ipaddress.ip_address(ip) raises exception when ip i snot valid
+ # if is_private_ip(value) or (value in POPULAR_DNS_SERVERS):
+ # return value
+
+ if value in POPULAR_DNS_SERVERS:
+ #print('IP is one of DNS servers, return original value: ', value)
+ return value
+
+ try:
+ ipaddress.IPv4Address(value)
+ #print('IP is IPv4, return redacted value')
+ return random.choice(REPLACEMENTS_IP["IPv4"])
+ except ValueError:
+ try:
+ ipaddress.IPv6Address(value)
+ #print('IP is IPv6, return redacted value')
+ return random.choice(REPLACEMENTS_IP["IPv6"])
+ except ValueError:
+ # this doesn't happen if we already use ipaddress filter in the detection
+ # this is good as we have another layer of protection to redace false positive
+ #print("Invalid IP address:", value)
+ return value
+
+def redact_email_key_user_phone(value, tag):
+ supported_tags = {'KEY', 'EMAIL', 'USER', 'PHONE_NUMBER'}
+ if tag in supported_tags:
+ #return random.choice(REPLACEMENT_DICT[tag])
+ if tag=='KEY':
+ redact_value = "".join(random.choice(digits) for i in range(10))
+ if tag == 'EMAIL':
+ redact_value = "".join(random.choice(letters) for i in range(10)) + "@{}.com".format("".join(random.choice(letters) for i in range(5)))
+ if tag == 'USER':
+ redact_value = "@"+"".join(random.choice(letters) for i in range(10))
+ if tag == 'PHONE_NUMBER':
+ redact_value = "".join(random.choice(digits) for i in range(10))
+ return redact_value
+ else:
+ #print('{} type is not supported!'.format(tag))
+ return value
+
+
+# TODO: generate random strings on the fly, instead of choose from one of n
+def redact_pii_with_random_values(text, matches):
+ # adapted from bigcode-pii redaction
+ # however, matches here is a list of dictionaries
+ # the dictionary is of this schema:
+ # {'start': 123, 'end': 234, 'value': xyz, 'type': PHONE_NUMBER}
+ redacted_str = text
+ replaced_values = []
+ lookup_dict = {}
+ for match in matches:
+ start_idx = match['start']
+ end_idx = match['end']
+ matched_str = match['value'] #text[start_idx:end_idx]
+ tag = match['type']
+ if matched_str in replaced_values:
+ redact_tag = lookup_dict[matched_str]
+ else:
+ if tag == 'IP_ADDRESS':
+ redact_tag = replace_ip(matched_str)
+
+ else:
+ redact_tag = redact_email_key_user_phone(matched_str, tag)
+
+ replaced_values.append(matched_str)
+ lookup_dict[matched_str]=redact_tag
+
+ # print('original: ', matched_str)
+ # print('redacted tag: ', redact_tag)
+ match['redacted'] = redact_tag
+ redacted_str = redacted_str.replace(matched_str, redact_tag)
+ # Create the "metadata" as all of the information we had before redaction
+ #metadata += [(match)]
+ #print(matches)
+ return redacted_str
+
+
+def redact_pii_with_tags(text, matches):
+ # adapted from bigscience-pii
+ redacted_str = text
+ for match in matches:
+ matched_str = match['value']
+ tag = match['type']
+ redact_tag = "[" + tag +"]"
+ redacted_str = redacted_str.replace(matched_str, redact_tag)
+
+ return redacted_str
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/utils.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/utils.py
new file mode 100644
index 000000000..5e1d4bdd9
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/utils.py
@@ -0,0 +1,162 @@
+import json
+from datasets import load_dataset
+import random
+from bigscience_pii_detect_redact import detect_pii
+
+piis_not_to_consider =['PERSON',
+ 'NRP',
+ 'LOCATION',
+ 'DATE_TIME',
+ 'URL'
+ ]
+
+def write_to_html(result, text_corpus):
+ idx = result['doc_idx']
+ text = text_corpus[idx]['text']
+ text_list = list(text)
+
+ entity_categories = []
+ n = 0
+ for r in result['pii']:
+ start = int(r['start'])
+ end = int(r['end'])
+ text_list.insert(start+3*n, '')
+ text_list.insert(end+1+3*n, "")
+ text_list.insert(end+2+3*n, '[[['+r['type']+']]]')
+ n+=1
+ entity_categories.append(r['type'])
+
+ bolded = ''.join(text_list)
+ #html = ""+bolded+""
+ #print(html)
+
+ summary = summarize_pii_entities(entity_categories)
+
+ bolded = summary + ''+ bolded
+ return bolded, entity_categories
+
+
+def summarize_pii_entities(entity_categories):
+ unique_categories = list(set(entity_categories))
+ summary = 'PIIs: '
+ for e in unique_categories:
+ occurences = entity_categories.count(e)
+ summary += (e + ": "+str(occurences)+'; ')
+ return summary
+
+def parse_recognizer_result(result):
+ #temp = result.split(',')
+ #assert len(temp)==4, 'a valid result should have 4 fields, but only got {} fields'.format(len(temp))
+ parsed_dict = {}
+
+ parsed_dict['type']=result.entity_type
+ parsed_dict['start']=result.start
+ parsed_dict['end']=result.end #temp[2][5:]
+ #parsed_dict['score']=result.score #temp[3][6:]
+
+ return parsed_dict
+
+
+def count_num_piis_to_consider(result):
+ #result is a dictionary of this format
+ # {doc_idx:123, num_pii: 2, pii: [{type: ABC, start:234, end: 256, score:0.6}, {}]}
+ filtered_piis = []
+ piis = result['pii']
+ num_piis_to_consider = 0
+ for pii in piis:
+ if pii['type'] in piis_not_to_consider:
+ #print('Not including {} category'.format(pii['type']))
+ continue
+ else:
+ num_piis_to_consider += 1
+ #print(pii)
+ if pii['type'] != 'IP_ADDRESS' and pii['type'] !='EMAIL_ADDRESS':
+ pii['type'] = 'ID_NUM_STR'
+ #print(pii)
+ filtered_piis.append(pii)
+ #print('number of piis to consider: ', num_piis_to_consider)
+ #print('filtered piis: ',filtered_piis)
+
+ return num_piis_to_consider, filtered_piis
+
+def filter_results_by_category(results):
+ filtered_results = []
+ for result in results:
+ num_piis_to_consider, filtered_piis = count_num_piis_to_consider(result)
+ if num_piis_to_consider>0:
+ result['pii'] = filtered_piis
+ result['num_pii']=len(filtered_piis)
+ filtered_results.append(result)
+ #print('filtered results: ',filtered_results)
+ return filtered_results
+
+def sample_results(results, number_of_samples):
+ random.seed(1234)
+ if len(results) > number_of_samples:
+ return random.sample(results, number_of_samples)
+ else:
+ return results
+
+# this tag list is copied from
+# https://github.com/bigscience-workshop/data-preparation/blob/main/preprocessing/training/02_pii/bigscience_pii_detect_redact.py#L53
+high_risk_tags = {'KEY', 'EMAIL', 'USER', 'IP_ADDRESS'} # , 'NUMBER', "ID"}
+
+def detect_with_bigscience_pii_single_sample(text):
+ matches = detect_pii(text, None, high_risk_tags)
+ if len(matches)>0:
+ pii_list = []
+ for m in matches:
+ #print(m)
+ pii = {}
+ pii['type']=m[-2]
+ pii['start']=m[1][0]
+ pii['end']=m[1][1]
+ #print(pii)
+ pii_list.append(pii)
+
+ return pii_list
+ else:
+ return None
+
+def is_phone_number(matched_str):
+ DEFAULT_SUPPORTED_REGIONS = ("US", "UK", "DE", "FE", "IL", "IN", "CA", "BR")
+ #valid = phonenumbers.is_valid_number(matched_str)
+ #print(matched_str)
+ for region in DEFAULT_SUPPORTED_REGIONS:
+ try:
+ parsed_number = phonenumbers.parse(matched_str)
+ except:
+ #print('cannot parse the string as phone number')
+ return False
+
+ flag = phonenumbers.is_possible_number(parsed_number)
+ if flag == True:
+ #print('KEY is PHONE_NUMBER')
+ return True
+
+ return False
+
+
+
+def remove_phone_numbers_from_bigscience_results(matches):
+ # use bigscience-pii to detect
+ # emails, ip addresses, usernames, id alphanumerics
+ if len(matches)>0:
+ pii_list = []
+ phone_matches = []
+ for i, m in enumerate(matches):
+ matched_str = m[0]
+ if is_phone_number(matched_str):
+ phone_matches.append(i)
+ # else:
+ # # print(m)
+ # pii = {}
+ # pii['type']=m[-2]
+ # pii['start']=m[1][0]
+ # pii['end']=m[1][1]
+ # print(pii)
+ # pii_list.append(pii)
+
+
+ matches = [matches[i] for i in range(len(matches)) if i not in phone_matches]
+ return matches
diff --git a/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/validate_ray_outputs.py b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/validate_ray_outputs.py
new file mode 100644
index 000000000..c7b32203f
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_detection_redaction/src/validate_ray_outputs.py
@@ -0,0 +1,158 @@
+from datasets import load_dataset
+# from presidio_analyzer import AnalyzerEngine
+from utils import summarize_pii_entities
+# import time
+import json
+# from redact_pii import redact_pii_with_random_values, redact_pii_with_tags
+# from detect_pii import detect_other_piis, detect_phone_numbers, merge_outputs
+import glob
+import random
+import os
+import pandas as pd
+# from matplotlib import pyplot as plt
+
+def write_merged_detect_redact_results_to_html(sample):
+ # sample is a dictionary {'text': xxxx, pii: []}
+ #text = sample['original']
+ piis = sample['secrets']
+ entity_categories = []
+
+ if sample['modified']:
+ #text_list = list(text)
+ redacted_text = sample['text']
+ redacted_text_list = list(redacted_text)
+
+
+ n = 0
+ for r in piis:
+ start = int(r['start'])
+ end = int(r['end'])
+ # text_list.insert(start+3*n, '')
+ # text_list.insert(end+1+3*n, "")
+ # text_list.insert(end+2+3*n, '[[['+r['type']+']]]')
+
+ redacted_text_list.insert(start+3*n, '')
+ redacted_text_list.insert(end+1+3*n, "")
+ redacted_text_list.insert(end+2+3*n, '[[['+r['type']+']]]')
+
+ n+=1
+ entity_categories.append(r['type'])
+
+
+ bolded = ''.join(redacted_text_list)
+ #html = ""+bolded+""
+ #print(html)
+
+ # redacted_marked = ''.join(redacted_text_list)
+ summary = summarize_pii_entities(entity_categories)
+ bolded = summary + ''+ bolded
+
+ else:
+ bolded = sample['text']
+ # redacted_marked = None
+
+ return bolded, entity_categories
+
+
+
+
+path = '/home/vmagent/app/falcon-refinedweb-pii-remove/'
+datafile = glob.glob(path + '*.parquet')
+# randomly pick one file from output
+filename = random.choice(datafile)
+output = 'pii_test'
+
+# Check 1: load with pd to check schema and content
+df = pd.read_parquet(filename)
+print(df.head(10))
+
+print(df.shape)
+
+
+# Check 2: get statistics from a sample
+def get_stats(row):
+ count_dict = {
+ 'PHONE_NUMBER': 0,
+ 'IP_ADDRESS':0,
+ 'EMAIL': 0,
+ 'USER':0,
+ 'KEY':0
+ }
+
+ if row['modified'] == True:
+ pii = row['secrets']
+ num_piis = len(pii)
+ for x in pii:
+ count_dict[x['type']] += 1
+
+ else:
+ num_piis = 0
+
+ return num_piis, count_dict
+
+
+sample_files = random.sample(datafile, min(10, len(datafile)))
+total_num_piis = 0
+count_dict_all = {
+ 'PHONE_NUMBER': 0,
+ 'IP_ADDRESS':0,
+ 'EMAIL': 0,
+ 'USER':0,
+ 'KEY':0
+ }
+
+for f in sample_files:
+ df = pd.read_parquet(f).sample(1000)
+ for _, row in df.iterrows():
+ num_piis, count_dict = get_stats(row)
+ total_num_piis += num_piis
+ for k, v in count_dict.items():
+ count_dict_all[k] += v
+
+print(count_dict_all)
+
+
+# Check 3: visual check with html
+df= df.sample(100)
+html=""
+num_piis = []
+entities = []
+# summary = 'Total number of samples: '+str(len(samples)) +''
+summary = ""
+
+for _, sample in df.iterrows():
+ #print(sample['meta'])
+ bolded, entity_categories = write_merged_detect_redact_results_to_html(sample)
+
+ try:
+ meta = sample['meta']
+ html += (meta + ''+bolded+"")
+ except:
+ html += '---------------------------'
+ html += ''+bolded+""
+
+ if sample['modified']:
+ # html += ''+redacted+""
+ num_piis.append(len(sample['secrets']))
+ entities.extend(entity_categories)
+
+
+assert sum(num_piis)==len(entities), 'number of entities not match'
+
+
+summary += 'Total number of PIIs: {}'.format(len(entities))
+
+summary += '' + summarize_pii_entities(entities) +''
+
+html = ''+summary+html+""
+
+
+output_path = path + 'validation/'
+if not os.path.exists(output_path):
+ os.mkdir(output_path)
+
+
+output_file = output_path + '{}-pii-validation.html'.format(output)
+f = open(output_file,"w")
+f.write(html)
+f.close()
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_redaction.py b/tools/pii_removal_for_contact_mp/pii_redaction.py
new file mode 100644
index 000000000..bcac8a0f9
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_redaction.py
@@ -0,0 +1,23 @@
+import argparse
+import os
+from multiprocessing import Pool, cpu_count
+from pyrecdp.core.utils import Timer
+from math import ceil
+from tqdm import tqdm
+import json
+from pii_redaction_impl import *
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("-d", dest="data_dir", type=str)
+ parser.add_argument("-o", dest="out_dir", type=str)
+ parser.add_argument("-mp", dest="mp", type=int, default=-1)
+ args = parser.parse_args()
+
+ data_dir = args.data_dir
+ out_dir = args.out_dir
+ n_parallel = args.mp
+
+ with Timer(f"generate hash to {data_dir}"):
+ pii_remove_MP(data_dir, out_dir, n_parallel)
\ No newline at end of file
diff --git a/tools/pii_removal_for_contact_mp/pii_redaction_impl.py b/tools/pii_removal_for_contact_mp/pii_redaction_impl.py
new file mode 100644
index 000000000..9589e7833
--- /dev/null
+++ b/tools/pii_removal_for_contact_mp/pii_redaction_impl.py
@@ -0,0 +1,142 @@
+import argparse
+import os, sys
+from pyrecdp.core.utils import Timer
+import json
+from pyrecdp.primitives.llmutils.utils import get_nchunks_and_nproc, clean_str
+import hashlib
+import pandas as pd
+from tqdm import tqdm
+import subprocess #nosec
+import io
+import re
+import time
+import hashlib
+
+from presidio_analyzer.predefined_recognizers import PhoneRecognizer
+
+import sys, pathlib
+cur_path = str(pathlib.Path(__file__).parent.resolve())
+import_path = os.path.join(cur_path, "pii_detection_redaction", "src")
+print(f"add new import_path: {import_path}")
+sys.path.append(import_path)
+
+from pii_redaction_v2 import *
+
+def pii_removal_impl_parquet_to_parquet(in_file_name, out_file_name, base_file_name):
+ analyzer = PhoneRecognizer()
+ batch = pd.read_parquet(in_file_name).reset_index(drop=True)
+ text = batch['text'].tolist()
+ redacted_content = []
+ modified = []
+ piis = []
+
+ #for txt in tqdm(text, total=len(text), desc = f"process {in_file_name}"):
+ for txt in text:
+ # # for testing exception
+ # if i%5 == 0:
+ # raise ValueError
+ output = detect_redact_pii_for_one_text(txt, analyzer)
+ modified.append(output['modified'])
+ piis.append(output['pii'])
+ if output['pii'] != None: # have PII so output redacted text
+ redacted_content.append(output['redacted'])
+ else: # did not have PII so output original text
+ redacted_content.append(txt)
+
+ batch['text'] = pd.Series(redacted_content)
+ batch['secrets'] = pd.Series(piis)
+ batch['modified'] = pd.Series(modified)
+
+ batch.to_parquet(out_file_name)
+
+# define actual work
+def pii_remove(proc_id, x_list, out_type):
+ #for x in tqdm(x_list, total=len(x_list), desc=f"proc-{proc_id}", position=proc_id+1):
+ for x in x_list:
+ try:
+ in_file_name, out_file_name, base_file_name = x
+ base_file_name = os.path.basename(base_file_name)
+ out_dir = os.path.dirname(out_file_name)
+ os.makedirs(out_dir, exist_ok=True)
+ pii_removal_impl_parquet_to_parquet(in_file_name, out_file_name, base_file_name)
+
+ except Exception as e:
+ with open(f"{out_file_name}.error.log", 'w') as f:
+ f.write(f"Failed to process {base_file_name}, error is {e}")
+ return True
+
+def wait_and_check(pool):
+ for proc_id, (process, cmd) in pool.items():
+ std_out, std_err = process.communicate()
+ rc = process.wait()
+ if rc != 0:
+ file_name = f"pii-redaction-proc-{proc_id}.error.log"
+ print(f"Task failed, please check {file_name} for detail information")
+ with open(file_name, "a") as f:
+ f.write(f"=== {time.ctime()} {' '.join(cmd)} failed. ===\n")
+ f.write(std_err.decode(sys.getfilesystemencoding()))
+ f.write("\n")
+
+def launch_cmdline_mp(args, data_dir, out_dir, mp):
+ pool = {}
+ for arg in tqdm(args, total=len(args), desc="pii redaction"):
+ proc_id, x_list = arg
+ cmd = ["python", "pii_redaction_impl.py", "--proc_id", f"{proc_id}", "--in_dir", f"{data_dir}", "--out_dir", f"{out_dir}", "--file_list", f"{x_list}"]
+ #f.write(' '.join(cmd) + "\n")
+ pool[proc_id] = (subprocess.Popen(cmd , stdout=subprocess.PIPE, stderr=subprocess.PIPE), cmd)
+
+ if len(pool) >= mp:
+ wait_and_check(pool)
+ pool = {}
+
+ wait_and_check(pool)
+
+def get_target_file_list(data_dir, file_type):
+ cmd = ["find", data_dir, "-name", f"*.{file_type}"]
+ proc = subprocess.Popen(cmd, stdout=subprocess.PIPE)
+ stdout, stderr = proc.communicate()
+ exitcode = proc.returncode
+ if exitcode != 0:
+ return []
+ else:
+ ret = stdout.decode("utf-8").split('\n')[:-1]
+ ret = [i.replace(data_dir, "") for i in ret]
+ ret = [i[1:] if i[0] == '/' else i for i in ret]
+ return ret
+
+def pii_remove_MP(data_dir, out_dir, n_part = -1):
+ files = get_target_file_list(data_dir, 'parquet')
+ #print(files)
+
+ if len(files) == 0:
+ print("Detect 0 files, exit here")
+ return
+
+ if n_part != -1:
+ n_proc = n_part
+ else:
+ _, n_proc = get_nchunks_and_nproc(len(files), n_part = n_part)
+ print(f"resetting to {n_proc} for number of processes")
+
+ args = [(idx, [i]) for idx, i in enumerate(files)]
+ launch_cmdline_mp(args, data_dir, out_dir, n_proc)
+
+
+if __name__ == "__main__":
+ parser = argparse.ArgumentParser()
+ parser.add_argument("--proc_id", dest="proc_id", type=int)
+ parser.add_argument("--in_dir", dest="in_dir", type=str)
+ parser.add_argument("--out_dir", dest="out_dir", type=str)
+ parser.add_argument("--file_list", dest="file_list", type=str)
+ args = parser.parse_args()
+
+ proc_id = args.proc_id
+ in_dir = args.in_dir
+ out_dir = args.out_dir
+ in_file_list = eval(args.file_list)
+ out_type = 'parquet'
+
+ file_args = [(os.path.join(in_dir, f_name), os.path.join(out_dir, f"{f_name}.pii_remove.{out_type}"), f_name) for f_name in in_file_list]
+
+ with Timer(f"generate hash index with proc-id {proc_id}"):
+ pii_remove(proc_id, file_args, out_type)