-
Notifications
You must be signed in to change notification settings - Fork 0
/
scrub.py
211 lines (154 loc) · 5.71 KB
/
scrub.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
"""Module to scrub text of all PII/PHI.
Usage:
$ python scrub.py scrub_text str_arg
$ python scrub.py scrub_image Image_arg
"""
from io import BytesIO
from mss import base
from PIL import Image
from presidio_anonymizer.entities import OperatorConfig
import fire
import config
import utils
def scrub_text(text: str, is_hyphenated: bool = False) -> str:
"""
Scrub the text of all PII/PHI using Presidio Analyzer and Anonymizer
Args:
text (str): Text to be scrubbed
Returns:
str: Scrubbed text
"""
if text is None:
return None
if is_hyphenated:
text = ''.join(text.split('-'))
analyzer_results = config.ANALYZER_TRF.analyze(
text=text, entities=config.SCRUBBING_ENTITIES, language="en"
)
operators = {}
for entity in analyzer_results:
operators[entity.entity_type] = OperatorConfig(
"mask",
{
"masking_char": "*",
"chars_to_mask": entity.end - entity.start,
"from_end": True,
},
)
# TODO: remove this print statement after testing
print(
f"Recognized entity: {entity.entity_type} - start: {entity.start} end: {entity.end}"
)
anonymized_results = config.ANONYMIZER.anonymize(
text=text,
analyzer_results=analyzer_results,
operators=operators,
)
return anonymized_results.text
def scrub_image(image: Image, fill_color=config.DEFAULT_SCRUB_FILL_COLOR) -> Image:
"""
Scrub the image of all PII/PHI using Presidio Image Redactor
Args:
image (PIL.Image): A PIL.Image object to be scrubbed
Returns:
PIL.Image: The scrubbed image with PII and PHI removed.
"""
# Redact the image
redacted_image = config.IMAGE_REDACTOR.redact(
image, fill=fill_color, entities=config.SCRUBBING_ENTITIES
)
# Return the redacted image data
return redacted_image
def scrub_screenshot(
screenshot: base.ScreenShot, fill_color=config.DEFAULT_SCRUB_FILL_COLOR
) -> base.ScreenShot:
"""
Scrub the screenshot of all PII/PHI using Presidio Image Redactor
Args:
screenshot (mss.base.ScreenShot): An mss.base.ScreenShot object to be scrubbed
Returns:
mss.base.ScreenShot: The scrubbed screenshot with PII and PHI removed.
"""
# Convert the MSS screenshot object to a PIL Image
image = Image.frombytes("RGBA", screenshot.size, screenshot.bgra, "raw", "BGRA")
# Use the scrub_image function to scrub the image
redacted_image = scrub_image(image, fill_color)
# Convert the redacted PIL Image back into an mss.base.ScreenShot object
raw_data = bytearray(redacted_image.tobytes("raw", "RGB"))
# Prepare monitor information from the original screenshot
monitor_info = {
"left": screenshot.left,
"top": screenshot.top,
"width": screenshot.width,
"height": screenshot.height
}
# Construct a new screenshot with the redacted image data
redacted_screenshot = base.ScreenShot(raw_data, monitor_info)
# Return the redacted screenshot
return redacted_screenshot
def scrub_png_data(png_data: bytes, fill_color=config.DEFAULT_SCRUB_FILL_COLOR) -> bytes:
"""
Scrub the png_data of all PII/PHI using Presidio Image Redactor
Args:
png_data (bytes): PNG data to be scrubbed
Returns:
bytes: Scrubbed PNG data
Raises:
None
"""
# Load image from the input png_data
image = Image.open(BytesIO(png_data))
# Redact the image with red color
redacted_image = config.IMAGE_REDACTOR.redact(
image, fill=fill_color, entities=config.SCRUBBING_ENTITIES
)
# Save the redacted image to an in-memory buffer
output_buffer = BytesIO()
redacted_image.save(output_buffer, format='PNG') # type: ignore
# Get the redacted image data from the buffer
redacted_png_data = output_buffer.getvalue()
# Return the redacted image data
return redacted_png_data
def scrub_dict(input_dict: dict, list_keys: list = None) -> dict:
"""
Scrub the dict of all PII/PHI using Presidio Analyzer and Anonymizer.
Args:
input_dict (dict): A dict to be scrubbed
Returns:
dict: The scrubbed dict with PII and PHI removed.
"""
if list_keys is None:
list_keys = config.SCRUB_KEYS_HTML
scrubbed_dict = {}
for key, value in input_dict.items():
if isinstance(value, str) and key in list_keys:
scrubbed_dict[key] = scrub_text(value)
elif isinstance(value, list):
scrubbed_list = []
for item in value:
if isinstance(item, str) and key in list_keys:
scrubbed_list.append(scrub_text(item))
elif isinstance(item, dict):
scrubbed_list.append(scrub_dict(item, list_keys))
else:
scrubbed_list.append(item)
scrubbed_dict[key] = scrubbed_list
elif isinstance(value, dict):
scrubbed_dict[key] = scrub_dict(value, list_keys)
else:
scrubbed_dict[key] = value
return scrubbed_dict
def scrub_list_dicts(input_list: list[dict], list_keys: list = None) -> list[dict]:
"""
Scrub the list of dicts of all PII/PHI using Presidio Analyzer and Anonymizer.
Args:
input_list (list[dict]): A list of dicts to be scrubbed
Returns:
list[dict]: The scrubbed list of dicts with PII and PHI removed.
"""
scrubbed_list_dicts = []
for input_dict in input_list:
scrubbed_list_dicts.append(scrub_dict(input_dict, list_keys))
return scrubbed_list_dicts
if __name__ == "__main__":
fire.Fire(utils.get_functions(__name__))