-
Notifications
You must be signed in to change notification settings - Fork 17
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Adding customChunks to future.apply functions #83
Comments
Thanks for this. I've thought a little bit more about your use case that you mentioned on Twitter. I agree, those needs requires something that's ortogonal to controlling the processing order of elements. Here's how I understand the problem and your ask: Let's assume we've got a list If we process Now, let's assume we have 3 workers. If we use What you're asking for is a way for the 3 workers to instead process the elements in 3 differently sized chunks so that it will complete sooner. In this case, it's more efficient if the chunking would be One can of course also consider the case with, say, 2 or 4 workers. For example, with 2 workers, one might want to chunk it up as Did I understand it correctly? If so, I'll follow up with more comments. |
@HenrikBengtsson Yes, that summarizes it. It's my understanding that dynamic balancing could produce the efficient distribution you mentioned, but would be strictly less efficient than the static approach:
Building off a point you raised earlier and continuing with the case of wishing to process a 6-element list, I see two ways in which users could incorporate information about complexity of each element:
The first approach requires the user to not just have information about the complexity of each element but also to use that information to generate balanced groupings of elements and then build a list reflecting those groupings. The second approach simply requires the users to know something about the complexity of each element. A new, internal optimization function could then use that information to try to find chunks generating better balance. Of course, the status quo and/or dynamic balancing might be more efficient than finding something like an optimum - and I imagine someone more well-versed in algorithms could state the precise tradeoff! |
Great. So, one approach would then be to make it possible to specify the relative compute load of each element. For simplicitly, lets say compute load and processing time are synonyms. With the above example, we would the have compute loads, or "compute weights", that are: weights <- c(6, 3, 3, 2, 2, 2) Together with the information on the number of workers available, chunks <- list(1L, 2:3, 4:6) where the chunk weights are: chunk_weights <- sapply(chunks, FUN = function(idxs) sum(weights[idxs]))
print(chunk_weights)
[1] 6 6 6 The advantage of having an algorithm that identifies chunks based on element "weights" is that it avoids having to know what the number of workers is. I think we want to stay away from the code having to be aware of the number of workers, or even the concept of a worker. That's part of the core design philosophy of the Future API. Identifying the optimal chunk configuration given element weights is probably an computational expensive task, but my gut feeling is that an good-enough algorithm can do it pretty fast. So, what do you think about being able to do something like: y <- future_lapply(
X,
FUN = my_fcn,
future.scheduling = structure(1.0, weights = c(6, 3, 3, 2, 2, 2))
) ? Obviously, it's not hard to imagine something like: y <- future_lapply(
X,
FUN = my_fcn,
future.scheduling = structure(1.0, weights = estimate_compute_load(X))
) PS. Thinking ahead. Another advantage of the abstraction of element-wise "weights" and compute times is that it fits nicely with plans on being able to specify resource requirements for futures. If we specify the compute time for a future, then the parallel backend can make decisions on how and where it want to process such a future. That is something for instance HPC schedulers are good at, so it's not hard to imagine that the above weights can be passed in some form to a HPC scheduler with the future.batchtools backend, if that's being used. |
A few thoughts:
Note that the primary efficiency gains arise from setting
|
Yes, future.apply knows about it internally, but there is no need for developers/users to know about it. Part of the design of the future framework is that code that make use of it should be agile to whether it runs sequential or in parallel, and if in parallize, how and where. As a developer you should not make assumptions that
Another reason why I'm very conservative when it comes to adding/opening up for custom features is that it risks limiting what the future framework can evolve into later on. I look at every feature request and ask myself several questions: "How will this work with when run sequentially, or with another parallel backend?", "Will this limit or prevent us from implementing what's on the roadmap?", "What about the things we still don't know should be on the roadmap, but that someone eventually will ask about?", ... In this case, what happens if the developer defines chunks based on the assumption that the code is parallelizing on the local machine, but the user choose to run on some other backend, e.g. a mix of local and remote workers, or on a job scheduler? I don't know the full answer, which is why I'm very careful to not open up low-level control that will be hard to undo. You'll see me take the same approach regarding BTW, one reason for why |
Chunking in
future.apply
future.apply
currently relies on the internal makeChunks.R function to partition elements for processing into "chunks" that are sent to workers for processing.makeChunks
outputs a list of integer vectors, where each vector is a "chunk" and its elements are the indices representing the elements to be processed in the input object (often a list).Users have some control over the generation of chunks via the
future.apply
argumentsfuture.chunk.size
(which specifies the average number of elements per chunk a user prefers) andfuture.scheduling
(which specifies the order of chunk processing). Furthermore, they can control the processing order of chunks with the ordering attribute offuture.chunk.size
orfuture.scheduling
.Nevertheless, this control is limited and even the sensible defaults of
makeChunks
can produce substantial load imbalance across workers and resulting inefficiency. Some of this inefficiency could be reduced if users were able to better-control chunk generation. While some of this inefficiency may be averted by the dynamic balancing offuture.apply
. the costs of dynamic balancing itself can be non-trivial.The Purpose of
makeChunks
Currently,
makeChunks
accomplishes two tasks:Ideally, the former is redundant as the elements of the object users pass
future.apply
would be the chunks they want processed andnbrOfElements == nbrOfWorkers
and the latter is redundant as chunks are generated such that chunks are indexed in the order in which they should be processed. This allows for efficient static load balancing with chunks already balanced and one chunk per worker. However, users often pass objects where the ordering is ad-hoc and chunking not planned.Adding
customChunks
I envision two approaches to improving the flexibility of chunking in
future.apply
:customChunks
argument tofuture.apply
functionsUsers could pass a list to
customChunks
. future.applywould use this list instead of the list that
makeChunksreturns. If
is.null(customChunks) == TRUE, then the status quo internal
makeChunksfunction is used. If
is.null(customChunks) == FALSE,
makeChunks` is not executed and all other chunk-related arguments are ignored.The primary motivation for this is that users may wish to (a) have complete control over chunking and ordering of chunks, (b) do so without modifying their input to ``future.apply` (i.e. avoid creating deeper objects or repeatedly rearranging the elements of their object just for processing) and (c) create more interpretable code distinguishing between the input object, the plan for processing, and processing itself. This also helps decouple functions for working in parallel from functions for serial pre-processing.
customChunks
argument tofuture.apply
functions and exportmakeChunks
Users could pass their object to
makeChunks
and pass the result to to thecustomChunks
argument offuture.apply
. In the event thatcustomChunks == NULL
,future.apply
would callmakeChunks
as usual. This would allow users to generate chunks withmakeChunks
either inside or outside offuture.apply
. The upside to this is that users can directly observe and edit the output ofmakeChunks
.I could submit a pull request implementing this, but I'm not sure when/how
makeChunks
is called internally - don't see it in makefiles or the definitions of thefuture.apply
functions.The text was updated successfully, but these errors were encountered: