Skip to content

Commit

Permalink
Merge branch 'master' of github.com:amplab/shark into buffer-resizing
Browse files Browse the repository at this point in the history
  • Loading branch information
harshars committed Aug 24, 2013
2 parents 0a8bf47 + 97b63be commit c911808
Show file tree
Hide file tree
Showing 21 changed files with 643 additions and 412 deletions.
71 changes: 35 additions & 36 deletions bin/dev/run-tests-from-scratch
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,8 @@ SHARK_MASTER_MEM_DEFAULT=4g
SPARK_KV_JAVA_OPTS_DEFAULT=("-Dspark.local.dir=/tmp " "-Dspark.kryoserializer.buffer.mb=10 ")
SPARK_GIT_URL_DEFAULT="https://github.com/mesos/spark.git"
HIVE_GIT_URL_DEFAULT="https://github.com/amplab/hive.git -b shark-0.9"
HADOOP_VERSION_DEFAULT="0.20.205.0"
HADOOP_MAJOR_VERSION_DEFAULT=1

SPARK_HADOOP_VERSION_DEFAULT="1.0.4"
SPARK_WITH_YARN_DEFAULT=false

# Setup the Shark project directory.
if [ "x$SHARK_PROJ_DIR" == "x" ] ; then
Expand Down Expand Up @@ -76,14 +75,14 @@ if [ "x$HIVE_GIT_URL" == "x" ] ; then
fi

# We will download this version of Hadoop. To do so, we assume it is available at
# http://archive.apache.org/dist/hadoop/core/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
if [ "x$HADOOP_VERSION" == "x" ] ; then
HADOOP_VERSION=$HADOOP_VERSION_DEFAULT
# http://archive.apache.org/dist/hadoop/core/hadoop-${SPARK_HADOOP_VERSION}/hadoop-${SPARK_HADOOP_VERSION}.tar.gz
# TODO(andyk): This assumption is not true for many versions the user might want to pass in here!
if [ "x$SPARK_HADOOP_VERSION" == "x" ] ; then
SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION_DEFAULT
fi

# This should be set according to your choice of HADOO_VERSION above.
if [ "x$HADOOP_MAJOR_VERSION" == "x" ] ; then
HADOOP_MAJOR_VERSION=$HADOOP_MAJOR_VERSION_DEFAULT
if [ "x$SPARK_WITH_YARN" == "x" ] ; then
SPARK_WITH_YARN=$SPARK_WITH_YARN_DEFAULT
fi

usage()
Expand Down Expand Up @@ -123,8 +122,8 @@ Optional configuration environment variables:
SPARK_JAVA_OPTS (default: "${SPARK_KV_JAVA_OPTS_DEFAULT[@]}")
SPARK_GIT_URL (default: "$SPARK_GIT_URL_DEFAULT")
HIVE_GIT_URL (default: "$HIVE_GIT_URL_DEFAULT")
HADOOP_VERSION (default: $HADOOP_VERSION_DEFAULT)
HADOOP_MAJOR_VERSION (default: $HADOOP_MAJOR_VERSION_DEFAULT)
SPARK_HADOOP_VERSION (default: $SPARK_HADOOP_VERSION_DEFAULT)
SPARK_WITH_YARN (default: $SPARK_WITH_YARN_DEFAULT)
EOF
exit
}
Expand Down Expand Up @@ -179,21 +178,22 @@ export HIVE_HOME="$HIVE_DEV_HOME/build/dist"
#####################################################################

# Make sure they are running a new enough JDK.
JAVA_VERSION=`${JAVA_HOME}/bin/java -version 2>&1 |awk -F\" '/version/ { print $2 }'`
JAVA_MAJOR_VERSION=`echo $JAVA_VERSION | awk -F\_ '{print $1}' | awk -F\. '{print $2}'`
JAVA_MINOR_VERSION=`echo $JAVA_VERSION | awk -F\_ '{print $1}' | awk -F\. '{print $3}'`

