Skip to content


Implement greedy scheduler
Browse files Browse the repository at this point in the history
  • Loading branch information
MasonProtter committed Jan 31, 2024
1 parent d13312e commit e403e99
Show file tree
Hide file tree
Showing 3 changed files with 87 additions and 53 deletions.
7 changes: 7 additions & 0 deletions src/OhMyThreads.jl
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents of `A` in a non-deterministic order, and thus your reducing `op` **must** be [commutative]( in addition to being associative, or you could get incorrect results! This schedule will however work with non-`AbstractArray` iterables. If you use the `:greedy` scheduler, we strongly recommend you provide an `init` keyword argument.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](, but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument.
Expand Down Expand Up @@ -77,6 +78,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents of `A` in a non-deterministic order, and thus your reducing `op` **must** be [commutative]( in addition to being associative, or you could get incorrect results! This schedule will however work with non-`AbstractArray` iterables. If you use the `:greedy` scheduler, we strongly recommend you provide an `init` keyword argument.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](, but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument.
Expand Down Expand Up @@ -117,6 +119,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents of `A` in a non-deterministic order, and thus your reducing `op` **must** be [commutative]( in addition to being associative, or you could get incorrect results! This schedule will however work with non-`AbstractArray` iterables. If you use the `:greedy` scheduler, we strongly recommend you provide an `init` keyword argument.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
- `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only
needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](, but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument.
Expand All @@ -142,6 +145,7 @@ A multithreaded function like `Base.foreach`. Apply `f` to each element of `A` o
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
function tforeach end
Expand All @@ -165,6 +169,7 @@ fewer allocations than the version where `OutputElementType` is not specified.
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule only works if the `OutputElementType` argument is provided.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
function tmap end
Expand All @@ -185,6 +190,7 @@ of `out[i] = f(A[i])` for each index `i` of `A` and `out`.
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
function tmap! end
Expand All @@ -203,6 +209,7 @@ inputs. The optional argument `OutputElementType` will select a specific element
- `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of
- `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system.
- `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time.
- `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule only works if the `OutputElementType` argument is provided.
- `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat]( processes running on the interactive threadpool.
function tcollect end
Expand Down
56 changes: 39 additions & 17 deletions src/implementation.jl
Original file line number Diff line number Diff line change
Expand Up @@ -14,41 +14,60 @@ function tmapreduce(f, op, Arrs...;
if schedule === :dynamic
_tmapreduce(f, op, Arrs, outputtype, nchunks, split, :default; kwargs...)
elseif schedule === :interactive
_tmapreduce(f, op, Arrs, outputtype, nchunks, split, :interactive; kwargs...)
_tmapreduce(f, op, Arrs, outputtype, nchunks, split, :default, mapreduce_kwargs)
elseif schedule === :interactive
_tmapreduce(f, op, Arrs, outputtype, nchunks, split, :interactive, mapreduce_kwargs)
elseif schedule === :greedy
_tmapreduce_greedy(f, op, Arrs, outputtype, nchunks, split, mapreduce_kwargs)
elseif schedule === :static
_tmapreduce_static(f, op, Arrs, outputtype, nchunks, split; kwargs...)
_tmapreduce_static(f, op, Arrs, outputtype, nchunks, split, mapreduce_kwargs)
@noinline schedule_err(s) = error(ArgumentError("Invalid schedule option: $s, expected :dynamic or :static."))
@noinline schedule_err(s) = error(ArgumentError("Invalid schedule option: $s, expected :dynamic, :interactive, :greedy, or :static."))

treducemap(op, f, A...; kwargs...) = tmapreduce(f, op, A...; kwargs...)

function _tmapreduce(f, op, Arrs, ::Type{OutputType}, nchunks, split, schedule; kwargs...)::OutputType where {OutputType}
function _tmapreduce(f, op, Arrs, ::Type{OutputType}, nchunks, split, threadpool, mapreduce_kwargs)::OutputType where {OutputType}
tasks = map(chunks(first(Arrs); n=nchunks, split)) do inds
args = map(A -> A[inds], Arrs)
@spawn schedule mapreduce(f, op, args...; kwargs...)
@spawn threadpool mapreduce(f, op, args...; $mapreduce_kwargs...)
mapreduce(fetch, op, tasks)

function _tmapreduce_static(f, op, Arrs, ::Type{OutputType}, nchunks, split; kwargs...) where {OutputType}
nt = nthreads()
if nchunks > nt
# We could implement strategies, like round-robin, in the future
throw(ArgumentError("We currently only support `nchunks <= nthreads()` for static scheduling."))
function _tmapreduce_greedy(f, op, Arrs, ::Type{OutputType}, nchunks, split, mapreduce_kwargs)::OutputType where {OutputType}
if Base.IteratorSize(first(Arrs)) isa Base.SizeUnknown
ntasks = nchunks
nchunks > 0 || throw("Error: nchunks must be a positive integer")
ch = Channel{Tuple{eltype.(Arrs)...}}(0; spawn=true) do ch
for args zip(Arrs...)
put!(ch, args)
tasks = map(1:ntasks) do _
@spawn mapreduce(op, ch; mapreduce_kwargs...) do args
tasks = map(enumerate(chunks(first(Arrs); n=nchunks, split))) do (c, inds)
mapreduce(fetch, op, tasks; mapreduce_kwargs...)

function _tmapreduce_static(f, op, Arrs, ::Type{OutputType}, nchunks, split, mapreduce_kwargs) where {OutputType}
nchunks > 0 || throw("Error: nchunks must be a positive integer")
n = min(nthreads(), nchunks) # We could implement strategies, like round-robin, in the future
tasks = map(enumerate(chunks(first(Arrs); n, split))) do (c, inds)
tid = @inbounds nthtid(c)
args = map(A -> A[inds], Arrs)
@spawnat tid mapreduce(f, op, args...; kwargs...)
@spawnat tid mapreduce(f, op, args...; mapreduce_kwargs...)
mapreduce(fetch, op, tasks)
Expand Down Expand Up @@ -79,10 +98,13 @@ function tmap(f, ::Type{T}, A::AbstractArray, _Arrs::AbstractArray...; kwargs...
tmap!(f, similar(A, T), Arrs...; kwargs...)

function tmap(f, A::AbstractArray, _Arrs::AbstractArray...; nchunks::Int=nthreads(), kwargs...)
function tmap(f, A::AbstractArray, _Arrs::AbstractArray...; nchunks::Int=nthreads(), schedule=:dynamic, kwargs...)
Arrs = (A, _Arrs...)
the_chunks = collect(chunks(A; n=nchunks))
if schedule == :greedy
error("Greedy schedules are not supported with `tmap` unless you provide an `OutputElementType` argument, since the greedy schedule requires a commutative reducing operator.")
# It's vital that we force split=:batch here because we're not doing a commutative operation!
v = tmapreduce(append!!, the_chunks; kwargs..., nchunks, split=:batch) do inds
args = map(A -> @view(A[inds]), Arrs)
Expand Down
77 changes: 41 additions & 36 deletions test/runtests.jl
Original file line number Diff line number Diff line change
@@ -1,43 +1,48 @@
using Test, OhMyThreads

@testset "Basics" begin
for (~, f, op, itrs) [
(isapprox, sin∘*, +, (rand(ComplexF64, 10, 10), rand(-10:10, 10, 10))),
(isapprox, cos, max, (1:100000,)),
(==, round, vcat, (randn(1000),)),
(==, last, *, ([1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"],))
@testset for schedule (:static, :dynamic, :interactive)
@testset for split (:batch, :scatter)
if split == :scatter # scatter only works for commutative operators
if op (vcat, *)
for nchunks (1, 2, 6, 10)
if schedule == :static && nchunks > Threads.nthreads()
kwargs = (; schedule, split, nchunks)
mapreduce_f_op_itr = mapreduce(f, op, itrs...)
@test tmapreduce(f, op, itrs...; kwargs...) ~ mapreduce_f_op_itr
@test treducemap(op, f, itrs...; kwargs...) ~ mapreduce_f_op_itr
@test treduce(op, f.(itrs...); kwargs...) ~ mapreduce_f_op_itr
sets_to_test = [
(~=isapprox, f=sin∘*, op=+, itrs = (rand(ComplexF64, 10, 10), rand(-10:10, 10, 10)), init=complex(0.0))
(~=isapprox, f=cos, op=max, itrs = (1:100000,), init=0.0)
(~=(==), f=round, op=vcat, itrs = (randn(1000),), init=Float64[])
(~=(==), f=last, op=*, itrs = ([1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"],), init="")

map_f_itr = map(f, itrs...)
@test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr)
@test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr)
@test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr)

@test tmap(f, itrs...; kwargs...) ~ map_f_itr
@test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
@test tcollect(f.(itrs...); kwargs...) ~ map_f_itr

RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...})

@test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr
@test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
@test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr
@testset "Basics" begin
for (; ~, f, op, itrs, init) sets_to_test
@testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin
@testset for schedule (:static, :dynamic, :interactive, :greedy)
@testset for split (:batch, :scatter)
for nchunks (1, 2, 6)
rand() < 0.25 && continue # we don't really want full coverage here

kwargs = (; schedule, split, nchunks)
if (split == :scatter || schedule == :greedy) || op (vcat, *)
# scatter and greedy only works for commutative operators!
mapreduce_f_op_itr = mapreduce(f, op, itrs...)
@test tmapreduce(f, op, itrs...; init, kwargs...) ~ mapreduce_f_op_itr
@test treducemap(op, f, itrs...; init, kwargs...) ~ mapreduce_f_op_itr
@test treduce(op, f.(itrs...); init, kwargs...) ~ mapreduce_f_op_itr

map_f_itr = map(f, itrs...)
@test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr)
@test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr)
@test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr)

RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...})

@test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr
@test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
@test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr

if schedule !== :greedy
@test tmap(f, itrs...; kwargs...) ~ map_f_itr
@test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr
@test tcollect(f.(itrs...); kwargs...) ~ map_f_itr
Expand Down

0 comments on commit e403e99

Please sign in to comment.