diff --git a/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java b/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java index 74cb9cb3b6f..9807739f6ec 100644 --- a/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java +++ b/gobblin-core/src/main/java/gobblin/source/extractor/hadoop/HadoopFsHelper.java @@ -13,6 +13,7 @@ import gobblin.source.extractor.filebased.FileBasedHelper; import gobblin.source.extractor.filebased.FileBasedHelperException; + import java.io.IOException; import java.io.InputStream; import java.net.URI; @@ -29,6 +30,8 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.compress.CompressionCodec; +import org.apache.hadoop.io.compress.CompressionCodecFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -117,7 +120,13 @@ public void lsr(Path p, List results) public InputStream getFileStream(String path) throws FileBasedHelperException { try { - return this.fs.open(new Path(path)); + Path p = new Path(path); + InputStream in = this.fs.open(p); + // Account for compressed files (e.g. gzip). + // https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala + CompressionCodecFactory factory = new CompressionCodecFactory(this.fs.getConf()); + CompressionCodec codec = factory.getCodec(p); + return (codec == null) ? in : codec.createInputStream(in); } catch (IOException e) { throw new FileBasedHelperException("Cannot do open file " + path + " due to " + e.getMessage(), e); }