Skip to content

Commit

Permalink
Replace separate JEDI radiance bias correction files with tarball (NO…
Browse files Browse the repository at this point in the history
…AA-EMC#2875)

This PR updates g-w components which handle JEDI radiance bias
correction files. Currently, JEDI radiance bias correction files are
processed at the _satellite_sensor_. This PR replaces these multiple
files with a single radiance bias correction tarball.

Resolves NOAA-EMC#2862
  • Loading branch information
RussTreadon-NOAA authored Sep 30, 2024
1 parent 386aa6d commit e2f56f5
Show file tree
Hide file tree
Showing 9 changed files with 104 additions and 97 deletions.
2 changes: 1 addition & 1 deletion ci/cases/pr/C96C48_ufs_hybatmDA.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ arguments:
expdir: {{ 'RUNTESTS' | getenv }}/EXPDIR
icsdir: {{ 'ICSDIR_ROOT' | getenv }}/C96C48/20240610
idate: 2024022318
edate: 2024022400
edate: 2024022406
nens: 2
gfs_cyc: 1
start: warm
Expand Down
2 changes: 1 addition & 1 deletion env/ORION.env
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ elif [[ "${step}" = "marineanlchkpt" ]]; then

export NTHREADS_OCNANAL=${NTHREADSmax}

export APRUN_MARINEANLCHKPT="${APRUN} --cpus-per-task=${NTHREADS_OCNANAL}"
export APRUN_MARINEANLCHKPT="${APRUN_default} --cpus-per-task=${NTHREADS_OCNANAL}"

elif [[ "${step}" = "ocnanalecen" ]]; then

Expand Down
2 changes: 2 additions & 0 deletions parm/archive/gdas_restarta.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,8 @@ gdas_restarta:
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}abias_int"
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}dtfanl.nc"
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}loginc.txt"
{% else %}
- "{{ COMIN_ATMOS_ANALYSIS | relpath(ROTDIR) }}/{{ head }}rad_varbc_params.tar"
{% endif %}

# Snow surface data
Expand Down
10 changes: 10 additions & 0 deletions parm/config/gfs/config.resources.ORION
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,16 @@ case ${step} in
# Remove this block once the GSI issue is resolved.
export walltime="00:45:00"
;;
"atmanlvar")
# Run on 8 nodes for memory requirement
export tasks_per_node=8
export walltime="00:45:00"
;;
"atmensanlobs")
# Run on 8 nodes for memory requirement
export tasks_per_node=8
export walltime="00:45:00"
;;
*)
;;
esac
9 changes: 1 addition & 8 deletions parm/stage/analysis.yaml.j2
Original file line number Diff line number Diff line change
Expand Up @@ -10,17 +10,10 @@ analysis:
{% for mem in range(first_mem, last_mem + 1) %}
{% set imem = mem - first_mem %}
{% set COMOUT_ATMOS_ANALYSIS_MEM = COMOUT_ATMOS_ANALYSIS_MEM_list[imem] %}
{% for ftype in ["abias", "abias_air", "abias_int", "abias_pc", "atminc.nc", "atmi009.nc", "atmi003.nc", "radstat", "ratminc.nc", "ratmi009.nc", "ratmi003.nc"] %}
{% for ftype in ["abias", "abias_air", "abias_int", "abias_pc", "atminc.nc", "atmi009.nc", "atmi003.nc", "radstat", "ratminc.nc", "ratmi009.nc", "ratmi003.nc", "rad_varbc_params.tar"] %}
{% if path_exists(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ current_cycle_HH ~ "z." ~ ftype) %}
- ["{{ ICSDIR }}/{{ COMOUT_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) }}/{{ RUN }}.t{{ current_cycle_HH }}z.{{ ftype }}", "{{ COMOUT_ATMOS_ANALYSIS_MEM }}"]
{% endif %}
{% endfor %}
{% if DO_JEDIATMVAR %}
{% for ftype in ["satbias.nc", "satbias_cov.nc", "tlapse.txt"] %}
{% for file in glob(ICSDIR ~ "/" ~ COMOUT_ATMOS_ANALYSIS_MEM | relpath(ROTDIR) ~ "/" ~ RUN ~ ".t" ~ current_cycle_HH ~ "z.atms_*." ~ ftype) %}
- ["{{ file }}", "{{ COMOUT_ATMOS_ANALYSIS_MEM }}"]
{% endfor %}
{% endfor %}
{% endif %}
{% endfor %} # mem loop
{% endif %}
56 changes: 46 additions & 10 deletions ush/python/pygfs/jedi/jedi.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
#!/usr/bin/env python3

