From c9cdec11668eedda55dd1f46b9a6566ebf113b09 Mon Sep 17 00:00:00 2001 From: Joannas Yeow Jie Lin Date: Mon, 20 Mar 2023 18:35:15 +0800 Subject: [PATCH 1/3] joannas/get_agg_measurements --- src/get_agg_measurements.py | 113 ++++++++++++++++++++++++++++++++++++ 1 file changed, 113 insertions(+) create mode 100644 src/get_agg_measurements.py diff --git a/src/get_agg_measurements.py b/src/get_agg_measurements.py new file mode 100644 index 0000000..da7c11a --- /dev/null +++ b/src/get_agg_measurements.py @@ -0,0 +1,113 @@ +import neuroblu_postgres as nb +import pandas as pd +import warnings + +def get_aggregated_measurement(scale, version, patients=None, cohort=None, failed_encounter=True, dbname=None, + agg='mode_median'): + + """ + This function returns a filtered summary/total score for a given measurement for a set of patients based on criteria set. + 20/03/2023: this function only supports CGI-S, CGI-I and GAF + + Parameters + ---------- + version: `string` + The version of the schema as defined in OMOP CDM + patients : `integer, array-like of integers` + Patient IDs of interest specified as integer or array-like + collection of integers. + cohort : pandas DataFrame containing ``person_id``, ``start_date``, ``end_date``, and ``cohort_id`` + dbname: `string`, *optional* + The name of the database as defined in ``db.json`` file. + If None, the defaultDB is used. + failed_encounter: `bool` + If False, filters out visits that are 'failed encounters'. + By default True. + agg: 'string' + How the scales should be aggregated. Options available are ``mean``, ``median``,``max``,``min``,``mode_median``,``mode_mean``,``mode_max``,``mode_min`` + ``mode_median``: aggregate using mode first, then median if multiple modes + + Returns + ------- + pandas.DataFrame + Returns a data frame containing the ``person_id`` ,``agg_value`` + + Raises + ------ + TypeError + if ``patients`` is not one of the following - + int, list of int, tuple, numpy array, pandas series of int. + Also if `person_id` in ``cohort`` is not type int. + ValueError + if both ``patients`` and ``cohort`` are None or not None together. + TypeError + if ``cohort`` is supplied and is not a dataframe. + TypeError + if `start_date` and `end_date` in ``cohort`` are not datetime. + ValueError + if ``cohort`` is supplied and does not have the required columns + TypeError + if `failed_encounter` is not type bool + UserWarning + if the input is an empty list of patient IDs + UserWarning + if the scale input does not exist in this dataset. + UserWarning + if the CGIS or CGII value contains any 0s + + """ + # validate aggregation + if agg not in ['mean','median','mode','max','min','mode_median','mode_mean','mode_min','mode_max']: + warnings.warn("Aggregation method not found.") + return None + + # call CGIS, CGII or GAF + + # error: can't call a nb internal function ): + # df = nb._get_measurement_summary(scale=scale, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname, version=version) + + if scale == "CGI-S": + df = nb.get_CGIS(version=version, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname) + + if scale == "CGI-I": + df = nb.get_CGII(version=version, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname) + + if scale == "GAF": + df = nb.get_GAF(version=version, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname) + + + # aggregate the scales (!!! TO-DO - code can be cleaner & more straightforward !!!) + if agg == 'mean': + agg_df = round(df.groupby('person_id')['value'].mean().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + if agg == 'median': + agg_df = round(df.groupby('person_id')['value'].median().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + if agg == 'max': + agg_df = round(df.groupby('person_id')['value'].max().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + if agg == 'min': + agg_df = round(df.groupby('person_id')['value'].min().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + #aggregate using mode first, then median if multiple modes + if agg == 'mode_median': + agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').median().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + #aggregate using mode first, then mean if multiple modes + if agg == 'mode_median': + agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').mean().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + #aggregate using mode first, then min if multiple modes + if agg == 'mode_min': + agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').min().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + #aggregate using mode first, then max if multiple modes + if agg == 'mode_max': + agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').max().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + + assert len(agg_df) == df.person_id.nunique(), "Aggregation error. There shoule be an aggregated value per unique person id." + + # (!!! TO-DO - obtain the first/last measurement date of the aggregated value !!!) + # (!!! TO-DO - further extension: obtain patients with increase/decrease in scale by X within Y period !!!) + + return agg_df \ No newline at end of file From 1795b23f2626c4148e9bbd8b18c05a2d59b7ab2f Mon Sep 17 00:00:00 2001 From: Joannas Yeow Jie Lin Date: Thu, 13 Apr 2023 17:07:41 +0800 Subject: [PATCH 2/3] joannas/get_agg_measurements --- src/get_agg_measurements.py | 101 ++++++++++++++++++++++-------------- 1 file changed, 63 insertions(+), 38 deletions(-) diff --git a/src/get_agg_measurements.py b/src/get_agg_measurements.py index da7c11a..f708ab3 100644 --- a/src/get_agg_measurements.py +++ b/src/get_agg_measurements.py @@ -1,36 +1,47 @@ import neuroblu_postgres as nb +import numpy as np +from datetime import timedelta import pandas as pd import warnings def get_aggregated_measurement(scale, version, patients=None, cohort=None, failed_encounter=True, dbname=None, - agg='mode_median'): + agg='mode_median',change=False): """ - This function returns a filtered summary/total score for a given measurement for a set of patients based on criteria set. + This function returns an aggregated summary/total score for a given measurement for a set of patients based on parameters set. 20/03/2023: this function only supports CGI-S, CGI-I and GAF Parameters ---------- - version: `string` + scale : `str` + The name of the measurement scale being retrieved. + version : `string` The version of the schema as defined in OMOP CDM patients : `integer, array-like of integers` Patient IDs of interest specified as integer or array-like collection of integers. cohort : pandas DataFrame containing ``person_id``, ``start_date``, ``end_date``, and ``cohort_id`` - dbname: `string`, *optional* + dbname : `string`, *optional* The name of the database as defined in ``db.json`` file. If None, the defaultDB is used. - failed_encounter: `bool` + failed_encounter : `bool` If False, filters out visits that are 'failed encounters'. By default True. - agg: 'string' + agg : 'string' How the scales should be aggregated. Options available are ``mean``, ``median``,``max``,``min``,``mode_median``,``mode_mean``,``mode_max``,``mode_min`` ``mode_median``: aggregate using mode first, then median if multiple modes + change : 'boolean' + If True, scale measurements will be aggregated per person_id per measurement day using the selected agg method. + Outputs the change in measurements from the previous available measurement and the number of days between the change. Returns ------- pandas.DataFrame Returns a data frame containing the ``person_id`` ,``agg_value`` + If change = True, returns *patients with at least 2 measurements* and a data frame containing the + ``person_id`` ,``prev_measurement_date``,``prev_value``, + ``measurement_date``,``value``, + ``days_between_change``,``value_change`` Raises ------ @@ -68,46 +79,60 @@ def get_aggregated_measurement(scale, version, patients=None, cohort=None, faile if scale == "CGI-S": df = nb.get_CGIS(version=version, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname) - if scale == "CGI-I": df = nb.get_CGII(version=version, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname) - if scale == "GAF": df = nb.get_GAF(version=version, patients=patients, cohort=cohort, failed_encounter=failed_encounter, dbname=dbname) + # groupby person_id, groupby person_id then measurement date if looking for change in scales. + if change is True: + groupby = ['person_id','measurement_date'] + else: + groupby = 'person_id' + df_groupby = df.groupby(groupby)['value'] - # aggregate the scales (!!! TO-DO - code can be cleaner & more straightforward !!!) + # aggregate the scales if agg == 'mean': - agg_df = round(df.groupby('person_id')['value'].mean().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - + agg_df = df_groupby.mean() if agg == 'median': - agg_df = round(df.groupby('person_id')['value'].median().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - + agg_df = df_groupby.median() if agg == 'max': - agg_df = round(df.groupby('person_id')['value'].max().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - + agg_df = df_groupby.max() if agg == 'min': - agg_df = round(df.groupby('person_id')['value'].min().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - - #aggregate using mode first, then median if multiple modes - if agg == 'mode_median': - agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').median().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - - #aggregate using mode first, then mean if multiple modes - if agg == 'mode_median': - agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').mean().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - - #aggregate using mode first, then min if multiple modes - if agg == 'mode_min': - agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').min().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() + agg_df = df_groupby.min() - #aggregate using mode first, then max if multiple modes - if agg == 'mode_max': - agg_df = round(df.groupby('person_id')['value'].agg(pd.Series.mode).groupby('person_id').max().rename('agg_value',axis=0),0).astype(int).to_frame().reset_index() - - assert len(agg_df) == df.person_id.nunique(), "Aggregation error. There shoule be an aggregated value per unique person id." - - # (!!! TO-DO - obtain the first/last measurement date of the aggregated value !!!) - # (!!! TO-DO - further extension: obtain patients with increase/decrease in scale by X within Y period !!!) - - return agg_df \ No newline at end of file + # aggregate using mode first, + df_groupby_mode = df_groupby.apply(pd.Series.mode).groupby(groupby) + + # then median, mean, min or max if multiple modes + if agg == 'mode_median': + agg_df = df_groupby_mode.median() + if agg == 'mode_median': + agg_df = df_groupby_mode.mean() + if agg == 'mode_min': + agg_df = df_groupby_mode.min() + if agg == 'mode_max': + agg_df = df_groupby_mode.max() + + clean_agg_df = round(agg_df,0).astype(int).to_frame().reset_index() + + if change is False: + assert len(clean_agg_df) == df.person_id.nunique(), "Aggregation error. There shoule be an aggregated value per unique person id." + else: + # obtain patients with increase/decrease in scale + clean_agg_df["prev_person_id"] = clean_agg_df['person_id'].shift(1) + clean_agg_df["prev_measurement_date"] = clean_agg_df['measurement_date'].shift(1) + clean_agg_df["prev_value"] = clean_agg_df['value'].shift(1) + + + clean_agg_df["measurement_date"] = pd.to_datetime(clean_agg_df["measurement_date"]) + clean_agg_df["prev_measurement_date"] = pd.to_datetime(clean_agg_df["prev_measurement_date"]) + + # days between the change + clean_agg_df["days_between_change"] = np.where(clean_agg_df["person_id"] == clean_agg_df["prev_person_id"], (clean_agg_df['measurement_date']-clean_agg_df["prev_measurement_date"]).dt.days,pd.Timedelta("nan")) + # change in value + clean_agg_df["value_change"] = np.where(clean_agg_df["person_id"] == clean_agg_df["prev_person_id"], (clean_agg_df['value']-clean_agg_df["prev_value"]),np.nan) + #filter for patients with at least 2 measurements + clean_agg_df = clean_agg_df[["person_id","prev_measurement_date","prev_value","measurement_date","value","days_between_change","value_change"]].dropna(subset=['value_change']) + + return clean_agg_df \ No newline at end of file From 29ef2bb404b6199bcd30663fa6cac8002d743211 Mon Sep 17 00:00:00 2001 From: Joannas Yeow Jie Lin Date: Tue, 30 May 2023 11:49:16 +0800 Subject: [PATCH 3/3] bug fix --- src/get_agg_measurements.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/get_agg_measurements.py b/src/get_agg_measurements.py index f708ab3..a8da02f 100644 --- a/src/get_agg_measurements.py +++ b/src/get_agg_measurements.py @@ -32,7 +32,7 @@ def get_aggregated_measurement(scale, version, patients=None, cohort=None, faile ``mode_median``: aggregate using mode first, then median if multiple modes change : 'boolean' If True, scale measurements will be aggregated per person_id per measurement day using the selected agg method. - Outputs the change in measurements from the previous available measurement and the number of days between the change. + Outputs the change in measurements from the previous available measurement and the number of days between the change. Returns ------- @@ -107,7 +107,7 @@ def get_aggregated_measurement(scale, version, patients=None, cohort=None, faile # then median, mean, min or max if multiple modes if agg == 'mode_median': agg_df = df_groupby_mode.median() - if agg == 'mode_median': + if agg == 'mode_mean': agg_df = df_groupby_mode.mean() if agg == 'mode_min': agg_df = df_groupby_mode.min()