Skip to content

Commit

Permalink
Add callback that estimates walltime
Browse files Browse the repository at this point in the history
Increase allocation limits

Bump allocation limits

Finish comment in flame graph

Fix doc string

Improve names, add docs

Improve names

Improve docs and names

Improve docs, qualify Period

Improve names

Add comment for eval function

Maintain log after 50%

Add warning for restarted simulations
  • Loading branch information
charleskawczynski committed Dec 15, 2023
1 parent 06a3da7 commit 5fc53d6
Show file tree
Hide file tree
Showing 6 changed files with 175 additions and 7 deletions.
11 changes: 6 additions & 5 deletions perf/flame.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ simulation = CA.get_simulation(config)

import SciMLBase
SciMLBase.step!(integrator) # compile first
SciMLBase.step!(integrator) # compile print_walltime_estimate, which skips the first step to avoid timing compilation
CA.call_all_callbacks!(integrator) # compile callbacks
import Profile, ProfileCanvas
output_dir = job_id
Expand All @@ -36,17 +37,17 @@ ProfileCanvas.html_file(joinpath(output_dir, "flame.html"), results)
#####

allocs_limit = Dict()
allocs_limit["flame_perf_target"] = 147_520
allocs_limit["flame_perf_target_tracers"] = 179_776
allocs_limit["flame_perf_target"] = 148_256
allocs_limit["flame_perf_target_tracers"] = 180_512
allocs_limit["flame_perf_target_edmfx"] = 7_005_552
allocs_limit["flame_perf_diagnostics"] = 25_356_928
allocs_limit["flame_perf_target_diagnostic_edmfx"] = 1_309_968
allocs_limit["flame_perf_target_diagnostic_edmfx"] = 1_311_040
allocs_limit["flame_sphere_baroclinic_wave_rhoe_equilmoist_expvdiff"] =
4_018_252_656
allocs_limit["flame_perf_target_threaded"] = 1_276_864
allocs_limit["flame_perf_target_callbacks"] = 37_277_112
allocs_limit["flame_perf_gw"] = 3_226_428_736
allocs_limit["flame_perf_target_prognostic_edmfx_aquaplanet"] = 1_257_712
allocs_limit["flame_perf_gw"] = 3_226_429_472
allocs_limit["flame_perf_target_prognostic_edmfx_aquaplanet"] = 1_258_848

