Skip to content

Commit

Permalink
Merge pull request #45 from yut23/kronos_parallel
Browse files Browse the repository at this point in the history
Use GNU parallel in Kronos archiving script
  • Loading branch information
zingale authored Dec 6, 2024
2 parents 0d4f6df + 3b1f326 commit 39eb118
Showing 1 changed file with 85 additions and 95 deletions.
180 changes: 85 additions & 95 deletions job_scripts/hpss/kronos_process.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ set -e
jobidfile=process.jobid


# set the prefix of the plotfiles and checkpoint files (passed to find(1) -name)
# set the prefix of the plotfiles and checkpoint files (a fnmatch(3) pattern)
plt_prefix='*plt'
chk_prefix='*chk'

Expand Down Expand Up @@ -53,14 +53,19 @@ echo "$SLURM_JOB_ID" > "$jobidfile"

# if our process is killed, remove the lock file first
function cleanup() {
echo "process: received signal; removing $jobidfile"
echo "process: removing $jobidfile"
command rm -f "$jobidfile"
# remove the EXIT handler, since we only want to do this once
trap - EXIT
# don't exit, so we can finish the current operation:
# $jobidfile is checked at the start of each loop iteration in process_files()
}
trap cleanup EXIT HUP INT QUIT TERM XCPU
function cleanup_killed() {
echo "process: received signal; stopping"
cleanup
}
trap cleanup_killed HUP INT QUIT TERM XCPU
trap cleanup EXIT

# Number of seconds to sleep before checking again.
N=60
Expand Down Expand Up @@ -100,78 +105,95 @@ fi
# Subsequent invocations of this routine will skip over any plotfiles or
# checkpoint files that have a corresponding .processed file.


