Skip to content

Commit

Permalink
speeding up Parquet_output function by vectorizing (#816)
Browse files Browse the repository at this point in the history
* speeding up Parquet_output function by vectorizing

* updating the index

* delete pdb
  • Loading branch information
AminTorabi-NOAA authored Aug 1, 2024
1 parent a7260b7 commit c2cd1b4
Showing 1 changed file with 44 additions and 19 deletions.
63 changes: 44 additions & 19 deletions src/troute-nwm/src/nwm_routing/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,28 +64,53 @@ def _parquet_output_format_converter(df, start_datetime, dt, configuration, pref
--------
- timeseries_df (DataFrame): Converted timeseries data frame
'''

df.index.name = 'location_id'
df.reset_index(inplace=True)
timeseries_df = df.melt(id_vars=['location_id'], var_name='var')
timeseries_df['var'] = timeseries_df['var'].astype('string')
timeseries_df[['timestep', 'variable']] = timeseries_df['var'].str.strip("()").str.split(",", n=1, expand=True)
timeseries_df['variable'] = timeseries_df['variable'].str.strip().str.replace("'", "")
timeseries_df['timestep'] = timeseries_df['timestep'].astype('int')
timeseries_df['value_time'] = (start_datetime + pd.to_timedelta(timeseries_df['timestep'] * dt, unit='s'))
variable_to_name_map = {"q": "streamflow", "d": "depth", "v": "velocity"}
timeseries_df["variable_name"] = timeseries_df["variable"].map(variable_to_name_map)
timeseries_df.drop(['var', 'timestep', 'variable'], axis=1, inplace=True)
timeseries_df['configuration'] = configuration
variable_to_units_map = {"streamflow": "m3/s", "velocity": "m/s", "depth": "m"}
timeseries_df['units'] = timeseries_df['variable_name'].map(variable_to_units_map)
timeseries_df['reference_time'] = start_datetime.date()
timeseries_df['location_id'] = timeseries_df['location_id'].astype('string')
timeseries_df['location_id'] = prefix_ids + '-' + timeseries_df['location_id']
timeseries_df['value'] = timeseries_df['value'].astype('double')
timeseries_df['reference_time'] = timeseries_df['reference_time'].astype('datetime64[us]')
timeseries_df['value_time'] = timeseries_df['value_time'].astype('datetime64[us]')

# Prepare the location_id with prefix
df.index.name = 'location_id'
df.reset_index(inplace=True)
location_ids = prefix_ids + '-' + df['location_id'].astype(str)

# Flatten the dataframe using NumPy
num_locations = df.shape[0]
num_time_variables = df.shape[1] - 1
num_records = num_locations * num_time_variables

# Prepare timestep and variable arrays
times = df.columns[1:]
timesteps = np.array([t[0] for t in times], dtype=int)
variables = np.array([t[1] for t in times])

# Preallocate arrays
location_ids_repeated = np.tile(location_ids, num_time_variables)
value_time = np.empty(num_records, dtype='datetime64[us]')
variable_names = np.empty(num_records, dtype=object)
units = np.empty(num_records, dtype=object)
values = np.empty(num_records, dtype=float)

# Calculate value_time, variable_names, units, and values in a vectorized manner
for i in range(num_time_variables):
start_idx = i * num_locations
end_idx = start_idx + num_locations
value_time[start_idx:end_idx] = start_datetime + pd.to_timedelta(timesteps[i] * dt, unit='s')
variable_name = variable_to_name_map[variables[i]]
unit = variable_to_units_map[variable_name]
variable_names[start_idx:end_idx] = variable_name
units[start_idx:end_idx] = unit
values[start_idx:end_idx] = df.iloc[:, i + 1].values

# Create the resulting DataFrame
timeseries_df = pd.DataFrame({
'location_id': location_ids_repeated,
'value': values,
'value_time': value_time,
'variable_name': variable_names,
'units': units,
'reference_time': start_datetime.date(),
'configuration': configuration
})

return timeseries_df


Expand Down

0 comments on commit c2cd1b4

Please sign in to comment.