Skip to content

Commit

Permalink
changed database connection method to use SSM
Browse files Browse the repository at this point in the history
  • Loading branch information
jhou98 committed Jul 23, 2020
1 parent c684e49 commit e7eed6a
Show file tree
Hide file tree
Showing 18 changed files with 5,461 additions and 6,330 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,4 +4,5 @@ sagemaker_code
output/
split.py
dev/
database.ini
database.ini
out.yaml
28 changes: 22 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,19 +1,35 @@
# PHSA MRI CODE

## Preprocessing Data
- [preprocess_data.py](/preprocess/preprocess_data.py): Main python script to preprocess MRI data
## Setup PostgreSQL
- To initialize the database, run the init_db.sql script in the terminal using `psql -U <user> -h <host> -f init_db.sql`. _Note this does assume you have postgresql database named rules._
- The functions expect the database keys to be stored on SSM Parameter Store. You can either use the AWS Console or the following AWS CLI commands:
```bash
aws ssm put-parameter --name /mri-phsa/dbserver --value <host> --type SecureString --overwrite
aws ssm put-parameter --name /mri-phsa/dbname --value <database> --type SecureString --overwrite
aws ssm put-parameter --name /mri-phsa/dbuser --value <user> --type SecureString --overwrite
aws ssm put-parameter --name /mri-phsa/dbpwd --value <password> --type SecureString --overwrite
```

## Preprocessing
- [preprocess_data.py](/preprocess/preprocess_data.py): Main python script to preprocess the requisition data in large batches
- [preprocess_helper.py](/preprocess/preprocess_helper.py): Helper functions
- [sample_output.json](sample_output.json): Sample JSON output for first 100 rows of data of sample.csv
- To create a sample_output.json, in the terminal run `python .\preprocess\preprocess_data.py`

## Generating P-Value
- [init_db.sql](init_db.sql): Script to re-initialize database (wipes out any previous data!)
## Rule Processing
- [rules.py](/rule_processing/rules.py): Main python script to obtain the priority value
- [update_weights.py](/rule_processing/update_weights.py): Creates weighted tokens in the database for the descriptions found in mri_rules
- [config.py](/rule_processing/config.py): Connecting to the postgres database with a custome database.ini file
- To initialize the database, run the init_db.sql script in the terminal using `psql -U {username} -h {host} -f init_db.sql`. _Note this does assume you have a database named rules._
- To update the weighted rule tokens, run `python update_weights.py` in the terminal
- To apply the rules and obtain a Rule ID + P-Value, run `python .\rule_processing\rules.py` in the terminal

## Lambdas
- Python Code used for lambda functions
- To run the cloudformation code, use the following commands:
```
sam package --s3-bucket cic-mri-data-bucket-test --output-template-file out.yaml
sam deploy --template-file out.yaml --capabilities CAPABILITY_IAM CAPABILITY_AUTO_EXPAND --stack-name mri-test-stack
```

## Rule Database Analysis
- [Sample Result](/csv/mri_sample_results_0.2.xlsx): Form to P-Value Data (most recent)
- [Sample Result](/csv/mri_sample_results_0.3.xlsx): Form to P-Value Data (most recent)
Binary file added csv/mri_sample_results_0.3.xlsx
Binary file not shown.
3,489 changes: 0 additions & 3,489 deletions csv/req_data_june.csv

This file was deleted.

4,235 changes: 3,485 additions & 750 deletions csv/requisition_data.csv

Large diffs are not rendered by default.

Binary file added csv/requisition_data_0.3.xlsx
Binary file not shown.
5 changes: 0 additions & 5 deletions database_sample.ini

This file was deleted.

31 changes: 30 additions & 1 deletion init_db.sql
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@
DROP TABLE IF EXISTS data_results;
DROP TABLE IF EXISTS mri_rules;
DROP TABLE IF EXISTS word_weights;
DROP TABLE IF EXISTS conjunctions;
DROP TABLE IF EXISTS key_words;
DROP TABLE IF EXISTS spellchecker;

