Skip to content

Commit

Permalink
Do NCDatasets.sync asynch if we have threads
Browse files Browse the repository at this point in the history
  • Loading branch information
charleskawczynski committed Mar 22, 2024
1 parent 595366c commit c9df27e
Show file tree
Hide file tree
Showing 4 changed files with 69 additions and 57 deletions.
1 change: 1 addition & 0 deletions perf/benchmark_netcdf_io.jl
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ for device in keys(timings)
integrator.t,
simulation.output_dir,
)
NCDatasets.sync(netcdf_writer.open_files[simulation.output_dir])
# Now, profile
@info "Profiling ($device_name)"
prof = Profile.@profile CAD.save_diagnostic_to_disk!(
Expand Down
121 changes: 67 additions & 54 deletions src/diagnostics/diagnostic.jl
Original file line number Diff line number Diff line change
Expand Up @@ -661,6 +661,18 @@ function accumulate!(diag_accumulator, diag_storage, reduction_time_func)
return nothing
end

import NVTX
NVTX.@annotate function sync_nc_datasets(diagnostics, output_dir)
any(diag -> diag.output_writer isa NetCDFWriter, diagnostics) || return
s = map(diagnostics) do diag
writer = diag.output_writer
writer isa NetCDFWriter || continue
Threads.@spawn NCDatasets.sync(writer.open_files[output_dir])
end
return nothing
# Threads.wait(s) # I don't think this is needed
end

"""
get_callbacks_from_diagnostics(diagnostics, storage, counters)
Expand Down Expand Up @@ -699,66 +711,67 @@ function get_callbacks_from_diagnostics(
# diagnostics that perform reductions.

callback_arrays = map(diagnostics) do diag
variable = diag.variable
compute_callback =
integrator -> begin
variable.compute!(
storage[diag],
integrator.u,
integrator.p,
integrator.t,
)

# accumulator[diag] is not defined for non-reductions
diag_accumulator = get(accumulators, diag, nothing)

accumulate!(
diag_accumulator,
storage[diag],
diag.reduction_time_func,
)
counters[diag] += 1
return nothing
end

output_callback =
integrator -> begin
# Move accumulated value to storage so that we can output it (for
# reductions). This provides a unified interface to pre_output_hook! and
# output, at the cost of an additional copy. If this copy turns out to be
# too expensive, we can move the if statement below.
isnothing(diag.reduction_time_func) ||
(storage[diag] .= accumulators[diag])

# Any operations we have to perform before writing to output?
# Here is where we would divide by N to obtain an arithmetic average
diag.pre_output_hook!(storage[diag], counters[diag])

# Write to disk
write_field!(
diag.output_writer,
storage[diag],
diag,
integrator.u,
integrator.p,
integrator.t,
output_dir,
)

# accumulator[diag] is not defined for non-reductions
diag_accumulator = get(accumulators, diag, nothing)

reset_accumulator!(diag_accumulator, diag.reduction_time_func)
counters[diag] = 0
return nothing
end
NVTX.@annotate function compute_callback(integrator)
diag.variable.compute!(
storage[diag],
integrator.u,
integrator.p,
integrator.t,
)

# accumulator[diag] is not defined for non-reductions
diag_accumulator = get(accumulators, diag, nothing)

accumulate!(
diag_accumulator,
storage[diag],
diag.reduction_time_func,
)
counters[diag] += 1
return nothing
end

NVTX.@annotate function output_callback(integrator)
# Move accumulated value to storage so that we can output it (for
# reductions). This provides a unified interface to pre_output_hook! and
# output, at the cost of an additional copy. If this copy turns out to be
# too expensive, we can move the if statement below.
isnothing(diag.reduction_time_func) ||
(storage[diag] .= accumulators[diag])

# Any operations we have to perform before writing to output?
# Here is where we would divide by N to obtain an arithmetic average
diag.pre_output_hook!(storage[diag], counters[diag])

# Write to disk
write_field!(
diag.output_writer,
storage[diag],
diag,
integrator.u,
integrator.p,
integrator.t,
output_dir,
)

# accumulator[diag] is not defined for non-reductions
diag_accumulator = get(accumulators, diag, nothing)

reset_accumulator!(diag_accumulator, diag.reduction_time_func)
counters[diag] = 0
return nothing
end

return [
AtmosCallback(compute_callback, EveryNSteps(diag.compute_every)),
AtmosCallback(output_callback, EveryNSteps(diag.output_every)),
]
end
nc_sync(integrator) = sync_nc_datasets(diagnostics, output_dir)

# We need to flatten to tuples
return vcat(callback_arrays...)
return vcat(
callback_arrays...,
AtmosCallback(nc_sync, EveryNSteps(diag.output_every)),
)
end
3 changes: 0 additions & 3 deletions src/diagnostics/netcdf_writer.jl
Original file line number Diff line number Diff line change
Expand Up @@ -738,9 +738,6 @@ function save_diagnostic_to_disk!(
elseif length(dim_names) == 1
v[time_index, :] = interpolated_field
end

# Write data to disk
NCDatasets.sync(writer.open_files[output_path])
return nothing
end

Expand Down
1 change: 1 addition & 0 deletions src/solver/type_getters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -886,6 +886,7 @@ function get_simulation(config::AtmosConfig)
t_start,
output_dir,
)
NCDatasets.sync(diag.output_writer.open_files[output_dir])
else
# Add to the accumulator

Expand Down

0 comments on commit c9df27e

Please sign in to comment.