Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SQLite python improvements #1784

Merged
merged 3 commits into from
Aug 17, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
184 changes: 160 additions & 24 deletions mimic-iv/buildmimic/sqlite/import.py
Original file line number Diff line number Diff line change
@@ -1,35 +1,171 @@
from argparse import ArgumentParser
import json
import os
from pathlib import Path
import sqlite3
import sys

from glob import glob
import typing as t
import pandas as pd

DATABASE_NAME = "mimic4.db"
THRESHOLD_SIZE = 5 * 10**7
CHUNKSIZE = 10**6

if os.path.exists(DATABASE_NAME):
msg = "File {} already exists.".format(DATABASE_NAME)
print(msg)
sys.exit()

with sqlite3.Connection(DATABASE_NAME) as connection:
for f in glob("**/*.csv*", recursive=True):
print("Starting processing {}".format(f))
folder, filename = os.path.split(f)
tablename = filename.lower()
if tablename.endswith('.gz'):
tablename = tablename[:-3]
if tablename.endswith('.csv'):
tablename = tablename[:-4]
if os.path.getsize(f) < THRESHOLD_SIZE:
df = pd.read_csv(f)
df.to_sql(tablename, connection)
_MIMIC_TABLES = (
# hospital EHR derived tables
'admissions',
'd_hcpcs',
'd_icd_diagnoses',
'd_icd_procedures',
'd_labitems',
'diagnoses_icd',
'drgcodes',
'emar',
'emar_detail',
'hcpcsevents',
'labevents',
'microbiologyevents',
'omr',
'patients',
'pharmacy',
'poe',
'poe_detail',
'prescriptions',
'procedures_icd',
'provider',
'services',
'transfers',
# ICU derived tables
'caregiver',
'chartevents',
'd_items',
'datetimeevents',
'icustays',
'ingredientevents',
'inputevents',
'outputevents',
'procedureevents',
)

def process_dataframe(df: pd.DataFrame, subjects: t.Optional[t.List[int]] = None) -> pd.DataFrame:
for c in df.columns:
if c.endswith('time') or c.endswith('date'):
df[c] = pd.to_datetime(df[c], format='ISO8601')

if subjects is not None and 'subject_id' in df:
df = df.loc[df['subject_id'].isin(subjects)]

return df

def main():
argparser = ArgumentParser()
argparser.add_argument(
'--limit', type=int, default=0,
help='Restrict the database to the first N subject_id.'
)
argparser.add_argument(
'--data_dir', type=str, default='.',
help='Path to the directory containing the MIMIC-IV CSV files.'
)
argparser.add_argument(
'--overwrite', action='store_true',
help='Overwrite existing mimic4.db file.'
)
args = argparser.parse_args()

# validate that we can find all the files
data_dir = Path(args.data_dir).resolve()
data_files = list(data_dir.rglob('**/*.csv*'))
if not data_files:
print(f"No CSV files found in {data_dir}")
sys.exit()

# remove suffixes from data files -> also lower case tablenames
# creates index aligned array for data files
tablenames = []
for f in data_files:
while f.suffix.lower() in {'.csv', '.gz'}:
f = f.with_suffix('')
tablenames.append(f.stem.lower())

# check that all the expected tables are present
expected_tables = set([t for t in tablenames])
missing_tables = set(_MIMIC_TABLES) - expected_tables
if missing_tables:
print(expected_tables)
print(f"Missing tables: {missing_tables}")
sys.exit()

pt = None
subjects = None
if args.limit > 0:
for f in data_files:
if 'patients' in f.name:
pt = pd.read_csv(f)
break
if pt is None:
raise FileNotFoundError('Unable to find a patients file in current folder.')

pt = pt[['subject_id']].sort_values('subject_id').head(args.limit)
subjects = set(sorted(pt['subject_id'].tolist())[:args.limit])
print(f'Limiting to {len(subjects)} subjects.')

if os.path.exists(DATABASE_NAME):
if args.overwrite:
os.remove(DATABASE_NAME)
else:
# If the file is too large, let's do the work in chunks
for chunk in pd.read_csv(f, chunksize=CHUNKSIZE, low_memory=False):
chunk.to_sql(tablename, connection, if_exists="append")
print("Finished processing {}".format(f))
msg = "File {} already exists.".format(DATABASE_NAME)
print(msg)
sys.exit()

# For a subset of columns, we specify the data types to ensure
# pandas loads the data correctly.
mimic_dtypes = {
"subject_id": pd.Int64Dtype(),
"hadm_id": pd.Int64Dtype(),
"stay_id": pd.Int64Dtype(),
"caregiver_id": pd.Int64Dtype(),
"provider_id": str,
"category": str, # d_hcpcs
"parent_field_ordinal": str,
"pharmacy_id": pd.Int64Dtype(),
"emar_seq": pd.Int64Dtype(),
"poe_seq": pd.Int64Dtype(),
"ndc": str,
"doses_per_24_hrs": pd.Int64Dtype(),
"drg_code": str,
"org_itemid": pd.Int64Dtype(),
"isolate_num": pd.Int64Dtype(),
"quantity": str,
"ab_itemid": pd.Int64Dtype(),
"dilution_text": str,
"warning": pd.Int64Dtype(),
"valuenum": float,
}

row_counts = {t: 0 for t in set(tablenames) | set(_MIMIC_TABLES)}
with sqlite3.Connection(DATABASE_NAME) as connection:
for i, f in enumerate(data_files):
tablename = tablenames[i]
print("Starting processing {}".format(tablename), end='.. ')
if os.path.getsize(f) < THRESHOLD_SIZE:
df = pd.read_csv(f, dtype=mimic_dtypes)
df = process_dataframe(df, subjects=subjects)
df.to_sql(tablename, connection, index=False)
row_counts[tablename] += len(df)
else:
# If the file is too large, let's do the work in chunks
for chunk in pd.read_csv(f, chunksize=CHUNKSIZE, low_memory=False, dtype=mimic_dtypes):
chunk = process_dataframe(chunk)
chunk.to_sql(tablename, connection, if_exists="append", index=False)
row_counts[tablename] += len(chunk)
print("done!")

print("Should be all done! Row counts of loaded data:\n")

print(json.dumps(row_counts, indent=2))



print("Should be all done!")
if __name__ == '__main__':
main()
Loading