diff --git a/perf/benchmark_netcdf_io.jl b/perf/benchmark_netcdf_io.jl index 05a0a1f4e7..00d8db0448 100644 --- a/perf/benchmark_netcdf_io.jl +++ b/perf/benchmark_netcdf_io.jl @@ -66,6 +66,7 @@ for device in keys(timings) integrator.t, simulation.output_dir, ) + NCDatasets.sync(netcdf_writer.open_files[simulation.output_dir]) # Now, profile @info "Profiling ($device_name)" prof = Profile.@profile CAD.save_diagnostic_to_disk!( diff --git a/src/diagnostics/diagnostic.jl b/src/diagnostics/diagnostic.jl index cf589e3736..39d839f1b3 100644 --- a/src/diagnostics/diagnostic.jl +++ b/src/diagnostics/diagnostic.jl @@ -661,6 +661,18 @@ function accumulate!(diag_accumulator, diag_storage, reduction_time_func) return nothing end +import NVTX +NVTX.@annotate function sync_nc_datasets(diagnostics, output_dir) + any(diag -> diag.output_writer isa NetCDFWriter, diagnostics) || return + s = map(diagnostics) do diag + writer = diag.output_writer + writer isa NetCDFWriter || continue + Threads.@spawn NCDatasets.sync(writer.open_files[output_dir]) + end + return nothing + # Threads.wait(s) # I don't think this is needed +end + """ get_callbacks_from_diagnostics(diagnostics, storage, counters) @@ -699,66 +711,67 @@ function get_callbacks_from_diagnostics( # diagnostics that perform reductions. callback_arrays = map(diagnostics) do diag - variable = diag.variable - compute_callback = - integrator -> begin - variable.compute!( - storage[diag], - integrator.u, - integrator.p, - integrator.t, - ) - - # accumulator[diag] is not defined for non-reductions - diag_accumulator = get(accumulators, diag, nothing) - - accumulate!( - diag_accumulator, - storage[diag], - diag.reduction_time_func, - ) - counters[diag] += 1 - return nothing - end - - output_callback = - integrator -> begin - # Move accumulated value to storage so that we can output it (for - # reductions). This provides a unified interface to pre_output_hook! and - # output, at the cost of an additional copy. If this copy turns out to be - # too expensive, we can move the if statement below. - isnothing(diag.reduction_time_func) || - (storage[diag] .= accumulators[diag]) - - # Any operations we have to perform before writing to output? - # Here is where we would divide by N to obtain an arithmetic average - diag.pre_output_hook!(storage[diag], counters[diag]) - - # Write to disk - write_field!( - diag.output_writer, - storage[diag], - diag, - integrator.u, - integrator.p, - integrator.t, - output_dir, - ) - - # accumulator[diag] is not defined for non-reductions - diag_accumulator = get(accumulators, diag, nothing) - - reset_accumulator!(diag_accumulator, diag.reduction_time_func) - counters[diag] = 0 - return nothing - end + NVTX.@annotate function compute_callback(integrator) + diag.variable.compute!( + storage[diag], + integrator.u, + integrator.p, + integrator.t, + ) + + # accumulator[diag] is not defined for non-reductions + diag_accumulator = get(accumulators, diag, nothing) + + accumulate!( + diag_accumulator, + storage[diag], + diag.reduction_time_func, + ) + counters[diag] += 1 + return nothing + end + + NVTX.@annotate function output_callback(integrator) + # Move accumulated value to storage so that we can output it (for + # reductions). This provides a unified interface to pre_output_hook! and + # output, at the cost of an additional copy. If this copy turns out to be + # too expensive, we can move the if statement below. + isnothing(diag.reduction_time_func) || + (storage[diag] .= accumulators[diag]) + + # Any operations we have to perform before writing to output? + # Here is where we would divide by N to obtain an arithmetic average + diag.pre_output_hook!(storage[diag], counters[diag]) + + # Write to disk + write_field!( + diag.output_writer, + storage[diag], + diag, + integrator.u, + integrator.p, + integrator.t, + output_dir, + ) + + # accumulator[diag] is not defined for non-reductions + diag_accumulator = get(accumulators, diag, nothing) + + reset_accumulator!(diag_accumulator, diag.reduction_time_func) + counters[diag] = 0 + return nothing + end return [ AtmosCallback(compute_callback, EveryNSteps(diag.compute_every)), AtmosCallback(output_callback, EveryNSteps(diag.output_every)), ] end + nc_sync(integrator) = sync_nc_datasets(diagnostics, output_dir) # We need to flatten to tuples - return vcat(callback_arrays...) + return vcat( + callback_arrays..., + AtmosCallback(nc_sync, EveryNSteps(diag.output_every)), + ) end diff --git a/src/diagnostics/netcdf_writer.jl b/src/diagnostics/netcdf_writer.jl index c9f27d1e94..7f2a67d7a2 100644 --- a/src/diagnostics/netcdf_writer.jl +++ b/src/diagnostics/netcdf_writer.jl @@ -738,9 +738,6 @@ function save_diagnostic_to_disk!( elseif length(dim_names) == 1 v[time_index, :] = interpolated_field end - - # Write data to disk - NCDatasets.sync(writer.open_files[output_path]) return nothing end diff --git a/src/solver/type_getters.jl b/src/solver/type_getters.jl index eaa9cc4528..21b59035a6 100644 --- a/src/solver/type_getters.jl +++ b/src/solver/type_getters.jl @@ -886,6 +886,7 @@ function get_simulation(config::AtmosConfig) t_start, output_dir, ) + NCDatasets.sync(diag.output_writer.open_files[output_dir]) else # Add to the accumulator