import os
import tarfile
from logging import getLogger
from typing import List, Dict, Any, Optional
from jcb import render
from wxflow import (AttrDict,
FileHandler,
chdir, rm_p,
parse_j2yaml,
logit,
Expand Down Expand Up @@ -188,19 +190,19 @@ def get_obs_dict(self, task_config: AttrDict) -> Dict[str, Any]:
return obs_dict

@logit(logger)
def get_bias_dict(self, task_config: AttrDict) -> Dict[str, Any]:
def get_bias_dict(self, task_config: AttrDict, bias_file) -> Dict[str, Any]:
"""Compile a dictionary of observation files to copy
This method extracts 'observers' from the JEDI yaml and from that list, extracts a list of
observation bias correction files that are to be copied to the run directory
This method extracts 'observers' from the JEDI yaml and determines from that list
if bias correction tar files are to be copied to the run directory
from the component directory.
TODO: COM_ATMOS_ANALYSIS_PREV is hardwired here and this method is not appropriate in
`analysis.py` and should be implemented in the component where this is applicable.
Parameters
----------
task_config: AttrDict
Attribute-dictionary of all configuration variables associated with a GDAS task.
bias_file
name of bias correction tar file
Returns
----------
Expand All @@ -216,18 +218,52 @@ def get_bias_dict(self, task_config: AttrDict) -> Dict[str, Any]:
obfile = ob['obs bias']['input file']
obdir = os.path.dirname(obfile)
basename = os.path.basename(obfile)
prefix = '.'.join(basename.split('.')[:-2])
for file in ['satbias.nc', 'satbias_cov.nc', 'tlapse.txt']:
bfile = f"{prefix}.{file}"
copylist.append([os.path.join(task_config.COM_ATMOS_ANALYSIS_PREV, bfile), os.path.join(obdir, bfile)])
# TODO: Why is this specific to ATMOS?
prefix = '.'.join(basename.split('.')[:-3])
bfile = f"{prefix}.{bias_file}"
tar_file = os.path.join(obdir, bfile)
copylist.append([os.path.join(task_config.VarBcDir, bfile), tar_file])
break

bias_dict = {
'mkdir': [os.path.join(task_config.DATA, 'bc')],
'copy': copylist
}

return bias_dict

@staticmethod
@logit(logger)
def extract_tar(tar_file: str) -> None:
"""Extract files from a tarball
This method extract files from a tarball
Parameters
----------
tar_file
path/name of tarball
Returns
----------
None
"""

# extract files from tar file
tar_path = os.path.dirname(tar_file)
try:
with tarfile.open(tar_file, "r") as tarball:
tarball.extractall(path=tar_path)
logger.info(f"Extract {tarball.getnames()}")
except tarfile.ReadError as err:
if tarfile.is_tarfile(tar_file):
logger.error(f"FATAL ERROR: {tar_file} could not be read")
raise tarfile.ReadError(f"FATAL ERROR: unable to read {tar_file}")
else:
logger.info()
except tarfile.ExtractError as err:
logger.exception(f"FATAL ERROR: unable to extract from {tar_file}")
raise tarfile.ExtractError("FATAL ERROR: unable to extract from {tar_file}")


@logit(logger)
def find_value_in_nested_dict(nested_dict: Dict, target_key: str) -> Any:
Expand Down
45 changes: 0 additions & 45 deletions ush/python/pygfs/task/analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,6 @@ def initialize(self) -> None:
obs_dict = self.get_obs_dict()
FileHandler(obs_dict).sync()

# some analyses need to stage bias corrections
bias_dict = self.get_bias_dict()
FileHandler(bias_dict).sync()

# link jedi executable to run directory
self.link_jediexe()

Expand Down Expand Up @@ -127,47 +123,6 @@ def get_obs_dict(self) -> Dict[str, Any]:
}
return obs_dict

@logit(logger)
def get_bias_dict(self) -> Dict[str, Any]:
"""Compile a dictionary of observation files to copy
This method extracts 'observers' from the JEDI yaml and from that list, extracts a list of
observation bias correction files that are to be copied to the run directory
from the component directory.
TODO: COM_ATMOS_ANALYSIS_PREV is hardwired here and this method is not appropriate in
`analysis.py` and should be implemented in the component where this is applicable.
Parameters
----------
Returns
----------
bias_dict: Dict
a dictionary containing the list of observation bias files to copy for FileHandler
"""

logger.info(f"Extracting a list of bias correction files from Jedi config file")
observations = find_value_in_nested_dict(self.task_config.jedi_config, 'observations')
logger.debug(f"observations:\n{pformat(observations)}")

copylist = []
for ob in observations['observers']:
if 'obs bias' in ob.keys():
obfile = ob['obs bias']['input file']
obdir = os.path.dirname(obfile)
basename = os.path.basename(obfile)
prefix = '.'.join(basename.split('.')[:-2])
for file in ['satbias.nc', 'satbias_cov.nc', 'tlapse.txt']:
bfile = f"{prefix}.{file}"
copylist.append([os.path.join(self.task_config.COM_ATMOS_ANALYSIS_PREV, bfile), os.path.join(obdir, bfile)])
# TODO: Why is this specific to ATMOS?

bias_dict = {
'mkdir': [os.path.join(self.task_config.DATA, 'bc')],
'copy': copylist
}
return bias_dict

@logit(logger)
def add_fv3_increments(self, inc_file_tmpl: str, bkg_file_tmpl: str, incvars: List) -> None:
"""Add cubed-sphere increments to cubed-sphere backgrounds
Expand Down
66 changes: 35 additions & 31 deletions ush/python/pygfs/task/atm_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,17 @@ def initialize_analysis(self) -> None:

# stage bias corrections
logger.info(f"Staging list of bias correction files generated from JEDI config")
bias_dict = self.jedi.get_bias_dict(self.task_config)
self.task_config.VarBcDir = f"{self.task_config.COM_ATMOS_ANALYSIS_PREV}"
bias_file = f"rad_varbc_params.tar"
bias_dict = self.jedi.get_bias_dict(self.task_config, bias_file)
FileHandler(bias_dict).sync()
logger.debug(f"Bias correction files:\n{pformat(bias_dict)}")

# extract bias corrections
tar_file = os.path.join(self.task_config.DATA, 'obs', f"{self.task_config.GPREFIX}{bias_file}")
logger.info(f"Extract bias correction files from {tar_file}")
self.jedi.extract_tar(tar_file)

# stage CRTM fix files
logger.info(f"Staging CRTM fix files from {self.task_config.CRTM_FIX_YAML}")
crtm_fix_dict = parse_j2yaml(self.task_config.CRTM_FIX_YAML, self.task_config)
Expand Down Expand Up @@ -265,37 +272,34 @@ def finalize(self) -> None:
}
FileHandler(yaml_copy).sync()

