-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge pull request #1784 from MIT-LCP/sqlite_python_improvements
SQLite python improvements
- Loading branch information
Showing
1 changed file
with
160 additions
and
24 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,35 +1,171 @@ | ||
from argparse import ArgumentParser | ||
import json | ||
import os | ||
from pathlib import Path | ||
import sqlite3 | ||
import sys | ||
|
||
from glob import glob | ||
import typing as t | ||
import pandas as pd | ||
|
||
DATABASE_NAME = "mimic4.db" | ||
THRESHOLD_SIZE = 5 * 10**7 | ||
CHUNKSIZE = 10**6 | ||
|
||
if os.path.exists(DATABASE_NAME): | ||
msg = "File {} already exists.".format(DATABASE_NAME) | ||
print(msg) | ||
sys.exit() | ||
|
||
with sqlite3.Connection(DATABASE_NAME) as connection: | ||
for f in glob("**/*.csv*", recursive=True): | ||
print("Starting processing {}".format(f)) | ||
folder, filename = os.path.split(f) | ||
tablename = filename.lower() | ||
if tablename.endswith('.gz'): | ||
tablename = tablename[:-3] | ||
if tablename.endswith('.csv'): | ||
tablename = tablename[:-4] | ||
if os.path.getsize(f) < THRESHOLD_SIZE: | ||
df = pd.read_csv(f) | ||
df.to_sql(tablename, connection) | ||
_MIMIC_TABLES = ( | ||
# hospital EHR derived tables | ||
'admissions', | ||
'd_hcpcs', | ||
'd_icd_diagnoses', | ||
'd_icd_procedures', | ||
'd_labitems', | ||
'diagnoses_icd', | ||
'drgcodes', | ||
'emar', | ||
'emar_detail', | ||
'hcpcsevents', | ||
'labevents', | ||
'microbiologyevents', | ||
'omr', | ||
'patients', | ||
'pharmacy', | ||
'poe', | ||
'poe_detail', | ||
'prescriptions', | ||
'procedures_icd', | ||
'provider', | ||
'services', | ||
'transfers', | ||
# ICU derived tables | ||
'caregiver', | ||
'chartevents', | ||
'd_items', | ||
'datetimeevents', | ||
'icustays', | ||
'ingredientevents', | ||
'inputevents', | ||
'outputevents', | ||
'procedureevents', | ||
) | ||
|
||
def process_dataframe(df: pd.DataFrame, subjects: t.Optional[t.List[int]] = None) -> pd.DataFrame: | ||
for c in df.columns: | ||
if c.endswith('time') or c.endswith('date'): | ||
df[c] = pd.to_datetime(df[c], format='ISO8601') | ||
|
||
if subjects is not None and 'subject_id' in df: | ||
df = df.loc[df['subject_id'].isin(subjects)] | ||
|
||
return df | ||
|
||
def main(): | ||
argparser = ArgumentParser() | ||
argparser.add_argument( | ||
'--limit', type=int, default=0, | ||
help='Restrict the database to the first N subject_id.' | ||
) | ||
argparser.add_argument( | ||
'--data_dir', type=str, default='.', | ||
help='Path to the directory containing the MIMIC-IV CSV files.' | ||
) | ||
argparser.add_argument( | ||
'--overwrite', action='store_true', | ||
help='Overwrite existing mimic4.db file.' | ||
) | ||
args = argparser.parse_args() | ||
|
||
# validate that we can find all the files | ||
data_dir = Path(args.data_dir).resolve() | ||
data_files = list(data_dir.rglob('**/*.csv*')) | ||
if not data_files: | ||
print(f"No CSV files found in {data_dir}") | ||
sys.exit() | ||
|
||
# remove suffixes from data files -> also lower case tablenames | ||
# creates index aligned array for data files | ||
tablenames = [] | ||
for f in data_files: | ||
while f.suffix.lower() in {'.csv', '.gz'}: | ||
f = f.with_suffix('') | ||
tablenames.append(f.stem.lower()) | ||
|
||
# check that all the expected tables are present | ||
expected_tables = set([t for t in tablenames]) | ||
missing_tables = set(_MIMIC_TABLES) - expected_tables | ||
if missing_tables: | ||
print(expected_tables) | ||
print(f"Missing tables: {missing_tables}") | ||
sys.exit() | ||
|
||
pt = None | ||
subjects = None | ||
if args.limit > 0: | ||
for f in data_files: | ||
if 'patients' in f.name: | ||
pt = pd.read_csv(f) | ||
break | ||
if pt is None: | ||
raise FileNotFoundError('Unable to find a patients file in current folder.') | ||
|
||
pt = pt[['subject_id']].sort_values('subject_id').head(args.limit) | ||
subjects = set(sorted(pt['subject_id'].tolist())[:args.limit]) | ||
print(f'Limiting to {len(subjects)} subjects.') | ||
|
||
if os.path.exists(DATABASE_NAME): | ||
if args.overwrite: | ||
os.remove(DATABASE_NAME) | ||
else: | ||
# If the file is too large, let's do the work in chunks | ||
for chunk in pd.read_csv(f, chunksize=CHUNKSIZE, low_memory=False): | ||
chunk.to_sql(tablename, connection, if_exists="append") | ||
print("Finished processing {}".format(f)) | ||
msg = "File {} already exists.".format(DATABASE_NAME) | ||
print(msg) | ||
sys.exit() | ||
|
||
# For a subset of columns, we specify the data types to ensure | ||
# pandas loads the data correctly. | ||
mimic_dtypes = { | ||
"subject_id": pd.Int64Dtype(), | ||
"hadm_id": pd.Int64Dtype(), | ||
"stay_id": pd.Int64Dtype(), | ||
"caregiver_id": pd.Int64Dtype(), | ||
"provider_id": str, | ||
"category": str, # d_hcpcs | ||
"parent_field_ordinal": str, | ||
"pharmacy_id": pd.Int64Dtype(), | ||
"emar_seq": pd.Int64Dtype(), | ||
"poe_seq": pd.Int64Dtype(), | ||
"ndc": str, | ||
"doses_per_24_hrs": pd.Int64Dtype(), | ||
"drg_code": str, | ||
"org_itemid": pd.Int64Dtype(), | ||
"isolate_num": pd.Int64Dtype(), | ||
"quantity": str, | ||
"ab_itemid": pd.Int64Dtype(), | ||
"dilution_text": str, | ||
"warning": pd.Int64Dtype(), | ||
"valuenum": float, | ||
} | ||
|
||
row_counts = {t: 0 for t in set(tablenames) | set(_MIMIC_TABLES)} | ||
with sqlite3.Connection(DATABASE_NAME) as connection: | ||
for i, f in enumerate(data_files): | ||
tablename = tablenames[i] | ||
print("Starting processing {}".format(tablename), end='.. ') | ||
if os.path.getsize(f) < THRESHOLD_SIZE: | ||
df = pd.read_csv(f, dtype=mimic_dtypes) | ||
df = process_dataframe(df, subjects=subjects) | ||
df.to_sql(tablename, connection, index=False) | ||
row_counts[tablename] += len(df) | ||
else: | ||
# If the file is too large, let's do the work in chunks | ||
for chunk in pd.read_csv(f, chunksize=CHUNKSIZE, low_memory=False, dtype=mimic_dtypes): | ||
chunk = process_dataframe(chunk) | ||
chunk.to_sql(tablename, connection, if_exists="append", index=False) | ||
row_counts[tablename] += len(chunk) | ||
print("done!") | ||
|
||
print("Should be all done! Row counts of loaded data:\n") | ||
|
||
print(json.dumps(row_counts, indent=2)) | ||
|
||
|
||
|
||
print("Should be all done!") | ||
if __name__ == '__main__': | ||
main() |