diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala index 8cfde46186ca4..bb2240b335d4f 100644 --- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala +++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/TableReader.scala @@ -21,9 +21,14 @@ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.fs.{Path, PathFilter} import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants._ import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.io.orc.OrcSerde import org.apache.hadoop.hive.ql.metadata.{Partition => HivePartition, Table => HiveTable} import org.apache.hadoop.hive.ql.plan.TableDesc -import org.apache.hadoop.hive.serde2.Deserializer +import org.apache.hadoop.hive.serde2.{Serializer, Deserializer} +import org.apache.hadoop.hive.serde2.columnar.{ColumnarStruct => HiveColumnarStruct} +import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct +import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorConverters} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.IdentityConverter import org.apache.hadoop.io.Writable import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} @@ -40,7 +45,6 @@ private[hive] sealed trait TableReader { def makeRDDForPartitionedTable(partitions: Seq[HivePartition]): RDD[_] } - /** * Helper class for scanning tables stored in Hadoop - e.g., to read Hive tables that reside in the * data warehouse directory. @@ -157,21 +161,60 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient sc: HiveCon // Create local references so that the outer object isn't serialized. val tableDesc = _tableDesc + val tableSerDeClass = tableDesc.getDeserializerClass + val broadcastedHiveConf = _broadcastedHiveConf val localDeserializer = partDeserializer val hivePartitionRDD = createHadoopRdd(tableDesc, inputPathStr, ifc) - hivePartitionRDD.mapPartitions { iter => + hivePartitionRDD.mapPartitions { case iter => val hconf = broadcastedHiveConf.value.value val rowWithPartArr = new Array[Object](2) - // Map each tuple to a row object - iter.map { value => - val deserializer = localDeserializer.newInstance() - deserializer.initialize(hconf, partProps) - val deserializedRow = deserializer.deserialize(value) - rowWithPartArr.update(0, deserializedRow) - rowWithPartArr.update(1, partValues) - rowWithPartArr.asInstanceOf[Object] + + val partSerDe = localDeserializer.newInstance() + val tableSerDe = tableSerDeClass.newInstance() + partSerDe.initialize(hconf, partProps) + tableSerDe.initialize(hconf, tableDesc.getProperties) + + val tblConvertedOI = ObjectInspectorConverters.getConvertedOI( + partSerDe.getObjectInspector, tableSerDe.getObjectInspector, true) + .asInstanceOf[StructObjectInspector] + val partTblObjectInspectorConverter = ObjectInspectorConverters.getConverter( + partSerDe.getObjectInspector, tblConvertedOI) + + // This is done per partition, and unnecessary to put it in the iterations (in iter.map). + rowWithPartArr.update(1, partValues) + + // Map each tuple to a row object. + if (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter]) { + iter.map { case value => + rowWithPartArr.update(0, partSerDe.deserialize(value)) + rowWithPartArr.asInstanceOf[Object] + } + } else { + iter.map { case value => + val deserializedRow = { + // If partition schema does not match table schema, update the row to match. + val convertedRow = + partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) + + // If conversion was performed, convertedRow will be a standard Object, but if + // conversion wasn't necessary, it will still be lazy. We can't have both across + // partitions, so we serialize and deserialize again to make it lazy. + if (tableSerDe.isInstanceOf[OrcSerde]) { + convertedRow + } else { + convertedRow match { + case _: LazyStruct => convertedRow + case _: HiveColumnarStruct => convertedRow + case _ => tableSerDe.deserialize( + tableSerDe.asInstanceOf[Serializer].serialize(convertedRow, tblConvertedOI)) + } + } + } + rowWithPartArr.update(0, deserializedRow) + rowWithPartArr.asInstanceOf[Object] + } } } }.toSeq