From 23962b679e6b27780f8bf8b61cd420918ec0afee Mon Sep 17 00:00:00 2001 From: Cheng Lian Date: Mon, 5 May 2014 19:29:27 +0800 Subject: [PATCH 1/8] Bug fix: RLDecoder.hasNext may lose duplicated values --- .../scala/shark/memstore2/column/CompressedColumnIterator.scala | 2 +- .../shark/memstore2/column/CompressedColumnIteratorSuite.scala | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala b/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala index 28856826..be79c993 100644 --- a/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala +++ b/src/main/scala/shark/memstore2/column/CompressedColumnIterator.scala @@ -88,7 +88,7 @@ class RLDecoder[V](buffer: ByteBuffer, columnType: ColumnType[_, V]) extends Ite private var _count: Int = 0 private val _current: V = columnType.newWritable() - override def hasNext = buffer.hasRemaining() + override def hasNext = _count < _run || buffer.hasRemaining() override def next(): V = { if (_count == _run) { diff --git a/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala b/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala index 7f209d23..bcf1cf08 100644 --- a/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala +++ b/src/test/scala/shark/memstore2/column/CompressedColumnIteratorSuite.scala @@ -78,6 +78,7 @@ class CompressedColumnIteratorSuite extends FunSuite { } l.foreach { x => + assert(iter.hasNext) iter.next() assert(t.get(iter.current, oi) === x) } From 93096c327415a5bbc438161ac5bf7efd8d1c1011 Mon Sep 17 00:00:00 2001 From: Yin Huai Date: Sun, 11 May 2014 19:46:07 -0400 Subject: [PATCH 2/8] Initial commit of bringing back map join hints. --- .../optimizer/SharkMapJoinProcessor.scala | 60 +++++++++++++++++++ .../{ => optimizer}/SharkOptimizer.scala | 16 +++-- .../shark/parse/SharkSemanticAnalyzer.scala | 3 +- 3 files changed, 74 insertions(+), 5 deletions(-) create mode 100644 src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala rename src/main/scala/shark/{ => optimizer}/SharkOptimizer.scala (75%) diff --git a/src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala b/src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala new file mode 100644 index 00000000..72aab228 --- /dev/null +++ b/src/main/scala/shark/optimizer/SharkMapJoinProcessor.scala @@ -0,0 +1,60 @@ +/* + * Copyright (C) 2012 The Regents of The University California. + * All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package shark.optimizer + +import java.util.{LinkedHashMap => JavaLinkedHashMap} + +import org.apache.hadoop.hive.ql.exec.{MapJoinOperator, JoinOperator, Operator} +import org.apache.hadoop.hive.ql.optimizer.MapJoinProcessor +import org.apache.hadoop.hive.ql.parse.{ParseContext, QBJoinTree, OpParseContext} +import org.apache.hadoop.hive.ql.plan.OperatorDesc +import org.apache.hadoop.hive.conf.HiveConf + +class SharkMapJoinProcessor extends MapJoinProcessor { + + /** + * Override generateMapJoinOperator to bypass the step of validating Map Join hints int Hive. + */ + override def generateMapJoinOperator( + pctx: ParseContext, + op: JoinOperator, + joinTree: QBJoinTree, + mapJoinPos: Int): MapJoinOperator = { + val hiveConf: HiveConf = pctx.getConf + val noCheckOuterJoin: Boolean = + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTSORTMERGEBUCKETMAPJOIN) && + HiveConf.getBoolVar(hiveConf, HiveConf.ConfVars.HIVEOPTBUCKETMAPJOIN) + + val opParseCtxMap: JavaLinkedHashMap[Operator[_ <: OperatorDesc], OpParseContext] = + pctx.getOpParseCtx + + // Explicitly set validateMapJoinTree to false to bypass the step of validating + // Map Join hints in Hive. + val validateMapJoinTree = false + val mapJoinOp: MapJoinOperator = + MapJoinProcessor.convertMapJoin( + opParseCtxMap, op, joinTree, mapJoinPos, noCheckOuterJoin, validateMapJoinTree) + + // Hive originally uses genSelectPlan to insert an dummy select after the MapJoinOperator. + // We should not need this step. + // create a dummy select to select all columns + // MapJoinProcessor.genSelectPlan(pctx, mapJoinOp) + + return mapJoinOp + } +} diff --git a/src/main/scala/shark/SharkOptimizer.scala b/src/main/scala/shark/optimizer/SharkOptimizer.scala similarity index 75% rename from src/main/scala/shark/SharkOptimizer.scala rename to src/main/scala/shark/optimizer/SharkOptimizer.scala index e36184fd..08f649d1 100644 --- a/src/main/scala/shark/SharkOptimizer.scala +++ b/src/main/scala/shark/optimizer/SharkOptimizer.scala @@ -1,5 +1,5 @@ /* - * Copyright (C) 2012 The Regents of The University California. + * Copyright (C) 2012 The Regents of The University California. * All rights reserved. * * Licensed under the Apache License, Version 2.0 (the "License"); @@ -15,14 +15,15 @@ * limitations under the License. */ -package shark +package shark.optimizer import java.util.{List => JavaList} import org.apache.hadoop.hive.ql.optimizer.JoinReorder import org.apache.hadoop.hive.ql.optimizer.{Optimizer => HiveOptimizer, - SimpleFetchOptimizer, Transform} -import org.apache.hadoop.hive.ql.parse.{ParseContext} + SimpleFetchOptimizer, Transform, MapJoinProcessor => HiveMapJoinProcessor} +import org.apache.hadoop.hive.ql.parse.ParseContext +import shark.LogHelper class SharkOptimizer extends HiveOptimizer with LogHelper { @@ -49,6 +50,13 @@ class SharkOptimizer extends HiveOptimizer with LogHelper { transformation match { case _: SimpleFetchOptimizer => {} case _: JoinReorder => {} + case _: HiveMapJoinProcessor => { + // Use SharkMapJoinProcessor to bypass the step of validating Map Join hints + // in Hive. So, we can use hints to mark tables that will be considered as small + // tables (like Hive 0.9). + val sharkMapJoinProcessor = new SharkMapJoinProcessor + pctx = sharkMapJoinProcessor.transform(pctx) + } case _ => { pctx = transformation.transform(pctx) } diff --git a/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala b/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala index 7f3a3fd2..677da200 100755 --- a/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala +++ b/src/main/scala/shark/parse/SharkSemanticAnalyzer.scala @@ -38,11 +38,12 @@ import org.apache.hadoop.hive.ql.parse._ import org.apache.hadoop.hive.ql.plan._ import org.apache.hadoop.hive.ql.session.SessionState -import shark.{LogHelper, SharkConfVars, SharkOptimizer} +import shark.{LogHelper, SharkConfVars} import shark.execution.{HiveDesc, Operator, OperatorFactory, ReduceSinkOperator} import shark.execution.{SharkDDLWork, SparkLoadWork, SparkWork, TerminalOperator} import shark.memstore2.{CacheType, LazySimpleSerDeWrapper, MemoryMetadataManager} import shark.memstore2.SharkTblProperties +import shark.optimizer.SharkOptimizer /** From a41966e991d4cb827a2eb1bc4791d7dcacab2044 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Tue, 13 May 2014 00:22:29 -0700 Subject: [PATCH 3/8] Fix the crash for map pruning when the column value is not a constant. --- src/main/scala/shark/execution/MapSplitPruning.scala | 9 ++++++--- src/test/scala/shark/SQLSuite.scala | 5 +++++ 2 files changed, 11 insertions(+), 3 deletions(-) diff --git a/src/main/scala/shark/execution/MapSplitPruning.scala b/src/main/scala/shark/execution/MapSplitPruning.scala index 976ebf50..874165b1 100644 --- a/src/main/scala/shark/execution/MapSplitPruning.scala +++ b/src/main/scala/shark/execution/MapSplitPruning.scala @@ -67,7 +67,7 @@ object MapSplitPruning { true } - case _: GenericUDFIn => + case _: GenericUDFIn if e.children(0).isInstanceOf[ExprNodeColumnEvaluator] => testInPredicate( s, e.children(0).asInstanceOf[ExprNodeColumnEvaluator], @@ -91,10 +91,13 @@ object MapSplitPruning { val columnStats = s.stats(field.fieldID) if (columnStats != null) { - expEvals.exists { - e => + expEvals.exists { e => + if (e.isInstanceOf[ExprNodeConstantEvaluator]) { val constEval = e.asInstanceOf[ExprNodeConstantEvaluator] columnStats := constEval.expr.getValue() + } else { + true + } } } else { // If there is no stats on the column, don't prune. diff --git a/src/test/scala/shark/SQLSuite.scala b/src/test/scala/shark/SQLSuite.scala index 3855ca21..94bebe17 100644 --- a/src/test/scala/shark/SQLSuite.scala +++ b/src/test/scala/shark/SQLSuite.scala @@ -718,6 +718,11 @@ class SQLSuite extends FunSuite { where year(from_unixtime(k)) between "2013" and "2014" """, Array[String]("0")) } + test("map pruning with functions in in clause") { + expectSql("""select count(*) from mapsplitfunc_cached + where year(from_unixtime(k)) in ("2013", concat("201", "4")) """, Array[String]("0")) + } + ////////////////////////////////////////////////////////////////////////////// // SharkContext APIs (e.g. sql2rdd, sql) ////////////////////////////////////////////////////////////////////////////// From 00fe1ff61246fb2e1d51f16c98f72252e6672df2 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Wed, 14 May 2014 17:48:56 -0700 Subject: [PATCH 4/8] Avoid starting SparkContext in remote mode. --- src/main/scala/shark/SharkCliDriver.scala | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/src/main/scala/shark/SharkCliDriver.scala b/src/main/scala/shark/SharkCliDriver.scala index b2cc2e4f..204364be 100755 --- a/src/main/scala/shark/SharkCliDriver.scala +++ b/src/main/scala/shark/SharkCliDriver.scala @@ -162,7 +162,9 @@ object SharkCliDriver { val cli = new SharkCliDriver(reloadRdds) cli.setHiveVariables(oproc.getHiveVariables()) - SharkEnv.fixUncompatibleConf(conf) + if (!ss.isRemoteMode) { + SharkEnv.fixUncompatibleConf(conf) + } // Execute -i init files (always in silent mode) cli.processInitFiles(ss) From 5228d5a6d825d5af9f4e3f0747d38c45ae1b3469 Mon Sep 17 00:00:00 2001 From: Cheng Hao Date: Mon, 26 May 2014 15:14:43 +0800 Subject: [PATCH 5/8] Fix the rdd recovery of table from the non-default database --- src/main/scala/shark/memstore2/TableRecovery.scala | 1 + 1 file changed, 1 insertion(+) diff --git a/src/main/scala/shark/memstore2/TableRecovery.scala b/src/main/scala/shark/memstore2/TableRecovery.scala index adf61061..6e92152b 100644 --- a/src/main/scala/shark/memstore2/TableRecovery.scala +++ b/src/main/scala/shark/memstore2/TableRecovery.scala @@ -58,6 +58,7 @@ object TableRecovery extends LogHelper { logInfo(logMessage) } val cmd = QueryRewriteUtils.cacheToAlterTable("CACHE %s".format(tableName)) + cmdRunner(s"use $databaseName") cmdRunner(cmd) } } From b9e6ee9b693a2c94043d5f7ca36e99af0bca7da9 Mon Sep 17 00:00:00 2001 From: Reynold Xin Date: Mon, 26 May 2014 23:33:42 -0700 Subject: [PATCH 6/8] Bump Shark version to 0.9.2. --- project/SharkBuild.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/project/SharkBuild.scala b/project/SharkBuild.scala index fe9dcfbc..45a35042 100755 --- a/project/SharkBuild.scala +++ b/project/SharkBuild.scala @@ -29,7 +29,7 @@ import sbtassembly.Plugin.AssemblyKeys._ object SharkBuild extends Build { // Shark version - val SHARK_VERSION = "0.9.1" + val SHARK_VERSION = "0.9.2" val SHARK_ORGANIZATION = "edu.berkeley.cs.shark" From fac8c67115947393281e76d852853099f3ec0b97 Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Tue, 27 May 2014 20:50:46 +0000 Subject: [PATCH 7/8] Add ClassTags to types that depend on Spark's Serializer. This change is necessary for compatibility with Spark-1.0: commit 7eefc9d2b3f6ebc0ecb5562da7323f1e06afbb35 in Spark added ClassTags on Serializer and all dependent types. --- .../shark/execution/LateralViewJoinOperator.scala | 6 +++--- .../execution/serialization/JavaSerializer.scala | 6 ++++-- .../serialization/KryoSerializationWrapper.scala | 6 ++++-- .../execution/serialization/KryoSerializer.scala | 6 ++++-- .../execution/serialization/ShuffleSerializer.scala | 12 +++++++----- 5 files changed, 22 insertions(+), 14 deletions(-) diff --git a/src/main/scala/shark/execution/LateralViewJoinOperator.scala b/src/main/scala/shark/execution/LateralViewJoinOperator.scala index c30005d4..ca908fdb 100755 --- a/src/main/scala/shark/execution/LateralViewJoinOperator.scala +++ b/src/main/scala/shark/execution/LateralViewJoinOperator.scala @@ -21,7 +21,7 @@ import java.nio.ByteBuffer import scala.collection.mutable.ArrayBuffer import scala.collection.JavaConversions._ -import scala.reflect.BeanProperty +import scala.reflect.{BeanProperty, ClassTag} import org.apache.commons.codec.binary.Base64 import org.apache.hadoop.hive.ql.exec.{ExprNodeEvaluator, ExprNodeEvaluatorFactory} @@ -174,12 +174,12 @@ object KryoSerializerToString { @transient val kryoSer = new SparkKryoSerializer(SparkEnv.get.conf) - def serialize[T](o: T): String = { + def serialize[T: ClassTag](o: T): String = { val bytes = kryoSer.newInstance().serialize(o).array() new String(Base64.encodeBase64(bytes)) } - def deserialize[T](byteString: String): T = { + def deserialize[T: ClassTag](byteString: String): T = { val bytes = Base64.decodeBase64(byteString.getBytes()) kryoSer.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) } diff --git a/src/main/scala/shark/execution/serialization/JavaSerializer.scala b/src/main/scala/shark/execution/serialization/JavaSerializer.scala index df6ab31d..17537f2b 100644 --- a/src/main/scala/shark/execution/serialization/JavaSerializer.scala +++ b/src/main/scala/shark/execution/serialization/JavaSerializer.scala @@ -19,6 +19,8 @@ package shark.execution.serialization import java.nio.ByteBuffer +import scala.reflect.ClassTag + import org.apache.spark.SparkEnv import org.apache.spark.serializer.{JavaSerializer => SparkJavaSerializer} @@ -26,11 +28,11 @@ import org.apache.spark.serializer.{JavaSerializer => SparkJavaSerializer} object JavaSerializer { @transient val ser = new SparkJavaSerializer(SparkEnv.get.conf) - def serialize[T](o: T): Array[Byte] = { + def serialize[T: ClassTag](o: T): Array[Byte] = { ser.newInstance().serialize(o).array() } - def deserialize[T](bytes: Array[Byte]): T = { + def deserialize[T: ClassTag](bytes: Array[Byte]): T = { ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) } } diff --git a/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala b/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala index 9f227295..57c3f1da 100644 --- a/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala +++ b/src/main/scala/shark/execution/serialization/KryoSerializationWrapper.scala @@ -17,13 +17,15 @@ package shark.execution.serialization +import scala.reflect.ClassTag + /** * A wrapper around some unserializable objects that make them both Java * serializable. Internally, Kryo is used for serialization. * * Use KryoSerializationWrapper(value) to create a wrapper. */ -class KryoSerializationWrapper[T] extends Serializable { +class KryoSerializationWrapper[T: ClassTag] extends Serializable { @transient var value: T = _ @@ -54,7 +56,7 @@ class KryoSerializationWrapper[T] extends Serializable { object KryoSerializationWrapper { - def apply[T](value: T): KryoSerializationWrapper[T] = { + def apply[T: ClassTag](value: T): KryoSerializationWrapper[T] = { val wrapper = new KryoSerializationWrapper[T] wrapper.value = value wrapper diff --git a/src/main/scala/shark/execution/serialization/KryoSerializer.scala b/src/main/scala/shark/execution/serialization/KryoSerializer.scala index 0532fbcc..e733367f 100644 --- a/src/main/scala/shark/execution/serialization/KryoSerializer.scala +++ b/src/main/scala/shark/execution/serialization/KryoSerializer.scala @@ -19,6 +19,8 @@ package shark.execution.serialization import java.nio.ByteBuffer +import scala.reflect.ClassTag + import org.apache.spark.{SparkConf, SparkEnv} import org.apache.spark.serializer.{KryoSerializer => SparkKryoSerializer} @@ -36,11 +38,11 @@ object KryoSerializer { new SparkKryoSerializer(sparkConf) } - def serialize[T](o: T): Array[Byte] = { + def serialize[T: ClassTag](o: T): Array[Byte] = { ser.newInstance().serialize(o).array() } - def deserialize[T](bytes: Array[Byte]): T = { + def deserialize[T: ClassTag](bytes: Array[Byte]): T = { ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes)) } } diff --git a/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala b/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala index d6179ecd..2aa94d93 100644 --- a/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala +++ b/src/main/scala/shark/execution/serialization/ShuffleSerializer.scala @@ -20,6 +20,8 @@ package shark.execution.serialization import java.io.{InputStream, OutputStream} import java.nio.ByteBuffer +import scala.reflect.ClassTag + import org.apache.hadoop.io.BytesWritable import org.apache.spark.SparkConf @@ -60,11 +62,11 @@ class ShuffleSerializer(conf: SparkConf) extends Serializer with Serializable { class ShuffleSerializerInstance extends SerializerInstance with Serializable { - override def serialize[T](t: T): ByteBuffer = throw new UnsupportedOperationException + override def serialize[T: ClassTag](t: T): ByteBuffer = throw new UnsupportedOperationException - override def deserialize[T](bytes: ByteBuffer): T = throw new UnsupportedOperationException + override def deserialize[T: ClassTag](bytes: ByteBuffer): T = throw new UnsupportedOperationException - override def deserialize[T](bytes: ByteBuffer, loader: ClassLoader): T = + override def deserialize[T: ClassTag](bytes: ByteBuffer, loader: ClassLoader): T = throw new UnsupportedOperationException override def serializeStream(s: OutputStream): SerializationStream = { @@ -79,7 +81,7 @@ class ShuffleSerializerInstance extends SerializerInstance with Serializable { class ShuffleSerializationStream(stream: OutputStream) extends SerializationStream with Serializable { - override def writeObject[T](t: T): SerializationStream = { + override def writeObject[T: ClassTag](t: T): SerializationStream = { // On the write-side, the ReduceKey should be of type ReduceKeyMapSide. val (key, value) = t.asInstanceOf[(ReduceKey, BytesWritable)] writeUnsignedVarInt(key.length) @@ -110,7 +112,7 @@ class ShuffleSerializationStream(stream: OutputStream) extends SerializationStre class ShuffleDeserializationStream(stream: InputStream) extends DeserializationStream with Serializable { - override def readObject[T](): T = { + override def readObject[T: ClassTag](): T = { // Return type is (ReduceKeyReduceSide, Array[Byte]) val keyLen = readUnsignedVarInt() if (keyLen < 0) { From bd24c7e605eebe86e7a281cd3933284bfab8d30d Mon Sep 17 00:00:00 2001 From: Kay Ousterhout Date: Wed, 28 May 2014 04:51:23 +0000 Subject: [PATCH 8/8] Explicitly add javax.servlet to resolve run-time classpath issues. --- project/SharkBuild.scala | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/project/SharkBuild.scala b/project/SharkBuild.scala index 123d4604..e8b259c2 100755 --- a/project/SharkBuild.scala +++ b/project/SharkBuild.scala @@ -203,6 +203,12 @@ object SharkBuild extends Build { // See https://code.google.com/p/guava-libraries/issues/detail?id=1095 "com.google.code.findbugs" % "jsr305" % "1.3.+", + // sbt fails down download the javax.servlet artifacts from jetty 8.1: + // http://mvnrepository.com/artifact/org.eclipse.jetty.orbit/javax.servlet/3.0.0.v201112011016 + // which may be due to the use of the orbit extension. So, we manually include servlet api + // from a separate source. + "org.mortbay.jetty" % "servlet-api" % "3.0.20100224", + // Hive unit test requirements. These are used by Hadoop to run the tests, but not necessary // in usual Shark runs. "commons-io" % "commons-io" % "2.1",