Skip to content

Commit

Permalink
Merge pull request #38 from openearthplatforminitiative/fix/use-dask-…
Browse files Browse the repository at this point in the history
…client-explicitly

fix: explicitly use the dask client
  • Loading branch information
giltinde authored May 8, 2024
2 parents 09c6033 + f3703f1 commit 50d33da
Showing 1 changed file with 8 additions and 4 deletions.
12 changes: 8 additions & 4 deletions data_pipelines/assets/flood/discharge.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,7 @@ def detailed_forecast(
transformed_discharge: dd.DataFrame,
rp_combined_thresh_pq: dd.DataFrame,
) -> dd.DataFrame:
client = dask_resource._client
forecast_df = transformed_discharge

# Perform operations on the columns
Expand Down Expand Up @@ -197,10 +198,8 @@ def detailed_forecast(
detailed_forecast_df["issued_on"] = detailed_forecast_df["issued_on"].astype(str)
detailed_forecast_df["valid_for"] = detailed_forecast_df["valid_for"].astype(str)

context.log.info(f"Starting computing detailed forecast")

detailed_forecast_df.compute()

context.log.info(f"Started computing detailed forecast")
detailed_forecast_df = client.compute(detailed_forecast_df).result()
context.log.info(f"Finished computing detailed forecast")

return detailed_forecast_df
Expand All @@ -219,6 +218,7 @@ def summary_forecast(
dask_resource: DaskResource,
detailed_forecast: dd.DataFrame,
) -> dd.DataFrame:
client = dask_resource._client
detailed_forecast_df = detailed_forecast.drop(columns=["wkt"])

peak_timing_df = compute_flood_peak_timing(detailed_forecast_df)
Expand All @@ -237,4 +237,8 @@ def summary_forecast(
summary_forecast_df, GLOFAS_RESOLUTION / 2, GLOFAS_PRECISION
)

context.log.info(f"Started computing summary forecast")
summary_forecast_df = client.compute(summary_forecast_df).result()
context.log.info(f"Finished computing summary forecast")

return summary_forecast_df

0 comments on commit 50d33da

Please sign in to comment.