Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add a native function namespace manager #23358

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -955,6 +955,12 @@
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.presto</groupId>
<artifactId>presto-native-sidecar-plugin</artifactId>
<version>${project.version}</version>
</dependency>

<dependency>
<groupId>com.facebook.hive</groupId>
<artifactId>hive-dwrf</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import com.facebook.presto.common.QualifiedObjectName;
import com.facebook.presto.common.block.BlockEncodingSerde;
import com.facebook.presto.common.function.SqlFunctionResult;
import com.facebook.presto.common.type.Type;
import com.facebook.presto.common.type.TypeManager;
import com.facebook.presto.common.type.UserDefinedType;
import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutors;
Expand Down Expand Up @@ -192,7 +193,7 @@ public Optional<UserDefinedType> getUserDefinedType(QualifiedObjectName typeName
}

@Override
public final FunctionHandle getFunctionHandle(Optional<? extends FunctionNamespaceTransactionHandle> transactionHandle, Signature signature)
public FunctionHandle getFunctionHandle(Optional<? extends FunctionNamespaceTransactionHandle> transactionHandle, Signature signature)
{
checkCatalog(signature.getName());
// This is the only assumption in this class that we're dealing with sql-invoked regular function.
Expand Down Expand Up @@ -362,10 +363,13 @@ protected AggregationFunctionImplementation sqlInvokedFunctionToAggregationImple
"Need aggregationMetadata to get aggregation function implementation");

AggregationFunctionMetadata aggregationMetadata = function.getAggregationMetadata().get();
List<Type> parameters = function.getSignature().getArgumentTypes().stream().map(
pdabre12 marked this conversation as resolved.
Show resolved Hide resolved
(typeManager::getType)).collect(toImmutableList());
return new SqlInvokedAggregationFunctionImplementation(
typeManager.getType(aggregationMetadata.getIntermediateType()),
typeManager.getType(function.getSignature().getReturnType()),
aggregationMetadata.isOrderSensitive());
aggregationMetadata.isOrderSensitive(),
parameters);
default:
throw new IllegalStateException(format("Unknown function implementation type: %s", implementationType));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.json;
package com.facebook.presto.functionNamespace;

import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.spi.function.AggregationFunctionMetadata;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.json;
package com.facebook.presto.functionNamespace;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
*/
package com.facebook.presto.functionNamespace.json;

import com.facebook.presto.functionNamespace.UdfFunctionSignatureMap;

public interface FunctionDefinitionProvider
{
UdfFunctionSignatureMap getUdfDefinition(String filePath);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@
package com.facebook.presto.functionNamespace.json;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata;
import com.facebook.presto.functionNamespace.UdfFunctionSignatureMap;

import java.io.IOException;
import java.nio.file.Files;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,10 @@
import com.facebook.presto.common.type.TypeSignature;
import com.facebook.presto.common.type.UserDefinedType;
import com.facebook.presto.functionNamespace.AbstractSqlInvokedFunctionNamespaceManager;
import com.facebook.presto.functionNamespace.JsonBasedUdfFunctionMetadata;
import com.facebook.presto.functionNamespace.ServingCatalog;
import com.facebook.presto.functionNamespace.SqlInvokedFunctionNamespaceManagerConfig;
import com.facebook.presto.functionNamespace.UdfFunctionSignatureMap;
import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutors;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.AggregationFunctionImplementation;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,9 @@ public static TestingPrestoServer createTestingPrestoServer()
functionAndTypeManager.loadFunctionNamespaceManager(
"hive-functions",
"hive",
getNamespaceManagerCreationProperties());
getNamespaceManagerCreationProperties(),
server.getPluginNodeManager(),
false);
server.refreshNodes();
return server;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,9 @@ private static TestingPrestoServer createServer()
functionAndTypeManager.loadFunctionNamespaceManager(
"hive-functions",
"hive",
Collections.emptyMap());
Collections.emptyMap(),
server.getPluginNodeManager(),
false);
server.refreshNodes();
return server;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1343,7 +1343,12 @@ private SpecializedFunctionKey getSpecializedFunctionKey(Signature signature)

private SpecializedFunctionKey doGetSpecializedFunctionKey(Signature signature)
{
Iterable<SqlFunction> candidates = getFunctions(null, signature.getName());
Collection<SqlFunction> candidates = getFunctions(null, signature.getName());
return doGetSpecializedFunctionKey(signature, candidates);
}

