Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add callback that estimates walltime #2428

Merged
merged 1 commit into from
Dec 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions perf/flame.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ simulation = CA.get_simulation(config)

import SciMLBase
SciMLBase.step!(integrator) # compile first
SciMLBase.step!(integrator) # compile print_walltime_estimate, which skips the first step to avoid timing compilation
CA.call_all_callbacks!(integrator) # compile callbacks
import Profile, ProfileCanvas
output_dir = job_id
Expand All @@ -36,17 +37,17 @@ ProfileCanvas.html_file(joinpath(output_dir, "flame.html"), results)
#####

allocs_limit = Dict()
allocs_limit["flame_perf_target"] = 147_520
allocs_limit["flame_perf_target_tracers"] = 179_776
allocs_limit["flame_perf_target"] = 148_256
allocs_limit["flame_perf_target_tracers"] = 180_512
allocs_limit["flame_perf_target_edmfx"] = 7_005_552
allocs_limit["flame_perf_diagnostics"] = 25_356_928
allocs_limit["flame_perf_target_diagnostic_edmfx"] = 1_309_968
allocs_limit["flame_perf_target_diagnostic_edmfx"] = 1_311_040
allocs_limit["flame_sphere_baroclinic_wave_rhoe_equilmoist_expvdiff"] =
4_018_252_656
allocs_limit["flame_perf_target_threaded"] = 1_276_864
allocs_limit["flame_perf_target_callbacks"] = 37_277_112
allocs_limit["flame_perf_gw"] = 3_226_428_736
allocs_limit["flame_perf_target_prognostic_edmfx_aquaplanet"] = 1_257_712
allocs_limit["flame_perf_gw"] = 3_226_429_472
allocs_limit["flame_perf_target_prognostic_edmfx_aquaplanet"] = 1_258_848

