Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unify multithreading handling #81

Open
ctrueden opened this issue Mar 20, 2019 · 7 comments
Open

Unify multithreading handling #81

ctrueden opened this issue Mar 20, 2019 · 7 comments

Comments

@ctrueden
Copy link
Member

Many of the implementations in imglib2-algorithm have an int numThreads parameter for controlling how many threads to spawn. Others take an ExecutorService. Accepting an ExecutorService is more flexible and would mesh better with SciJava Common’s ThreadService and therefore ImageJ Ops. See also imagej/imagej-ops#599. On the other hand, an ExecutorService alone is not enough to identify the intended number of threads to use when partitioning the task.

Note that currently, we often infer numTasks from Runtime.availableProcessors() by default, which is not a good practice in general because multiple tasks may be ongoing simultaneously, which can result in more threads than processors.

@tpietzsch points out:

  • In ForkJoinPool there is a "parallelism level" that corresponds to this roughly. We should consider using ForkJoinPool throughout.
  • ForkJoinPool extends ExecutorService, so it would at least be backwards compatible in some ways.
  • ForkJoinPool.getParallelism() could replace / augment numTasks.
  • ForkJoinPool supports work-stealing which would be important if submitted task spawn new subtasks for whose completion they wait. This allows handing down pool through algorithms that parallelise in chunks and for each chunk call another algorithm that parallelizes internally. (With handing down ExecutorService that wouldn't work.)
  • Also there is the common ForkJoinPool.commonPool() that is used by streams etc. We could fall back to this there is no user-provided pool.

Based on a chat in gitter.

@maarzt
Copy link
Contributor

maarzt commented Apr 12, 2019

I see mayor disadvantages with the suggested solutions:

  • I tried to use ForkJoinPool, it's very hard to understand how it works. But there's a trick. Use parallel streams instead. They will use ForkJoinPool automatically.
  • ExecutorService has the disadvantage of not telling you how many threads there are. ExecutorService blocks for nested parallelization.
  • numThreads has the disadvantage, that it doesn't allow to specify or reuse, a ExecutorService or thread pool.
  • There is no single threaded implementation for ForkJoinPool or ExecutorService. So you alway's have the overhead of using threads.

My idea is to not use numThreads, ExecutorService or ForkJoinPool directly, but instead use an interface:

public interface MultiThreadSetting
{
	
	boolean useMultiThreading(); // returns true if multi-threading should be used

	int suggestNumberOfTasks(); // returns 1 for single-threading, or 4 * availableProccessors() for multi threading, you may also manually specify this value

	<T> void forEach( Collection< T > values, Consumer< T > action ); // Executes the action for each value, sequentially for single-threading, in parallel for multi-threading.
}

See PR imglib/imglib2#250

For this interface I have three implementations:

  1. Default implementation that uses the common ForkJoinPool.
  2. An implementation based on ExecutorService.
  3. An implementation that uses no multi-threading. This is single threaded, and completely avoids the overhead that you get from using ForkJoinPool or ExecutorService.

@ctrueden
Copy link
Member Author

ctrueden commented Apr 12, 2019

Thanks a lot @maarzt for digging in to this issue!

I am 100% on board with using our own interface. However, the tricky question is: where shall this interface live? Because I want to use it in both SciJava Common and ImgLib2. Do you have a preference? @tpietzsch what do you think? Maybe a new scijava-threads component?

And @maarzt: have you looked in detail at the SciJava ThreadService? How would you propose we update it to slot in nicely with this new approach?

@tpietzsch
Copy link
Member

@maarzt some quick comments

  • int suggestNumberOfTasks() is not good enough.
    The decision on how many subtasks to create should be made at the point where MultiThreadSetting is used. Taking into account availableProcessors but also other things that might be only locally available. I would rather put a method int getParallelism() like in ForkJoinPool. (This is more or less a replacement for the availableProcessors which might be too global.)

  • void forEach( Collection< T > values, Consumer< T > action );
    I don't like this either. It makes cases very cumbersome where you care about the result of the action. There you have to bake both input and holder for the result into T. I also don't get why you would force to use the same action for each value. What happens if you have different actions? I assume calling foreach(a,b); foreach(c,d); would always block foreach(c,d) until foreach(a,b) has completed (I guess it would have to). So you would need to also multiplex multiple actions into action. I like a Future<V> submit(Callable<V>) interface much better. It would also be what is done everywhere in the JDK...

  • I would not use the name MultiThreadSetting for something that is used to execute stuff. Setting implies (at least to me) a passive, probably immutable object.

@tpietzsch
Copy link
Member

@ctrueden @axtimwalde @hanslovsky @StephanPreibisch
We discussed this and @maarzt came up with a pretty fantastic (IMO) proposal.
You can have a look at it here (multi-threading branch).

The gist of it is that parallelization settings are available as ThreadLocal inside algorithms without passing any ExecutorService or similar argument in. In an algorithm you just call, for example,
Parallelization.forEach( elements, operation );
and it runs parallel or sequential according to context.

This can mean either

  • call operation on each element sequentially in the current thread, or
  • call operation on each element in parallel in the common ForkJoinPool (or whichever pool you are currently in.)

The decision is made by the caller on the top-level, by running
Parallelization.singleThreaded( () -> myAlgorithm() ); or
Parallelization.multiThreaded( () -> myAlgorithm() );
Note, that you would (almost?) never call this inside any algorithm. This is done outside by the user.

It ties nicely into existing stuff. For example, if you call
elements.parallelStream().forEach( operation );
then that also triggers parallelisation inside operation.

It supports recursive tasks like ForkJoinPool.
See https://github.com/maarzt/sandbox/blob/multi-threading/src/main/java/sandbox/parallization/SumAlgorithm.java for an example the recursively splits a RAI by dimension.

@ctrueden This brings us to a remaining question: The "user" would be for example the OpService, CommandService etc. If I call ops.bla(), is that run with the callers parallelisation settings or does the OpService have its own threads and parallelisation settings? (For OpService it should probably run in the calling thread, but for other services it might not be so clear.

Any thoughts, comments?

@acardona
Copy link
Contributor

acardona commented Apr 17, 2019

Hi @maarzt & @tpietzsch,

First: thanks very much for trying. Getting abstracted parallelization right is perhaps impossible. It has, though, its uses, but for the general case, I have yet to stumble upon a good solution.

The first major issue is that algorithm have different memory requirements and different contention issues. There are of course groups of algorithms that share such properties (i.e. filtering an image), and therefore there can be well-suited solutions for them.

An example: suppose I am running a memory-heavy algorithm that uses e.g. 25% of my desktop's RAM. I can only effectively run 4 threads. But, some steps within each parallel processing branch can be done with more threads. Therefore each would use 1/4th of all available threads. When done with this subprocess, then the main, memory-heavy thread would resume (for example, moving on to the next 3D image to process in a 4D series).

An example of a subtask would be to copy a resulting image into an ArrayImg. That's why e.g. I'd need to specify the number of threads in ImgUtil.copy.

If the abstracted parallelization framework assumes that allocating as many free threads as possible to a task is a good idea, the above will fail, or result in suboptimal performance.

Another example: suppose there are multiple data structures that have to be accessed for every iteration in the loop. If one naively uses all available threads to process the next available loop index, contention is high: all threads try to access all data structures at once. Instead, one can chunk the data structures and have each thread process a chunk that the others never access. Is chunking included in the abstract parallelization? For some cases, it would be preferable, because it is cumbersome, and could be done automatically via e.g. Views.interval. But in many cases it cannot.

All that said: "Those who think something can't be done should not stop those who are doing it from doing it." So carry on and let's see how far this goes. Looking forward to the results.

Specific comments on the code: statically instantiating both an ExecutorService and a ForkJoinExecutorService by default is not a good idea. These should be instantiated only if/when needed.

On the "problem" that an ExecutorService doesn't tell you how many threads it is setup to use. That's only true for the excessively abstract interface ExecutorService. Consider using directly e.g. ThreadPoolExecutor, which is the parent class to even the ScheduledThreadPoolExecutor. With ThreadPoolExecutor, you have methods like getPoolSize and others that let you find out how many Threads are available, and how many are actively running presently, etc.

@ctrueden
Copy link
Member Author

@ctrueden This brings us to a remaining question

Let's discuss when I visit in June. Added to my list for that week.

@tpietzsch
Copy link
Member

@ctrueden

Let's discuss when I visit in June. Added to my list for that week.

Sounds good.

@acardona
I agree that a general solution is basically impossible. There will always be cases where it is advisable to roll your own multithreading solution. This proposal is more about the 80% of cases where you just want to chunk your image and get on with it... :-)

Specifically, the first scenario you mentioned would be possible to do with a ForkJoinPool (resp. the proposed Parallelization). You could just split the whole task into 4 parts and those parts could then submit many subtasks and wait for completion. It gets ugly if you do not want to simply split into 4 equal parts, for example because it cannot be predicted whether the parts would run for about the same time. Even then, it could probably be shoehorned in somehow, but it would be ugly and a special-case solution would be preferable.

Regarding the second scenario, chunking is not included in the abstract parallelization. The idea is that you say Parallelization.getParallelism() and split accordingly into subtasks, then submit those. The problem of contention on "hot" data structures is not solved by the proposed framework. It is only about specifying which task can be run concurrently and which dependencies between tasks exist.
How the tasks are distributed onto threads is not controllable. To some extent this is even true if you implement a special case solution, unless you have full control over which machine this runs on and what other things are running at the same time. If you create 8 threads you can only hope that they will actually all be running at the same time. Anyway, the proposal offers no solution there. You could do striping of data structures as you proposed, but by task, not by thread. This will work but is probable over-cautious.

Specific comments on the code: statically instantiating both an ExecutorService and a ForkJoinExecutorService by default is not a good idea. These should be instantiated only if/when needed.

Hmm, why? These are two super light-weight objects. Neither the SequentialExecutorService nor the ForkJoinExecutorService create threads or hold references to any data. (SequentialExecutorService just immediately runs everything in the calling thread, ForkJoinExecutorService submits to the common pool or whatever pool the calling thread is running in.)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants