-
Notifications
You must be signed in to change notification settings - Fork 327
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
unnecessary serialize/deserialize for cases where no conversion was performed #329
base: branch-0.9
Are you sure you want to change the base?
Changes from 2 commits
0b2a7a2
7bfc9e6
eea72cd
6efb4d0
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,6 +33,7 @@ import org.apache.hadoop.hive.serde2.`lazy`.LazyStruct | |
import org.apache.hadoop.hive.serde2.objectinspector.{StructObjectInspector, ObjectInspectorConverters} | ||
import org.apache.hadoop.io.Writable | ||
import org.apache.hadoop.mapred.{FileInputFormat, InputFormat, JobConf} | ||
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorConverters.IdentityConverter | ||
|
||
import org.apache.spark.broadcast.Broadcast | ||
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD} | ||
|
@@ -194,30 +195,36 @@ class HadoopTableReader(@transient _tableDesc: TableDesc, @transient _localHConf | |
partSerDe.getObjectInspector(), tblConvertedOI) | ||
val rowWithPartArr = new Array[Object](2) | ||
// Map each tuple to a row object | ||
iter.map { value => | ||
val deserializedRow = { | ||
|
||
// If partition schema does not match table schema, update the row to match | ||
val convertedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) | ||
|
||
// If conversion was performed, convertedRow will be a standard Object, but if | ||
// conversion wasn't necessary, it will still be lazy. We can't have both across | ||
// partitions, so we serialize and deserialize again to make it lazy. | ||
if (tableSerDe.isInstanceOf[OrcSerde]) { | ||
convertedRow | ||
} else { | ||
|
||
if ((tableSerDe.isInstanceOf[OrcSerde]) || (partTblObjectInspectorConverter.isInstanceOf[IdentityConverter])) { | ||
iter.map { value => | ||
val deserializedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. If it's the IdentityConverter, we don't need to call convert method. |
||
rowWithPartArr.update(0, deserializedRow) | ||
rowWithPartArr.update(1, partValues) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The partValues doesn't change per partition, we can move out of the iter.map, the same for the later one. |
||
rowWithPartArr.asInstanceOf[Object] | ||
} | ||
} | ||
else { | ||
iter.map { value => | ||
val deserializedRow = { | ||
|
||
// If partition schema does not match table schema, update the row to match | ||
val convertedRow = partTblObjectInspectorConverter.convert(partSerDe.deserialize(value)) | ||
|
||
// If conversion was performed, convertedRow will be a standard Object, but if | ||
// conversion wasn't necessary, it will still be lazy. We can't have both across | ||
// partitions, so we serialize and deserialize again to make it lazy. | ||
convertedRow match { | ||
case _: LazyStruct => convertedRow | ||
case _: HiveColumnarStruct => convertedRow | ||
case _ => tableSerDe.deserialize( | ||
tableSerDe.asInstanceOf[Serializer].serialize( | ||
convertedRow, tblConvertedOI)) | ||
case _ => | ||
tableSerDe.deserialize( tableSerDe.asInstanceOf[Serializer].serialize( convertedRow, tblConvertedOI)) | ||
} | ||
} | ||
rowWithPartArr.update(0, deserializedRow) | ||
rowWithPartArr.update(1, partValues) | ||
rowWithPartArr.asInstanceOf[Object] | ||
} | ||
rowWithPartArr.update(0, deserializedRow) | ||
rowWithPartArr.update(1, partValues) | ||
rowWithPartArr.asInstanceOf[Object] | ||
} | ||
} | ||
}.toSeq | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result should be correct, however, this is confusing in logic.