set +e
${JAVA_HOME}/bin/java -version 2>&1 | grep -i hotspot
JVM_IS_HOTSPOT=$?
set -e

JAVA_SUB_VERSION=`echo $JAVA_VERSION | awk -F\_ '{print $2}'`
if (($JAVA_MAJOR_VERSION < 6)) ||
((($JAVA_MAJOR_VERSION == 6)) && ((($JAVA_MINOR_VERSION < 23)) || (($JVM_IS_HOTSPOT==0)) )); then
echo "You are running Java version ${JAVA_VERSION}, please run Java 7 or HotSpot JDK 6u23 or newer."
exit -1
fi
# JAVA_VERSION=`${JAVA_HOME}/bin/java -version 2>&1 |awk -F\" '/version/ { print $2 }'`
# JAVA_MAJOR_VERSION=`echo $JAVA_VERSION | awk -F\_ '{print $1}' | awk -F\. '{print $2}'`
# JAVA_MINOR_VERSION=`echo $JAVA_VERSION | awk -F\_ '{print $1}' | awk -F\. '{print $3}'`

# set +e
# ${JAVA_HOME}/bin/java -version 2>&1 | grep -i hotspot
# JVM_IS_HOTSPOT=$?
# set -e

# echo "JAVA_MINOR_VERSION $JAVA_MINOR_VERSION"
# JAVA_SUB_VERSION=`echo $JAVA_VERSION | awk -F\_ '{print $2}'`
# if (($JAVA_MAJOR_VERSION < 6)) ||
# ((($JAVA_MAJOR_VERSION == 6)) && ((($JAVA_MINOR_VERSION < 23)) || (($JVM_IS_HOTSPOT==0)) )); then
# echo "You are running Java version ${JAVA_VERSION}, please run Java 7 or HotSpot JDK 6u23 or newer."
# exit -1
# fi

if [ "x$SCALA_HOME" != "x" ] ; then
# User already specified SCALA_HOME. Make sure the correct version of Scala installed.
Expand Down Expand Up @@ -248,28 +248,27 @@ else
# Download and build Spark.
git clone $SPARK_GIT_URL
pushd spark
# Replace Hadoop1 settings in build file, which are the default, with Hadoop2 settings.
sed -i.backup "s/val HADOOP_VERSION = \"1\.0\.4\"/val HADOOP_VERSION = \"$HADOOP_VERSION\"/" project/SparkBuild.scala
sed -i.backup "s/val HADOOP_MAJOR_VERSION = \"1\"/val HADOOP_MAJOR_VERSION = \"$HADOOP_MAJOR_VERSION\"/" project/SparkBuild.scala
export SPARK_HADOOP_VERSION=$SPARK_HADOOP_VERSION
export SPARK_WITH_YARN=$SPARK_WITH_YARN
# Build spark and push the jars to local Ivy/Maven caches.
sbt/sbt clean publish-local
popd
fi
export SPARK_HOME="$WORKSPACE/spark"

if $SKIP_HADOOP ; then
if [ ! -e "hadoop-${HADOOP_VERSION}" ] ; then
echo "hadoop-${HADOOP_VERSION} must exist when skipping Hadoop download and build stage."
if [ ! -e "hadoop-${SPARK_HADOOP_VERSION}" ] ; then
echo "hadoop-${SPARK_HADOOP_VERSION} must exist when skipping Hadoop download and build stage."
exit -1
fi
else
rm -rf hadoop-${HADOOP_VERSION}.tar.gz
rm -rf hadoop-${HADOOP_VERSION}
rm -rf hadoop-${SPARK_HADOOP_VERSION}.tar.gz
rm -rf hadoop-${SPARK_HADOOP_VERSION}
# Download and unpack Hadoop and set env variable.
wget http://archive.apache.org/dist/hadoop/core/hadoop-${HADOOP_VERSION}/hadoop-${HADOOP_VERSION}.tar.gz
tar xvfz hadoop-${HADOOP_VERSION}.tar.gz
wget http://archive.apache.org/dist/hadoop/core/hadoop-${SPARK_HADOOP_VERSION}/hadoop-${SPARK_HADOOP_VERSION}.tar.gz
tar xvfz hadoop-${SPARK_HADOOP_VERSION}.tar.gz
fi
export HADOOP_HOME="$WORKSPACE/hadoop-${HADOOP_VERSION}"
export HADOOP_HOME="$WORKSPACE/hadoop-${SPARK_HADOOP_VERSION}"

