diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java
index ded4c0e479a..fc82dd1fa90 100644
--- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java
+++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java
@@ -15,14 +15,9 @@
*/
package io.delta.kernel.defaults.client;
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
-import java.io.FileNotFoundException;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.Iterator;
+import java.io.*;
+import io.delta.storage.LogStore;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -34,41 +29,58 @@
import io.delta.kernel.internal.util.Utils;
+import io.delta.kernel.defaults.internal.logstore.LogStoreProvider;
+
/**
- * Default implementation of {@link FileSystemClient} based on Hadoop APIs.
+ * Default implementation of {@link FileSystemClient} based on Hadoop APIs. It takes a Hadoop
+ * {@link Configuration} object to interact with the file system. The following optional
+ * configurations can be set to customize the behavior of the client:
+ *
+ * - {@code io.delta.kernel.logStore..impl} - The class name of the custom
+ * {@link LogStore} implementation to use for operations on storage systems with the
+ * specified {@code scheme}. For example, to use a custom {@link LogStore} for S3 storage
+ * objects:
+ *
{@code
+ *
+ * io.delta.kernel.logStore.s3.impl
+ * com.example.S3LogStore
+ *
+ * }
+ * If not set, the default LogStore implementation for the scheme will be used.
+ *
+ * - {@code delta.enableFastS3AListFrom} - Set to {@code true} to enable fast listing
+ * functionality when using a {@link LogStore} created for S3 storage objects.
+ *
+ *
+ *
+ * The above list of options is not exhaustive. For a complete list of options, refer to the
+ * specific implementation of {@link FileSystem}.
*/
public class DefaultFileSystemClient
- implements FileSystemClient {
+ implements FileSystemClient {
private final Configuration hadoopConf;
+ /**
+ * Create an instance of the default {@link FileSystemClient} implementation.
+ *
+ * @param hadoopConf Configuration to use. List of options to customize the behavior of
+ * the client can be found in the class documentation.
+ */
public DefaultFileSystemClient(Configuration hadoopConf) {
this.hadoopConf = hadoopConf;
}
@Override
public CloseableIterator listFrom(String filePath) throws IOException {
- Iterator iter;
-
Path path = new Path(filePath);
- FileSystem fs = path.getFileSystem(hadoopConf);
- if (!fs.exists(path.getParent())) {
- throw new FileNotFoundException(
- String.format("No such file or directory: %s", path.getParent())
- );
- }
- org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(path.getParent());
- iter = Arrays.stream(files)
- .filter(f -> f.getPath().getName().compareTo(path.getName()) >= 0)
- .sorted(Comparator.comparing(o -> o.getPath().getName()))
- .iterator();
+ LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme());
- return Utils.toCloseableIterator(iter)
- .map(hadoopFileStatus ->
- FileStatus.of(
- hadoopFileStatus.getPath().toString(),
- hadoopFileStatus.getLen(),
- hadoopFileStatus.getModificationTime())
- );
+ return Utils.toCloseableIterator(logStore.listFrom(path, hadoopConf))
+ .map(hadoopFileStatus ->
+ FileStatus.of(
+ hadoopFileStatus.getPath().toString(),
+ hadoopFileStatus.getLen(),
+ hadoopFileStatus.getModificationTime()));
}
@Override
@@ -81,7 +93,7 @@ public String resolvePath(String path) throws IOException {
@Override
public CloseableIterator readFiles(
- CloseableIterator readRequests) {
+ CloseableIterator readRequests) {
return readRequests.map(elem ->
getStream(elem.getPath(), elem.getStartOffset(), elem.getReadLength()));
}
@@ -97,12 +109,12 @@ private ByteArrayInputStream getStream(String filePath, int offset, int size) {
return new ByteArrayInputStream(buff);
} catch (IOException ex) {
throw new RuntimeException(String.format(
- "IOException reading from file %s at offset %s size %s",
- filePath, offset, size), ex);
+ "IOException reading from file %s at offset %s size %s",
+ filePath, offset, size), ex);
}
} catch (IOException ex) {
throw new RuntimeException(String.format(
- "Could not resolve the FileSystem for path %s", filePath), ex);
+ "Could not resolve the FileSystem for path %s", filePath), ex);
}
}
}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java
new file mode 100644
index 00000000000..0f8bc1e419e
--- /dev/null
+++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright (2024) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.defaults.internal;
+
+import static java.lang.String.format;
+
+public class DefaultTableClientErrors {
+
+ // TODO update to be table client exception with future exception framework
+ // (see delta-io/delta#2231)
+ public static IllegalArgumentException canNotInstantiateLogStore(String logStoreClassName) {
+ return new IllegalArgumentException(
+ format("Can not instantiate `LogStore` class: %s", logStoreClassName));
+ }
+}
diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java
new file mode 100644
index 00000000000..812b5750d80
--- /dev/null
+++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright (2024) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.defaults.internal.logstore;
+
+import java.util.*;
+
+import io.delta.storage.*;
+import org.apache.hadoop.conf.Configuration;
+
+import io.delta.kernel.defaults.internal.DefaultTableClientErrors;
+
+/**
+ * Utility class to provide the correct {@link LogStore} based on the scheme of the path.
+ */
+public class LogStoreProvider {
+
+ // Supported schemes per storage system.
+ private static final Set S3_SCHEMES = unmodifiableSet("s3", "s3a", "s3n");
+ private static final Set AZURE_SCHEMES =
+ unmodifiableSet("abfs", "abfss", "adl", "wasb", "wasbs");
+ private static final Set GCS_SCHEMES = unmodifiableSet("gs");
+
+ /**
+ * Get the {@link LogStore} instance for the given scheme and configuration. Callers can set
+ * {@code io.delta.kernel.logStore..impl} to specify the {@link LogStore}
+ * implementation to use for {@code scheme}.
+ *
+ * If not set, the default {@link LogStore} implementation (given below) for the scheme will
+ * be used.
+ *
+ * - {@code s3, s3a, s3n}: {@link S3SingleDriverLogStore}
+ * - {@code abfs, abfss, adl, wasb, wasbs}: {@link AzureLogStore}
+ * - {@code gs}: {@link GCSLogStore}
+ * - {@code hdfs, file}: {@link HDFSLogStore}
+ * - remaining: {@link HDFSLogStore}
+ *
+ *
+ * @param hadoopConf {@link Configuration} to use for creating the LogStore.
+ * @param scheme Scheme of the path.
+ * @return {@link LogStore} instance.
+ * @throws IllegalArgumentException if the LogStore implementation is not found or can not be
+ * instantiated.
+ */
+ public static LogStore getLogStore(Configuration hadoopConf, String scheme) {
+ String schemeLower = Optional.ofNullable(scheme)
+ .map(String::toLowerCase).orElse(null);
+
+ // Check if the LogStore implementation is set in the configuration.
+ String classNameFromConfig = hadoopConf.get(getLogStoreSchemeConfKey(schemeLower));
+ if (classNameFromConfig != null) {
+ try {
+ return getLogStoreClass(classNameFromConfig)
+ .getConstructor(Configuration.class)
+ .newInstance(hadoopConf);
+ } catch (Exception e) {
+ throw DefaultTableClientErrors.canNotInstantiateLogStore(classNameFromConfig);
+ }
+ }
+
+ // Create default LogStore based on the scheme.
+ String defaultClassName = HDFSLogStore.class.getName();
+ if (S3_SCHEMES.contains(schemeLower)) {
+ defaultClassName = S3SingleDriverLogStore.class.getName();
+ } else if (AZURE_SCHEMES.contains(schemeLower)) {
+ defaultClassName = AzureLogStore.class.getName();
+ } else if (GCS_SCHEMES.contains(schemeLower)) {
+ defaultClassName = GCSLogStore.class.getName();
+ }
+
+ try {
+ return getLogStoreClass(defaultClassName)
+ .getConstructor(Configuration.class)
+ .newInstance(hadoopConf);
+ } catch (Exception e) {
+ throw DefaultTableClientErrors.canNotInstantiateLogStore(defaultClassName);
+ }
+ }
+
+ /**
+ * Configuration key for setting the LogStore implementation for a scheme.
+ * ex: `io.delta.kernel.logStore.s3.impl` -> `io.delta.storage.S3SingleDriverLogStore`
+ */
+ static String getLogStoreSchemeConfKey(String scheme) {
+ return "io.delta.kernel.logStore." + scheme + ".impl";
+ }
+
+ /**
+ * Utility method to get the LogStore class from the class name.
+ */
+ private static Class extends LogStore> getLogStoreClass(String logStoreClassName)
+ throws ClassNotFoundException {
+ return Class.forName(
+ logStoreClassName,
+ true /* initialize */,
+ Thread.currentThread().getContextClassLoader())
+ .asSubclass(LogStore.class);
+ }
+
+ /**
+ * Remove this method once we start supporting JDK9+
+ */
+ private static Set unmodifiableSet(String... elements) {
+ return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(elements)));
+ }
+}
diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala
new file mode 100644
index 00000000000..62441ea16de
--- /dev/null
+++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala
@@ -0,0 +1,103 @@
+/*
+ * Copyright (2024) The Delta Lake Project Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.delta.kernel.defaults.internal.logstore
+
+import io.delta.storage._
+import org.apache.hadoop.conf.Configuration
+import org.apache.hadoop.fs.{FileStatus, Path}
+import org.scalatest.funsuite.AnyFunSuite
+
+class LogStoreProviderSuite extends AnyFunSuite {
+
+ private val customLogStoreClassName = classOf[UserDefinedLogStore].getName
+
+ val hadoopConf = new Configuration()
+ Seq(
+ "s3" -> classOf[S3SingleDriverLogStore].getName,
+ "s3a" -> classOf[S3SingleDriverLogStore].getName,
+ "s3n" -> classOf[S3SingleDriverLogStore].getName,
+ "hdfs" -> classOf[HDFSLogStore].getName,
+ "file" -> classOf[HDFSLogStore].getName,
+ "gs" -> classOf[GCSLogStore].getName,
+ "abfss" -> classOf[AzureLogStore].getName,
+ "abfs" -> classOf[AzureLogStore].getName,
+ "adl" -> classOf[AzureLogStore].getName,
+ "wasb" -> classOf[AzureLogStore].getName,
+ "wasbs" -> classOf[AzureLogStore].getName
+ ).foreach { case (scheme, logStoreClass) =>
+ test(s"get the default LogStore for scheme $scheme") {
+ val logStore = LogStoreProvider.getLogStore(hadoopConf, scheme)
+ assert(logStore.getClass.getName === logStoreClass)
+ }
+ }
+
+ test("override the default LogStore for a schema") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("s3"), customLogStoreClassName)
+ val logStore = LogStoreProvider.getLogStore(hadoopConf, "s3")
+ assert(logStore.getClass.getName === customLogStoreClassName)
+ }
+
+ test("set LogStore config for a custom scheme") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("fake"), customLogStoreClassName)
+ val logStore = LogStoreProvider.getLogStore(hadoopConf, "fake")
+ assert(logStore.getClass.getName === customLogStoreClassName)
+ }
+
+ test("set LogStore config to a class that doesn't extend LogStore") {
+ val hadoopConf = new Configuration()
+ hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("fake"), "java.lang.String")
+ val e = intercept[IllegalArgumentException](
+ LogStoreProvider.getLogStore(hadoopConf, "fake")
+ )
+ assert(e.getMessage.contains(
+ "Can not instantiate `LogStore` class: %s".format("java.lang.String")))
+ }
+}
+
+/**
+ * Sample user-defined log store implementing [[LogStore]].
+ */
+class UserDefinedLogStore(override val initHadoopConf: Configuration)
+ extends LogStore(initHadoopConf) {
+
+ private val logStoreInternal = new HDFSLogStore(initHadoopConf)
+
+ override def read(path: Path, hadoopConf: Configuration): CloseableIterator[String] = {
+ logStoreInternal.read(path, hadoopConf)
+ }
+
+ override def write(
+ path: Path,
+ actions: java.util.Iterator[String],
+ overwrite: java.lang.Boolean,
+ hadoopConf: Configuration): Unit = {
+ logStoreInternal.write(path, actions, overwrite, hadoopConf)
+ }
+
+ override def listFrom(path: Path, hadoopConf: Configuration): java.util.Iterator[FileStatus] = {
+ logStoreInternal.listFrom(path, hadoopConf)
+ }
+
+ override def resolvePathOnPhysicalStorage(path: Path, hadoopConf: Configuration): Path = {
+ logStoreInternal.resolvePathOnPhysicalStorage(path, hadoopConf)
+ }
+
+ override def isPartialWriteVisible(path: Path, hadoopConf: Configuration): java.lang.Boolean = {
+ false
+ }
+}