forked from delta-io/delta
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[Kernel] Use
LogStore
s in listFrom
implementation in default `Fil…
…eSystemClient` (delta-io#2770) ## Description `LogStore`s in `storage` module have file system operations (needed for reading/writing DeltaLogs) implemented for each storage (e.g. s3, GCS etc.) to take into account of the behavior of storage and also efficiently implement certain operations depending upon the storage system support (e.g. fast listing in S3). This PR creates `LogStoreProvider` to get the specific implementation of the `LogStore` for given `scheme`. Also updates the `DefaultFileSystemClient.listFrom` to use the `LogStore.listFrom`. The majority of the code here is copied from the `delta-spark` and `standalone` projects. ## How was this patch tested? Unittests for `LogStoreProvider` and existing integration tests for `DefaultFileSystemClient.listFrom` changes.
- Loading branch information
1 parent
9e74e56
commit cb77f54
Showing
4 changed files
with
294 additions
and
33 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
28 changes: 28 additions & 0 deletions
28
...el-defaults/src/main/java/io/delta/kernel/defaults/internal/DefaultTableClientErrors.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.defaults.internal; | ||
|
||
import static java.lang.String.format; | ||
|
||
public class DefaultTableClientErrors { | ||
|
||
// TODO update to be table client exception with future exception framework | ||
// (see delta-io/delta#2231) | ||
public static IllegalArgumentException canNotInstantiateLogStore(String logStoreClassName) { | ||
return new IllegalArgumentException( | ||
format("Can not instantiate `LogStore` class: %s", logStoreClassName)); | ||
} | ||
} |
118 changes: 118 additions & 0 deletions
118
...l-defaults/src/main/java/io/delta/kernel/defaults/internal/logstore/LogStoreProvider.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,118 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.defaults.internal.logstore; | ||
|
||
import java.util.*; | ||
|
||
import io.delta.storage.*; | ||
import org.apache.hadoop.conf.Configuration; | ||
|
||
import io.delta.kernel.defaults.internal.DefaultTableClientErrors; | ||
|
||
/** | ||
* Utility class to provide the correct {@link LogStore} based on the scheme of the path. | ||
*/ | ||
public class LogStoreProvider { | ||
|
||
// Supported schemes per storage system. | ||
private static final Set<String> S3_SCHEMES = unmodifiableSet("s3", "s3a", "s3n"); | ||
private static final Set<String> AZURE_SCHEMES = | ||
unmodifiableSet("abfs", "abfss", "adl", "wasb", "wasbs"); | ||
private static final Set<String> GCS_SCHEMES = unmodifiableSet("gs"); | ||
|
||
/** | ||
* Get the {@link LogStore} instance for the given scheme and configuration. Callers can set | ||
* {@code io.delta.kernel.logStore.<scheme>.impl} to specify the {@link LogStore} | ||
* implementation to use for {@code scheme}. | ||
* <p> | ||
* If not set, the default {@link LogStore} implementation (given below) for the scheme will | ||
* be used. | ||
* <ul> | ||
* <li>{@code s3, s3a, s3n}: {@link S3SingleDriverLogStore}</li> | ||
* <li>{@code abfs, abfss, adl, wasb, wasbs}: {@link AzureLogStore}</li> | ||
* <li>{@code gs}: {@link GCSLogStore}</li> | ||
* <li>{@code hdfs, file}: {@link HDFSLogStore}</li> | ||
* <li>remaining: {@link HDFSLogStore}</li> | ||
* </ul> | ||
* | ||
* @param hadoopConf {@link Configuration} to use for creating the LogStore. | ||
* @param scheme Scheme of the path. | ||
* @return {@link LogStore} instance. | ||
* @throws IllegalArgumentException if the LogStore implementation is not found or can not be | ||
* instantiated. | ||
*/ | ||
public static LogStore getLogStore(Configuration hadoopConf, String scheme) { | ||
String schemeLower = Optional.ofNullable(scheme) | ||
.map(String::toLowerCase).orElse(null); | ||
|
||
// Check if the LogStore implementation is set in the configuration. | ||
String classNameFromConfig = hadoopConf.get(getLogStoreSchemeConfKey(schemeLower)); | ||
if (classNameFromConfig != null) { | ||
try { | ||
return getLogStoreClass(classNameFromConfig) | ||
.getConstructor(Configuration.class) | ||
.newInstance(hadoopConf); | ||
} catch (Exception e) { | ||
throw DefaultTableClientErrors.canNotInstantiateLogStore(classNameFromConfig); | ||
} | ||
} | ||
|
||
// Create default LogStore based on the scheme. | ||
String defaultClassName = HDFSLogStore.class.getName(); | ||
if (S3_SCHEMES.contains(schemeLower)) { | ||
defaultClassName = S3SingleDriverLogStore.class.getName(); | ||
} else if (AZURE_SCHEMES.contains(schemeLower)) { | ||
defaultClassName = AzureLogStore.class.getName(); | ||
} else if (GCS_SCHEMES.contains(schemeLower)) { | ||
defaultClassName = GCSLogStore.class.getName(); | ||
} | ||
|
||
try { | ||
return getLogStoreClass(defaultClassName) | ||
.getConstructor(Configuration.class) | ||
.newInstance(hadoopConf); | ||
} catch (Exception e) { | ||
throw DefaultTableClientErrors.canNotInstantiateLogStore(defaultClassName); | ||
} | ||
} | ||
|
||
/** | ||
* Configuration key for setting the LogStore implementation for a scheme. | ||
* ex: `io.delta.kernel.logStore.s3.impl` -> `io.delta.storage.S3SingleDriverLogStore` | ||
*/ | ||
static String getLogStoreSchemeConfKey(String scheme) { | ||
return "io.delta.kernel.logStore." + scheme + ".impl"; | ||
} | ||
|
||
/** | ||
* Utility method to get the LogStore class from the class name. | ||
*/ | ||
private static Class<? extends LogStore> getLogStoreClass(String logStoreClassName) | ||
throws ClassNotFoundException { | ||
return Class.forName( | ||
logStoreClassName, | ||
true /* initialize */, | ||
Thread.currentThread().getContextClassLoader()) | ||
.asSubclass(LogStore.class); | ||
} | ||
|
||
/** | ||
* Remove this method once we start supporting JDK9+ | ||
*/ | ||
private static Set<String> unmodifiableSet(String... elements) { | ||
return Collections.unmodifiableSet(new HashSet<>(Arrays.asList(elements))); | ||
} | ||
} |
103 changes: 103 additions & 0 deletions
103
...lts/src/test/scala/io/delta/kernel/defaults/internal/logstore/LogStoreProviderSuite.scala
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,103 @@ | ||
/* | ||
* Copyright (2024) The Delta Lake Project Authors. | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
package io.delta.kernel.defaults.internal.logstore | ||
|
||
import io.delta.storage._ | ||
import org.apache.hadoop.conf.Configuration | ||
import org.apache.hadoop.fs.{FileStatus, Path} | ||
import org.scalatest.funsuite.AnyFunSuite | ||
|
||
class LogStoreProviderSuite extends AnyFunSuite { | ||
|
||
private val customLogStoreClassName = classOf[UserDefinedLogStore].getName | ||
|
||
val hadoopConf = new Configuration() | ||
Seq( | ||
"s3" -> classOf[S3SingleDriverLogStore].getName, | ||
"s3a" -> classOf[S3SingleDriverLogStore].getName, | ||
"s3n" -> classOf[S3SingleDriverLogStore].getName, | ||
"hdfs" -> classOf[HDFSLogStore].getName, | ||
"file" -> classOf[HDFSLogStore].getName, | ||
"gs" -> classOf[GCSLogStore].getName, | ||
"abfss" -> classOf[AzureLogStore].getName, | ||
"abfs" -> classOf[AzureLogStore].getName, | ||
"adl" -> classOf[AzureLogStore].getName, | ||
"wasb" -> classOf[AzureLogStore].getName, | ||
"wasbs" -> classOf[AzureLogStore].getName | ||
).foreach { case (scheme, logStoreClass) => | ||
test(s"get the default LogStore for scheme $scheme") { | ||
val logStore = LogStoreProvider.getLogStore(hadoopConf, scheme) | ||
assert(logStore.getClass.getName === logStoreClass) | ||
} | ||
} | ||
|
||
test("override the default LogStore for a schema") { | ||
val hadoopConf = new Configuration() | ||
hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("s3"), customLogStoreClassName) | ||
val logStore = LogStoreProvider.getLogStore(hadoopConf, "s3") | ||
assert(logStore.getClass.getName === customLogStoreClassName) | ||
} | ||
|
||
test("set LogStore config for a custom scheme") { | ||
val hadoopConf = new Configuration() | ||
hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("fake"), customLogStoreClassName) | ||
val logStore = LogStoreProvider.getLogStore(hadoopConf, "fake") | ||
assert(logStore.getClass.getName === customLogStoreClassName) | ||
} | ||
|
||
test("set LogStore config to a class that doesn't extend LogStore") { | ||
val hadoopConf = new Configuration() | ||
hadoopConf.set(LogStoreProvider.getLogStoreSchemeConfKey("fake"), "java.lang.String") | ||
val e = intercept[IllegalArgumentException]( | ||
LogStoreProvider.getLogStore(hadoopConf, "fake") | ||
) | ||
assert(e.getMessage.contains( | ||
"Can not instantiate `LogStore` class: %s".format("java.lang.String"))) | ||
} | ||
} | ||
|
||
/** | ||
* Sample user-defined log store implementing [[LogStore]]. | ||
*/ | ||
class UserDefinedLogStore(override val initHadoopConf: Configuration) | ||
extends LogStore(initHadoopConf) { | ||
|
||
private val logStoreInternal = new HDFSLogStore(initHadoopConf) | ||
|
||
override def read(path: Path, hadoopConf: Configuration): CloseableIterator[String] = { | ||
logStoreInternal.read(path, hadoopConf) | ||
} | ||
|
||
override def write( | ||
path: Path, | ||
actions: java.util.Iterator[String], | ||
overwrite: java.lang.Boolean, | ||
hadoopConf: Configuration): Unit = { | ||
logStoreInternal.write(path, actions, overwrite, hadoopConf) | ||
} | ||
|
||
override def listFrom(path: Path, hadoopConf: Configuration): java.util.Iterator[FileStatus] = { | ||
logStoreInternal.listFrom(path, hadoopConf) | ||
} | ||
|
||
override def resolvePathOnPhysicalStorage(path: Path, hadoopConf: Configuration): Path = { | ||
logStoreInternal.resolvePathOnPhysicalStorage(path, hadoopConf) | ||
} | ||
|
||
override def isPartialWriteVisible(path: Path, hadoopConf: Configuration): java.lang.Boolean = { | ||
false | ||
} | ||
} |