Skip to content

Commit

Permalink
Merge pull request #40 from treeverse/handle_external_paths
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnyaug authored Oct 19, 2023
2 parents 095e5b7 + fc7cbdf commit 6418b3c
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 4 deletions.
19 changes: 15 additions & 4 deletions src/main/java/io/lakefs/iceberg/LakeFSFileIO.java
Original file line number Diff line number Diff line change
Expand Up @@ -30,28 +30,39 @@ public Map<String, String> properties() {
return wrapped.properties();
}


@Override
public InputFile newInputFile(String path) {
if (!path.startsWith("s3a://")) {
if (!path.matches("^[0-9a-z]*://.*")) {
path = String.format("s3a://%s/%s/%s", lakeFSRepo, lakeFSRef, path);
}
if (!path.startsWith(String.format("s3a://%s/%s/", lakeFSRepo, lakeFSRef))) {
// not a path in the repository, treat as a regular path
return wrapped.newInputFile(path);
}
return HadoopInputFile.fromPath(new LakeFSPath(path), wrapped.conf());
}

@Override
public InputFile newInputFile(String path, long length) {
if (!path.startsWith("s3a://")) {
if (!path.matches("^[0-9a-z]*://.*")) {
path = String.format("s3a://%s/%s/%s", lakeFSRepo, lakeFSRef, path);
}
if (!path.startsWith(String.format("s3a://%s/%s/", lakeFSRepo, lakeFSRef))) {
// not a path in the repository, treat as a regular path
return wrapped.newInputFile(path, length);
}
return HadoopInputFile.fromPath(new LakeFSPath(path), length, wrapped.conf());
}

@Override
public OutputFile newOutputFile(String path) {
if (!path.startsWith("s3a://")) {
if (!path.matches("^[0-9a-z]*://.*")) {
path = String.format("s3a://%s/%s/%s", lakeFSRepo, lakeFSRef, path);
}
if (!path.startsWith(String.format("s3a://%s/%s/", lakeFSRepo, lakeFSRef))) {
// not a path in the repository, treat as a regular path
return wrapped.newOutputFile(path);
}
return HadoopOutputFile.fromPath(new LakeFSPath(path), wrapped.conf());
}

Expand Down
35 changes: 35 additions & 0 deletions src/test/java/io/lakefs/iceberg/TestLakeFSFileIO.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
package io.lakefs.iceberg;

import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.hadoop.HadoopFileIO;
import org.apache.iceberg.io.InputFile;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;

public class TestLakeFSFileIO {

private LakeFSFileIO lakeFSFileIO;

@Before
public void setUp() {
HadoopFileIO hadoopFileIO = new HadoopFileIO(new Configuration());
String lakeFSRepo = "myLakeFSRepo";
String lakeFSRef = "myLakeFSRef";
lakeFSFileIO = new LakeFSFileIO(hadoopFileIO, lakeFSRepo, lakeFSRef);
}

@Test
public void testNewInputFile() {
// Test the behavior of newInputFile method
String relativePath = "path/in/repo";
String absolutePath = "s3a://myLakeFSRepo/myLakeFSRef/other/path/in/repo";
String externalPath = "s3a://otherBucket/otherPath";
InputFile relativeInputFile = lakeFSFileIO.newInputFile(relativePath);
Assert.assertEquals("path/in/repo", relativeInputFile.location());
InputFile absoluteInputFile = lakeFSFileIO.newInputFile(absolutePath);
Assert.assertEquals("other/path/in/repo", absoluteInputFile.location());
InputFile externalInputFile = lakeFSFileIO.newInputFile(externalPath);
Assert.assertEquals("s3a://otherBucket/otherPath", externalInputFile.location());
}
}

0 comments on commit 6418b3c

Please sign in to comment.