# Ideally, we would like to track all the allocations, but this becomes too
# expensive there is too many of them. Here, we set the default sample rate to
Expand Down
12 changes: 11 additions & 1 deletion src/cache/cache.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
struct AtmosCache{
FT <: AbstractFloat,
FTE,
WTE,
SD,
AM,
NUM,
Expand Down Expand Up @@ -28,6 +30,12 @@ struct AtmosCache{
"""Timestep of the simulation (in seconds). This is also used by callbacks and tendencies"""
dt::FT

"""End time of the simulation (in seconds). This used by callbacks"""
t_end::FTE

"""Walltime estimate"""
walltime_estimate::WTE

"""Start date (used for insolation)."""
start_date::SD

Expand Down Expand Up @@ -93,7 +101,7 @@ end

# The model also depends on f_plane_coriolis_frequency(params)
# This is a constant Coriolis frequency that is only used if space is flat
function build_cache(Y, atmos, params, surface_setup, dt, start_date)
function build_cache(Y, atmos, params, surface_setup, dt, t_end, start_date)
FT = eltype(params)

ᶜcoord = Fields.local_geometry_field(Y.c).coordinates
Expand Down Expand Up @@ -184,6 +192,8 @@ function build_cache(Y, atmos, params, surface_setup, dt, start_date)

args = (
dt,
t_end,
WallTimeEstimate(),
start_date,
atmos,
numerics,
Expand Down
88 changes: 88 additions & 0 deletions src/callbacks/callbacks.jl
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,94 @@ function save_restart_func(integrator, output_dir)
return nothing
end

Base.@kwdef mutable struct WallTimeEstimate
"""Number of calls to the callback"""
n_calls::Int = 0
"""Int indicating next time the callback will print to the log"""
n_next::Int = 1
"""Wall time of previous call to update `WallTimeEstimate`"""
t_wall_last::Float64 = -1
"""Sum of elapsed walltime over calls to `step!`"""
∑Δt_wall::Float64 = 0
"""Fixed increment to increase n_next by after 5% completion"""
n_fixed_increment::Float64 = -1
end
import Dates
function print_walltime_estimate(integrator)
(; walltime_estimate, dt, t_end) = integrator.p
wte = walltime_estimate

# Notes on `ready_to_report`
# - The very first call (when `n_calls == 0`), there's no elapsed
# times to report (and this is called during initialization,
# before `step!` has been called).
# - The second call (`n_calls == 1`) is after `step!` is called
# for the first time, but we don't want to report this since it
# includes compilation time.
# - Calls after that (`n_calls > 1`) exclude compilation and provide
# the best wall time estimates

ready_to_report = wte.n_calls > 1
if ready_to_report
# We need to account for skipping cost of `Δt_wall` when `n_calls == 1`:
factor = wte.n_calls == 2 ? 2 : 1
Δt_wall = factor * (time() - wte.t_wall_last)
else
wte.n_calls == 1 && @info "Progress: Completed first step"
Δt_wall = Float64(0)
wte.n_next = wte.n_calls + 1
end
wte.∑Δt_wall += Δt_wall
wte.t_wall_last = time()

if wte.n_calls == wte.n_next && ready_to_report
t = integrator.t
n_steps_total = ceil(Int, t_end / dt)
Sbozzolo marked this conversation as resolved.
Show resolved Hide resolved
n_steps = ceil(Int, t / dt)
wall_time_ave_per_step = wte.∑Δt_wall / n_steps
wall_time_ave_per_step_str = time_and_units_str(wall_time_ave_per_step)
percent_complete = round(t / t_end * 100; digits = 1)
n_steps_remaining = n_steps_total - n_steps
wall_time_remaining = wall_time_ave_per_step * n_steps_remaining
wall_time_remaining_str = time_and_units_str(wall_time_remaining)
wall_time_total =
time_and_units_str(wall_time_ave_per_step * n_steps_total)
wall_time_spent = time_and_units_str(wte.∑Δt_wall)
simulation_time = time_and_units_str(Float64(t))
sypd = round(
simulated_years_per_day(
EfficiencyStats((zero(t), t), wte.∑Δt_wall),
);
digits = 3,
)
estimated_finish_date =
Dates.now() + compound_period(wall_time_remaining, Dates.Second)
@info "Progress" simulation_time = simulation_time n_steps_completed =
n_steps wall_time_per_step = wall_time_ave_per_step_str wall_time_total =
wall_time_total wall_time_remaining = wall_time_remaining_str wall_time_spent =
wall_time_spent percent_complete = "$percent_complete%" sypd = sypd date_now =
Dates.now() estimated_finish_date = estimated_finish_date

# the first fixed increment is equivalent to
# doubling (which puts us at 10%), so we check
# if we're below 5%.
if percent_complete < 5
# doubling factor (to reduce log noise)
wte.n_next *= 2
else
if wte.n_fixed_increment == -1
wte.n_fixed_increment = wte.n_next
end
# increase by fixed increment after 10%
# completion to maintain logs after 50%.
wte.n_next += wte.n_fixed_increment
end
end
wte.n_calls += 1

return nothing
end

function gc_func(integrator)
num_pre = Base.gc_num()
alloc_since_last = (num_pre.allocd + num_pre.deferred_alloc) / 2^20
Expand Down
9 changes: 8 additions & 1 deletion src/solver/type_getters.jl
Original file line number Diff line number Diff line change
Expand Up @@ -455,7 +455,12 @@ function get_callbacks(parsed_args, sim_info, atmos, params, comms_ctx)
FT = eltype(params)
(; dt, output_dir) = sim_info

callbacks = ()
callbacks = (
call_every_n_steps(
(integrator) -> print_walltime_estimate(integrator);
skip_first = true,
),
)
dt_save_to_disk = time_to_seconds(parsed_args["dt_save_to_disk"])
if !(dt_save_to_disk == Inf)
callbacks = (
Expand Down Expand Up @@ -759,6 +764,7 @@ function get_simulation(config::AtmosConfig)
if sim_info.restart
(Y, t_start) = get_state_restart(config.comms_ctx)
spaces = get_spaces_restart(Y)
@warn "Progress estimates do not support restarted simulations"
else
spaces = get_spaces(config.parsed_args, params, config.comms_ctx)
Y = ICs.atmos_state(
Expand All @@ -779,6 +785,7 @@ function get_simulation(config::AtmosConfig)
params,
surface_setup,
sim_info.dt,
sim_info.t_end,
sim_info.start_date,
)
end
Expand Down
60 changes: 60 additions & 0 deletions src/utils/utilities.jl
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,66 @@ function prettytime(t)
return @sprintf("%.3f %s", value, units)
end

import Dates

time_per_time(::Type{P}, ::Type{P}) where {P} = 1
#=
define_time_per_times(periods)

Evals `time_per_time(::Type{<:Dates.Period},::Type{<:Dates.Period})`
for `Nanosecond, Microsecond, Millisecond, Second, Minute, Hour, Day, Week`
in a triangular fashion-- `time_per_time` provides the conversion factor
(e.g., `Nanosecond`s per `Second`) and all larger periods (but not smaller ones).
=#
function define_time_per_times(periods)
for i in eachindex(periods)
T, n = periods[i]
N = Int64(1)
for j in (i - 1):-1:firstindex(periods) # less-precise periods
Tc, nc = periods[j]
N *= nc
@eval time_per_time(::Type{Dates.$T}, ::Type{Dates.$Tc}) = $N
end
end
end

# From Dates
define_time_per_times([
(:Week, 7),
(:Day, 24),
(:Hour, 60),
(:Minute, 60),
(:Second, 1000),
(:Millisecond, 1000),
(:Microsecond, 1000),
(:Nanosecond, 1),
])

"""
time_and_units_str(x::Real)

Returns a truncated string of time and units,
given a time `x` in Seconds.
"""
time_and_units_str(x::Real) =
trunc_time(string(compound_period(x, Dates.Second)))

"""
compound_period(x::Real, ::Type{T}) where {T <: Dates.Period}

A canonicalized `Dates.CompoundPeriod` given a real value
`x`, and its units via the period type `T`.
"""
function compound_period(x::Real, ::Type{T}) where {T <: Dates.Period}
nf = time_per_time(Dates.Nanosecond, T)
return Dates.canonicalize(
Dates.CompoundPeriod(Dates.Nanosecond(ceil(x * nf))),
)
end

trunc_time(s::String) = count(',', s) > 1 ? join(split(s, ",")[1:2], ",") : s


function prettymemory(b)
if b < 1024
return string(b, " bytes")
Expand Down
2 changes: 2 additions & 0 deletions test/coupler_compatibility.jl
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ const T2 = 290
@. sfc_setup = (surface_state,)
p_overwritten = CA.AtmosCache(
p.dt,
simulation.t_end,
CA.WallTimeEstimate(),
p.start_date,
p.atmos,
p.numerics,
Expand Down
Loading