-
Notifications
You must be signed in to change notification settings - Fork 385
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Made combOp of aggregate() work as a reduce instead of an implicit fold #658
base: master
Are you sure you want to change the base?
Conversation
Thank you for your pull request. An admin will review this request soon. |
val cleanSeqOp = sc.clean(seqOp) | ||
val cleanCombOp = sc.clean(combOp) | ||
val aggregatePartition = (it: Iterator[T]) => it.aggregate(zeroValue)(cleanSeqOp, cleanCombOp) | ||
val mergeResult = (index: Int, taskResult: U) => jobResult = combOp(jobResult, taskResult) | ||
def optCombOp(a: Option[U], b: Option[U]): Option[U] = for (u <- b) yield a.fold(u)(cleanCombOp(_, _)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess
for (u <- b) yield a.fold(u)(combOp(_, _))
should be just as good, maybe better.
- preserve sequential order within partitions - reformat code
- no need to clone None for jobResult - use combOp instead of cleanCombOp
A fun little demo aggregating an
Run the aggregate multiple times, and you'll see different permutations of foo, bar, baz and quux, but always maintaining the ordering within each partition and with only one leading zeroValue ("_") where the old implementation would have two. |
Mark, this seems inconsistent with Scala Collections' Alternatively, if this does return the |
Just to show what I mean:
|
The zeroValue is not bound into combOp (which operation potentially needs a different zero than does seqOp), but the zeroElement is still part of the seqOp fold, so there won't be any NoSuchElementException. The results from my aggregate() using your example inputs are exactly the same as for Scala collections. The difference is that the zeroElement doesn't become the initial element of an implicit fold in combOp. Instead, the first call of optCombOp (when jobResult is still None) effectively becomes the identity function -- i.e. the same as Option(zeroElementForCombOp combOp u) without us having to know or infer what the correct zero is for combOp. (If U couldn't be an AnyVal, then you could avoid using Option and do something similar by initializing jobResult to null and putting an For sane uses of aggregate, this shouldn't make any difference; but you can see differences in "creative" uses of aggregate. For example, |
But I don't understand why we'd want to support a combOp that's incompatible with the seqOp. Why should the user have to worry about the number of partitions? If you look at Scala's
I think this just means that you're not expected to use |
Yes, I'm not sure that there is a legitimate use case for seqOp and combOp that don't share the same zero. If we're willing to say that combOp must be commutative, associative and have the same zero as seqOp, then there is no harm in inserting an extra zeroElement into the combOp reduce, and this pull request is needless complication (although a documentation update would be in order.) On the other hand, if we don't want to be that restrictive on combOp and can see a use for incompatible operations when users have grasped control of the relevant details of partitioning (e.g. with a custom partitioner, coalesce or one of the operations that supports a numPartitions parameter), then we really should implement combOp as a reduce, as the docs claim. I will agree that Scala does generate some really weird results when incompatible operations are supplied to aggregate over parallel collections. I haven't looked yet at how that aggregate is implemented, but it's not obvious to me what it is doing just from looking at various inputs and results. |
My (intended) real point in the mailing list discussion was that its behaviour is somewhat inconsistent with its documentation. If it's behaviour is desired (and that is arguably the case), then perhaps the docs should simply be updated to indicate that the operations should be consistent (as Mark pointed out both associative and commutative) to avoid strange behaviour. If any user really wants to use two "inconsistent" operations they do have available mapPartitions and a reduce (or whatever). |
Yeah, good point. In that case I'd vote to just update the docs for this but leave it the way it was. One other difference I noticed with fold vs. reduce is that fold will also work on RDDs with zero partitions (which can actually happen sometimes with HDFS files if you give an empty directory), while reduce won't. |
Thank you for your pull request. An admin will review this request soon. |
I also removed a println that I bumped into. Author: Michael Armbrust <michael@databricks.com> Closes mesos#658 from marmbrus/nullPrimitives and squashes the following commits: a3ec4f3 [Michael Armbrust] Remove println. 695606b [Michael Armbrust] Support for null primatives from using scala and java reflection.
No description provided.