public SpecializedFunctionKey doGetSpecializedFunctionKey(Signature signature, Collection<SqlFunction> candidates)
{
// search for exact match
Type returnType = functionAndTypeManager.getType(signature.getReturnType());
List<TypeSignatureProvider> argumentTypeSignatureProviders = fromTypeSignatures(signature.getArgumentTypes());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.facebook.presto.common.type.TypeWithName;
import com.facebook.presto.common.type.UserDefinedType;
import com.facebook.presto.operator.window.WindowFunctionSupplier;
import com.facebook.presto.spi.NodeManager;
import com.facebook.presto.spi.PrestoException;
import com.facebook.presto.spi.function.AggregationFunctionImplementation;
import com.facebook.presto.spi.function.AlterRoutineCharacteristics;
Expand All @@ -49,6 +50,7 @@
import com.facebook.presto.spi.function.Signature;
import com.facebook.presto.spi.function.SqlFunction;
import com.facebook.presto.spi.function.SqlFunctionId;
import com.facebook.presto.spi.function.SqlFunctionSupplier;
import com.facebook.presto.spi.function.SqlInvokedFunction;
import com.facebook.presto.sql.analyzer.FeaturesConfig;
import com.facebook.presto.sql.analyzer.FunctionAndTypeResolver;
Expand Down Expand Up @@ -129,6 +131,7 @@ public class FunctionAndTypeManager
private final LoadingCache<FunctionResolutionCacheKey, FunctionHandle> functionCache;
private final CacheStatsMBean cacheStatsMBean;
private final boolean nativeExecution;
private CatalogSchemaName currentDefaultNamespace = DEFAULT_NAMESPACE;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this class isn't really thread safe anymore because this field is mutable, and there are no guards on how it is read/written. It might be better to set the default namespace in FunctionsConfig instead of in the config for any particular function namespace. Then we also wouldn't need to check that there is only one.
Downside is that we will only fail later (at runtime?) if the default namespace doesn't exist (never gets loaded).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

also, let's change this name to defaultNamespace, as this isn't something that we expect to be updated. and change the static variable DEFAULT_NAMESPACE to something that's clearer now, e.g. JAVA_BUILTIN_NAMESPACE.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Last comment about default_namespace, but have you checked all the things that are using DEFAULT_NAMESPACE right now? Is it still correct for them to do that?


@Inject
public FunctionAndTypeManager(
Expand Down Expand Up @@ -220,6 +223,12 @@ public FunctionMetadata getFunctionMetadata(FunctionHandle functionHandle)
return FunctionAndTypeManager.this.getFunctionMetadata(functionHandle);
}

@Override
public SqlFunctionSupplier getSpecializedFunctionKey(Signature signature)
{
return FunctionAndTypeManager.this.getSpecializedFunctionKey(signature);
}

@Override
public Collection<SqlFunction> listBuiltInFunctions()
{
Expand All @@ -246,9 +255,12 @@ public FunctionHandle lookupCast(String castType, Type fromType, Type toType)

public QualifiedObjectName qualifyObjectName(QualifiedName name)
{
if (!name.getPrefix().isPresent()) {
if (name.getSuffix().startsWith("$internal")) {
return QualifiedObjectName.valueOf(DEFAULT_NAMESPACE, name.getSuffix());
}
if (!name.getPrefix().isPresent()) {
return QualifiedObjectName.valueOf(currentDefaultNamespace, name.getSuffix());
}
if (name.getOriginalParts().size() != 3) {
throw new PrestoException(FUNCTION_NOT_FOUND, format("Functions that are not temporary or builtin must be referenced by 'catalog.schema.function_name', found: %s", name));
}
Expand All @@ -267,12 +279,17 @@ public CacheStatsMBean getFunctionResolutionCacheStats()
public void loadFunctionNamespaceManager(
String functionNamespaceManagerName,
String catalogName,
Map<String, String> properties)
Map<String, String> properties,
NodeManager nodeManager,
boolean isDefaultNamespace)
{
requireNonNull(functionNamespaceManagerName, "functionNamespaceManagerName is null");
FunctionNamespaceManagerFactory factory = functionNamespaceManagerFactories.get(functionNamespaceManagerName);
checkState(factory != null, "No factory for function namespace manager %s", functionNamespaceManagerName);
FunctionNamespaceManager<?> functionNamespaceManager = factory.create(catalogName, properties, new FunctionNamespaceManagerContext(this));
if (isDefaultNamespace) {
this.currentDefaultNamespace = new CatalogSchemaName(catalogName, "default");
}
FunctionNamespaceManager<?> functionNamespaceManager = factory.create(catalogName, properties, new FunctionNamespaceManagerContext(this, nodeManager, this));
functionNamespaceManager.setBlockEncodingSerde(blockEncodingSerde);

transactionManager.registerFunctionNamespaceManager(catalogName, functionNamespaceManager);
Expand Down Expand Up @@ -363,8 +380,9 @@ public List<SqlFunction> listFunctions(Session session, Optional<String> likePat
ImmutableList.Builder<SqlFunction> functions = new ImmutableList.Builder<>();
if (!isListBuiltInFunctionsOnly(session)) {
functions.addAll(SessionFunctionUtils.listFunctions(session.getSessionFunctions()));
functions.addAll(functionNamespaceManagers.values().stream()
.flatMap(manager -> manager.listFunctions(likePattern, escape).stream())
functions.addAll(functionNamespaceManagers.entrySet().stream()
.flatMap(manager -> manager.getValue().listFunctions(likePattern, escape).stream()
.filter((functionName) -> functionName.getSignature().getName().getCatalogSchemaName().equals(currentDefaultNamespace)))
.collect(toImmutableList()));
}
else {
Expand Down Expand Up @@ -589,26 +607,33 @@ public boolean nullIfSpecialFormEnabled()
return !nativeExecution;
}

public FunctionHandle lookupFunction(String name, List<TypeSignatureProvider> parameterTypes)
{
QualifiedObjectName functionName = getFunctionAndTypeResolver().qualifyObjectName(QualifiedName.of(name));
return lookupFunction(functionName, parameterTypes);
}

/**
* Lookup up a function with name and fully bound types. This can only be used for builtin functions. {@link #resolveFunction(Optional, Optional, QualifiedObjectName, List)}
* should be used for dynamically registered functions.
*
* @throws PrestoException if function could not be found
*/
public FunctionHandle lookupFunction(String name, List<TypeSignatureProvider> parameterTypes)
public FunctionHandle lookupFunction(QualifiedObjectName functionName, List<TypeSignatureProvider> parameterTypes)
{
QualifiedObjectName functionName = getFunctionAndTypeResolver().qualifyObjectName(QualifiedName.of(name));
Optional<FunctionNamespaceManager<?>> functionNamespaceManager = getServingFunctionNamespaceManager(functionName.getCatalogSchemaName());
checkArgument(functionNamespaceManager.isPresent(), "Cannot find function namespace for function '%s'", functionName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i would change this to throw some kind of Presto user error if the functionNamespaceManager isn't present (maybe FUNCTION_NOT_FOUND error code)

if (parameterTypes.stream().noneMatch(TypeSignatureProvider::hasDependency)) {
return lookupCachedFunction(functionName, parameterTypes);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there's too many assumptions going on here now. this will throw an exception later if the fucntionNamepsaceManager here is not the DEFAULT or currentDefault.

}

Collection<? extends SqlFunction> candidates = builtInTypeAndFunctionNamespaceManager.getFunctions(Optional.empty(), functionName);
Collection<? extends SqlFunction> candidates = functionNamespaceManager.get().getFunctions(Optional.empty(), functionName);
Optional<Signature> match = functionSignatureMatcher.match(candidates, parameterTypes, false);
if (!match.isPresent()) {
throw new PrestoException(FUNCTION_NOT_FOUND, constructFunctionNotFoundErrorMessage(functionName, parameterTypes, candidates));
}

return builtInTypeAndFunctionNamespaceManager.getFunctionHandle(Optional.empty(), match.get());
return functionNamespaceManager.get().getFunctionHandle(Optional.empty(), match.get());
}

public FunctionHandle lookupCast(CastType castType, Type fromType, Type toType)
Expand All @@ -631,6 +656,11 @@ public FunctionHandle lookupCast(CastType castType, Type fromType, Type toType)
return builtInTypeAndFunctionNamespaceManager.getFunctionHandle(Optional.empty(), signature);
}

public CatalogSchemaName getCurrentDefaultNamespace()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

getDefaultNamespace()

{
return currentDefaultNamespace;
}

protected Type getType(UserDefinedType userDefinedType)
{
// Distinct type
Expand Down Expand Up @@ -698,7 +728,8 @@ private FunctionHandle resolveFunctionInternal(Optional<TransactionId> transacti

private FunctionHandle resolveBuiltInFunction(QualifiedObjectName functionName, List<TypeSignatureProvider> parameterTypes)
{
checkArgument(functionName.getCatalogSchemaName().equals(DEFAULT_NAMESPACE), "Expect built-in functions");
checkArgument(functionName.getCatalogSchemaName().equals(currentDefaultNamespace) ||
Copy link
Contributor

@rschlussel rschlussel Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we use both DEFAULT_NAMESPACE and currentDefaultNamespace here?

functionName.getCatalogSchemaName().equals(DEFAULT_NAMESPACE), "Expect built-in/default namespace functions");
checkArgument(parameterTypes.stream().noneMatch(TypeSignatureProvider::hasDependency), "Expect parameter types not to have dependency");
return resolveFunctionInternal(Optional.empty(), functionName, parameterTypes);
}
Expand Down Expand Up @@ -726,6 +757,17 @@ private Optional<FunctionNamespaceManager<? extends SqlFunction>> getServingFunc
return Optional.ofNullable(functionNamespaceManagers.get(typeSignatureBase.getTypeName().getCatalogName()));
}

@Override
@SuppressWarnings("unchecked")
public SpecializedFunctionKey getSpecializedFunctionKey(Signature signature)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

calling out to BuiltInTypeAndFunctionNamepsaceManager to use that functionality for other namespace managers seems to violate some of the boundaries of the abstraction. Maybe we should extract that specialized function cache logic out of BuiltInTypeAndFunctionNamespaceManager if it needs to be common to other function namespace managers.

{
QualifiedObjectName functionName = signature.getName();
Optional<FunctionNamespaceManager<?>> functionNamespaceManager = getServingFunctionNamespaceManager(functionName.getCatalogSchemaName());
checkArgument(functionNamespaceManager.isPresent(), "Cannot find function namespace for signature '%s'", functionName);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same comment about throwing a better error here instead of checkArgument.

Collection<SqlFunction> candidates = (Collection<SqlFunction>) functionNamespaceManager.get().getFunctions(Optional.empty(), functionName);
return builtInTypeAndFunctionNamespaceManager.doGetSpecializedFunctionKey(signature, candidates);
}

private static class FunctionResolutionCacheKey
{
private final QualifiedObjectName functionName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@

import com.facebook.presto.spi.function.Signature;
import com.facebook.presto.spi.function.SqlFunction;
import com.facebook.presto.spi.function.SqlFunctionSupplier;

import java.util.Objects;

import static com.facebook.presto.metadata.SignatureBinder.applyBoundVariables;
import static java.util.Objects.requireNonNull;

public class SpecializedFunctionKey
implements SqlFunctionSupplier
{
private final SqlFunction function;
private final BoundVariables boundVariables;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
package com.facebook.presto.metadata;

import com.facebook.airlift.log.Logger;
import com.facebook.presto.spi.NodeManager;
import com.google.common.collect.ImmutableList;

import javax.inject.Inject;
Expand All @@ -28,20 +29,26 @@
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.base.Strings.isNullOrEmpty;
import static com.google.common.io.Files.getNameWithoutExtension;
import static java.lang.Boolean.parseBoolean;
import static java.lang.String.format;

public class StaticFunctionNamespaceStore
{
private static final Logger log = Logger.get(StaticFunctionNamespaceStore.class);
private static final String FUNCTION_NAMESPACE_MANAGER_NAME = "function-namespace-manager.name";
private static final String DEFAULT_NAMESPACE = "default.namespace";

private final FunctionAndTypeManager functionAndTypeManager;
private final NodeManager nodeManager;
private final File configDir;
private final AtomicBoolean functionNamespaceLoading = new AtomicBoolean();
private boolean isDefaultNamespace;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Call this isDefaultNamespaceSet


@Inject
public StaticFunctionNamespaceStore(FunctionAndTypeManager functionAndTypeManager, StaticFunctionNamespaceStoreConfig config)
public StaticFunctionNamespaceStore(FunctionAndTypeManager functionAndTypeManager, NodeManager nodeManager, StaticFunctionNamespaceStoreConfig config)
{
this.functionAndTypeManager = functionAndTypeManager;
this.nodeManager = nodeManager;
this.configDir = config.getFunctionNamespaceConfigurationDir();
}

Expand Down Expand Up @@ -77,8 +84,15 @@ private void loadFunctionNamespaceManager(String catalogName, Map<String, String
properties = new HashMap<>(properties);
String functionNamespaceManagerName = properties.remove(FUNCTION_NAMESPACE_MANAGER_NAME);
checkState(!isNullOrEmpty(functionNamespaceManagerName), "%s property must be present", FUNCTION_NAMESPACE_MANAGER_NAME);

functionAndTypeManager.loadFunctionNamespaceManager(functionNamespaceManagerName, catalogName, properties);
boolean defaultNamespace = parseBoolean(properties.remove(DEFAULT_NAMESPACE));
if (defaultNamespace) {
if (isDefaultNamespace) {
throw new IllegalStateException(
format("Only one function namespace manager can have the %s property set.", DEFAULT_NAMESPACE));
}
isDefaultNamespace = true;
}
functionAndTypeManager.loadFunctionNamespaceManager(functionNamespaceManagerName, catalogName, properties, nodeManager, defaultNamespace);
log.info("-- Added function namespace manager [%s] --", catalogName);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ else if (method.isAnnotationPresent(SqlParameters.class)) {
functionDescription,
routineCharacteristics,
body,
false,
notVersioned(),
SCALAR,
Optional.empty()))
Expand Down
Loading