Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-2443][SQL] Fix slow read from partitioned tables #1390

Closed
wants to merge 2 commits into from
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,14 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{Path, PathFilter}
import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._
import org.apache.hadoop.hive.ql.exec.Utilities
import org.apache.hadoop.hive.ql.io.orc.OrcSerde
import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable}
import org.apache.hadoop.hive.ql.plan.TableDesc
import org.apache.hadoop.hive.serde2.Deserializer
import org.apache.hadoop.hive.serde2.{Serializer, Deserializer}
import org.apache.hadoop.hive.serde2.columnar.{ColumnarStruct => HiveColumnarStruct}
import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct
import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorConverters}
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.IdentityConverter
import org.apache.hadoop.io.Writable
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf}

Expand All @@ -40,7 +45,6 @@ private[hive] sealed trait TableReader {
def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_]
}


/**
* Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the
* data warehouse directory.
Expand Down Expand Up @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon

// Create local references so that the outer object isn't serialized.
val tableDesc = _tableDesc
val tableSerDeClass = tableDesc.getDeserializerClass

val broadcastedHiveConf = _broadcastedHiveConf
val localDeserializer = partDeserializer

val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)
hivePartitionRDD.mapPartitions { iter =>
hivePartitionRDD.mapPartitions { case iter =>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is the pattern matching here necessary?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I initially thought in a function context, { case x => ... } will be optimized to { x => ... }. I did a scalac -print on a simple program to confirm that this is not the case.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh really? how does the generated bytecode differ?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

https://gist.github.com/concretevitamin/272fe413dcc06b8cbe9c

It seems the with-case version does have more instructions to do.

val hconf = broadcastedHiveConf.value.value
val rowWithPartArr = new Array[Object](2)
// Map each tuple to a row object
iter.map { value =>
val deserializer = localDeserializer.newInstance()
deserializer.initialize(hconf, partProps)
val deserializedRow = deserializer.deserialize(value)
rowWithPartArr.update(0, deserializedRow)
rowWithPartArr.update(1, partValues)
rowWithPartArr.asInstanceOf[Object]

val partSerDe = localDeserializer.newInstance()
val tableSerDe = tableSerDeClass.newInstance()
partSerDe.initialize(hconf, partProps)
tableSerDe.initialize(hconf, tableDesc.getProperties)

val tblConvertedOI = ObjectInspectorConverters.getConvertedOI(
partSerDe.getObjectInspector, tableSerDe.getObjectInspector, true)
.asInstanceOf[StructObjectInspector]
val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter(
partSerDe.getObjectInspector, tblConvertedOI)

// This is done per partition, and unnecessary to put it in the iterations (in iter.map).
rowWithPartArr.update(1, partValues)

// Map each tuple to a row object.
if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) {
iter.map { case value =>
rowWithPartArr.update(0, partSerDe.deserialize(value))
rowWithPartArr.asInstanceOf[Object]
}
} else {
iter.map { case value =>
val deserializedRow = {
// If partition schema does not match table schema, update the row to match.
val convertedRow =
partTblObjectInspectorConverter.convert(partSerDe.deserialize(value))

// If conversion was performed, convertedRow will be a standard Object, but if
// conversion wasn't necessary, it will still be lazy. We can't have both across
// partitions, so we serialize and deserialize again to make it lazy.
if (tableSerDe.isInstanceOf[OrcSerde]) {
convertedRow
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need to take care ORC separately?

} else {
convertedRow match {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need to comment why we need to do this pattern matching. Also, why do we handle LazyStruct and ColumnarStruct specially? There are similar classes, e.g. LazyBinaryStruct and LazyBinaryColumnarStruct.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, the code is from Shark, and it is a little bit tricky. I think the logic here is:

  • We assumes the table object inspector is always a lazy objectinspector, and the deserializer always produces the lazy objects
  • We assumes the ObjectInspectorConverter always produces NON LAZY objects if the objectinspectors ARE NOT compatible.
  • If convertedRow is the lazy object, which means partition objectinspector is compatible with the table objectinspector(convertedRow can be retrieved directly), otherwise, the non lazy convertedRow is not acceptable by the table object inspector( table object inspector is the lazy object inspector), hence we need to convert it by serializing and de-serializing again.

I don't think we need to maintain the logic here, as we can provide a better solution for the partition based table scanning. All we need to do is converting the raw object into MutableRow directly, as we did in HiveTableScan of Spark.

case _: LazyStruct => convertedRow
case _: HiveColumnarStruct => convertedRow
case _ => tableSerDe.deserialize(
tableSerDe.asInstanceOf[Serializer].serialize(convertedRow, tblConvertedOI))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As mentioned by @chenghao-intel, can we avoid it?

}
}
}
rowWithPartArr.update(0, deserializedRow)
rowWithPartArr.asInstanceOf[Object]
}
}
}
}.toSeq
Expand Down