Skip to content

Commit

Permalink
Added reference REST function server
Browse files Browse the repository at this point in the history
  • Loading branch information
abevk2023 committed Jan 6, 2025
1 parent 1312c87 commit 9e96957
Show file tree
Hide file tree
Showing 17 changed files with 694 additions and 121 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -92,16 +92,19 @@ public JsonBasedUdfFunctionMetadata(
this.version = requireNonNull(version, "version is null");
}

@JsonProperty
public String getDocString()
{
return docString;
}

@JsonProperty
public FunctionKind getFunctionKind()
{
return functionKind;
}

@JsonProperty
public TypeSignature getOutputType()
{
return outputType;
Expand All @@ -113,31 +116,37 @@ public List<String> getParamNames()
return IntStream.range(0, paramTypes.size()).boxed().map(idx -> "input" + idx).collect(toImmutableList());
}

@JsonProperty
public List<TypeSignature> getParamTypes()
{
return paramTypes;
}

@JsonProperty
public String getSchema()
{
return schema;
}

@JsonProperty
public RoutineCharacteristics getRoutineCharacteristics()
{
return routineCharacteristics;
}

@JsonProperty
public Optional<AggregationFunctionMetadata> getAggregateMetadata()
{
return aggregateMetadata;
}

@JsonProperty
public Optional<SqlFunctionId> getFunctionId()
{
return functionId;
}

