Skip to content

Commit

Permalink
Merge pull request apache#57 from rayortigas/s3
Browse files Browse the repository at this point in the history
S3 support.
  • Loading branch information
sahilTakiar committed Mar 26, 2015
2 parents 4a9ecca + bc03f34 commit 2e7edf7
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@

import gobblin.source.extractor.filebased.FileBasedHelper;
import gobblin.source.extractor.filebased.FileBasedHelperException;
import gobblin.util.HadoopUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
Expand All @@ -29,6 +31,8 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.io.compress.CompressionCodecFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -39,10 +43,16 @@
public class HadoopFsHelper implements FileBasedHelper {
private static Logger log = LoggerFactory.getLogger(HadoopFsHelper.class);
private State state;
private final Configuration configuration;
private FileSystem fs;

public HadoopFsHelper(State state) {
this(state, HadoopUtils.newConfiguration());
}

public HadoopFsHelper(State state, Configuration configuration) {
this.state = state;
this.configuration = configuration;
}

public FileSystem getFileSystem() {
Expand All @@ -55,7 +65,7 @@ public void connect()
URI uri = null;
try {
uri = new URI(state.getProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI));
this.fs = FileSystem.get(uri, new Configuration());
this.fs = FileSystem.get(uri, configuration);
} catch (IOException e) {
throw new FileBasedHelperException("Cannot connect to given URI " + uri + " due to " + e.getMessage(), e);
} catch (URISyntaxException e) {
Expand Down Expand Up @@ -111,7 +121,13 @@ public void lsr(Path p, List<String> results)
public InputStream getFileStream(String path)
throws FileBasedHelperException {
try {
return this.fs.open(new Path(path));
Path p = new Path(path);
InputStream in = this.fs.open(p);
// Account for compressed files (e.g. gzip).
// https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/input/WholeTextFileRecordReader.scala
CompressionCodecFactory factory = new CompressionCodecFactory(this.fs.getConf());
CompressionCodec codec = factory.getCodec(p);
return (codec == null) ? in : codec.createInputStream(in);
} catch (IOException e) {
throw new FileBasedHelperException("Cannot do open file " + path + " due to " + e.getMessage(), e);
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
package gobblin.source.extractor.hadoop;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;

import gobblin.configuration.ConfigurationKeys;
import gobblin.configuration.SourceState;
import gobblin.source.extractor.filebased.FileBasedHelperException;

import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.testng.Assert;
import org.testng.annotations.Test;

public class HadoopFsHelperTest {

@Test(expectedExceptions = IllegalArgumentException.class)
public void testConnectFailsWithS3URLWithoutAWSCredentials() throws FileBasedHelperException {
Configuration conf = new Configuration(); // plain conf, no S3 credentials
SourceState sourceState = new SourceState();
sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, "s3://support.elasticmapreduce/spark/install-spark/");
HadoopFsHelper fsHelper = new HadoopFsHelper(sourceState, conf);
fsHelper.connect();
}

@Test
public void testGetFileStreamSucceedsWithUncompressedFile() throws FileBasedHelperException, IOException {
SourceState sourceState = new SourceState();
URL rootUrl = getClass().getResource("/source/");
String rootPath = rootUrl.toString();
sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, rootPath);
HadoopFsHelper fsHelper = new HadoopFsHelper(sourceState);

fsHelper.connect();
URL url = getClass().getResource("/source/simple.tsv");
String path = url.toString();
InputStream in = fsHelper.getFileStream(path);
String contents = IOUtils.toString(in, "UTF-8");
Assert.assertEquals(contents, "A\t1\nB\t2\n");
}

@Test
public void testGetFileStreamSucceedsWithGZIPFile() throws FileBasedHelperException, IOException {
SourceState sourceState = new SourceState();
URL rootUrl = getClass().getResource("/source/");
String rootPath = rootUrl.toString();
sourceState.setProp(ConfigurationKeys.SOURCE_FILEBASED_FS_URI, rootPath);
HadoopFsHelper fsHelper = new HadoopFsHelper(sourceState);

fsHelper.connect();
URL url = getClass().getResource("/source/simple.tsv.gz");
String path = url.toString();
InputStream in = fsHelper.getFileStream(path);
String contents = IOUtils.toString(in, "UTF-8");
Assert.assertEquals(contents, "A\t1\nB\t2\n");
}
}
2 changes: 2 additions & 0 deletions gobblin-core/src/test/resources/source/simple.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
A 1
B 2
Binary file not shown.
22 changes: 22 additions & 0 deletions gobblin-utility/src/main/java/gobblin/util/HadoopUtils.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package gobblin.util;

import org.apache.hadoop.conf.Configuration;

public class HadoopUtils {
public static Configuration newConfiguration() {
Configuration conf = new Configuration();

// Explicitly check for S3 environment variables, so that Hadoop can access s3 and s3n URLs.
// h/t https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/deploy/SparkHadoopUtil.scala
String awsAccessKeyId = System.getenv("AWS_ACCESS_KEY_ID");
String awsSecretAccessKey = System.getenv("AWS_SECRET_ACCESS_KEY");
if (awsAccessKeyId != null && awsSecretAccessKey != null) {
conf.set("fs.s3.awsAccessKeyId", awsAccessKeyId);
conf.set("fs.s3.awsSecretAccessKey", awsSecretAccessKey);
conf.set("fs.s3n.awsAccessKeyId", awsAccessKeyId);
conf.set("fs.s3n.awsSecretAccessKey", awsSecretAccessKey);
}

return conf;
}
}

0 comments on commit 2e7edf7

Please sign in to comment.