diff --git a/src/OhMyThreads.jl b/src/OhMyThreads.jl index 7db5788b..473d65b1 100644 --- a/src/OhMyThreads.jl +++ b/src/OhMyThreads.jl @@ -38,6 +38,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents of `A` in a non-deterministic order, and thus your reducing `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! This schedule will however work with non-`AbstractArray` iterables. If you use the `:greedy` scheduler, we strongly recommend you provide an `init` keyword argument. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. - `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. @@ -77,6 +78,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents of `A` in a non-deterministic order, and thus your reducing `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! This schedule will however work with non-`AbstractArray` iterables. If you use the `:greedy` scheduler, we strongly recommend you provide an `init` keyword argument. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. - `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. @@ -117,6 +119,7 @@ This data is divided into chunks to be worked on in parallel using [ChunkSplitte - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule will read from the contents of `A` in a non-deterministic order, and thus your reducing `op` **must** be [commutative](https://en.wikipedia.org/wiki/Commutative_property) in addition to being associative, or you could get incorrect results! This schedule will however work with non-`AbstractArray` iterables. If you use the `:greedy` scheduler, we strongly recommend you provide an `init` keyword argument. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. - `outputtype::Type` (default `Any`) will work as the asserted output type of parallel calculations. This is typically only needed if you are using a `:static` schedule, since the `:dynamic` schedule is uses [StableTasks.jl](https://github.com/MasonProtter/StableTasks.jl), but if you experience problems with type stability, you may be able to recover it with the `outputtype` keyword argument. @@ -142,6 +145,7 @@ A multithreaded function like `Base.foreach`. Apply `f` to each element of `A` o - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tforeach end @@ -165,6 +169,7 @@ fewer allocations than the version where `OutputElementType` is not specified. - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule only works if the `OutputElementType` argument is provided. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tmap end @@ -185,6 +190,7 @@ of `out[i] = f(A[i])` for each index `i` of `A` and `out`. - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tmap! end @@ -203,6 +209,7 @@ inputs. The optional argument `OutputElementType` will select a specific element - `schedule::Symbol` (default `:dynamic`), determines how the parallel portions of the calculation are scheduled. Options are one of - `:dynamic`: generally preferred since it is more flexible and better at load balancing, and won't interfere with other multithreaded functions which may be running on the system. - `:static`: can sometimes be more performant than `:dynamic` when the time it takes to complete a step of the calculation is highly uniform, and no other parallel functions are running at the same time. + - `:greedy`: best option for load-balancing slower, uneven computations, but does carry some additional overhead. This schedule only works if the `OutputElementType` argument is provided. - `:interactive`: like `:dynamic` but runs on the high-priority interactive threadpool. This should be used very carefully since tasks on this threadpool should not be allowed to run for a long time without `yield`ing as it can interfere with [heartbeat](https://en.wikipedia.org/wiki/Heartbeat_(computing)) processes running on the interactive threadpool. """ function tcollect end diff --git a/src/implementation.jl b/src/implementation.jl index a2a6ee0c..69846c52 100644 --- a/src/implementation.jl +++ b/src/implementation.jl @@ -14,41 +14,60 @@ function tmapreduce(f, op, Arrs...; split::Symbol=:batch, schedule::Symbol=:dynamic, outputtype::Type=Any, - kwargs...) + mapreduce_kwargs...) if schedule === :dynamic - _tmapreduce(f, op, Arrs, outputtype, nchunks, split, :default; kwargs...) - elseif schedule === :interactive - _tmapreduce(f, op, Arrs, outputtype, nchunks, split, :interactive; kwargs...) + _tmapreduce(f, op, Arrs, outputtype, nchunks, split, :default, mapreduce_kwargs) + elseif schedule === :interactive + _tmapreduce(f, op, Arrs, outputtype, nchunks, split, :interactive, mapreduce_kwargs) + elseif schedule === :greedy + _tmapreduce_greedy(f, op, Arrs, outputtype, nchunks, split, mapreduce_kwargs) elseif schedule === :static - _tmapreduce_static(f, op, Arrs, outputtype, nchunks, split; kwargs...) + _tmapreduce_static(f, op, Arrs, outputtype, nchunks, split, mapreduce_kwargs) else schedule_err(schedule) end end -@noinline schedule_err(s) = error(ArgumentError("Invalid schedule option: $s, expected :dynamic or :static.")) +@noinline schedule_err(s) = error(ArgumentError("Invalid schedule option: $s, expected :dynamic, :interactive, :greedy, or :static.")) treducemap(op, f, A...; kwargs...) = tmapreduce(f, op, A...; kwargs...) -function _tmapreduce(f, op, Arrs, ::Type{OutputType}, nchunks, split, schedule; kwargs...)::OutputType where {OutputType} +function _tmapreduce(f, op, Arrs, ::Type{OutputType}, nchunks, split, threadpool, mapreduce_kwargs)::OutputType where {OutputType} check_all_have_same_indices(Arrs) tasks = map(chunks(first(Arrs); n=nchunks, split)) do inds args = map(A -> A[inds], Arrs) - @spawn schedule mapreduce(f, op, args...; kwargs...) + @spawn threadpool mapreduce(f, op, args...; $mapreduce_kwargs...) end mapreduce(fetch, op, tasks) end -function _tmapreduce_static(f, op, Arrs, ::Type{OutputType}, nchunks, split; kwargs...) where {OutputType} - nt = nthreads() - check_all_have_same_indices(Arrs) - if nchunks > nt - # We could implement strategies, like round-robin, in the future - throw(ArgumentError("We currently only support `nchunks <= nthreads()` for static scheduling.")) +function _tmapreduce_greedy(f, op, Arrs, ::Type{OutputType}, nchunks, split, mapreduce_kwargs)::OutputType where {OutputType} + if Base.IteratorSize(first(Arrs)) isa Base.SizeUnknown + ntasks = nchunks + else + check_all_have_same_indices(Arrs) + nchunks > 0 || throw("Error: nchunks must be a positive integer") + end + ch = Channel{Tuple{eltype.(Arrs)...}}(0; spawn=true) do ch + for args ∈ zip(Arrs...) + put!(ch, args) + end + end + tasks = map(1:ntasks) do _ + @spawn mapreduce(op, ch; mapreduce_kwargs...) do args + f(args...) + end end - tasks = map(enumerate(chunks(first(Arrs); n=nchunks, split))) do (c, inds) + mapreduce(fetch, op, tasks; mapreduce_kwargs...) +end + +function _tmapreduce_static(f, op, Arrs, ::Type{OutputType}, nchunks, split, mapreduce_kwargs) where {OutputType} + check_all_have_same_indices(Arrs) + nchunks > 0 || throw("Error: nchunks must be a positive integer") + n = min(nthreads(), nchunks) # We could implement strategies, like round-robin, in the future + tasks = map(enumerate(chunks(first(Arrs); n, split))) do (c, inds) tid = @inbounds nthtid(c) args = map(A -> A[inds], Arrs) - @spawnat tid mapreduce(f, op, args...; kwargs...) + @spawnat tid mapreduce(f, op, args...; mapreduce_kwargs...) end mapreduce(fetch, op, tasks) end @@ -79,10 +98,13 @@ function tmap(f, ::Type{T}, A::AbstractArray, _Arrs::AbstractArray...; kwargs... tmap!(f, similar(A, T), Arrs...; kwargs...) end -function tmap(f, A::AbstractArray, _Arrs::AbstractArray...; nchunks::Int=nthreads(), kwargs...) +function tmap(f, A::AbstractArray, _Arrs::AbstractArray...; nchunks::Int=nthreads(), schedule=:dynamic, kwargs...) Arrs = (A, _Arrs...) check_all_have_same_indices(Arrs) the_chunks = collect(chunks(A; n=nchunks)) + if schedule == :greedy + error("Greedy schedules are not supported with `tmap` unless you provide an `OutputElementType` argument, since the greedy schedule requires a commutative reducing operator.") + end # It's vital that we force split=:batch here because we're not doing a commutative operation! v = tmapreduce(append!!, the_chunks; kwargs..., nchunks, split=:batch) do inds args = map(A -> @view(A[inds]), Arrs) diff --git a/test/runtests.jl b/test/runtests.jl index f82c2140..244e11c1 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,43 +1,48 @@ using Test, OhMyThreads -@testset "Basics" begin - for (~, f, op, itrs) ∈ [ - (isapprox, sin∘*, +, (rand(ComplexF64, 10, 10), rand(-10:10, 10, 10))), - (isapprox, cos, max, (1:100000,)), - (==, round, vcat, (randn(1000),)), - (==, last, *, ([1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"],)) - ] - @testset for schedule ∈ (:static, :dynamic, :interactive) - @testset for split ∈ (:batch, :scatter) - if split == :scatter # scatter only works for commutative operators - if op ∈ (vcat, *) - continue - end - end - for nchunks ∈ (1, 2, 6, 10) - if schedule == :static && nchunks > Threads.nthreads() - continue - end - kwargs = (; schedule, split, nchunks) - mapreduce_f_op_itr = mapreduce(f, op, itrs...) - @test tmapreduce(f, op, itrs...; kwargs...) ~ mapreduce_f_op_itr - @test treducemap(op, f, itrs...; kwargs...) ~ mapreduce_f_op_itr - @test treduce(op, f.(itrs...); kwargs...) ~ mapreduce_f_op_itr +sets_to_test = [ + (~=isapprox, f=sin∘*, op=+, itrs = (rand(ComplexF64, 10, 10), rand(-10:10, 10, 10)), init=complex(0.0)) + (~=isapprox, f=cos, op=max, itrs = (1:100000,), init=0.0) + (~=(==), f=round, op=vcat, itrs = (randn(1000),), init=Float64[]) + (~=(==), f=last, op=*, itrs = ([1=>"a", 2=>"b", 3=>"c", 4=>"d", 5=>"e"],), init="") +] - map_f_itr = map(f, itrs...) - @test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr) - @test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr) - @test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr) - @test tmap(f, itrs...; kwargs...) ~ map_f_itr - @test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr - @test tcollect(f.(itrs...); kwargs...) ~ map_f_itr - - RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...}) - - @test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr - @test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr - @test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr +@testset "Basics" begin + for (; ~, f, op, itrs, init) ∈ sets_to_test + @testset "f=$f, op=$op, itrs::$(typeof(itrs))" begin + @testset for schedule ∈ (:static, :dynamic, :interactive, :greedy) + @testset for split ∈ (:batch, :scatter) + for nchunks ∈ (1, 2, 6) + rand() < 0.25 && continue # we don't really want full coverage here + + kwargs = (; schedule, split, nchunks) + if (split == :scatter || schedule == :greedy) || op ∉ (vcat, *) + # scatter and greedy only works for commutative operators! + else + mapreduce_f_op_itr = mapreduce(f, op, itrs...) + @test tmapreduce(f, op, itrs...; init, kwargs...) ~ mapreduce_f_op_itr + @test treducemap(op, f, itrs...; init, kwargs...) ~ mapreduce_f_op_itr + @test treduce(op, f.(itrs...); init, kwargs...) ~ mapreduce_f_op_itr + end + + map_f_itr = map(f, itrs...) + @test all(tmap(f, Any, itrs...; kwargs...) .~ map_f_itr) + @test all(tcollect(Any, (f(x...) for x in collect(zip(itrs...))); kwargs...) .~ map_f_itr) + @test all(tcollect(Any, f.(itrs...); kwargs...) .~ map_f_itr) + + RT = Core.Compiler.return_type(f, Tuple{eltype.(itrs)...}) + + @test tmap(f, RT, itrs...; kwargs...) ~ map_f_itr + @test tcollect(RT, (f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr + @test tcollect(RT, f.(itrs...); kwargs...) ~ map_f_itr + + if schedule !== :greedy + @test tmap(f, itrs...; kwargs...) ~ map_f_itr + @test tcollect((f(x...) for x in collect(zip(itrs...))); kwargs...) ~ map_f_itr + @test tcollect(f.(itrs...); kwargs...) ~ map_f_itr + end + end end end end