Skip to content

Commit

Permalink
Merge pull request #29 from mitre/metadata-handling-rebase
Browse files Browse the repository at this point in the history
Add metadata guardrail to PPRL processes
  • Loading branch information
radamson authored Sep 30, 2022
2 parents 6579224 + 51cb3dd commit de64b37
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 41 deletions.
21 changes: 12 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ A description of the properties in the file:
- **entity_service_url** - The RESTful service endpoint for the
anonlink-entity-service.
- **matching_threshold** - The threshold for considering a potential set of
records a match when comparing in anonlink.
records a match when comparing in anonlink. This can either be a single number between 0 and 1 or a list of numbers between 0 and 1
- **mongo_uri** - The URI to use when connecting to MongoDB to store or access
results. For details on the URI structure, consult the
[Connection String URI Format documentation](https://docs.mongodb.com/manual/reference/connection-string/)
Expand Down Expand Up @@ -208,19 +208,19 @@ for individuals:

```
output/
site_a.csv
site_b.csv
site_c.csv
site_a.zip
site_b.zip
site_c.zip
...
```

And the second example, for households:

```
output/
site_a_households.csv
site_b_households.csv
site_c_households.csv
site_a_households.zip
site_b_households.zip
site_c_households.zip
...
```

Expand Down Expand Up @@ -248,8 +248,11 @@ This project is a set of python scripts driven by a central configuration file,
matching information and use it to generate LINK_IDs, which are written to a
CSV file in the configured results folder.
1. Once all LINK_IDs have been created, run `data_owner_ids.py` which will
create a file per data owner that can be sent with only information on their
LINK_IDs.
create one ZIP file per data owner. That file will contain a metadata file
and a CSV file with only information on their LINK_IDs.

`projects.py`, `match.py` and `link_ids.py` will also generate JSON metadata
files that contain information about the corresponding process.

#### Example system folder hierarchy:

Expand Down
17 changes: 17 additions & 0 deletions config-metadata.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"systems": ["site_a", "site_b", "site_c", "site_d", "site_e", "site_f"],
"projects": ["name-sex-dob-phone", "name-sex-dob-zip", "name-sex-dob-addr"],
"matching_threshold": [0.75, 0.75, 0.75],
"schema_folder": "test-data/envs/small/schema",
"inbox_folder": "test-data/envs/metadata-test/inbox",
"project_results_folder": "test-data/envs/metadata-test/project_results",
"matching_results_folder": "test-data/envs/metadata-test/results",
"output_folder": "test-data/envs/metadata-test/output",
"entity_service_url": "http://localhost:8851/api/v1",
"mongo_uri": "localhost:27017",
"blocked": false,
"blocking_schema": "test-data/envs/small/schema/blocking-schema/lambda.json",
"household_match": false,
"household_threshold": 0.65,
"household_schema": "test-data/envs/small/schema/household-schema/fn-phone-addr-zip.json"
}
103 changes: 93 additions & 10 deletions data_owner_ids.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

import argparse
import csv
import json
import os
import shutil
import uuid
import zipfile
from datetime import datetime
from pathlib import Path

from dcctools.config import Configuration
Expand All @@ -18,30 +24,107 @@ def parse_args():
return args


def process_csv(csv_path, system_csv_path, system):
with open(csv_path) as csvfile:
def zip_and_clean(system_output_path, system, timestamp, household_match):
if household_match:
zip_path = system_output_path.parent / f"{system}_households.zip"
else:
zip_path = system_output_path.parent / f"{system}.zip"
with zipfile.ZipFile(zip_path, mode="w") as system_archive:
if household_match:
system_archive.write(
system_output_path / "households" / f"{system}_households.csv",
arcname=str(Path("output") / "households" / f"{system}_households.csv"),
)
else:
system_archive.write(
system_output_path / f"{system}.csv",
arcname=str(Path("output") / f"{system}.csv"),
)
system_archive.write(
system_output_path / f"{system}-metadata{timestamp}.json",
arcname=str(Path("output") / f"{system}-metadata{timestamp}.json"),
)