# Ideally, we would like to track all the allocations, but this becomes too
# expensive there is too many of them. Here, we set the default sample rate to
Expand Down
12 changes: 11 additions & 1 deletion src/cache/cache.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
struct AtmosCache{
FT <: AbstractFloat,
FTE,
WTE,
SD,
AM,
NUM,
Expand Down Expand Up @@ -28,6 +30,12 @@ struct AtmosCache{
"""Timestep of the simulation (in seconds). This is also used by callbacks and tendencies"""
dt::FT

"""End time of the simulation (in seconds). This used by callbacks"""
t_end::FTE

"""Walltime estimate"""
walltime_estimate::WTE

"""Start date (used for insolation)."""
start_date::SD

Expand Down Expand Up @@ -93,7 +101,7 @@ end

# The model also depends on f_plane_coriolis_frequency(params)
# This is a constant Coriolis frequency that is only used if space is flat
function build_cache(Y, atmos, params, surface_setup, dt, start_date)
function build_cache(Y, atmos, params, surface_setup, dt, t_end, start_date)
FT = eltype(params)

ᶜcoord = Fields.local_geometry_field(Y.c).coordinates
Expand Down Expand Up @@ -184,6 +192,8 @@ function build_cache(Y, atmos, params, surface_setup, dt, start_date)

args = (
dt,
t_end,
WallTimeEstimate(),
start_date,
atmos,
numerics,
Expand Down
88 changes: 88 additions & 0 deletions src/callbacks/callbacks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,94 @@ function save_restart_func(integrator, output_dir)
return nothing
end

Base.@kwdef mutable struct WallTimeEstimate
"""Number of calls to the callback"""
n_calls::Int = 0
"""Int indicating next time the callback will print to the log"""
n_next::Int = 1
"""Wall time of previous call to update `WallTimeEstimate`"""
t_wall_last::Float64 = -1
"""Sum of elapsed walltime over calls to `step!`"""
∑Δt_wall::Float64 = 0
"""Fixed increment to increase n_next by after 5% completion"""
n_fixed_increment::Float64 = -1
end
import Dates
function print_walltime_estimate(integrator)
(; walltime_estimate, dt, t_end) = integrator.p
wte = walltime_estimate

# Notes on `ready_to_report`
# - The very first call (when `n_calls == 0`), there's no elapsed
# times to report (and this is called during initialization,
# before `step!` has been called).
# - The second call (`n_calls == 1`) is after `step!` is called
# for the first time, but we don't want to report this since it
# includes compilation time.
# - Calls after that (`n_calls > 1`) exclude compilation and provide
# the best wall time estimates

ready_to_report = wte.n_calls > 1
if ready_to_report
# We need to account for skipping cost of `Δt_wall` when `n_calls == 1`:
factor = wte.n_calls == 2 ? 2 : 1
Δt_wall = factor * (time() - wte.t_wall_last)
else
wte.n_calls == 1 && @info "Progress: Completed first step"
Δt_wall = Float64(0)
wte.n_next = wte.n_calls + 1
end
wte.∑Δt_wall += Δt_wall
wte.t_wall_last = time()

if wte.n_calls == wte.n_next && ready_to_report
t = integrator.t
n_steps_total = ceil(Int, t_end / dt)
n_steps = ceil(Int, t / dt)
wall_time_ave_per_step = wte.∑Δt_wall / n_steps
wall_time_ave_per_step_str = time_and_units_str(wall_time_ave_per_step)
percent_complete = round(t / t_end * 100; digits = 1)
n_steps_remaining = n_steps_total - n_steps
wall_time_remaining = wall_time_ave_per_step * n_steps_remaining
wall_time_remaining_str = time_and_units_str(wall_time_remaining)
wall_time_total =
time_and_units_str(wall_time_ave_per_step * n_steps_total)
wall_time_spent = time_and_units_str(wte.∑Δt_wall)
simulation_time = time_and_units_str(Float64(t))
sypd = round(
simulated_years_per_day(
EfficiencyStats((zero(t), t), wte.∑Δt_wall),
);
digits = 3,
)
estimated_finish_date =
Dates.now() + compound_period(wall_time_remaining, Dates.Second)
@info "Progress" simulation_time = simulation_time n_steps_completed =
n_steps wall_time_per_step = wall_time_ave_per_step_str wall_time_total =
wall_time_total wall_time_remaining = wall_time_remaining_str wall_time_spent =
wall_time_spent percent_complete = "$percent_complete%" sypd = sypd date_now =
Dates.now() estimated_finish_date = estimated_finish_date

# the first fixed increment is equivalent to
# doubling (which puts us at 10%), so we check
# if we're below 5%.
if percent_complete < 5
# doubling factor (to reduce log noise)
wte.n_next *= 2
else
if wte.n_fixed_increment == -1
wte.n_fixed_increment = wte.n_next
end
# increase by fixed increment after 10%
# completion to maintain logs after 50%.
wte.n_next += wte.n_fixed_increment
end
end
wte.n_calls += 1

return nothing
end

function gc_func(integrator)
num_pre = Base.gc_num()
alloc_since_last = (num_pre.allocd + num_pre.deferred_alloc) / 2^20
Expand Down
9 changes: 8 additions & 1 deletion src/solver/type_getters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,12 @@ function get_callbacks(parsed_args, sim_info, atmos, params, comms_ctx)
FT = eltype(params)
(; dt, output_dir) = sim_info

callbacks = ()
callbacks = (
call_every_n_steps(
(integrator) -> print_walltime_estimate(integrator);
skip_first = true,
),
)
dt_save_to_disk = time_to_seconds(parsed_args["dt_save_to_disk"])
if !(dt_save_to_disk == Inf)
callbacks = (
Expand Down Expand Up @@ -759,6 +764,7 @@ function get_simulation(config::AtmosConfig)
if sim_info.restart
(Y, t_start) = get_state_restart(config.comms_ctx)
spaces = get_spaces_restart(Y)
@warn "Progress estimates do not support restarted simulations"
else
spaces = get_spaces(config.parsed_args, params, config.comms_ctx)
Y = ICs.atmos_state(
Expand All @@ -779,6 +785,7 @@ function get_simulation(config::AtmosConfig)
params,
surface_setup,
sim_info.dt,
sim_info.t_end,
sim_info.start_date,
)
end
Expand Down
60 changes: 60 additions & 0 deletions src/utils/utilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,66 @@ function prettytime(t)
return @sprintf("%.3f %s", value, units)
end

import Dates

time_per_time(::Type{P}, ::Type{P}) where {P} = 1
#=
define_time_per_times(periods)
Evals `time_per_time(::Type{<:Dates.Period},::Type{<:Dates.Period})`
for `Nanosecond, Microsecond, Millisecond, Second, Minute, Hour, Day, Week`
in a triangular fashion-- `time_per_time` provides the conversion factor
(e.g., `Nanosecond`s per `Second`) and all larger periods (but not smaller ones).
=#
function define_time_per_times(periods)
for i in eachindex(periods)
T, n = periods[i]
N = Int64(1)
for j in (i - 1):-1:firstindex(periods) # less-precise periods
Tc, nc = periods[j]
N *= nc
@eval time_per_time(::Type{Dates.$T}, ::Type{Dates.$Tc}) = $N
end
end
end

# From Dates
define_time_per_times([
(:Week, 7),
(:Day, 24),
(:Hour, 60),
(:Minute, 60),
(:Second, 1000),
(:Millisecond, 1000),
(:Microsecond, 1000),
(:Nanosecond, 1),
])

"""
time_and_units_str(x::Real)
Returns a truncated string of time and units,
given a time `x` in Seconds.
"""
time_and_units_str(x::Real) =
trunc_time(string(compound_period(x, Dates.Second)))

"""
compound_period(x::Real, ::Type{T}) where {T <: Dates.Period}
A canonicalized `Dates.CompoundPeriod` given a real value
`x`, and its units via the period type `T`.
"""
function compound_period(x::Real, ::Type{T}) where {T <: Dates.Period}
nf = time_per_time(Dates.Nanosecond, T)
return Dates.canonicalize(
Dates.CompoundPeriod(Dates.Nanosecond(ceil(x * nf))),
)
end

trunc_time(s::String) = count(',', s) > 1 ? join(split(s, ",")[1:2], ",") : s


function prettymemory(b)
if b < 1024
return string(b, " bytes")
Expand Down
2 changes: 2 additions & 0 deletions test/coupler_compatibility.jl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const T2 = 290
@. sfc_setup = (surface_state,)
p_overwritten = CA.AtmosCache(
p.dt,
simulation.t_end,
CA.WallTimeEstimate(),
p.start_date,
p.atmos,
p.numerics,
Expand Down

0 comments on commit 5fc53d6

Please sign in to comment.