#####################################################################
# Download and build Hive.
Expand Down
10 changes: 9 additions & 1 deletion project/SharkBuild.scala
Original file line number Diff line number Diff line change
Expand Up @@ -93,11 +93,19 @@ object SharkBuild extends Build {
"org.spark-project" %% "spark-core" % SPARK_VERSION,
"org.spark-project" %% "spark-repl" % SPARK_VERSION,
"com.google.guava" % "guava" % "14.0.1",
"org.apache.hadoop" % "hadoop-core" % HADOOP_VERSION,
"org.apache.hadoop" % "hadoop-client" % HADOOP_VERSION,
// See https://code.google.com/p/guava-libraries/issues/detail?id=1095
"com.google.code.findbugs" % "jsr305" % "1.3.+",

// Hive unit test requirements. These are used by Hadoop to run the tests, but not necessary
// in usual Shark runs.
"commons-io" % "commons-io" % "2.1",
"commons-httpclient" % "commons-httpclient" % "3.1" % "test",

// Test infrastructure
"org.scalatest" %% "scalatest" % "1.9.1" % "test",
"junit" % "junit" % "4.10" % "test",
"net.java.dev.jets3t" % "jets3t" % "0.9.0",
"com.novocode" % "junit-interface" % "0.8" % "test") ++
(if (TACHYON_ENABLED) Some("org.tachyonproject" % "tachyon" % "0.3.0-SNAPSHOT" excludeAll(excludeKyro, excludeHadoop) ) else None).toSeq
)
Expand Down
2 changes: 1 addition & 1 deletion src/main/scala/shark/execution/CoGroupedRDD.scala
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ class CoGroupedRDD[K](@transient var rdds: Seq[RDD[(_, _)]], part: Partitioner)
// Read map outputs of shuffle
def mergePair(pair: (K, Any)) { getSeq(pair._1)(depNum) += pair._2 }
val fetcher = SparkEnv.get.shuffleFetcher
fetcher.fetch[K, Seq[Any]](shuffleId, split.index, context.taskMetrics, serializer)
fetcher.fetch[(K, Seq[Any])](shuffleId, split.index, context.taskMetrics, serializer)
.foreach(mergePair)
}
}
Expand Down
4 changes: 2 additions & 2 deletions src/main/scala/shark/execution/FileSinkOperator.scala
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ import org.apache.hadoop.hive.ql.exec.{FileSinkOperator => HiveFileSinkOperator}
import org.apache.hadoop.hive.ql.exec.JobCloseFeedBack
import org.apache.hadoop.mapred.TaskID
import org.apache.hadoop.mapred.TaskAttemptID
import org.apache.hadoop.mapred.HadoopWriter
import org.apache.hadoop.mapred.SparkHadoopWriter

import shark.execution.serialization.OperatorSerializationWrapper

