From c2cd1b4f61773942f80f04b860d38406345fe4fc Mon Sep 17 00:00:00 2001 From: Amin Torabi <140189926+AminTorabi-NOAA@users.noreply.github.com> Date: Thu, 1 Aug 2024 18:19:11 -0400 Subject: [PATCH] speeding up Parquet_output function by vectorizing (#816) * speeding up Parquet_output function by vectorizing * updating the index * delete pdb --- src/troute-nwm/src/nwm_routing/output.py | 63 +++++++++++++++++------- 1 file changed, 44 insertions(+), 19 deletions(-) diff --git a/src/troute-nwm/src/nwm_routing/output.py b/src/troute-nwm/src/nwm_routing/output.py index 38a6e6547..893825015 100644 --- a/src/troute-nwm/src/nwm_routing/output.py +++ b/src/troute-nwm/src/nwm_routing/output.py @@ -64,28 +64,53 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref -------- - timeseries_df (DataFrame): Converted timeseries data frame ''' - - df.index.name = 'location_id' - df.reset_index(inplace=True) - timeseries_df = df.melt(id_vars=['location_id'], var_name='var') - timeseries_df['var'] = timeseries_df['var'].astype('string') - timeseries_df[['timestep', 'variable']] = timeseries_df['var'].str.strip("()").str.split(",", n=1, expand=True) - timeseries_df['variable'] = timeseries_df['variable'].str.strip().str.replace("'", "") - timeseries_df['timestep'] = timeseries_df['timestep'].astype('int') - timeseries_df['value_time'] = (start_datetime + pd.to_timedelta(timeseries_df['timestep'] * dt, unit='s')) variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"} - timeseries_df["variable_name"] = timeseries_df["variable"].map(variable_to_name_map) - timeseries_df.drop(['var', 'timestep', 'variable'], axis=1, inplace=True) - timeseries_df['configuration'] = configuration variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"} - timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map) - timeseries_df['reference_time'] = start_datetime.date() - timeseries_df['location_id'] = timeseries_df['location_id'].astype('string') - timeseries_df['location_id'] = prefix_ids + '-' + timeseries_df['location_id'] - timeseries_df['value'] = timeseries_df['value'].astype('double') - timeseries_df['reference_time'] = timeseries_df['reference_time'].astype('datetime64[us]') - timeseries_df['value_time'] = timeseries_df['value_time'].astype('datetime64[us]') + # Prepare the location_id with prefix + df.index.name = 'location_id' + df.reset_index(inplace=True) + location_ids = prefix_ids + '-' + df['location_id'].astype(str) + + # Flatten the dataframe using NumPy + num_locations = df.shape[0] + num_time_variables = df.shape[1] - 1 + num_records = num_locations * num_time_variables + + # Prepare timestep and variable arrays + times = df.columns[1:] + timesteps = np.array([t[0] for t in times], dtype=int) + variables = np.array([t[1] for t in times]) + + # Preallocate arrays + location_ids_repeated = np.tile(location_ids, num_time_variables) + value_time = np.empty(num_records, dtype='datetime64[us]') + variable_names = np.empty(num_records, dtype=object) + units = np.empty(num_records, dtype=object) + values = np.empty(num_records, dtype=float) + + # Calculate value_time, variable_names, units, and values in a vectorized manner + for i in range(num_time_variables): + start_idx = i * num_locations + end_idx = start_idx + num_locations + value_time[start_idx:end_idx] = start_datetime + pd.to_timedelta(timesteps[i] * dt, unit='s') + variable_name = variable_to_name_map[variables[i]] + unit = variable_to_units_map[variable_name] + variable_names[start_idx:end_idx] = variable_name + units[start_idx:end_idx] = unit + values[start_idx:end_idx] = df.iloc[:, i + 1].values + + # Create the resulting DataFrame + timeseries_df = pd.DataFrame({ + 'location_id': location_ids_repeated, + 'value': values, + 'value_time': value_time, + 'variable_name': variable_names, + 'units': units, + 'reference_time': start_datetime.date(), + 'configuration': configuration + }) + return timeseries_df