CREATE TABLE IF NOT EXISTS mri_rules (
id SERIAL PRIMARY KEY,
Expand Down Expand Up @@ -32,6 +35,20 @@ CREATE TABLE IF NOT EXISTS word_weights (
weight VARCHAR(1)
);

CREATE TABLE IF NOT EXISTS conjunctions (
key VARCHAR(8) PRIMARY KEY,
val VARCHAR(16)
);

CREATE TABLE IF NOT EXISTS key_words (
key VARCHAR(16) PRIMARY KEY,
val VARCHAR(16)
);

CREATE TABLE IF NOT EXISTS spellchecker(
word VARCHAR(32) PRIMARY KEY
);

-- SELECT * FROM mri_rules;

INSERT INTO mri_rules(body_part, info, priority) VALUES
Expand Down Expand Up @@ -321,4 +338,16 @@ SET word = TRIM(word);

CREATE INDEX info_weighted_idx
ON mri_rules
USING GIN (info_weighted_tk);
USING GIN (info_weighted_tk);

INSERT INTO conjunctions(key, val) VALUES
('L', 'left'),
('l', 'left'),
('R', 'right'),
('r', 'right');

INSERT INTO key_words(key, val) VALUES
('followup','followup'),
('history','history'),
('hx','medical history'),
('?','query');
84 changes: 59 additions & 25 deletions lambdas/preprocess/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import uuid
from datetime import datetime, date
from spellchecker import SpellChecker
import psycopg2
from psycopg2 import sql

logger = logging.getLogger()
logger.setLevel(logging.INFO)
Expand All @@ -18,7 +20,54 @@

RuleProcessingLambdaName = os.getenv('RULE_PROCESSING_LAMBDA')

def queryTable(conn, table):
cmd = """
SELECT * FROM {}
"""
with conn.cursor() as cur:
cur.execute(sql.SQL(cmd).format(sql.Identifier(table)))
return cur.fetchall()

def connect():
"""
Connect to PostgreSQL
"""
ssm = boto3.client('ssm')
p_dbserver = '/mri-phsa/dbserver'
p_dbname = '/mri-phsa/dbname'
p_dbuser = '/mri-phsa/dbuser'
p_dbpwd = '/mri-phsa/dbpwd'
params = ssm.get_parameters(
Names=[
p_dbserver, p_dbname, p_dbuser, p_dbpwd
],
WithDecryption = True
)
if params['ResponseMetadata']['HTTPStatusCode'] != 200:
print('ParameterStore Error: ', str(params['ResponseMetadata']['HTTPStatusCode']))
sys.exit(1)
for p in params['Parameters']:
if p['Name'] == p_dbserver:
dbserver = p['Value']
elif p['Name'] == p_dbname:
dbname = p['Value']
elif p['Name'] == p_dbuser:
dbuser = p['Value']
elif p['Name'] == p_dbpwd:
dbpwd = p['Value']
logger.info("Trying to connect to postgresql")
conn = psycopg2.connect(host=dbserver, dbname=dbname, user=dbuser, password=dbpwd)
logger.info("Success, connected to PostgreSQL!")
return conn

spell = SpellChecker()
conn = connect()
conj_list = queryTable(conn, "conjunctions")
keyword_list =queryTable(conn, "key_words")
spelling_list = [x[0] for x in queryTable(conn, 'spellchecker')]
conn.close()
# Add words to spell list
spell.word_frequency.load_words(spelling_list)

def convert2CM(height):
if not isinstance(height, str):
Expand Down Expand Up @@ -88,32 +137,17 @@ def anatomySpelling(text: str):
word_list.append(spell.correction(word))
return ' '.join(word_list)

def preProcessAnatomy(text):
dir_list = {
'l': 'left',
'r': 'right',
'L': 'left',
'R': 'right'
}
def preProcessAnatomy(dir_list, text: str):
temp_text = f' {text} '
for direction in dir_list:
if contains_word(direction,text):
temp_text = temp_text.replace(f' {direction} ', f' {dir_list[direction]} ')
if contains_word(direction[0],text):
temp_text = temp_text.replace(f' {direction[0]} ', f' {direction[1]} ')
return temp_text[1:len(temp_text)-1]

def find_additional_info(text:str, info_list):
"""
Will append history and hx seperately, need to format it so this will become standardized?
"""
text_list = {
'followup': 'followup',
'history': 'history',
'hx': 'medical history',
'?': 'query',
}
for i in text_list:
if f'{i}' in f'{text}':
info_list.append(text_list[i])
def find_additional_info(key_list, text:str, info_list):
for i in key_list:
if f'{i[0]}' in f'{text}':
info_list.append(i[1])

def find_all_entities(data: str):
if not data:
Expand Down Expand Up @@ -266,10 +300,10 @@ def handler(event, context):
other_info = []
# Parse the Exam Requested Column into Comprehend Medical to find Anatomy Entities
anatomy_json = find_all_entities(anatomySpelling(f'{data_df["Exam Requested"]}'))
preprocessed_text = preProcessAnatomy(f'{data_df["Reason for Exam/Relevant Clinical History"]}')
preprocessed_text = preProcessAnatomy(conj_list, f'{data_df["Reason for Exam/Relevant Clinical History"]}')

for obj in list(filter(lambda info_list: info_list['Category'] == 'ANATOMY' or info_list['Category'] == 'TEST_TREATMENT_PROCEDURE', anatomy_json)):
anatomy = obj['Text'].lower()
anatomy = preProcessAnatomy(conj_list, obj['Text'].lower())
anatomy_list.append(anatomy)
# if(contains_word('hip',anatomy) or contains_word('knee', anatomy)):
# # apply comprehend to knee/hip column
Expand All @@ -279,7 +313,7 @@ def handler(event, context):
# formatted_df['Spine'][row] = find_entities(f'{data_df["Appropriateness Checklist - Spine"][row]}')
infer_icd10_cm(preprocessed_text, medical_conditions, diagnosis, symptoms)
find_key_phrases(preprocessed_text, key_phrases, medical_conditions+diagnosis+symptoms, anatomy_list)
find_additional_info(preprocessed_text, other_info)
find_additional_info(keyword_list, preprocessed_text, other_info)
formatted_df['anatomy'] = anatomy_list
formatted_df['medical_condition'] = medical_conditions
formatted_df['diagnosis'] = diagnosis
Expand Down
53 changes: 34 additions & 19 deletions lambdas/rule_processing/index.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,11 @@
import json
import time
from configparser import ConfigParser
import psycopg2
import logging
import boto3

logger = logging.getLogger()
logger.setLevel(logging.INFO)

def config():
db = {
"host": "mri-dev.cluster-cfp0qt1gjcdb.ca-central-1.rds.amazonaws.com",
"dbname": "rules",
"user": "postgres",
"password": ".Medicine01"
}
return db

insert_cmd = """
INSERT INTO data_results(id, info, init_priority) VALUES
Expand Down Expand Up @@ -64,21 +55,45 @@ def searchText(data, *data_keys):
value_set.add(word)
return (' | ').join(value_set)

""" Connect to the PostgreSQL database server """
try:
# read the connection parameters
params = config()
def connect():
"""
Connect to PostgreSQL
"""
ssm = boto3.client('ssm')
p_dbserver = '/mri-phsa/dbserver'
p_dbname = '/mri-phsa/dbname'
p_dbuser = '/mri-phsa/dbuser'
p_dbpwd = '/mri-phsa/dbpwd'
logger.info("Grabbing Parameters")
params = ssm.get_parameters(
Names=[
p_dbserver, p_dbname, p_dbuser, p_dbpwd
],
WithDecryption = True
)
if params['ResponseMetadata']['HTTPStatusCode'] != 200:
print('ParameterStore Error: ', str(params['ResponseMetadata']['HTTPStatusCode']))
sys.exit(1)
logger.info("Finished Grabbing Parameters")

for p in params['Parameters']:
if p['Name'] == p_dbserver:
dbserver = p['Value']
elif p['Name'] == p_dbname:
dbname = p['Value']
elif p['Name'] == p_dbuser:
dbuser = p['Value']
elif p['Name'] == p_dbpwd:
dbpwd = p['Value']
logger.info("Trying to connect to postgresql")
logger.info(params)
# connect to the PostgreSQL server
conn = psycopg2.connect(**params)
conn = psycopg2.connect(host=dbserver, dbname=dbname, user=dbuser, password=dbpwd)
logger.info("Success, connected to PostgreSQL!")
except (Exception, psycopg2.DatabaseError) as error:
logger.info(error)
return conn

def handler(event, context):
logger.info(event)
v = event
conn = connect()
with conn.cursor() as cur:
# insert into data_results one by one
if "anatomy" not in v.keys():
Expand Down
21 changes: 14 additions & 7 deletions preprocess/preprocess_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,11 @@
from preprocess_helper import *
import string
import time
import uuid

start = time.time()

data_df = pd.read_csv('./csv/req_data_june.csv', skip_blank_lines=True).fillna({'Req # CIO': '-1'})
data_df = pd.read_csv('./csv/requisition_data.csv', skip_blank_lines=True).fillna({'Req # CIO': '-1'})
# Format columns that don't need comprehend medical and preprocess the text
data_df['CIO_ID'] = data_df['Req # CIO']
data_df['age'] = data_df['DOB \r\n(yyyy-mm-dd)'].apply(dob2age)
Expand All @@ -30,6 +31,12 @@
formatted_df.loc[:,'phrases'] = ''
formatted_df.loc[:,'other_info'] = ''

# Obtain data from postgres
conn = connect()
conj_list = queryTable(conn, "conjunctions")
keyword_list =queryTable(conn, "key_words")
conn.close()

for row in range(len(formatted_df.index)):
print("row is :", row)
anatomy_list = []
Expand All @@ -40,16 +47,16 @@
other_info = []

# Create a UUID for CIO ID if there isn't one already
formatted_df.loc[row,'CIO_ID'] = findId(formatted_df['CIO_ID'][row])
# Change Unidentified priority to code UND
formatted_df.loc[row,'priority'] = findUnidentified(formatted_df['priority'][row])
formatted_df.loc[row,'CIO_ID'] = findId(f'{formatted_df["CIO_ID"][row]}')
# Change Unidentified priority to code U/I
formatted_df.loc[row,'priority'] = findUnidentified(f'{formatted_df["priority"][row]}')

# Parse the Exam Requested Column into Comprehend Medical to find Anatomy Entities
anatomy_json = find_all_entities(anatomySpelling(f'{data_df["Exam Requested"][row]}'))
preprocessed_text = preProcessAnatomy(f'{data_df["Reason for Exam/Relevant Clinical History"][row]}')
preprocessed_text = preProcessAnatomy(conj_list, f'{data_df["Reason for Exam/Relevant Clinical History"][row]}')
for obj in list(filter(lambda info_list: info_list['Category'] == 'ANATOMY' or info_list['Category'] == 'TEST_TREATMENT_PROCEDURE', anatomy_json)):
# print("--Body Part Identified: ", obj['Text'])
anatomy = preProcessAnatomy(obj['Text'].lower())
anatomy = preProcessAnatomy(conj_list, obj['Text'].lower())
anatomy_list.append(anatomy)
# if(contains_word('hip',anatomy) or contains_word('knee', anatomy)):
# # apply comprehend to knee/hip column
Expand All @@ -60,7 +67,7 @@

infer_icd10_cm(preprocessed_text, medical_conditions, diagnosis, symptoms)
find_key_phrases(preprocessed_text, key_phrases, medical_conditions+diagnosis+symptoms, anatomy_list)
find_additional_info(preprocessed_text, other_info)
find_additional_info(keyword_list, preprocessed_text, other_info)

formatted_df['anatomy'][row] = anatomy_list
formatted_df['medical_condition'][row] = medical_conditions
Expand Down
Loading

0 comments on commit e7eed6a

Please sign in to comment.