Skip to content

Commit

Permalink
When getting a file's input stream, also handle compressed (e.g. gzip…
Browse files Browse the repository at this point in the history
…) files.
  • Loading branch information
rayortigas committed Mar 19, 2015
1 parent 3e01a31 commit e8aea3c
Showing 1 changed file with 10 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

import gobblin.source.extractor.filebased.FileBasedHelper;
import gobblin.source.extractor.filebased.FileBasedHelperException;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -29,6 +30,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -117,7 +120,13 @@ public void lsr(Path p, List<String> results)
public InputStream getFileStream(String path)
throws FileBasedHelperException {
try {
return this.fs.open(new Path(path));
Path p = new Path(path);
InputStream in = this.fs.open(p);
// Account for compressed files (e.g. gzip).
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
CompressionCodecFactory factory = new CompressionCodecFactory(this.fs.getConf());
CompressionCodec codec = factory.getCodec(p);
return (codec == null) ? in : codec.createInputStream(in);
} catch (IOException e) {
throw new FileBasedHelperException("Cannot do open file " + path + " due to " + e.getMessage(), e);
}
Expand Down

0 comments on commit e8aea3c

Please sign in to comment.