@JsonProperty
public Optional<String> getVersion()
{
return version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import com.facebook.presto.spi.PrestoException;
import com.google.common.collect.ImmutableMap;
import com.google.inject.Inject;
import com.google.inject.name.Named;

import java.net.URI;
import java.util.List;
Expand All @@ -31,33 +32,31 @@
import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom;
import static com.facebook.airlift.http.client.JsonResponseHandler.createJsonResponseHandler;
import static com.facebook.airlift.http.client.StatusResponseHandler.createStatusResponseHandler;
import static com.facebook.presto.functionNamespace.rest.RestErrorCode.REST_SERVER_FUNCTION_FETCH_ERROR;
import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static java.util.Objects.requireNonNull;

public class RestBasedFunctionApis
{
public static final String ALL_FUNCTIONS_ENDPOINT = "/v1/functions";

private final HttpClient httpClient;

private final JsonCodec<Map<String, List<JsonBasedUdfFunctionMetadata>>> functionSignatureMapJsonCodec;

private final RestBasedFunctionNamespaceManagerConfig managerConfig;
private final String restUrl;

@Inject
public RestBasedFunctionApis(
JsonCodec<Map<String, List<JsonBasedUdfFunctionMetadata>>> nativeFunctionSignatureMapJsonCodec,
@ForRestServer HttpClient httpClient,
RestBasedFunctionNamespaceManagerConfig managerConfig)
@Named("restUrl") String restUrl)
{
this.functionSignatureMapJsonCodec = requireNonNull(nativeFunctionSignatureMapJsonCodec, "nativeFunctionSignatureMapJsonCodec is null");
this.httpClient = requireNonNull(httpClient, "httpClient is null");
this.managerConfig = requireNonNull(managerConfig, "httpClient is null");
this.restUrl = requireNonNull(restUrl, "restUrl is null");
}

public String getFunctionsETag()
{
URI uri = uriBuilderFrom(URI.create(managerConfig.getRestUrl()))
URI uri = uriBuilderFrom(URI.create(restUrl))
.appendPath(ALL_FUNCTIONS_ENDPOINT)
.build();
Request request = Request.builder()
Expand Down Expand Up @@ -89,21 +88,11 @@ public String addFunction(String schema, String functionName, JsonBasedUdfFuncti
throw new PrestoException(NOT_SUPPORTED, "Add Function is yet to be added");
}

public String updateFunction(String schema, String functionName, String functionId, JsonBasedUdfFunctionMetadata metadata)
{
throw new PrestoException(NOT_SUPPORTED, "Update Function is yet to be added");
}

public String deleteFunction(String schema, String functionName, String functionId)
{
throw new PrestoException(NOT_SUPPORTED, "Delete Function is yet to be added");
}

private UdfFunctionSignatureMap getFunctionsAt(String endpoint)
throws IllegalStateException
{
try {
URI uri = uriBuilderFrom(URI.create(managerConfig.getRestUrl()))
URI uri = uriBuilderFrom(URI.create(restUrl))
.appendPath(endpoint)
.build();
Request request = Request.builder()
Expand All @@ -115,7 +104,7 @@ private UdfFunctionSignatureMap getFunctionsAt(String endpoint)
return new UdfFunctionSignatureMap(ImmutableMap.copyOf(nativeFunctionSignatureMap));
}
catch (Exception e) {
throw new IllegalStateException("Failed to get function definitions from REST server, " + e.getMessage());
throw new PrestoException(REST_SERVER_FUNCTION_FETCH_ERROR, "Failed to fetch function definitions from REST server: " + e.getMessage(), e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,12 @@

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

import static com.facebook.presto.spi.StandardErrorCode.NOT_SUPPORTED;
import static com.facebook.presto.spi.function.RoutineCharacteristics.Language.REST;
import static com.google.common.base.Preconditions.checkState;
import static com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Collections.emptyList;
import static java.util.Objects.requireNonNull;
Expand Down Expand Up @@ -99,7 +96,8 @@ private List<SqlInvokedFunction> getLatestFunctions()
return ImmutableList.of();
}

createSqlInvokedFunctions(udfFunctionSignatureMap, latestFunctions);
List<SqlInvokedFunction> newFunctions = createSqlInvokedFunctions(udfFunctionSignatureMap);
latestFunctions.addAll(newFunctions);

if (newETag != null) {
cachedETag.set(Optional.of(newETag));
Expand All @@ -108,18 +106,17 @@ private List<SqlInvokedFunction> getLatestFunctions()
return latestFunctions;
}

private void createSqlInvokedFunctions(UdfFunctionSignatureMap udfFunctionSignatureMap, List<SqlInvokedFunction> functionList)
private List<SqlInvokedFunction> createSqlInvokedFunctions(UdfFunctionSignatureMap udfFunctionSignatureMap)
{
Map<String, List<JsonBasedUdfFunctionMetadata>> udfSignatureMap = udfFunctionSignatureMap.getUDFSignatureMap();
udfSignatureMap.forEach((name, metaInfoList) -> {
List<SqlInvokedFunction> functions = metaInfoList.stream().map(metaInfo -> createSqlInvokedFunction(name, metaInfo)).collect(toImmutableList());
functionList.addAll(functions);
});
return udfSignatureMap.entrySet().stream()
.flatMap(entry -> entry.getValue().stream()
.map(metaInfo -> createSqlInvokedFunction(entry.getKey(), metaInfo)))
.collect(toImmutableList());
}

private SqlInvokedFunction createSqlInvokedFunction(String functionName, JsonBasedUdfFunctionMetadata jsonBasedUdfFunctionMetaData)
{
checkState(jsonBasedUdfFunctionMetaData.getRoutineCharacteristics().getLanguage().equals(REST), "RestBasedFunctionNamespaceManager only supports REST UDF");
QualifiedObjectName qualifiedFunctionName = QualifiedObjectName.valueOf(new CatalogSchemaName(getCatalogName(), jsonBasedUdfFunctionMetaData.getSchema()), functionName);
List<String> parameterNameList = jsonBasedUdfFunctionMetaData.getParamNames();
List<TypeSignature> parameterTypeList = jsonBasedUdfFunctionMetaData.getParamTypes();
Expand All @@ -130,7 +127,7 @@ private SqlInvokedFunction createSqlInvokedFunction(String functionName, JsonBas
}

FunctionVersion functionVersion = new FunctionVersion(jsonBasedUdfFunctionMetaData.getVersion());
SqlFunctionId functionId = jsonBasedUdfFunctionMetaData.getFunctionId().isPresent() ? jsonBasedUdfFunctionMetaData.getFunctionId().get() : null;
SqlFunctionId functionId = jsonBasedUdfFunctionMetaData.getFunctionId().orElse(null);
return new SqlInvokedFunction(
qualifiedFunctionName,
parameterBuilder.build(),
Expand Down Expand Up @@ -158,11 +155,10 @@ protected Collection<SqlInvokedFunction> fetchFunctionsDirect(QualifiedObjectNam
{
UdfFunctionSignatureMap udfFunctionSignatureMap = restApis.getFunctions(functionName.getSchemaName(), functionName.getObjectName());
if (udfFunctionSignatureMap == null || udfFunctionSignatureMap.isEmpty()) {
return Collections.emptyList();
return ImmutableList.of();
}

List<SqlInvokedFunction> functions = new ArrayList<>();
createSqlInvokedFunctions(udfFunctionSignatureMap, functions);
List<SqlInvokedFunction> functions = createSqlInvokedFunctions(udfFunctionSignatureMap);
return functions;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@

import com.facebook.airlift.bootstrap.Bootstrap;
import com.facebook.presto.functionNamespace.FunctionNamespaceManagerPlugin;
import com.facebook.presto.functionNamespace.execution.NoopSqlFunctionExecutorsModule;
import com.facebook.presto.spi.function.FunctionHandleResolver;
import com.facebook.presto.spi.function.FunctionNamespaceManager;
import com.facebook.presto.spi.function.FunctionNamespaceManagerContext;
Expand Down Expand Up @@ -57,7 +56,7 @@ public FunctionNamespaceManager<?> create(String catalogName, Map<String, String
Bootstrap app = new Bootstrap(
new RestBasedCommunicationModule(),
new RestBasedFunctionNamespaceManagerModule(catalogName),
new NoopSqlFunctionExecutorsModule());
new RestSqlFunctionExecutorsModule());

Injector injector = app
.doNotInitializeLogging()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@
import com.facebook.presto.functionNamespace.execution.SqlFunctionLanguageConfig;
import com.google.inject.Binder;
import com.google.inject.Module;
import com.google.inject.Provides;
import com.google.inject.TypeLiteral;
import com.google.inject.name.Named;

import java.util.List;
import java.util.Map;
Expand Down Expand Up @@ -52,4 +54,11 @@ public void configure(Binder binder)
binder.bind(new TypeLiteral<JsonCodec<Map<String, List<JsonBasedUdfFunctionMetadata>>>>() {})
.toInstance(new JsonCodecFactory().mapJsonCodec(String.class, listJsonCodec(JsonBasedUdfFunctionMetadata.class)));
}

@Provides
@Named("restUrl")
public String provideRestUrl(RestBasedFunctionNamespaceManagerConfig config)
{
return config.getRestUrl();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.facebook.presto.functionNamespace.rest;

import com.facebook.presto.common.ErrorCode;
import com.facebook.presto.common.ErrorType;
import com.facebook.presto.spi.ErrorCodeSupplier;

import static com.facebook.presto.common.ErrorType.EXTERNAL;

public enum RestErrorCode
implements ErrorCodeSupplier
{
REST_SERVER_NOT_FOUND(0, EXTERNAL),
REST_SERVER_ERROR(1, EXTERNAL),
REST_SERVER_TIMEOUT(2, EXTERNAL),
REST_SERVER_CONNECT_ERROR(3, EXTERNAL),
REST_SERVER_BAD_RESPONSE(4, EXTERNAL),
REST_SERVER_IO_ERROR(5, EXTERNAL),
REST_SERVER_FUNCTION_FETCH_ERROR(6, EXTERNAL);

private final ErrorCode errorCode;

public static final int ERROR_CODE_MASK = 0x0002_1000;

RestErrorCode(int code, ErrorType type)
{
errorCode = new ErrorCode(code + ERROR_CODE_MASK, name(), type);
}

@Override
public ErrorCode toErrorCode()
{
return errorCode;
}
}
Loading

0 comments on commit 9e96957

Please sign in to comment.