function process_files
# this function does all the actual data transfer, and is run in parallel
function process_single_file
{
if [ ! -f "$jobidfile" ]; then
echo "process: $jobidfile has been removed, exiting"
local dir=$1
local job_slot=$2

local done_dir
# right-hand side is not quoted, as we want it to be treated as a pattern
if [[ $dir == ${plt_prefix}* ]]; then
done_dir=plotfiles
elif [[ $dir == ${chk_prefix}* ]]; then
done_dir=checkfiles
fi

if ! [[ -f "$jobidfile" ]]; then
echo "$job_slot | process: $jobidfile has been removed, exiting"
exit
fi
if [[ -d "${dir}" ]]; then

# plotfiles
# only work on the file if there is not a .processed file in the
# main directory or the plotfiles/ directory
if ! [[ -f "${dir}.processed" ]] && ! [[ -f "${done_dir}/${dir}.processed" ]]; then

# Take all but the final plt file -- we want to ensure they're completely
# written to disk. Strip out any tar files that are lying around as well
# as pltXXXXX.processed files. We restrict the find command to a depth of
# 1 to avoid catching any already-processed files in the plotfiles/
# directory
mapfile -t pltlist < <(
find . -maxdepth 1 -type d -name "${plt_prefix}"'?????' -print | sort
find . -maxdepth 1 -type d -name "${plt_prefix}"'??????' -print | sort
find . -maxdepth 1 -type d -name "${plt_prefix}"'???????' -print | sort
)
# do processing
printf '%2d | archiving %s to Kronos\n' "$job_slot" "$dir"

if (( ${#pltlist[@]} > 1 )); then
# Don't process the final plt file
unset 'pltlist[-1]'
# store the file on Kronos
if tar -cvf "${KRONOS_DIR}/${dir}.tar" "${dir}" > "${dir}.log"; then

for dir in "${pltlist[@]}"
do
if ! [[ -f "$jobidfile" ]]; then
echo "process: $jobidfile has been removed, exiting"
exit
fi
if [[ -d "${dir}" ]]; then
# mark this file as processed so we skip it next time
date > "${dir}.processed"

# only work on the file if there is not a .processed file in the
# main directory or the plotfiles/ directory
if ! [[ -f "${dir}.processed" ]] && ! [[ -f "plotfiles/${dir}.processed" ]]; then
if [[ $done_dir == plotfiles ]]; then
# output the plotfile name and simulation time to ftime.out
# TODO: we should update this file in diag_files_${datestr}.tar
if command -v "${FTIME_EXE}" > /dev/null; then
"${FTIME_EXE}" "${dir}" >> ftime.out
fi
fi

# do processing
echo "archiving ${dir} to Kronos"
# store the log file along with the archive
mv "${dir}.log" "${KRONOS_DIR}"

# store the file on Kronos
if tar -cvf "${KRONOS_DIR}/${dir}.tar" "${dir}" > "${dir}.log"; then
# move the file into the transferred directory
mv "${dir}" "$done_dir"

# mark this file as processed so we skip it next time
date > "${dir}.processed"
# ..and the corresponding .processed file too.
mv "${dir}.processed" "$done_dir"

# output the plotfile name and simulation time to ftime.out
# TODO: we should update this file in diag_files_${datestr}.tar
if command -v "${FTIME_EXE}" > /dev/null; then
"${FTIME_EXE}" "${dir}" >> ftime.out
fi
#if [[ $done_dir == plotfiles ]]; then
# # and visualize it
# runtimevis.py "${done_dir}/${dir}"
#fi

# store the log file along with the archive
mv "${dir}.log" "${KRONOS_DIR}"
fi

# move the plotfile into the plotfiles directory
mv "${dir}" plotfiles/
printf '%2d | done with %s\n' "$job_slot" "$dir"

# ..and the corresponding .processed file too.
mv "${dir}.processed" plotfiles/
fi # end test of whether file already processed

# and visualize it
#runtimevis.py "plotfiles/${dir}"
fi # end test of whether file is a directory (as it should be)
}

fi
# these are needed for GNU parallel
export jobidfile plt_prefix chk_prefix FTIME_EXE KRONOS_DIR
export -f process_single_file

fi # end test of whether plotfile already processed
function process_files
{
if [ ! -f "$jobidfile" ]; then
echo "process: $jobidfile has been removed, exiting"
exit
fi

fi # end test of whether plotfile is a directory (as it should be)
# plotfiles

done
fi
# Take all but the final plt file -- we want to ensure they're completely
# written to disk. Strip out any tar files that are lying around as well
# as pltXXXXX.processed files. We restrict the find command to a depth of
# 1 to avoid catching any already-processed files in the plotfiles/
# directory
mapfile -t pltlist < <(
{
find . -maxdepth 1 -type d -name "${plt_prefix}"'?????' -print | sort;
find . -maxdepth 1 -type d -name "${plt_prefix}"'??????' -print | sort;
find . -maxdepth 1 -type d -name "${plt_prefix}"'???????' -print | sort;
} | head -n-1 # don't process the final plotfile
)


# checkpoint files
Expand All @@ -182,49 +204,17 @@ function process_files
# 1 to avoid catching any already-processed files in the checkfiles/
# directory
mapfile -t chklist < <(
find . -maxdepth 2 -type f -path "${chk_prefix}"'?[05]000/Header' -printf '%h\n' | sort
find . -maxdepth 2 -type f -path "${chk_prefix}"'??[05]000/Header' -printf '%h\n' | sort
find . -maxdepth 2 -type f -path "${chk_prefix}"'???[05]000/Header' -printf '%h\n' | sort
{
find . -maxdepth 2 -type f -path "${chk_prefix}"'??000/Header' -printf '%h\n' | sort
find . -maxdepth 2 -type f -path "${chk_prefix}"'???000/Header' -printf '%h\n' | sort
find . -maxdepth 2 -type f -path "${chk_prefix}"'????000/Header' -printf '%h\n' | sort
} | head -n-1 # don't process the final checkpoint file
)

if (( ${#chklist[@]} > 1 )); then
# Don't process the final chk file
unset 'chklist[-1]'

for dir in "${chklist[@]}"
do
if ! [[ -f "$jobidfile" ]]; then
echo "process: $jobidfile has been removed, exiting"
exit
fi
if [[ -d "${dir}" ]]; then

if ! [[ -f "${dir}.processed" ]] && ! [[ -f "checkfiles/${dir}.processed" ]]; then

echo "archiving ${dir} to Kronos"

# store the file on Kronos
if tar -cvf "${KRONOS_DIR}/${dir}.tar" "${dir}" > "${dir}.log"; then

# mark this file as processed so we skip it next time
date > "${dir}.processed"

# store the log file along with the archive
mv "${dir}.log" "${KRONOS_DIR}"

# move the checkpoint file into the checkfiles directory
mv "${dir}" checkfiles/

# ..and the corresponding .processed file too.
mv "${dir}.processed" checkfiles/

fi

fi

fi
done
fi
# do the archiving in parallel
# use --line-buffer so the start and finish lines are interleaved properly
parallel --line-buffer -j 32 process_single_file '{}' '{%}' ::: "${pltlist[@]}" "${chklist[@]}"

}

Expand Down

0 comments on commit 39eb118

Please sign in to comment.