print(zip_path.name, "created")
shutil.rmtree(system_output_path)
print("Uncompressed directory removed")


def process_output(link_id_path, output_path, system, metadata, household_match):
data_owner_id_time = datetime.now()
timestamp = data_owner_id_time.strftime("%Y%m%dT%H%M%S")
n_rows = 0
with open(link_id_path) as csvfile:
reader = csv.DictReader(csvfile)
with open(system_csv_path, "w", newline="") as system_csvfile:
if household_match:
csv_path = output_path / "households" / f"{system}_households.csv"
else:
csv_path = output_path / f"{system}.csv"

with open(csv_path, "w", newline="") as system_csvfile:
writer = csv.DictWriter(system_csvfile, fieldnames=["LINK_ID", system])
writer.writeheader()
for row in reader:
if len(row[system]) > 0:
n_rows += 1
writer.writerow({"LINK_ID": row["LINK_ID"], system: row[system]})
system_metadata = {
"link_id_metadata": {
"creation_date": metadata["creation_date"],
"uuid1": metadata["uuid1"],
},
"input_system_metadata": {
key: val for key, val in metadata["input_system_metadata"][system].items()
},
"output_system_metadata": {
"creation_date": data_owner_id_time.isoformat(),
"number_of_records": n_rows,
"uuid1": str(uuid.uuid1()),
},
}
with open(
output_path / f"{system}-metadata{timestamp}.json", "w", newline=""
) as system_metadata_file:
json.dump(system_metadata, system_metadata_file, indent=2)
return timestamp


def do_data_owner_ids(c):
if c.household_match:
csv_path = Path(c.matching_results_folder) / "household_link_ids.csv"
link_ids = sorted(
Path(c.matching_results_folder).glob("household_link_ids*.csv")
)
else:
link_ids = sorted(Path(c.matching_results_folder).glob("link_ids*.csv"))

if len(link_ids) > 1:
print("More than one link_id file found")
print(link_ids)
link_id_times = [
datetime.strptime(x.name[-19:-4], "%Y%m%dT%H%M%S") for x in link_ids
]
most_recent = link_ids[link_id_times.index(max(link_id_times))]
print(f"Using most recent link_id file: {most_recent}")

link_id_path = most_recent
else:
csv_path = Path(c.matching_results_folder) / "link_ids.csv"
link_id_path = link_ids[0]

link_id_metadata_name = link_id_path.parent / link_id_path.name.replace(
"link_ids", "link_id-metadata"
).replace(".csv", ".json")
with open(link_id_metadata_name) as metadata_file:
metadata = json.load(metadata_file)

for system in c.systems:
system_output_path = Path(c.output_folder) / "output"
os.makedirs(system_output_path, exist_ok=True)
if c.household_match:
system_csv_path = Path(c.output_folder) / "{}_households.csv".format(system)
else:
system_csv_path = Path(c.output_folder) / "{}.csv".format(system)
process_csv(csv_path, system_csv_path, system)
print(f"{system_csv_path} created")
os.makedirs(system_output_path / "households", exist_ok=True)
timestamp = process_output(
link_id_path, system_output_path, system, metadata, c.household_match
)
zip_and_clean(system_output_path, system, timestamp, c.household_match)


if __name__ == "__main__":
Expand Down
63 changes: 59 additions & 4 deletions dcctools/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import json
import os
import sys
import zipfile
from datetime import datetime, timedelta
from pathlib import Path
from zipfile import ZipFile

Expand Down Expand Up @@ -41,10 +43,40 @@ def validate_config(self):
)
return config_issues

