From fc8d5758e1c9d9eaf9647f9fbbb84d32f629db87 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Fri, 30 Jan 2015 13:54:49 -1000 Subject: [PATCH 1/6] Add applicative for Execution --- .../src/main/scala/com/twitter/scalding/Execution.scala | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 6b6586f2e3..d5b64f44a6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -12,7 +12,7 @@ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License. -*/ + */ package com.twitter.scalding import com.twitter.algebird.monad.Reader @@ -193,6 +193,7 @@ object Execution { override def apply[T](t: T): Execution[T] = Execution.from(t) override def map[T, U](e: Execution[T])(fn: T => U): Execution[U] = e.map(fn) override def flatMap[T, U](e: Execution[T])(fn: T => Execution[U]): Execution[U] = e.flatMap(fn) + override def join[T, U](t: Execution[T], u: Execution[U]): Execution[(T, U)] = t.zip(u) } trait EvalCache { self => From 521b46ee8dbfd5d835427a4644f76e6e41182dc5 Mon Sep 17 00:00:00 2001 From: Vineeth Varghese Date: Tue, 3 Feb 2015 11:32:12 +1100 Subject: [PATCH 2/6] Using a ConcurrentHashMap instead of a WeakHashMap to make the Stats behave in a correct manner --- scalding-core/src/main/scala/com/twitter/scalding/Stats.scala | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala index 65187ce09f..ce55eb5ff1 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Stats.scala @@ -2,7 +2,7 @@ package com.twitter.scalding import cascading.flow.{ FlowDef, FlowProcess } import cascading.stats.CascadingStats -import java.util.{ Collections, WeakHashMap } +import java.util.concurrent.ConcurrentHashMap import org.slf4j.{ Logger, LoggerFactory } import scala.collection.JavaConversions._ import scala.collection.JavaConverters._ @@ -109,7 +109,7 @@ object RuntimeStats extends java.io.Serializable { @transient private lazy val logger: Logger = LoggerFactory.getLogger(this.getClass) private val flowMappingStore: mutable.Map[String, WeakReference[FlowProcess[_]]] = - Collections.synchronizedMap(new WeakHashMap[String, WeakReference[FlowProcess[_]]]) + new ConcurrentHashMap[String, WeakReference[FlowProcess[_]]] def getFlowProcessForUniqueId(uniqueId: UniqueID): FlowProcess[_] = { (for { From d32804556c8ed74dcf784459e74caef70d2611fd Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Tue, 3 Feb 2015 08:54:38 -1000 Subject: [PATCH 3/6] Add Execution.failed --- .../src/main/scala/com/twitter/scalding/Execution.scala | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala index 6b6586f2e3..9457a2a171 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Execution.scala @@ -399,6 +399,13 @@ object Execution { case Success(s) => Future.successful(s) case Failure(err) => Future.failed(err) } + + /** + * This creates a definitely failed Execution. + */ + def failed(t: Throwable): Execution[Nothing] = + fromFuture(_ => Future.failed(t)) + /** * This makes a constant execution that runs no job. * Note this is a lazy parameter that is evaluated every From 3ea6bd676d4a23be6dc9d2acc83dca990afe6292 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Tue, 3 Feb 2015 11:54:01 -1000 Subject: [PATCH 4/6] Use java.util.Random instead of scala.util.Random --- .../src/main/scala/com/twitter/scalding/JoinAlgorithms.scala | 2 +- .../src/main/scala/com/twitter/scalding/Operations.scala | 2 +- .../src/main/scala/com/twitter/scalding/RichPipe.scala | 2 +- .../main/scala/com/twitter/scalding/mathematics/Poisson.scala | 2 +- .../com/twitter/scalding/mathematics/TypedSimilarity.scala | 4 ++-- .../src/main/scala/com/twitter/scalding/typed/Sketched.scala | 2 +- .../src/main/scala/com/twitter/scalding/typed/TypedPipe.scala | 4 ++-- 7 files changed, 9 insertions(+), 9 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala b/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala index c2ec84c64d..19d60874f6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/JoinAlgorithms.scala @@ -27,7 +27,7 @@ import cascading.operation.filter._ import cascading.tuple._ import cascading.cascade._ -import scala.util.Random +import java.util.Random // this one is serializable, scala.util.Random is not import scala.collection.JavaConverters._ object JoinAlgorithms extends java.io.Serializable { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala index 7a753b92e9..69917de209 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/Operations.scala @@ -466,7 +466,7 @@ package com.twitter.scalding { } } - class SampleWithReplacement(frac: Double, val seed: Int = new scala.util.Random().nextInt) extends BaseOperation[Poisson]() + class SampleWithReplacement(frac: Double, val seed: Int = new java.util.Random().nextInt) extends BaseOperation[Poisson]() with Function[Poisson] with ScaldingPrepare[Poisson] { override def prepare(flowProcess: FlowProcess[_], operationCall: OperationCall[Poisson]) { super.prepare(flowProcess, operationCall) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala index a867cad1e2..86e42ede58 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala @@ -28,7 +28,7 @@ import cascading.tuple._ import cascading.cascade._ import cascading.operation.Debug.Output -import scala.util.Random +import java.util.Random import java.util.concurrent.atomic.AtomicInteger diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala index e9112f983f..1fb833a08d 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala @@ -1,6 +1,6 @@ package com.twitter.scalding.mathematics -import scala.util.Random +import java.util.Random /** * Generating Poisson-distributed random variables diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala index 3377ede124..16ad6de377 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala @@ -150,7 +150,7 @@ object TypedSimilarity extends Serializable { bigG: Grouped[N, (N, Int)], oversample: Double): TypedPipe[Edge[N, Double]] = { // 1) make rnd lazy due to serialization, // 2) fix seed so that map-reduce speculative execution does not give inconsistent results. - lazy val rnd = new scala.util.Random(1024) + lazy val rnd = new java.util.Random(1024) maybeWithReducers(smallG.cogroup(bigG) { (n: N, leftit: Iterator[(N, Int)], rightit: Iterable[(N, Int)]) => // Use a co-group to ensure this happens in the reducer: leftit.flatMap { @@ -185,7 +185,7 @@ object TypedSimilarity extends Serializable { */ def dimsumCosineSimilarity[N: Ordering](smallG: Grouped[N, (N, Double, Double)], bigG: Grouped[N, (N, Double, Double)], oversample: Double): TypedPipe[Edge[N, Double]] = { - lazy val rnd = new scala.util.Random(1024) + lazy val rnd = new java.util.Random(1024) maybeWithReducers(smallG.cogroup(bigG) { (n: N, leftit: Iterator[(N, Double, Double)], rightit: Iterable[(N, Double, Double)]) => // Use a co-group to ensure this happens in the reducer: leftit.flatMap { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala index 47783a71b6..93497c0ede 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala @@ -111,7 +111,7 @@ case class SketchJoined[K: Ordering, V, V2, R](left: Sketched[K, V], } lazy val toTypedPipe: TypedPipe[(K, R)] = { - lazy val rand = new scala.util.Random(left.seed) + lazy val rand = new java.util.Random(left.seed) val lhs = flatMapWithReplicas(left.pipe){ n => Some(rand.nextInt(n) + 1) } val rhs = flatMapWithReplicas(right){ n => 1.to(n) } diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala index 48b6b0eb19..0b2f918dc0 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/TypedPipe.scala @@ -28,7 +28,7 @@ import cascading.flow.FlowDef import cascading.pipe.{ Each, Pipe } import cascading.tap.Tap import cascading.tuple.{ Fields, Tuple => CTuple, TupleEntry } -import util.Random +import java.util.Random // prefer to scala.util.Random as this is serializable import scala.concurrent.Future @@ -400,7 +400,7 @@ trait TypedPipe[+T] extends Serializable { */ def sample(percent: Double, seed: Long): TypedPipe[T] = { // Make sure to fix the seed, otherwise restarts cause subtle errors - val rand = new Random(seed) + lazy val rand = new Random(seed) filter(_ => rand.nextDouble < percent) } From 2c88827679c6ef1ff10d7ec6b161aaa831043de2 Mon Sep 17 00:00:00 2001 From: "P. Oscar Boykin" Date: Tue, 3 Feb 2015 15:35:12 -1000 Subject: [PATCH 5/6] Back out 4 changes to be binary compatible! --- .../src/main/scala/com/twitter/scalding/RichPipe.scala | 2 +- .../main/scala/com/twitter/scalding/mathematics/Poisson.scala | 2 +- .../com/twitter/scalding/mathematics/TypedSimilarity.scala | 4 ++-- .../src/main/scala/com/twitter/scalding/typed/Sketched.scala | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala index 86e42ede58..a867cad1e2 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/RichPipe.scala @@ -28,7 +28,7 @@ import cascading.tuple._ import cascading.cascade._ import cascading.operation.Debug.Output -import java.util.Random +import scala.util.Random import java.util.concurrent.atomic.AtomicInteger diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala index 1fb833a08d..e9112f983f 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/Poisson.scala @@ -1,6 +1,6 @@ package com.twitter.scalding.mathematics -import java.util.Random +import scala.util.Random /** * Generating Poisson-distributed random variables diff --git a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala index 16ad6de377..3377ede124 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/mathematics/TypedSimilarity.scala @@ -150,7 +150,7 @@ object TypedSimilarity extends Serializable { bigG: Grouped[N, (N, Int)], oversample: Double): TypedPipe[Edge[N, Double]] = { // 1) make rnd lazy due to serialization, // 2) fix seed so that map-reduce speculative execution does not give inconsistent results. - lazy val rnd = new java.util.Random(1024) + lazy val rnd = new scala.util.Random(1024) maybeWithReducers(smallG.cogroup(bigG) { (n: N, leftit: Iterator[(N, Int)], rightit: Iterable[(N, Int)]) => // Use a co-group to ensure this happens in the reducer: leftit.flatMap { @@ -185,7 +185,7 @@ object TypedSimilarity extends Serializable { */ def dimsumCosineSimilarity[N: Ordering](smallG: Grouped[N, (N, Double, Double)], bigG: Grouped[N, (N, Double, Double)], oversample: Double): TypedPipe[Edge[N, Double]] = { - lazy val rnd = new java.util.Random(1024) + lazy val rnd = new scala.util.Random(1024) maybeWithReducers(smallG.cogroup(bigG) { (n: N, leftit: Iterator[(N, Double, Double)], rightit: Iterable[(N, Double, Double)]) => // Use a co-group to ensure this happens in the reducer: leftit.flatMap { diff --git a/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala b/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala index 93497c0ede..47783a71b6 100644 --- a/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala +++ b/scalding-core/src/main/scala/com/twitter/scalding/typed/Sketched.scala @@ -111,7 +111,7 @@ case class SketchJoined[K: Ordering, V, V2, R](left: Sketched[K, V], } lazy val toTypedPipe: TypedPipe[(K, R)] = { - lazy val rand = new java.util.Random(left.seed) + lazy val rand = new scala.util.Random(left.seed) val lhs = flatMapWithReplicas(left.pipe){ n => Some(rand.nextInt(n) + 1) } val rhs = flatMapWithReplicas(right){ n => 1.to(n) } From 1261a10737ad5c6211822cc03acbe59ae5812858 Mon Sep 17 00:00:00 2001 From: Alex Levenson Date: Tue, 3 Feb 2015 20:23:50 -0800 Subject: [PATCH 6/6] Prepare for release of 0.13.1 --- CHANGES.md | 7 +++++++ README.md | 2 +- scalding-core/src/main/scala/com/twitter/package.scala | 3 ++- version.sbt | 2 +- 4 files changed, 11 insertions(+), 3 deletions(-) diff --git a/CHANGES.md b/CHANGES.md index 1e84e51bd5..a6bdf9a5d0 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,5 +1,12 @@ # Scalding # +### Version 0.13.1 ### +* Back out 4 changes to be binary compatible: https://github.com/twitter/scalding/pull/1187 +* Use java.util.Random instead of scala.util.Random: https://github.com/twitter/scalding/pull/1186 +* Add Execution.failed: https://github.com/twitter/scalding/pull/1185 +* Using a ConcurrentHashMap instead of a WeakHashMap to make the Stats behave in a correct manner: https://github.com/twitter/scalding/pull/1184 +* Add applicative for Execution: https://github.com/twitter/scalding/pull/1181 + ### Version 0.13.0 ### * Covert LzoTextDelimited to Cascading scheme.: https://github.com/twitter/scalding/pull/1179 * Make TraceUtil support versions of cascading older than 2.6: https://github.com/twitter/scalding/pull/1180 diff --git a/README.md b/README.md index 25e4308b44..7f86144067 100644 --- a/README.md +++ b/README.md @@ -4,7 +4,7 @@ Scalding is a Scala library that makes it easy to specify Hadoop MapReduce jobs. ![Scalding Logo](https://raw.github.com/twitter/scalding/develop/logo/scalding.png) -Current version: `0.13.0` +Current version: `0.13.1` ## Word Count diff --git a/scalding-core/src/main/scala/com/twitter/package.scala b/scalding-core/src/main/scala/com/twitter/package.scala index 027aa6acc1..f712d2d353 100644 --- a/scalding-core/src/main/scala/com/twitter/package.scala +++ b/scalding-core/src/main/scala/com/twitter/package.scala @@ -30,10 +30,11 @@ package object scalding { type KeyedList[K, +V] = com.twitter.scalding.typed.KeyedList[K, V] type ValuePipe[+T] = com.twitter.scalding.typed.ValuePipe[T] type Grouped[K, +V] = com.twitter.scalding.typed.Grouped[K, V] + /** * Make sure this is in sync with version.sbt */ - val scaldingVersion: String = "0.13.0" + val scaldingVersion: String = "0.13.1" object RichPathFilter { implicit def toRichPathFilter(f: PathFilter) = new RichPathFilter(f) diff --git a/version.sbt b/version.sbt index 0d39db4919..beb9953b43 100644 --- a/version.sbt +++ b/version.sbt @@ -1 +1 @@ -version in ThisBuild := "0.13.0" +version in ThisBuild := "0.13.1"