From cb77f540b9a09f003d27606a3dfb5e592d5e6d18 Mon Sep 17 00:00:00 2001 From: Venki Korukanti Date: Fri, 22 Mar 2024 09:33:52 -0700 Subject: [PATCH] [Kernel] Use `LogStore`s in `listFrom` implementation in default `FileSystemClient` (#2770) ## Description `LogStore`s in `storage` module have file system operations (needed for reading/writing DeltaLogs) implemented for each storage (e.g. s3, GCS etc.) to take into account of the behavior of storage and also efficiently implement certain operations depending upon the storage system support (e.g. fast listing in S3). This PR creates `LogStoreProvider` to get the specific implementation of the `LogStore` for given `scheme`. Also updates the `DefaultFileSystemClient.listFrom` to use the `LogStore.listFrom`. The majority of the code here is copied from the `delta-spark` and `standalone` projects. ## How was this patch tested? Unittests for `LogStoreProvider` and existing integration tests for `DefaultFileSystemClient.listFrom` changes. --- .../client/DefaultFileSystemClient.java | 78 +++++++----- .../internal/DefaultTableClientErrors.java | 28 +++++ .../internal/logstore/LogStoreProvider.java | 118 ++++++++++++++++++ .../logstore/LogStoreProviderSuite.scala | 103 +++++++++++++++ 4 files changed, 294 insertions(+), 33 deletions(-) create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java create mode 100644 kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java create mode 100644 kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java index ded4c0e479a..fc82dd1fa90 100644 --- a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/client/DefaultFileSystemClient.java @@ -15,14 +15,9 @@ */ package io.delta.kernel.defaults.client; -import java.io.ByteArrayInputStream; -import java.io.DataInputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.util.Arrays; -import java.util.Comparator; -import java.util.Iterator; +import java.io.*; +import io.delta.storage.LogStore; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -34,41 +29,58 @@ import io.delta.kernel.internal.util.Utils; +import io.delta.kernel.defaults.internal.logstore.LogStoreProvider; + /** - * Default implementation of {@link FileSystemClient} based on Hadoop APIs. + * Default implementation of {@link FileSystemClient} based on Hadoop APIs. It takes a Hadoop + * {@link Configuration} object to interact with the file system. The following optional + * configurations can be set to customize the behavior of the client: + * + * + * The above list of options is not exhaustive. For a complete list of options, refer to the + * specific implementation of {@link FileSystem}. */ public class DefaultFileSystemClient - implements FileSystemClient { + implements FileSystemClient { private final Configuration hadoopConf; + /** + * Create an instance of the default {@link FileSystemClient} implementation. + * + * @param hadoopConf Configuration to use. List of options to customize the behavior of + * the client can be found in the class documentation. + */ public DefaultFileSystemClient(Configuration hadoopConf) { this.hadoopConf = hadoopConf; } @Override public CloseableIterator listFrom(String filePath) throws IOException { - Iterator iter; - Path path = new Path(filePath); - FileSystem fs = path.getFileSystem(hadoopConf); - if (!fs.exists(path.getParent())) { - throw new FileNotFoundException( - String.format("No such file or directory: %s", path.getParent()) - ); - } - org.apache.hadoop.fs.FileStatus[] files = fs.listStatus(path.getParent()); - iter = Arrays.stream(files) - .filter(f -> f.getPath().getName().compareTo(path.getName()) >= 0) - .sorted(Comparator.comparing(o -> o.getPath().getName())) - .iterator(); + LogStore logStore = LogStoreProvider.getLogStore(hadoopConf, path.toUri().getScheme()); - return Utils.toCloseableIterator(iter) - .map(hadoopFileStatus -> - FileStatus.of( - hadoopFileStatus.getPath().toString(), - hadoopFileStatus.getLen(), - hadoopFileStatus.getModificationTime()) - ); + return Utils.toCloseableIterator(logStore.listFrom(path, hadoopConf)) + .map(hadoopFileStatus -> + FileStatus.of( + hadoopFileStatus.getPath().toString(), + hadoopFileStatus.getLen(), + hadoopFileStatus.getModificationTime())); } @Override @@ -81,7 +93,7 @@ public String resolvePath(String path) throws IOException { @Override public CloseableIterator readFiles( - CloseableIterator readRequests) { + CloseableIterator readRequests) { return readRequests.map(elem -> getStream(elem.getPath(), elem.getStartOffset(), elem.getReadLength())); } @@ -97,12 +109,12 @@ private ByteArrayInputStream getStream(String filePath, int offset, int size) { return new ByteArrayInputStream(buff); } catch (IOException ex) { throw new RuntimeException(String.format( - "IOException reading from file %s at offset %s size %s", - filePath, offset, size), ex); + "IOException reading from file %s at offset %s size %s", + filePath, offset, size), ex); } } catch (IOException ex) { throw new RuntimeException(String.format( - "Could not resolve the FileSystem for path %s", filePath), ex); + "Could not resolve the FileSystem for path %s", filePath), ex); } } } diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java new file mode 100644 index 00000000000..0f8bc1e419e --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java @@ -0,0 +1,28 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal; + +import static java.lang.String.format; + +public class DefaultTableClientErrors { + + // TODO update to be table client exception with future exception framework + // (see delta-io/delta#2231) + public static IllegalArgumentException canNotInstantiateLogStore(String logStoreClassName) { + return new IllegalArgumentException( + format("Can not instantiate `LogStore` class: %s", logStoreClassName)); + } +} diff --git a/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java new file mode 100644 index 00000000000..812b5750d80 --- /dev/null +++ b/kernel/kernel-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java @@ -0,0 +1,118 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.logstore; + +import java.util.*; + +import io.delta.storage.*; +import org.apache.hadoop.conf.Configuration; + +import io.delta.kernel.defaults.internal.DefaultTableClientErrors; + +/** + * Utility class to provide the correct {@link LogStore} based on the scheme of the path. + */ +public class LogStoreProvider { + + // Supported schemes per storage system. + private static final Set S3_SCHEMES = unmodifiableSet("s3", "s3a", "s3n"); + private static final Set AZURE_SCHEMES = + unmodifiableSet("abfs", "abfss", "adl", "wasb", "wasbs"); + private static final Set GCS_SCHEMES = unmodifiableSet("gs"); + + /** + * Get the {@link LogStore} instance for the given scheme and configuration. Callers can set + * {@code io.delta.kernel.logStore..impl} to specify the {@link LogStore} + * implementation to use for {@code scheme}. + *

+ * If not set, the default {@link LogStore} implementation (given below) for the scheme will + * be used. + *

    + *
  • {@code s3, s3a, s3n}: {@link S3SingleDriverLogStore}
  • + *
  • {@code abfs, abfss, adl, wasb, wasbs}: {@link AzureLogStore}
  • + *
  • {@code gs}: {@link GCSLogStore}
  • + *
  • {@code hdfs, file}: {@link HDFSLogStore}
  • + *
  • remaining: {@link HDFSLogStore}
  • + *
+ * + * @param hadoopConf {@link Configuration} to use for creating the LogStore. + * @param scheme Scheme of the path. + * @return {@link LogStore} instance. + * @throws IllegalArgumentException if the LogStore implementation is not found or can not be + * instantiated. + */ + public static LogStore getLogStore(Configuration hadoopConf, String scheme) { + String schemeLower = Optional.ofNullable(scheme) + .map(String::toLowerCase).orElse(null); + + // Check if the LogStore implementation is set in the configuration. + String classNameFromConfig = hadoopConf.get(getLogStoreSchemeConfKey(schemeLower)); + if (classNameFromConfig != null) { + try { + return getLogStoreClass(classNameFromConfig) + .getConstructor(Configuration.class) + .newInstance(hadoopConf); + } catch (Exception e) { + throw DefaultTableClientErrors.canNotInstantiateLogStore(classNameFromConfig); + } + } + + // Create default LogStore based on the scheme. + String defaultClassName = HDFSLogStore.class.getName(); + if (S3_SCHEMES.contains(schemeLower)) { + defaultClassName = S3SingleDriverLogStore.class.getName(); + } else if (AZURE_SCHEMES.contains(schemeLower)) { + defaultClassName = AzureLogStore.class.getName(); + } else if (GCS_SCHEMES.contains(schemeLower)) { + defaultClassName = GCSLogStore.class.getName(); + } + + try { + return getLogStoreClass(defaultClassName) + .getConstructor(Configuration.class) + .newInstance(hadoopConf); + } catch (Exception e) { + throw DefaultTableClientErrors.canNotInstantiateLogStore(defaultClassName); + } + } + + /** + * Configuration key for setting the LogStore implementation for a scheme. + * ex: `io.delta.kernel.logStore.s3.impl` -> `io.delta.storage.S3SingleDriverLogStore` + */ + static String getLogStoreSchemeConfKey(String scheme) { + return "io.delta.kernel.logStore." + scheme + ".impl"; + } + + /** + * Utility method to get the LogStore class from the class name. + */ + private static Class getLogStoreClass(String logStoreClassName) + throws ClassNotFoundException { + return Class.forName( + logStoreClassName, + true /* initialize */, + Thread.currentThread().getContextClassLoader()) + .asSubclass(LogStore.class); + } + + /** + * Remove this method once we start supporting JDK9+ + */ + private static Set unmodifiableSet(String... elements) { + return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(elements))); + } +} diff --git a/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala new file mode 100644 index 00000000000..62441ea16de --- /dev/null +++ b/kernel/kernel-defaults/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala @@ -0,0 +1,103 @@ +/* + * Copyright (2024) The Delta Lake Project Authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package io.delta.kernel.defaults.internal.logstore + +import io.delta.storage._ +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, Path} +import org.scalatest.funsuite.AnyFunSuite + +class LogStoreProviderSuite extends AnyFunSuite { + + private val customLogStoreClassName = classOf[UserDefinedLogStore].getName + + val hadoopConf = new Configuration() + Seq( + "s3" -> classOf[S3SingleDriverLogStore].getName, + "s3a" -> classOf[S3SingleDriverLogStore].getName, + "s3n" -> classOf[S3SingleDriverLogStore].getName, + "hdfs" -> classOf[HDFSLogStore].getName, + "file" -> classOf[HDFSLogStore].getName, + "gs" -> classOf[GCSLogStore].getName, + "abfss" -> classOf[AzureLogStore].getName, + "abfs" -> classOf[AzureLogStore].getName, + "adl" -> classOf[AzureLogStore].getName, + "wasb" -> classOf[AzureLogStore].getName, + "wasbs" -> classOf[AzureLogStore].getName + ).foreach { case (scheme, logStoreClass) => + test(s"get the default LogStore for scheme $scheme") { + val logStore = LogStoreProvider.getLogStore(hadoopConf, scheme) + assert(logStore.getClass.getName === logStoreClass) + } + } + + test("override the default LogStore for a schema") { + val hadoopConf = new Configuration() + hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("s3"), customLogStoreClassName) + val logStore = LogStoreProvider.getLogStore(hadoopConf, "s3") + assert(logStore.getClass.getName === customLogStoreClassName) + } + + test("set LogStore config for a custom scheme") { + val hadoopConf = new Configuration() + hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("fake"), customLogStoreClassName) + val logStore = LogStoreProvider.getLogStore(hadoopConf, "fake") + assert(logStore.getClass.getName === customLogStoreClassName) + } + + test("set LogStore config to a class that doesn't extend LogStore") { + val hadoopConf = new Configuration() + hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("fake"), "java.lang.String") + val e = intercept[IllegalArgumentException]( + LogStoreProvider.getLogStore(hadoopConf, "fake") + ) + assert(e.getMessage.contains( + "Can not instantiate `LogStore` class: %s".format("java.lang.String"))) + } +} + +/** + * Sample user-defined log store implementing [[LogStore]]. + */ +class UserDefinedLogStore(override val initHadoopConf: Configuration) + extends LogStore(initHadoopConf) { + + private val logStoreInternal = new HDFSLogStore(initHadoopConf) + + override def read(path: Path, hadoopConf: Configuration): CloseableIterator[String] = { + logStoreInternal.read(path, hadoopConf) + } + + override def write( + path: Path, + actions: java.util.Iterator[String], + overwrite: java.lang.Boolean, + hadoopConf: Configuration): Unit = { + logStoreInternal.write(path, actions, overwrite, hadoopConf) + } + + override def listFrom(path: Path, hadoopConf: Configuration): java.util.Iterator[FileStatus] = { + logStoreInternal.listFrom(path, hadoopConf) + } + + override def resolvePathOnPhysicalStorage(path: Path, hadoopConf: Configuration): Path = { + logStoreInternal.resolvePathOnPhysicalStorage(path, hadoopConf) + } + + override def isPartialWriteVisible(path: Path, hadoopConf: Configuration): java.lang.Boolean = { + false + } +}