def validate_metadata(self, system_path):
metadata_issues = []
with zipfile.ZipFile(system_path) as archive:
metadata_files = []
for fname in archive.namelist():
if "metadata" in fname:
metadata_files.append(fname)
anchor = fname.rfind("T")
mname = fname[(anchor - 8) : (anchor + 7)]
timestamp = datetime.strptime(mname, "%Y%m%dT%H%M%S")
with archive.open(fname, "r") as metadata_fp:
metadata = json.load(metadata_fp)
garble_time = datetime.fromisoformat(metadata["creation_date"])
if (garble_time - timestamp) >= timedelta(seconds=1):
metadata_issues.append(
f"{system_path.name} metadata timecode {timestamp} does "
"not match listed garble time {garble_time}"
)
if len(metadata_files) == 0:
metadata_issues.append(
f"could not find metadata file within {system_path.name}"
)
elif len(metadata_files) > 1:
metadata_issues.append(
f"Too many metadata files found in {system_path.name}:"
+ "\n\t".join([metadata_file for metadata_file in metadata_files])
)
return metadata_issues

def validate_all_present(self):
missing_paths = []
expected_paths = []
unexpected_paths = []
metadata_issues = []
root_path = Path(self.filename).parent
inbox_folder = root_path / self.config_json["inbox_folder"]
for s in self.config_json["systems"]:
Expand All @@ -54,12 +86,16 @@ def validate_all_present(self):
expected_paths.append(household_zip_path)
if not os.path.exists(household_zip_path):
missing_paths.append(household_zip_path)
else:
metadata_issues.extend(self.validate_metadata(household_zip_path))
if os.path.exists(system_zip_path):
unexpected_paths.append(system_zip_path)
else:
expected_paths.append(system_zip_path)
if not os.path.exists(system_zip_path):
missing_paths.append(system_zip_path)
else:
metadata_issues.extend(self.validate_metadata(system_zip_path))
if os.path.exists(household_zip_path):
unexpected_paths.append(household_zip_path)
if self.config_json["blocked"]:
Expand All @@ -85,7 +121,7 @@ def validate_all_present(self):
path_to_check = root_path / getattr(self, d)
if not os.path.exists(path_to_check):
missing_paths.append(path_to_check)
return (set(missing_paths), set(unexpected_paths))
return set(missing_paths), set(unexpected_paths), metadata_issues

@property
def systems(self):
Expand Down Expand Up @@ -124,6 +160,17 @@ def extract_blocks(self, system):
with ZipFile(block_zip_path, mode="r") as block_zip:
block_zip.extractall(Path(self.config_json["inbox_folder"]) / system)

def get_metadata(self, system):
archive_name = (
f"{system}_households.zip" if self.household_match else f"{system}.zip"
)
archive_path = Path(self.config_json["inbox_folder"]) / archive_name
with ZipFile(archive_path, mode="r") as archive:
for file_name in archive.namelist():
if "metadata" in file_name:
with archive.open(file_name) as metadata_file:
return json.load(metadata_file)

def get_clk(self, system, project):
clk_path = (
Path(self.config_json["inbox_folder"])
Expand All @@ -136,7 +183,11 @@ def get_clks_raw(self, system, project):
clks = None
clk_zip_path = Path(self.config_json["inbox_folder"]) / "{}.zip".format(system)
with ZipFile(clk_zip_path, mode="r") as clk_zip:
with clk_zip.open("output/{}.json".format(project)) as clk_file:
for file_name in clk_zip.namelist():
if f"{project}.json" in file_name:
project_file = file_name
break
with clk_zip.open(project_file) as clk_file:
clks = clk_file.read()
return clks

Expand All @@ -146,7 +197,11 @@ def get_household_clks_raw(self, system, schema):
self.config_json["inbox_folder"]
) / "{}_households.zip".format(system)
with ZipFile(clk_zip_path, mode="r") as clk_zip:
with clk_zip.open("output/households/{}.json".format(schema)) as clk_file:
for file_name in clk_zip.namelist():
if f"{schema}.json" in file_name:
project_file = file_name
break
with clk_zip.open(project_file) as clk_file:
clks = clk_file.read()
return clks

Expand Down Expand Up @@ -190,7 +245,7 @@ def get_project_threshold(self, project_name):
else self.matching_threshold
)
if type(config_threshold) == list:
project_index = config_threshold.index(project_name)
project_index = self.projects.index(project_name)
threshold = config_threshold[project_index]
else:
threshold = config_threshold
Expand Down
Loading

0 comments on commit de64b37

Please sign in to comment.