Skip to content

Commit

Permalink
Added utils for converting File and Path to URI (deephaven#5247)
Browse files Browse the repository at this point in the history
  • Loading branch information
malhotrashivam authored Mar 14, 2024
1 parent 7c7d49f commit 45121bf
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 50 deletions.
71 changes: 71 additions & 0 deletions Base/src/main/java/io/deephaven/base/FileUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@
import org.jetbrains.annotations.Nullable;

import java.io.*;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.file.Path;
import java.util.ArrayList;

public class FileUtils {
Expand Down Expand Up @@ -249,4 +252,72 @@ public boolean accept(File pathname) {
|| (pathname.isFile() && (normalFileFilter == null || normalFileFilter.accept(pathname)));
}
}

/**
* Take the file source path or URI string and convert it to a URI object.
*
* @param source The file source path or URI
* @param isDirectory Whether the source is a directory
* @return The URI object
*/
public static URI convertToURI(final String source, final boolean isDirectory) {
if (source.isEmpty()) {
throw new IllegalArgumentException("Cannot convert empty source to URI");
}
final URI uri;
try {
uri = new URI(source);
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return convertToURI(new File(source), isDirectory);
}
if (uri.getScheme() == null) {
// Convert to a "file" URI
return convertToURI(new File(source), isDirectory);
}
return uri;
}

/**
* Takes a file and convert it to a URI object with {@code "file"} scheme. This method is preferred instead of
* {@link File#toURI()} because {@link File#toURI()} internally calls {@link File#isDirectory()}, which typically
* invokes the {@code stat} system call, resulting in filesystem metadata access.
*
* @param file The file
* @param isDirectory Whether the source file is a directory
* @return The URI object
*/
public static URI convertToURI(final File file, final boolean isDirectory) {
String absPath = file.getAbsolutePath();
if (File.separatorChar != '/') {
absPath = absPath.replace(File.separatorChar, '/');
}
if (absPath.charAt(0) != '/') {
absPath = "/" + absPath;
}
if (isDirectory && absPath.charAt(absPath.length() - 1) != '/') {
absPath = absPath + "/";
}
if (absPath.startsWith("//")) {
absPath = "//" + absPath;
}
try {
return new URI("file", null, absPath, null);
} catch (final URISyntaxException e) {
throw new IllegalStateException("Failed to convert file to URI: " + file, e);
}
}

/**
* Takes a path and convert it to a URI object with {@code "file"} scheme. This method is preferred instead of
* {@link Path#toUri()} because {@link Path#toUri()} internally invokes the {@code stat} system call, resulting in
* filesystem metadata access.
*
* @param path The path
* @param isDirectory Whether the file is a directory
* @return The URI object
*/
public static URI convertToURI(final Path path, final boolean isDirectory) {
return convertToURI(path.toFile(), isDirectory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,43 +6,22 @@
import io.deephaven.util.SafeCloseable;
import org.jetbrains.annotations.NotNull;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.net.URISyntaxException;
import java.nio.channels.SeekableByteChannel;
import java.nio.file.Path;
import java.nio.file.Paths;

public interface SeekableChannelsProvider extends SafeCloseable {
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Take the file source path or URI and convert it to a URI object.
*
* @param source The file source path or URI
* @return The URI object
*/
static URI convertToURI(final String source) {
final URI uri;
try {
uri = new URI(source);
} catch (final URISyntaxException e) {
// If the URI is invalid, assume it's a file path
return new File(source).toURI();
}
if (uri.getScheme() == null) {
// Need to convert to a "file" URI
return new File(source).toURI();
}
return uri;
}
public interface SeekableChannelsProvider extends SafeCloseable {

/**
* Wraps {@link SeekableChannelsProvider#getInputStream(SeekableByteChannel)} to ensure the channel's position is
* incremented the exact amount that has been consumed from the resulting input stream. To remain valid, the caller
* must ensure that the resulting input stream isn't re-wrapped by any downstream code in a way that would adversely
* effect the position (such as re-wrapping the resulting input stream with buffering).
* affect the position (such as re-wrapping the resulting input stream with buffering).
*
* <p>
* Equivalent to {@code ChannelPositionInputStream.of(ch, provider.getInputStream(ch))}.
Expand Down Expand Up @@ -79,7 +58,7 @@ default SeekableChannelContext makeSingleUseContext() {

default SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull String uriStr)
throws IOException {
return getReadChannel(channelContext, convertToURI(uriStr));
return getReadChannel(channelContext, convertToURI(uriStr, false));
}

SeekableByteChannel getReadChannel(@NotNull SeekableChannelContext channelContext, @NotNull URI uri)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.NoSuchElementException;
import java.util.function.Function;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static org.apache.parquet.format.Encoding.PLAIN_DICTIONARY;
import static org.apache.parquet.format.Encoding.RLE_DICTIONARY;
Expand Down Expand Up @@ -120,7 +121,7 @@ private URI getURI() {
return uri;
}
if (columnChunk.isSetFile_path() && FILE_URI_SCHEME.equals(rootURI.getScheme())) {
return uri = Path.of(rootURI).resolve(columnChunk.getFile_path()).toUri();
return uri = convertToURI(Path.of(rootURI).resolve(columnChunk.getFile_path()), false);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
return uri = rootURI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,15 @@
import org.apache.parquet.format.Type;
import org.apache.parquet.schema.*;

import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
import java.nio.channels.SeekableByteChannel;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import java.util.*;

import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;
import static io.deephaven.base.FileUtils.convertToURI;

/**
* Top level accessor for a parquet file which can read both from a file path string or a CLI style file URI,
Expand All @@ -39,17 +39,29 @@ public class ParquetFileReader {
private final URI rootURI;
private final MessageType type;

/**
* Create a new ParquetFileReader for the provided source.
*
* @param source The source path or URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final String source, final SeekableChannelsProvider channelsProvider)
throws IOException {
this(convertToURI(source), channelsProvider);
this(convertToURI(source, false), channelsProvider);
}

/**
* Create a new ParquetFileReader for the provided source.
*
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param channelsProvider The {@link SeekableChannelsProvider} to use for reading the file
*/
public ParquetFileReader(final URI parquetFileURI, final SeekableChannelsProvider channelsProvider)
throws IOException {
this.channelsProvider = channelsProvider;
if (!parquetFileURI.getRawPath().endsWith(".parquet") && FILE_URI_SCHEME.equals(parquetFileURI.getScheme())) {
// Construct a new file URI for the parent directory
rootURI = Path.of(parquetFileURI).getParent().toUri();
rootURI = convertToURI(new File(parquetFileURI).getParentFile(), true);
} else {
// TODO(deephaven-core#5066): Add support for reading metadata files from non-file URIs
rootURI = parquetFileURI;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@
import java.nio.file.Paths;
import java.util.*;

import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;
import static io.deephaven.base.FileUtils.convertToURI;

/**
* API for writing DH tables in parquet format
Expand Down Expand Up @@ -314,7 +314,7 @@ private static ParquetFileWriter getParquetFileWriter(
final Map<String, String> extraMetaData = new HashMap<>(tableMeta);
extraMetaData.put(METADATA_KEY, tableInfoBuilder.build().serializeToJSON());
return new ParquetFileWriter(path,
SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(path), null),
SeekableChannelsProviderLoader.getInstance().fromServiceLoader(convertToURI(path, false), null),
writeInstructions.getTargetPageSize(),
new HeapByteBufferAllocator(), mappedSchema.getParquetSchema(),
writeInstructions.getCompressionCodecName(), extraMetaData);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
import java.nio.file.attribute.BasicFileAttributes;
import java.util.*;

import static io.deephaven.base.FileUtils.convertToURI;
import static io.deephaven.parquet.base.ParquetFileReader.FILE_URI_SCHEME;
import static io.deephaven.util.channel.SeekableChannelsProvider.convertToURI;
import static io.deephaven.parquet.table.ParquetTableWriter.PARQUET_FILE_EXTENSION;
import static io.deephaven.util.type.TypeUtils.getUnboxedTypeIfBoxed;

Expand Down Expand Up @@ -97,7 +97,7 @@ private ParquetTools() {}
* @see ParquetFlatPartitionedLayout
*/
public static Table readTable(@NotNull final String source) {
return readTableInternal(convertToURI(source), ParquetInstructions.EMPTY);
return readTableInternal(convertParquetSourceToURI(source), ParquetInstructions.EMPTY);
}

/**
Expand Down Expand Up @@ -128,7 +128,7 @@ public static Table readTable(@NotNull final String source) {
public static Table readTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions) {
return readTableInternal(convertToURI(source), readInstructions);
return readTableInternal(convertParquetSourceToURI(source), readInstructions);
}

/**
Expand Down Expand Up @@ -186,6 +186,19 @@ public static Table readTable(
return readTableInternal(sourceFile, readInstructions);
}

/**
* Convert a parquet source to a URI.
*
* @param source The path or URI of parquet file or directory to examine
* @return The URI
*/
private static URI convertParquetSourceToURI(@NotNull final String source) {
if (source.endsWith(".parquet")) {
return convertToURI(source, false);
}
return convertToURI(source, true);
}

/**
* Write a table to a file.
*
Expand Down Expand Up @@ -961,7 +974,7 @@ public static Table readFlatPartitionedTable(
public static Table readSingleFileTable(
@NotNull final File file,
@NotNull final ParquetInstructions readInstructions) {
return readSingleFileTable(file.toURI(), readInstructions);
return readSingleFileTable(convertToURI(file, false), readInstructions);
}

/**
Expand All @@ -980,7 +993,7 @@ public static Table readSingleFileTable(
public static Table readSingleFileTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions) {
return readSingleFileTable(convertToURI(source), readInstructions);
return readSingleFileTable(convertToURI(source, false), readInstructions);
}

private static Table readSingleFileTable(
Expand All @@ -1007,7 +1020,7 @@ public static Table readSingleFileTable(
@NotNull final File file,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readSingleFileTable(file.toURI(), readInstructions, tableDefinition);
return readSingleFileTable(convertToURI(file, false), readInstructions, tableDefinition);
}

/**
Expand All @@ -1025,7 +1038,7 @@ public static Table readSingleFileTable(
@NotNull final String source,
@NotNull final ParquetInstructions readInstructions,
@NotNull final TableDefinition tableDefinition) {
return readSingleFileTable(convertToURI(source), readInstructions, tableDefinition);
return readSingleFileTable(convertToURI(source, false), readInstructions, tableDefinition);
}

private static Table readSingleFileTable(
Expand Down Expand Up @@ -1105,7 +1118,7 @@ private static ParquetSchemaReader.ColumnDefinitionConsumer makeSchemaReaderCons
* Make a {@link ParquetFileReader} for the supplied {@link File}. Wraps {@link IOException} as
* {@link TableDataException}.
*
* @param parquetFile The {@link File} to read
* @param parquetFile The parquet file or the parquet metadata file
* @param readInstructions the instructions for customizations while reading
* @return The new {@link ParquetFileReader}
*/
Expand All @@ -1122,7 +1135,7 @@ public static ParquetFileReader getParquetFileReader(@NotNull final File parquet
* Make a {@link ParquetFileReader} for the supplied {@link URI}. Wraps {@link IOException} as
* {@link TableDataException}.
*
* @param parquetFileURI The {@link URI} to read
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @param readInstructions the instructions for customizations while reading
* @return The new {@link ParquetFileReader}
*/
Expand All @@ -1138,20 +1151,20 @@ public static ParquetFileReader getParquetFileReader(@NotNull final URI parquetF
/**
* Make a {@link ParquetFileReader} for the supplied {@link File}.
*
* @param parquetFile The {@link File} to read
* @param parquetFile The parquet file or the parquet metadata file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
public static ParquetFileReader getParquetFileReaderChecked(
@NotNull final File parquetFile,
@NotNull final ParquetInstructions readInstructions) throws IOException {
return getParquetFileReaderChecked(parquetFile.toURI(), readInstructions);
return getParquetFileReaderChecked(convertToURI(parquetFile, false), readInstructions);
}

/**
* Make a {@link ParquetFileReader} for the supplied {@link URI}.
*
* @param parquetFileURI The {@link URI} to read
* @param parquetFileURI The URI for the parquet file or the parquet metadata file
* @return The new {@link ParquetFileReader}
* @throws IOException if an IO exception occurs
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.util.Map;
import java.util.stream.IntStream;

import static io.deephaven.base.FileUtils.convertToURI;

/**
* {@link TableLocationKey} implementation for use with data stored in the parquet format.
*/
Expand Down Expand Up @@ -70,7 +72,7 @@ public ParquetTableLocationKey(@NotNull final URI parquetFileUri, final int orde
}

private static URI validateParquetFile(@NotNull final File file) {
return validateParquetFile(file.toURI());
return validateParquetFile(convertToURI(file, false));
}

private static URI validateParquetFile(@NotNull final URI parquetFileUri) {
Expand Down Expand Up @@ -189,7 +191,7 @@ public synchronized int[] getRowGroupIndices() {
// we're not expecting that in this code path. To support it, discovery tools should figure out
// the row groups for a partition themselves and call setRowGroupReaders.
final String filePath = rowGroups.get(rgi).getColumns().get(0).getFile_path();
return filePath == null || new File(filePath).getAbsoluteFile().toURI().equals(uri);
return filePath == null || convertToURI(filePath, false).equals(uri);
}).toArray();
}

Expand Down
Loading

0 comments on commit 45121bf

Please sign in to comment.