Skip to content

Commit

Permalink
Merge pull request #59 from openearthplatforminitiative/fix/clean-up-job
Browse files Browse the repository at this point in the history
Fix/clean up job
  • Loading branch information
giltinde authored May 21, 2024
2 parents dea90ef + 5b0ccc7 commit 31aad0d
Show file tree
Hide file tree
Showing 5 changed files with 6 additions and 26 deletions.
20 changes: 1 addition & 19 deletions data_pipelines/assets/flood/discharge.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,8 @@ def raw_discharge(context: AssetExecutionContext, cds_client: CDSClient) -> None

if USE_CONTROL_MEMBER_IN_ENSEMBLE:
product_type = ["control_forecast", "ensemble_perturbed_forecasts"]
print("Retrieving both control and ensemble")
else:
product_type = "ensemble_perturbed_forecasts"
print("Retrieving only ensemble")

for leadtime_hour in discharge_partitions.get_partition_keys():
request_params = {
Expand Down Expand Up @@ -115,11 +113,6 @@ def transformed_discharge(

ds_upstream = restricted_uparea_glofas_v4_0

if USE_CONTROL_MEMBER_IN_ENSEMBLE:
context.log.info("Combining control and ensemble")
else:
context.log.info("Using only ensemble")

ds_upstream = restrict_dataset_area(
ds_upstream,
lat_min,
Expand Down Expand Up @@ -214,7 +207,7 @@ def split_discharge_by_area(

# Determine the unique areas in the transformed discharge data
total_roi = GLOFAS_ROI_CENTRAL_AFRICA
n_subareas = 4
n_subareas = N_SUBAREAS
lat_min = total_roi["lat_min"]
lat_max = total_roi["lat_max"]
lon_min = total_roi["lon_min"]
Expand Down Expand Up @@ -391,9 +384,6 @@ def detailed_forecast_subarea(
f"Saved detailed forecast data to {detailed_forecast_path}"
)

del sub_discharge
del detailed_forecast_df

return None


Expand Down Expand Up @@ -475,14 +465,6 @@ def summary_forecast_subarea(
)
context.log.info(f"Saved summary forecast data to {summary_forecast_path}")

# delete the detailed forecast variable
del detailed_forecast_df
del summary_forecast_df
del peak_timing_df
del tendency_df
del intensity_df
del intermediate_df

return None


Expand Down
1 change: 0 additions & 1 deletion data_pipelines/assets/flood/upstream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import httpx
import xarray as xr
from dagster import AssetExecutionContext, AssetIn, asset
from upath import UPath

from data_pipelines.resources.io_managers import (
copy_local_file_to_s3,
Expand Down
7 changes: 3 additions & 4 deletions data_pipelines/jobs.py
Original file line number Diff line number Diff line change
@@ -1,16 +1,15 @@
from dagster import ScheduleDefinition, define_asset_job

# build a job that materializes all assets
# build a job that materializes all flood assets
all_flood_assets_job = define_asset_job(
"all_assets_job",
selection="flood/raw_discharge*",
)

# define a schedule that runs the all_assets_job every day at 11:00 UTC
# without using build_schedule_from_partitioned_job
# define a schedule that runs the all_assets_job every day at 09:30 UTC
all_flood_assets_schedule = ScheduleDefinition(
name="all_assets_schedule",
cron_schedule="44 16 * * *",
cron_schedule="30 9 * * *",
job=all_flood_assets_job,
execution_timezone="UTC",
)
2 changes: 1 addition & 1 deletion data_pipelines/utils/flood/config.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
TIMEDELTA = 1
TIMEDELTA = 0
GLOFAS_API_URL = "https://cds.climate.copernicus.eu/api/v2"
UPSTREAM_URL = "https://confluence.ecmwf.int/download/attachments/242067380/uparea_glofas_v4_0.nc?version=2&modificationDate=1668604690076&api=v2"
GLOFAS_RET_PRD_THRESH_VALS = [2, 5, 20]
Expand Down
2 changes: 1 addition & 1 deletion deployment/dagster/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ dagster-user-deployments:
- name: "data-pipelines"
image:
repository: "ghcr.io/openearthplatforminitiative/data-pipelines"
tag: "0.3.39"
tag: "0.3.40"
pullPolicy: Always
dagsterApiGrpcArgs:
- "--module-name"
Expand Down

0 comments on commit 31aad0d

Please sign in to comment.