Expand All @@ -52,7 +52,7 @@ class FileSinkOperator extends TerminalOperator with Serializable {
def setConfParams(conf: HiveConf, context: TaskContext) {
val jobID = context.stageId
val splitID = context.splitId
val jID = HadoopWriter.createJobID(now, jobID)
val jID = SparkHadoopWriter.createJobID(now, jobID)
val taID = new TaskAttemptID(new TaskID(jID, true, splitID), 0)
conf.set("mapred.job.id", jID.toString)
conf.set("mapred.tip.id", taID.getTaskID.toString)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,7 +198,7 @@ class GroupByPostShuffleOperator extends GroupByPreShuffleOperator with HiveTopO
if (numReduceTasks < 1 || conf.getKeys.size == 0) numReduceTasks = 1
val partitioner = new ReduceKeyPartitioner(numReduceTasks)

val repartitionedRDD = new ShuffledRDD[Any, Any](inputRdd, partitioner)
val repartitionedRDD = new ShuffledRDD[Any, Any, (Any, Any)](inputRdd, partitioner)
.setSerializer(SharkEnv.shuffleSerializerName)

if (distinctKeyAggrs.size > 0) {
Expand Down
146 changes: 56 additions & 90 deletions src/main/scala/shark/execution/MapSplitPruning.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,12 @@ import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPOr
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPAnd
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqual
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBaseCompare
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFBetween
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPGreaterThan
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPLessThan
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrGreaterThan
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFOPEqualOrLessThan
import org.apache.hadoop.hive.ql.udf.generic.GenericUDFIn
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector
import org.apache.hadoop.io.Text

Expand Down Expand Up @@ -54,7 +56,14 @@ object MapSplitPruning {
case e: ExprNodeGenericFuncEvaluator => {
e.genericUDF match {
case _: GenericUDFOPAnd => test(s, e.children(0)) && test(s, e.children(1))
case _: GenericUDFOPOr => test(s, e.children(0)) || test(s, e.children(1))
case _: GenericUDFOPOr => test(s, e.children(0)) || test(s, e.children(1))
case _: GenericUDFBetween =>
testBetweenPredicate(s, e.children(0).asInstanceOf[ExprNodeConstantEvaluator],
e.children(1).asInstanceOf[ExprNodeColumnEvaluator],
e.children(2).asInstanceOf[ExprNodeConstantEvaluator],
e.children(3).asInstanceOf[ExprNodeConstantEvaluator])
case _: GenericUDFIn =>
testInPredicate(s, e.children(0).asInstanceOf[ExprNodeColumnEvaluator], e.children.drop(1))
case udf: GenericUDFBaseCompare =>
testComparisonPredicate(s, udf, e.children(0), e.children(1))
case _ => true
Expand All @@ -65,6 +74,47 @@ object MapSplitPruning {
}
}

def testInPredicate(
s: TablePartitionStats,
columnEval: ExprNodeColumnEvaluator,
expEvals: Array[ExprNodeEvaluator]): Boolean = {

val field = columnEval.field.asInstanceOf[IDStructField]
val columnStats = s.stats(field.fieldID)

if (columnStats != null) {
expEvals.exists {
e =>
val constEval = e.asInstanceOf[ExprNodeConstantEvaluator]
columnStats := constEval.expr.getValue()
}
} else {
// If there is no stats on the column, don't prune.
true
}
}

def testBetweenPredicate(
s: TablePartitionStats,
invertEval: ExprNodeConstantEvaluator,
columnEval: ExprNodeColumnEvaluator,
leftEval: ExprNodeConstantEvaluator,
rightEval: ExprNodeConstantEvaluator): Boolean = {

val field = columnEval.field.asInstanceOf[IDStructField]
val columnStats = s.stats(field.fieldID)
val leftValue: Object = leftEval.expr.getValue
val rightValue: Object = rightEval.expr.getValue
val invertValue: Boolean = invertEval.expr.getValue.asInstanceOf[Boolean]

if (columnStats != null) {
val exists = (columnStats :>< (leftValue , rightValue))
if (invertValue) !exists else exists
} else {
// If there is no stats on the column, don't prune.
true
}
}
/**
* Test whether we should keep the split as a candidate given the comparison
* predicate. Return true if the split should be kept as a candidate, false if
Expand Down Expand Up @@ -100,14 +150,12 @@ object MapSplitPruning {
val columnStats = s.stats(field.fieldID)

if (columnStats != null) {
val min = columnStats.min
val max = columnStats.max
udf match {
case _: GenericUDFOPEqual => testEqual(min, max, value)
case _: GenericUDFOPEqualOrGreaterThan => testEqualOrGreaterThan(min, max, value)
case _: GenericUDFOPEqualOrLessThan => testEqualOrLessThan(min, max, value)
case _: GenericUDFOPGreaterThan => testGreaterThan(min, max, value)
case _: GenericUDFOPLessThan => testLessThan(min, max, value)
case _: GenericUDFOPEqual => columnStats := value
case _: GenericUDFOPEqualOrGreaterThan => columnStats :>= value
case _: GenericUDFOPEqualOrLessThan => columnStats :<= value
case _: GenericUDFOPGreaterThan => columnStats :> value
case _: GenericUDFOPLessThan => columnStats :< value
case _ => true
}
} else {
Expand All @@ -119,86 +167,4 @@ object MapSplitPruning {
true
}
}

def testEqual(min: Any, max: Any, value: Any): Boolean = {
// Assume min and max have the same type.
val c = tryCompare(min, value)
tryCompare(min, value) match {
case Some(c) => c <= 0 && tryCompare(max, value).get >= 0
case None => true
}
}

def testEqualOrGreaterThan(min: Any, max: Any, value: Any): Boolean = {
// Assume min and max have the same type.
tryCompare(max, value) match {
case Some(c) => c >= 0
case None => true
}
}

def testEqualOrLessThan(min: Any, max: Any, value: Any): Boolean = {
// Assume min and max have the same type.
tryCompare(min, value) match {
case Some(c) => c <= 0
case None => true
}
}

def testGreaterThan(min: Any, max: Any, value: Any): Boolean = {
// Assume min and max have the same type.
tryCompare(max, value) match {
case Some(c) => c > 0
case None => true
}
}

def testLessThan(min: Any, max: Any, value: Any): Boolean = {
// Assume min and max have the same type.
tryCompare(min, value) match {
case Some(c) => c < 0
case None => true
}
}

def testNotEqual(min: Any, max: Any, value: Any): Boolean = {
// Assume min and max have the same type.
tryCompare(min, value) match {
case Some(c) => c != 0 || (tryCompare(max, value).get != 0)
case None => true
}
}

/**
* Try to compare value a and b.
* If a is greater than b, return 1.
* If a equals b, return 0.
* If a is less than b, return -1.
* If a and b are not comparable, return None.
*/
def tryCompare(a: Any, b: Any): Option[Int] = a match {
case a: Number => b match {
case b: Number => Some((a.longValue - b.longValue).toInt)
case _ => None
}
case a: Boolean => b match {
case b: Boolean => Some(if (a && !b) 1 else if (!a && b) -1 else 0)
case _ => None
}
case a: Text => b match {
case b: Text => Some(a.compareTo(b))
case b: String => Some(a.compareTo(new Text(b)))
case _=> None
}
case a: String => b match {
case b: Text => Some((new Text(a)).compareTo(b))
case b: String => Some(a.compareTo(b))
case _ => None
}
case a: Timestamp => b match {
case b: Timestamp => Some(a.compareTo(b))
case _ => None
}
case _ => None
}
}
5 changes: 3 additions & 2 deletions src/main/scala/shark/execution/RDDUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ object RDDUtils {
def repartition[K: ClassManifest, V: ClassManifest](rdd: RDD[(K, V)], part: Partitioner)
: RDD[(K, V)] =
{
new ShuffledRDD[K, V](rdd, part).setSerializer(SharkEnv.shuffleSerializerName)
new ShuffledRDD[K, V, (K, V)](rdd, part).setSerializer(SharkEnv.shuffleSerializerName)
}

/**
Expand All @@ -66,7 +66,8 @@ object RDDUtils {
: RDD[(K, V)] =
{
val part = new RangePartitioner(rdd.partitions.length, rdd)
val shuffled = new ShuffledRDD[K, V](rdd, part).setSerializer(SharkEnv.shuffleSerializerName)
val shuffled = new ShuffledRDD[K, V, (K, V)](rdd, part)
.setSerializer(SharkEnv.shuffleSerializerName)
shuffled.mapPartitions(iter => {
val buf = iter.toArray
buf.sortWith((x, y) => x._1.compareTo(y._1) < 0).iterator
Expand Down
Loading

0 comments on commit c911808

Please sign in to comment.