From 4ac5a0d13cab664594155065655e45b1f5fbf9cc Mon Sep 17 00:00:00 2001 From: AminTorabi-NOAA Date: Wed, 31 Jul 2024 12:39:08 -0400 Subject: [PATCH 1/3] speeding up Parquet_output function by vectorizing --- src/troute-nwm/src/nwm_routing/output.py | 61 +++++++++++++++++------- 1 file changed, 43 insertions(+), 18 deletions(-) diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 38a6e6547..de8747daf 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -64,27 +64,52 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref -------- - timeseries_df (DataFrame): Converted timeseries data frame ''' + variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"} + variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"} + # Prepare the location_id with prefix df.index.name = 'location_id' df.reset_index(inplace=True) - timeseries_df = df.melt(id_vars=['location_id'], var_name='var') - timeseries_df['var'] = timeseries_df['var'].astype('string') - timeseries_df[['timestep', 'variable']] = timeseries_df['var'].str.strip("()").str.split(",", n=1, expand=True) - timeseries_df['variable'] = timeseries_df['variable'].str.strip().str.replace("'", "") - timeseries_df['timestep'] = timeseries_df['timestep'].astype('int') - timeseries_df['value_time'] = (start_datetime + pd.to_timedelta(timeseries_df['timestep'] * dt, unit='s')) - variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"} - timeseries_df["variable_name"] = timeseries_df["variable"].map(variable_to_name_map) - timeseries_df.drop(['var', 'timestep', 'variable'], axis=1, inplace=True) - timeseries_df['configuration'] = configuration - variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"} - timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map) - timeseries_df['reference_time'] = start_datetime.date() - timeseries_df['location_id'] = timeseries_df['location_id'].astype('string') - timeseries_df['location_id'] = prefix_ids + '-' + timeseries_df['location_id'] - timeseries_df['value'] = timeseries_df['value'].astype('double') - timeseries_df['reference_time'] = timeseries_df['reference_time'].astype('datetime64[us]') - timeseries_df['value_time'] = timeseries_df['value_time'].astype('datetime64[us]') + location_ids = prefix_ids + '-' + df['location_id'].astype(str) + + # Flatten the dataframe using NumPy + num_locations = df.shape[0] + num_time_variables = df.shape[1] - 1 + num_records = num_locations * num_time_variables + + # Prepare timestep and variable arrays + times = df.columns[1:] + timesteps = np.array([t[0] for t in times], dtype=int) + variables = np.array([t[1] for t in times]) + + # Preallocate arrays + location_ids_repeated = np.repeat(location_ids, num_time_variables) + value_time = np.empty(num_records, dtype='datetime64[us]') + variable_names = np.empty(num_records, dtype=object) + units = np.empty(num_records, dtype=object) + values = np.empty(num_records, dtype=float) + + # Calculate value_time, variable_names, units, and values in a vectorized manner + for i in range(num_time_variables): + start_idx = i * num_locations + end_idx = start_idx + num_locations + value_time[start_idx:end_idx] = start_datetime + pd.to_timedelta(timesteps[i] * dt, unit='s') + variable_name = variable_to_name_map[variables[i]] + unit = variable_to_units_map[variable_name] + variable_names[start_idx:end_idx] = variable_name + units[start_idx:end_idx] = unit + values[start_idx:end_idx] = df.iloc[:, i + 1].values + + # Create the resulting DataFrame + timeseries_df = pd.DataFrame({ + 'location_id': location_ids_repeated, + 'value': values, + 'value_time': value_time, + 'variable_name': variable_names, + 'units': units, + 'reference_time': start_datetime.date(), + 'configuration': configuration + }) return timeseries_df From c55975e40a111fd8c8b687262863d1f1a8dfb2a6 Mon Sep 17 00:00:00 2001 From: AminTorabi-NOAA Date: Wed, 31 Jul 2024 12:52:15 -0400 Subject: [PATCH 2/3] updating the index --- src/troute-nwm/src/nwm_routing/output.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index de8747daf..0d95a4ff9 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -83,7 +83,7 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref variables = np.array([t[1] for t in times]) # Preallocate arrays - location_ids_repeated = np.repeat(location_ids, num_time_variables) + location_ids_repeated = np.tile(location_ids, num_time_variables) value_time = np.empty(num_records, dtype='datetime64[us]') variable_names = np.empty(num_records, dtype=object) units = np.empty(num_records, dtype=object) @@ -110,7 +110,7 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref 'reference_time': start_datetime.date(), 'configuration': configuration }) - + import pdb;pdb.set_trace() return timeseries_df From 8c0f8757f8fcb92a426e97bc441adcfd0b75a251 Mon Sep 17 00:00:00 2001 From: AminTorabi-NOAA Date: Wed, 31 Jul 2024 12:53:46 -0400 Subject: [PATCH 3/3] delete pdb --- src/troute-nwm/src/nwm_routing/output.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 0d95a4ff9..893825015 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -110,7 +110,7 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref 'reference_time': start_datetime.date(), 'configuration': configuration }) - import pdb;pdb.set_trace() + return timeseries_df