# copy bias correction files to ROTDIR
logger.info("Copy bias correction files from DATA/ to COM/")
biasdir = os.path.join(self.task_config.DATA, 'bc')
biasls = os.listdir(biasdir)
biaslist = []
for bfile in biasls:
src = os.path.join(biasdir, bfile)
dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, bfile)
biaslist.append([src, dest])

gprefix = f"{self.task_config.GPREFIX}"
gsuffix = f"{to_YMDH(self.task_config.previous_cycle)}" + ".txt"
aprefix = f"{self.task_config.APREFIX}"
asuffix = f"{to_YMDH(self.task_config.current_cycle)}" + ".txt"

logger.info(f"Copying {gprefix}*{gsuffix} from DATA/ to COM/ as {aprefix}*{asuffix}")
obsdir = os.path.join(self.task_config.DATA, 'obs')
obsls = os.listdir(obsdir)
for ofile in obsls:
if ofile.endswith(".txt"):
src = os.path.join(obsdir, ofile)
tfile = ofile.replace(gprefix, aprefix)
tfile = tfile.replace(gsuffix, asuffix)
dest = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, tfile)
biaslist.append([src, dest])

bias_copy = {
'mkdir': [self.task_config.COM_ATMOS_ANALYSIS],
'copy': biaslist,
# path of output radiance bias correction tarfile
bfile = f"{self.task_config.APREFIX}rad_varbc_params.tar"
radtar = os.path.join(self.task_config.COM_ATMOS_ANALYSIS, bfile)

# rename and copy tlapse radiance bias correction files from obs to bc
tlapobs = glob.glob(os.path.join(self.task_config.DATA, 'obs', '*tlapse.txt'))
copylist = []
for tlapfile in tlapobs:
obsfile = os.path.basename(tlapfile).split('.', 2)
newfile = f"{self.task_config.APREFIX}{obsfile[2]}"
copylist.append([tlapfile, os.path.join(self.task_config.DATA, 'bc', newfile)])
tlapse_dict = {
'copy': copylist
}
FileHandler(bias_copy).sync()
FileHandler(tlapse_dict).sync()

# get lists of radiance bias correction files to add to tarball
satlist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*satbias*nc'))
tlaplist = glob.glob(os.path.join(self.task_config.DATA, 'bc', '*tlapse.txt'))

# tar radiance bias correction files to ROTDIR
logger.info(f"Creating radiance bias correction tar file {radtar}")
with tarfile.open(radtar, 'w') as radbcor:
for satfile in satlist:
radbcor.add(satfile, arcname=os.path.basename(satfile))
for tlapfile in tlaplist:
radbcor.add(tlapfile, arcname=os.path.basename(tlapfile))
logger.info(f"Add {radbcor.getnames()}")

# Copy FV3 atm increment to comrot directory
logger.info("Copy UFS model readable atm increment file")
Expand Down
9 changes: 8 additions & 1 deletion ush/python/pygfs/task/atmens_analysis.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,17 @@ def initialize_analysis(self) -> None:

# stage bias corrections
logger.info(f"Staging list of bias correction files generated from JEDI config")
bias_dict = self.jedi.get_bias_dict(self.task_config)
self.task_config.VarBcDir = f"{self.task_config.COM_ATMOS_ANALYSIS_PREV}"
bias_file = f"rad_varbc_params.tar"
bias_dict = self.jedi.get_bias_dict(self.task_config, bias_file)
FileHandler(bias_dict).sync()
logger.debug(f"Bias correction files:\n{pformat(bias_dict)}")

# extract bias corrections
tar_file = os.path.join(self.task_config.DATA, 'obs', f"{self.task_config.GPREFIX}{bias_file}")
logger.info(f"Extract bias correction files from {tar_file}")
self.jedi.extract_tar(tar_file)

# stage CRTM fix files
logger.info(f"Staging CRTM fix files from {self.task_config.CRTM_FIX_YAML}")
crtm_fix_dict = parse_j2yaml(self.task_config.CRTM_FIX_YAML, self.task_config)
Expand Down

0 comments on commit e2f56f5

Please sign in to comment.