Skip to content

Commit

Permalink
Fix the geoparquet footer reader.
Browse files Browse the repository at this point in the history
  • Loading branch information
Imbruced committed Jan 8, 2025
1 parent 7439505 commit 0006471
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
import org.apache.parquet.ParquetReadOptions
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -66,23 +66,14 @@ object GeoParquetMetadataPartitionReaderFactory {
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, configuration)
val inputStream = inputFile.newStream()

try {
readFile(configuration, partitionedFile, readDataSchema, reader)
} finally {
reader.close()
}
}
val footer = ParquetFileReader
.readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType,
reader: ParquetFileReader): Iterator[InternalRow] = {
val filePath = partitionedFile.filePath
val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
val filePath = partitionedFile.toPath.toString
val metadata = footer.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.ParquetReadOptions
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -62,28 +63,18 @@ case class GeoParquetMetadataPartitionReaderFactory(
}

object GeoParquetMetadataPartitionReaderFactory {

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))
val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, configuration)
val inputStream = inputFile.newStream()

try {
readFile(configuration, partitionedFile, readDataSchema, reader)
} finally {
reader.close()
}
}
val footer = ParquetFileReader
.readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType,
reader: ParquetFileReader): Iterator[InternalRow] = {
val filePath = partitionedFile.toPath.toString
val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
val metadata = footer.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.spark.sql.execution.datasources.v2.geoparquet.metadata

import org.apache.hadoop.conf.Configuration
import org.apache.parquet.ParquetReadOptions
import org.apache.parquet.hadoop.ParquetFileReader
import org.apache.parquet.hadoop.util.HadoopInputFile
import org.apache.spark.broadcast.Broadcast
Expand Down Expand Up @@ -62,29 +63,19 @@ case class GeoParquetMetadataPartitionReaderFactory(
}

object GeoParquetMetadataPartitionReaderFactory {

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType): Iterator[InternalRow] = {
val reader = ParquetFileReader
.open(HadoopInputFile.fromPath(partitionedFile.toPath, configuration))

try {
readFile(configuration, partitionedFile, readDataSchema, reader)
} finally {
reader.close()
}
}
val inputFile = HadoopInputFile.fromPath(partitionedFile.toPath, configuration)
val inputStream = inputFile.newStream()

private def readFile(
configuration: Configuration,
partitionedFile: PartitionedFile,
readDataSchema: StructType,
reader: ParquetFileReader): Iterator[InternalRow] = {
val filePath = partitionedFile.toPath.toString
val footer = ParquetFileReader
.readFooter(inputFile, ParquetReadOptions.builder().build(), inputStream)

val metadata = reader.getFooter.getFileMetaData.getKeyValueMetaData
val filePath = partitionedFile.toPath.toString
val metadata = footer.getFileMetaData.getKeyValueMetaData
val row = GeoParquetMetaData.parseKeyValueMetaData(metadata) match {
case Some(geo) =>
val geoColumnsMap = geo.columns.map { case (columnName, columnMetadata) =>
Expand Down

0 comments on commit 0006471

Please sign in to comment.