From ee0226954daf4f897efd71e0d86f43efad445901 Mon Sep 17 00:00:00 2001 From: Jacob Khaliqi Date: Mon, 15 Jul 2024 10:55:01 -0700 Subject: [PATCH] implement rest endpoint --- presto-function-namespace-managers/pom.xml | 10 +- .../rest/RestComminicationModule.java | 30 ++ .../rest/RestSqlFunctionExecutionModule.java | 29 ++ .../rest/RestSqlFunctionExecutor.java | 194 +++++++++ .../rest/RestSqlFunctionExecutorsModule.java | 71 +++ .../testing/SqlInvokedFunctionTestUtils.java | 43 ++ .../TestSqlFunctionLanguageConfig.java | 12 + .../sql/InterpretedFunctionInvoker.java | 64 +++ .../main/resources/rest_function_server.yaml | 75 +++- .../presto/spi/StandardErrorCode.java | 2 + .../function/FunctionImplementationType.java | 2 +- .../presto/spi/function/SqlFunctionId.java | 9 + presto-tests/pom.xml | 11 + .../tests/TestRestSqlFunctionExecutor.java | 408 ++++++++++++++++++ 14 files changed, 953 insertions(+), 7 deletions(-) create mode 100644 presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestComminicationModule.java create mode 100644 presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutionModule.java create mode 100644 presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutor.java create mode 100644 presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutorsModule.java create mode 100644 presto-tests/src/test/java/com/facebook/presto/tests/TestRestSqlFunctionExecutor.java diff --git a/presto-function-namespace-managers/pom.xml b/presto-function-namespace-managers/pom.xml index fdbee3e1f11c4..b028c400d4512 100644 --- a/presto-function-namespace-managers/pom.xml +++ b/presto-function-namespace-managers/pom.xml @@ -164,6 +164,11 @@ jackson-datatype-jdk8 + + com.facebook.drift + drift-transport-netty + + com.facebook.presto @@ -189,11 +194,6 @@ test - - com.facebook.drift - drift-transport-netty - - org.assertj assertj-core diff --git a/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestComminicationModule.java b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestComminicationModule.java new file mode 100644 index 0000000000000..7ce03ff7e9029 --- /dev/null +++ b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestComminicationModule.java @@ -0,0 +1,30 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.functionNamespace.rest; + +import com.facebook.presto.functionNamespace.ForRestServer; +import com.google.inject.Binder; +import com.google.inject.Module; + +import static com.facebook.airlift.http.client.HttpClientBinder.httpClientBinder; + +public class RestComminicationModule + implements Module +{ + @Override + public void configure(Binder binder) + { + httpClientBinder(binder).bindHttpClient("rest", ForRestServer.class); + } +} diff --git a/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutionModule.java b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutionModule.java new file mode 100644 index 0000000000000..5b27beac5f724 --- /dev/null +++ b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutionModule.java @@ -0,0 +1,29 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.functionNamespace.rest; + +import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutionModule; +import com.facebook.presto.spi.function.SqlFunctionExecutor; +import com.google.inject.Binder; +import com.google.inject.Scopes; + +public class RestSqlFunctionExecutionModule + extends SqlFunctionExecutionModule +{ + @Override + protected void setup(Binder binder) + { + binder.bind(SqlFunctionExecutor.class).to(RestSqlFunctionExecutor.class).in(Scopes.SINGLETON); + } +} diff --git a/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutor.java b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutor.java new file mode 100644 index 0000000000000..27cdb8606f32c --- /dev/null +++ b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutor.java @@ -0,0 +1,194 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.functionNamespace.rest; + +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.HttpStatus; +import com.facebook.airlift.http.client.HttpUriBuilder; +import com.facebook.airlift.http.client.Request; +import com.facebook.airlift.http.client.Response; +import com.facebook.airlift.http.client.ResponseHandler; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.block.BlockEncodingSerde; +import com.facebook.presto.common.function.SqlFunctionResult; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.TypeSignature; +import com.facebook.presto.functionNamespace.ForRestServer; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.PrestoException; +import com.facebook.presto.spi.function.FunctionImplementationType; +import com.facebook.presto.spi.function.RemoteScalarFunctionImplementation; +import com.facebook.presto.spi.function.SqlFunctionExecutor; +import com.facebook.presto.spi.function.SqlFunctionHandle; +import com.facebook.presto.spi.function.SqlFunctionId; +import com.facebook.presto.spi.page.PagesSerde; +import com.facebook.presto.spi.page.SerializedPage; +import com.google.common.util.concurrent.FutureCallback; +import com.google.common.util.concurrent.Futures; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.InputStreamSliceInput; +import io.airlift.slice.SliceInput; + +import javax.inject.Inject; + +import java.io.IOException; +import java.net.URI; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; + +import static com.facebook.airlift.concurrent.MoreFutures.failedFuture; +import static com.facebook.airlift.concurrent.MoreFutures.toCompletableFuture; +import static com.facebook.airlift.http.client.HttpUriBuilder.uriBuilderFrom; +import static com.facebook.airlift.http.client.Request.Builder.preparePost; +import static com.facebook.airlift.http.client.StaticBodyGenerator.createStaticBodyGenerator; +import static com.facebook.presto.spi.StandardErrorCode.FUNCTION_SERVER_FAILURE; +import static com.facebook.presto.spi.StandardErrorCode.NOT_FOUND; +import static com.facebook.presto.spi.function.FunctionImplementationType.REST; +import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage; +import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage; +import static com.google.common.base.Preconditions.checkArgument; +import static com.google.common.base.Preconditions.checkState; +import static com.google.common.collect.ImmutableList.toImmutableList; +import static com.google.common.net.HttpHeaders.ACCEPT; +import static com.google.common.net.HttpHeaders.CONTENT_TYPE; +import static com.google.common.net.MediaType.PLAIN_TEXT_UTF_8; +import static com.google.common.util.concurrent.MoreExecutors.directExecutor; +import static java.util.Objects.requireNonNull; + +public class RestSqlFunctionExecutor + implements SqlFunctionExecutor +{ + private BlockEncodingSerde blockEncodingSerde; + private static PagesSerde pageSerde; + private HttpClient httpClient; + private final NodeManager nodeManager; + private final RestBasedFunctionNamespaceManagerConfig restBasedFunctionNamespaceManagerConfig; + + @Inject + public RestSqlFunctionExecutor(NodeManager nodeManager, RestBasedFunctionNamespaceManagerConfig restBasedFunctionNamespaceManagerConfig, @ForRestServer HttpClient httpClient) + { + this.nodeManager = requireNonNull(nodeManager, "nodeManager is null"); + this.restBasedFunctionNamespaceManagerConfig = requireNonNull(restBasedFunctionNamespaceManagerConfig, "restBasedFunctionNamespaceManagerConfig is null"); + this.httpClient = requireNonNull(httpClient, "httpClient is null"); + } + + @Override + public FunctionImplementationType getImplementationType() + { + return REST; + } + + @Override + public void setBlockEncodingSerde(BlockEncodingSerde blockEncodingSerde) + { + checkState(this.blockEncodingSerde == null, "blockEncodingSerde already set"); + this.blockEncodingSerde = requireNonNull(blockEncodingSerde, "blockEncodingSerde is null"); + this.pageSerde = new PagesSerde(blockEncodingSerde, Optional.empty(), Optional.empty(), Optional.empty()); + } + + @Override + public CompletableFuture executeFunction( + String source, + RemoteScalarFunctionImplementation functionImplementation, + Page input, + List channels, + List argumentTypes, + Type returnType) + { + SqlFunctionHandle functionHandle = functionImplementation.getFunctionHandle(); + SqlFunctionId functionId = functionHandle.getFunctionId(); + String functionVersion = functionHandle.getVersion(); + DynamicSliceOutput sliceOutput = new DynamicSliceOutput((int) input.getRetainedSizeInBytes()); + writeSerializedPage(sliceOutput, pageSerde.serialize(input)); + try { + Request request = preparePost() + .setUri(getExecutionEndpoint(functionId, returnType, functionVersion)) + .setBodyGenerator(createStaticBodyGenerator(sliceOutput.slice().byteArray())) + .setHeader(CONTENT_TYPE, PLAIN_TEXT_UTF_8.toString()) + .setHeader(ACCEPT, PLAIN_TEXT_UTF_8.toString()) + .build(); + HttpClient.HttpResponseFuture future = httpClient.executeAsync(request, new SqlFunctionResultResponseHandler()); + Futures.addCallback(future, new SqlResultFutureCallback(), directExecutor()); + return toCompletableFuture(future); + } + catch (Exception e) { + return failedFuture(new IllegalStateException("Failed to get function definitions for REST server/ Native worker, " + e.getMessage())); + } + } + + private URI getExecutionEndpoint(SqlFunctionId functionId, Type returnType, String functionVersion) + { + List functionArgumentTypes = functionId.getArgumentTypes().stream().map(TypeSignature::toString).collect(toImmutableList()); + if (restBasedFunctionNamespaceManagerConfig.getRestUrl() == null) { + throw new PrestoException(NOT_FOUND, "Failed to find native node !"); + } + + HttpUriBuilder uri = uriBuilderFrom(URI.create(restBasedFunctionNamespaceManagerConfig.getRestUrl())) + .appendPath("/v1/functions/" + + functionId.getFunctionName().getSchemaName() + + "/" + + functionId.getFunctionName().getObjectName() + + "/" + + functionId.getId().replace('(', '[').replace(')', ']') + + "/" + + functionVersion); + return uri.build(); + } + + public static class SqlFunctionResultResponseHandler + implements ResponseHandler + { + @Override + public SqlFunctionResult handleException(Request request, Exception exception) + { + throw new PrestoException(FUNCTION_SERVER_FAILURE, "Failed to get response for rest function call from function server, with exception" + exception.getMessage()); + } + + @Override + public SqlFunctionResult handle(Request request, Response response) + { + if (response.getStatusCode() != 200 || response.getStatusCode() != HttpStatus.OK.code()) { + throw new PrestoException(FUNCTION_SERVER_FAILURE, "Failed to get response for rest function call from function server. Response code: " + response.getStatusCode()); + } + try { + SliceInput input = new InputStreamSliceInput(response.getInputStream()); + SerializedPage serializedPage = readSerializedPage(input); + Page page = pageSerde.deserialize(serializedPage); + checkArgument(page.getChannelCount() == 1, "Expected only one channel in the function output"); + SqlFunctionResult output = new SqlFunctionResult(page.getBlock(0), 1); + return output; + } + catch (IOException e) { + throw new RuntimeException(e); + } + } + } + + public static class SqlResultFutureCallback + implements FutureCallback + { + @Override + public void onSuccess(SqlFunctionResult result) + { + result.getResult(); + } + + @Override + public void onFailure(Throwable t) + { + throw new PrestoException(FUNCTION_SERVER_FAILURE, "Failed with message " + t.getMessage()); + } + } +} diff --git a/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutorsModule.java b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutorsModule.java new file mode 100644 index 0000000000000..4c0f660994d05 --- /dev/null +++ b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/rest/RestSqlFunctionExecutorsModule.java @@ -0,0 +1,71 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.functionNamespace.rest; + +import com.facebook.airlift.configuration.AbstractConfigurationAwareModule; +import com.facebook.presto.functionNamespace.SqlInvokedFunctionNamespaceManagerConfig; +import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutionModule; +import com.facebook.presto.functionNamespace.execution.SqlFunctionExecutors; +import com.facebook.presto.functionNamespace.execution.SqlFunctionLanguageConfig; +import com.facebook.presto.spi.function.FunctionImplementationType; +import com.facebook.presto.spi.function.RoutineCharacteristics.Language; +import com.google.common.collect.ImmutableMap; +import com.google.inject.Binder; +import com.google.inject.TypeLiteral; + +import java.util.Map; + +import static com.facebook.airlift.configuration.ConfigBinder.configBinder; +import static com.google.inject.Scopes.SINGLETON; +import static java.util.Objects.requireNonNull; + +public class RestSqlFunctionExecutorsModule + extends AbstractConfigurationAwareModule +{ + private final SqlFunctionExecutionModule sqlFunctionExecutorModule; + + public RestSqlFunctionExecutorsModule() + { + this(new RestSqlFunctionExecutionModule()); + } + + public RestSqlFunctionExecutorsModule(SqlFunctionExecutionModule sqlFunctionExecutorModule) + { + this.sqlFunctionExecutorModule = requireNonNull(sqlFunctionExecutorModule, "sqlFunctionExecutorModule is null"); + } + + @Override + protected void setup(Binder binder) + { + configBinder(binder).bindConfig(RestBasedFunctionNamespaceManagerConfig.class); + binder.bind(RestSqlFunctionExecutor.class).in(SINGLETON); + ImmutableMap.Builder languageImplementationTypeMap = ImmutableMap.builder(); + ImmutableMap.Builder supportedLanguages = ImmutableMap.builder(); + SqlInvokedFunctionNamespaceManagerConfig config = buildConfigObject(SqlInvokedFunctionNamespaceManagerConfig.class); + for (String languageName : config.getSupportedFunctionLanguages()) { + Language language = new Language(languageName); + FunctionImplementationType implementationType = buildConfigObject(SqlFunctionLanguageConfig.class, languageName) + .setFunctionImplementationType("REST") + .getFunctionImplementationType(); + languageImplementationTypeMap.put(language, implementationType); + supportedLanguages.put(languageName, implementationType); + } + // for SqlFunctionExecutor + sqlFunctionExecutorModule.setSupportedLanguages(supportedLanguages.build()); + install(sqlFunctionExecutorModule); + // for SqlFunctionExecutors + binder.bind(SqlFunctionExecutors.class).in(SINGLETON); + binder.bind(new TypeLiteral>() {}).toInstance(languageImplementationTypeMap.build()); + } +} diff --git a/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/testing/SqlInvokedFunctionTestUtils.java b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/testing/SqlInvokedFunctionTestUtils.java index fe40283368c22..fa5f3a29084d2 100644 --- a/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/testing/SqlInvokedFunctionTestUtils.java +++ b/presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/testing/SqlInvokedFunctionTestUtils.java @@ -20,8 +20,11 @@ import com.facebook.presto.spi.function.SqlInvokedFunction; import com.google.common.collect.ImmutableList; +import static com.facebook.presto.common.type.StandardTypes.ARRAY; +import static com.facebook.presto.common.type.StandardTypes.BOOLEAN; import static com.facebook.presto.common.type.StandardTypes.DOUBLE; import static com.facebook.presto.common.type.StandardTypes.INTEGER; +import static com.facebook.presto.common.type.StandardTypes.VARCHAR; import static com.facebook.presto.common.type.TypeSignature.parseTypeSignature; import static com.facebook.presto.spi.function.FunctionVersion.notVersioned; import static com.facebook.presto.spi.function.RoutineCharacteristics.Determinism.DETERMINISTIC; @@ -38,6 +41,46 @@ private SqlInvokedFunctionTestUtils() public static final QualifiedObjectName POWER_TOWER = QualifiedObjectName.valueOf(new CatalogSchemaName(TEST_CATALOG, TEST_SCHEMA), "power_tower"); public static final QualifiedObjectName TANGENT = QualifiedObjectName.valueOf(new CatalogSchemaName(TEST_CATALOG, TEST_SCHEMA), "tangent"); + public static final QualifiedObjectName ABS = QualifiedObjectName.valueOf(new CatalogSchemaName(TEST_CATALOG, TEST_SCHEMA), "abs"); + public static final QualifiedObjectName REVERSE = QualifiedObjectName.valueOf(new CatalogSchemaName(TEST_CATALOG, TEST_SCHEMA), "reverse"); + public static final QualifiedObjectName BOOL_AND = QualifiedObjectName.valueOf(new CatalogSchemaName(TEST_CATALOG, TEST_SCHEMA), "bool_and"); + public static final QualifiedObjectName ARRAY_SUM = QualifiedObjectName.valueOf(new CatalogSchemaName(TEST_CATALOG, TEST_SCHEMA), "array_sum"); + + public static final SqlInvokedFunction FUNCTION_ABS_INT = new SqlInvokedFunction( + ABS, + ImmutableList.of(new Parameter("x", parseTypeSignature(INTEGER))), + parseTypeSignature(INTEGER), + "abs", + RoutineCharacteristics.builder().setDeterminism(DETERMINISTIC).setNullCallClause(RETURNS_NULL_ON_NULL_INPUT).build(), + "RETURN abs(x)", + notVersioned()); + + public static final SqlInvokedFunction FUNCTION_REV_STRING = new SqlInvokedFunction( + REVERSE, + ImmutableList.of(new Parameter("x", parseTypeSignature(VARCHAR))), + parseTypeSignature(VARCHAR), + "reverse", + RoutineCharacteristics.builder().setDeterminism(DETERMINISTIC).setNullCallClause(RETURNS_NULL_ON_NULL_INPUT).build(), + "RETURN reverse(x)", + notVersioned()); + + public static final SqlInvokedFunction FUNCTION_BOOL_AND = new SqlInvokedFunction( + BOOL_AND, + ImmutableList.of(new Parameter("x", parseTypeSignature(BOOLEAN))), + parseTypeSignature(BOOLEAN), + "bool_and", + RoutineCharacteristics.builder().setDeterminism(DETERMINISTIC).setNullCallClause(RETURNS_NULL_ON_NULL_INPUT).build(), + "RETURN bool_and(x)", + notVersioned()); + + public static final SqlInvokedFunction FUNCTION_ARRAY_SUM = new SqlInvokedFunction( + ARRAY_SUM, + ImmutableList.of(new Parameter("x", parseTypeSignature(INTEGER))), + parseTypeSignature(ARRAY), + "array_sum", + RoutineCharacteristics.builder().setDeterminism(DETERMINISTIC).setNullCallClause(RETURNS_NULL_ON_NULL_INPUT).build(), + "RETURN array_sum(x)", + notVersioned()); public static final SqlInvokedFunction FUNCTION_POWER_TOWER_DOUBLE = new SqlInvokedFunction( POWER_TOWER, diff --git a/presto-function-namespace-managers/src/test/java/com/facebook/presto/functionNamespace/execution/TestSqlFunctionLanguageConfig.java b/presto-function-namespace-managers/src/test/java/com/facebook/presto/functionNamespace/execution/TestSqlFunctionLanguageConfig.java index e9f8d3f6890b8..72ef38531970a 100644 --- a/presto-function-namespace-managers/src/test/java/com/facebook/presto/functionNamespace/execution/TestSqlFunctionLanguageConfig.java +++ b/presto-function-namespace-managers/src/test/java/com/facebook/presto/functionNamespace/execution/TestSqlFunctionLanguageConfig.java @@ -54,4 +54,16 @@ public void testCPPType() assertFullMapping(properties, expected); } + + @Test + public void testRESTType() + { + Map properties = new ImmutableMap.Builder() + .put("function-implementation-type", "REST") + .build(); + SqlFunctionLanguageConfig expected = new SqlFunctionLanguageConfig() + .setFunctionImplementationType("REST"); + + assertFullMapping(properties, expected); + } } diff --git a/presto-main/src/main/java/com/facebook/presto/sql/InterpretedFunctionInvoker.java b/presto-main/src/main/java/com/facebook/presto/sql/InterpretedFunctionInvoker.java index b6997ee5ae547..df7e0afd358c0 100644 --- a/presto-main/src/main/java/com/facebook/presto/sql/InterpretedFunctionInvoker.java +++ b/presto-main/src/main/java/com/facebook/presto/sql/InterpretedFunctionInvoker.java @@ -15,8 +15,13 @@ import com.facebook.presto.common.InvalidFunctionArgumentException; import com.facebook.presto.common.NotSupportedException; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.PageBuilder; +import com.facebook.presto.common.block.Block; +import com.facebook.presto.common.block.BlockBuilder; import com.facebook.presto.common.function.SqlFunctionProperties; import com.facebook.presto.common.type.TimeZoneNotSupportedException; +import com.facebook.presto.common.type.Type; import com.facebook.presto.metadata.FunctionAndTypeManager; import com.facebook.presto.operator.scalar.BuiltInScalarFunctionImplementation; import com.facebook.presto.operator.scalar.ScalarFunctionImplementationChoice; @@ -25,11 +30,13 @@ import com.facebook.presto.spi.function.FunctionHandle; import com.facebook.presto.spi.function.JavaScalarFunctionImplementation; import com.google.common.base.Defaults; +import io.airlift.slice.Slice; import java.lang.invoke.MethodHandle; import java.util.ArrayList; import java.util.Arrays; import java.util.List; +import java.util.concurrent.ExecutionException; import static com.facebook.presto.operator.scalar.ScalarFunctionImplementationChoice.ArgumentType.VALUE_TYPE; import static com.facebook.presto.operator.scalar.ScalarFunctionImplementationChoice.NullConvention.RETURN_NULL_ON_NULL; @@ -60,6 +67,22 @@ public Object invoke(FunctionHandle functionHandle, SqlFunctionProperties proper return invoke(functionAndTypeManager.getJavaScalarFunctionImplementation(functionHandle), properties, arguments); } + public Object invoke(FunctionHandle functionHandle, List arguments, List types, List channels, Type returnType) + { + PageBuilder pageBuilder = new PageBuilder(types); + pageBuilder.declarePosition(); + for (int i = 0; i < types.size(); i++) { + writeOutput(types.get(i), pageBuilder.getBlockBuilder(i), arguments.get(i)); + } + Page inputPage = pageBuilder.build(); + try { + return convertObject(returnType, functionAndTypeManager.executeFunction("", functionHandle, inputPage, channels).get().getResult()); + } + catch (InterruptedException | ExecutionException e) { + throw new RuntimeException(e); + } + } + /** * Arguments must be the native container type for the corresponding SQL types. *

@@ -146,4 +169,45 @@ private static RuntimeException propagate(Throwable throwable) throwIfUnchecked(throwable); throw new RuntimeException(throwable); } + + public void writeOutput(Type type, BlockBuilder output, Object argument) + { + switch (type.getJavaType().getSimpleName()) { + case "long": + type.writeLong(output, (Long) argument); + break; + case "double": + type.writeDouble(output, (Double) argument); + break; + case "boolean": + type.writeBoolean(output, (Boolean) argument); + break; + case "Slice": + type.writeSlice(output, (Slice) argument); + break; + case "Block": + type.writeObject(output, argument); + break; + default: + throw new IllegalArgumentException("Unexpected type: " + type.getJavaType().getSimpleName()); + } + } + + public Object convertObject(Type returnType, Block result) + { + switch (returnType.getJavaType().getSimpleName()) { + case "long": + return returnType.getLong(result, 0); + case "double": + return returnType.getDouble(result, 0); + case "boolean": + return returnType.getBoolean(result, 0); + case "Slice": + return returnType.getSlice(result, 0); + case "Block": + return returnType.getObject(result, 0); + default: + throw new IllegalArgumentException("Unexpected return type: " + returnType.getJavaType().getSimpleName()); + } + } } diff --git a/presto-openapi/src/main/resources/rest_function_server.yaml b/presto-openapi/src/main/resources/rest_function_server.yaml index 8f64f9469b987..8e80f47a087ba 100644 --- a/presto-openapi/src/main/resources/rest_function_server.yaml +++ b/presto-openapi/src/main/resources/rest_function_server.yaml @@ -167,6 +167,70 @@ paths: description: The function was successfully deleted. The response body is empty. '404': description: The function was not found. + /v1/functions/{schema}/{functionName}/{functionId}/{version}: + post: + summary: Retrieve value from function + parameters: + - name: schema + in: path + required: true + schema: + type: string + description: The schema in which the function is defined. + - name: functionName + in: path + required: true + schema: + type: string + description: The name of the function. + - name: functionId + in: path + required: true + schema: + type: string + description: The ID of the function. + - name: version + in: path + schema: + type: string + required: true + description: The version of the function to execute + - name: returnType + in: query + schema: + $ref: '#/components/schemas/TypeSignature' + required: true + description: The return type of the function + - name: numArgs + in: query + schema: + $ref: '#/components/schemas/SqlFunctionId/properties/size' + required: true + description: The numbers of arguments passed in + - name: argType + in: query + schema: + type: object + additionalProperties: + type: string + pattern: '^[A-Za-z][A-Za-z0-9]*$' + example: + argType1: string + argType2: double + description: The type for each argument passed in + requestBody: + required: true + content: + text/plain; charset=utf-8: + schema: + $ref: '#/components/schemas/SerializedPageBinary' + responses: + '200': + description: function value output + content: + text/plain; charset=utf-8: + schema: + $ref: '#/components/schemas/SerializedPageBinary' components: schemas: UdfSignatureMap: @@ -283,4 +347,13 @@ components: argumentTypes: type: array items: - $ref: '#/components/schemas/TypeSignature' \ No newline at end of file + $ref: '#/components/schemas/TypeSignature' + size: + type: integer + SerializedPageBinary: + type: string + format: binary + description: Match this format https://prestodb.io/docs/current/develop/serialized-page.html + Page: + type: string + format: binary \ No newline at end of file diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java index 900a72526e542..ddfac852fd36b 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/StandardErrorCode.java @@ -16,6 +16,7 @@ import com.facebook.presto.common.ErrorCode; import com.facebook.presto.common.ErrorType; +import static com.facebook.presto.common.ErrorType.EXTERNAL; import static com.facebook.presto.common.ErrorType.INSUFFICIENT_RESOURCES; import static com.facebook.presto.common.ErrorType.INTERNAL_ERROR; import static com.facebook.presto.common.ErrorType.USER_ERROR; @@ -140,6 +141,7 @@ public enum StandardErrorCode EXCEEDED_WRITTEN_INTERMEDIATE_BYTES_LIMIT(0x0002_0012, INSUFFICIENT_RESOURCES), TOO_MANY_SIDECARS(0x0002_0013, INTERNAL_ERROR), NO_CPP_SIDECARS(0x0002_0014, INTERNAL_ERROR), + FUNCTION_SERVER_FAILURE(0X0002_0015, EXTERNAL), /**/; // Error code range 0x0003 is reserved for Presto-on-Spark diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionImplementationType.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionImplementationType.java index 12bf8c2376f1c..60a51a06fcd98 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionImplementationType.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/FunctionImplementationType.java @@ -20,7 +20,7 @@ public enum FunctionImplementationType THRIFT(true, false), GRPC(true, false), CPP(false, false), - REST(false, true); + REST(true, false); private final boolean externalExecution; private final boolean evaluatedInCoordinator; diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java index 9e103fd419023..af1a05772b663 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionId.java @@ -39,11 +39,14 @@ public class SqlFunctionId private final QualifiedObjectName functionName; private final List argumentTypes; + private int size; + @ThriftConstructor public SqlFunctionId(QualifiedObjectName functionName, List argumentTypes) { this.functionName = requireNonNull(functionName, "functionName is null"); this.argumentTypes = requireNonNull(argumentTypes, "argumentTypes is null"); + this.size = argumentTypes.size(); } @ThriftField(1) @@ -116,4 +119,10 @@ else if (parts.length > 1) { throw new AssertionError(format("Invalid serialization: %s", signature)); } } + + public int getSize() + { + size = argumentTypes.size(); + return size; + } } diff --git a/presto-tests/pom.xml b/presto-tests/pom.xml index bb806244f3027..560ab2d3c6234 100644 --- a/presto-tests/pom.xml +++ b/presto-tests/pom.xml @@ -244,6 +244,11 @@ javax.servlet-api + + com.facebook.airlift + jaxrs + + com.facebook.airlift @@ -277,6 +282,12 @@ test + + com.facebook.airlift + jaxrs-testing + test + + org.openjdk.jmh diff --git a/presto-tests/src/test/java/com/facebook/presto/tests/TestRestSqlFunctionExecutor.java b/presto-tests/src/test/java/com/facebook/presto/tests/TestRestSqlFunctionExecutor.java new file mode 100644 index 0000000000000..e0a67120b2a80 --- /dev/null +++ b/presto-tests/src/test/java/com/facebook/presto/tests/TestRestSqlFunctionExecutor.java @@ -0,0 +1,408 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.tests; + +import com.facebook.airlift.bootstrap.Bootstrap; +import com.facebook.airlift.http.client.HttpClient; +import com.facebook.airlift.http.client.testing.TestingHttpClient; +import com.facebook.airlift.jaxrs.JsonMapper; +import com.facebook.airlift.jaxrs.testing.JaxrsTestingHttpProcessor; +import com.facebook.presto.client.NodeVersion; +import com.facebook.presto.common.Page; +import com.facebook.presto.common.PageBuilder; +import com.facebook.presto.common.block.BlockBuilder; +import com.facebook.presto.common.block.BlockEncodingManager; +import com.facebook.presto.common.block.IntArrayBlockBuilder; +import com.facebook.presto.common.function.SqlFunctionResult; +import com.facebook.presto.common.type.ArrayType; +import com.facebook.presto.common.type.BooleanType; +import com.facebook.presto.common.type.Decimals; +import com.facebook.presto.common.type.DoubleType; +import com.facebook.presto.common.type.IntegerType; +import com.facebook.presto.common.type.Type; +import com.facebook.presto.common.type.VarcharType; +import com.facebook.presto.functionNamespace.ForRestServer; +import com.facebook.presto.functionNamespace.rest.RestSqlFunctionExecutor; +import com.facebook.presto.functionNamespace.rest.RestSqlFunctionExecutorsModule; +import com.facebook.presto.metadata.InternalNode; +import com.facebook.presto.spi.NodeManager; +import com.facebook.presto.spi.function.FunctionImplementationType; +import com.facebook.presto.spi.function.RemoteScalarFunctionImplementation; +import com.facebook.presto.spi.function.RoutineCharacteristics; +import com.facebook.presto.spi.function.SqlFunctionHandle; +import com.facebook.presto.spi.function.SqlInvokedFunction; +import com.facebook.presto.spi.page.PagesSerde; +import com.facebook.presto.spi.page.SerializedPage; +import com.facebook.presto.testing.TestingNodeManager; +import com.fasterxml.jackson.annotation.JsonInclude; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.datatype.jdk8.Jdk8Module; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import com.google.inject.Injector; +import io.airlift.slice.BasicSliceInput; +import io.airlift.slice.DynamicSliceOutput; +import io.airlift.slice.Slice; +import io.airlift.slice.Slices; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +import javax.ws.rs.Consumes; +import javax.ws.rs.POST; +import javax.ws.rs.Path; +import javax.ws.rs.PathParam; +import javax.ws.rs.Produces; +import javax.ws.rs.QueryParam; +import javax.ws.rs.core.MediaType; +import javax.ws.rs.core.UriBuilder; + +import java.math.BigDecimal; +import java.net.URI; +import java.util.ArrayList; +import java.util.List; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; + +import static com.facebook.presto.functionNamespace.testing.SqlInvokedFunctionTestUtils.FUNCTION_ABS_INT; +import static com.facebook.presto.functionNamespace.testing.SqlInvokedFunctionTestUtils.FUNCTION_ARRAY_SUM; +import static com.facebook.presto.functionNamespace.testing.SqlInvokedFunctionTestUtils.FUNCTION_BOOL_AND; +import static com.facebook.presto.functionNamespace.testing.SqlInvokedFunctionTestUtils.FUNCTION_POWER_TOWER_DOUBLE_UPDATED; +import static com.facebook.presto.functionNamespace.testing.SqlInvokedFunctionTestUtils.FUNCTION_REV_STRING; +import static com.facebook.presto.spi.page.PagesSerdeUtil.readSerializedPage; +import static com.facebook.presto.spi.page.PagesSerdeUtil.writeSerializedPage; +import static io.airlift.slice.Slices.wrappedBuffer; +import static java.lang.Double.longBitsToDouble; +import static java.util.Objects.requireNonNull; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.assertFalse; +import static org.testng.Assert.assertTrue; + +@Test +public class TestRestSqlFunctionExecutor +{ + @DataProvider(name = "testExecuteFunctionAbs") + public Object[][] valuesForAbs() + { + return new Object[][] {{0L, 0L}, {-3L, 3L}, {3L, 3L}}; + } + @DataProvider(name = "testExecuteFunctionPow") + public Object[][] valuesForPow() + { + return new Object[][] {{3.0, 27.0}, {2.0, 4.0}, {-2.0, 0.25}, {0.0, 1.0}}; + } + @DataProvider(name = "testExecuteFunctionReverseString") + public Object[][] valuesForReverseString() + { + return new Object[][] {{"inputValue", "eulaVtupni"}, {"", ""}, {"abc", "cba"}, {"#$123([]}Aa", "aA}][(321$#"}}; + } + @DataProvider(name = "testExecuteFunctionArraySum") + public Object[][] valuesForArraySum() + { + return new Object[][] {{2, 3, 5L}, {1, 1, 2L}, {-4, 0, -4L}, {-3, -2, -5L}}; + } + public static final URI REST_SERVER_URI = URI.create("http://127.0.0.1:7777"); + private RestSqlFunctionExecutor restSqlFunctionExecutor; + + @BeforeMethod + public void setUp() throws Exception + { + // Set up the mock HTTP endpoint that delegates to the Java base Presto Page + TestingFunctionResource testingFunctionResource = new TestingFunctionResource(createPagesSerde()); + ObjectMapper mapper = new ObjectMapper(); + mapper.registerModule(new Jdk8Module()); + mapper.setSerializationInclusion(JsonInclude.Include.NON_EMPTY); + + JaxrsTestingHttpProcessor jaxrsTestingHttpProcessor = new JaxrsTestingHttpProcessor( + UriBuilder.fromUri(REST_SERVER_URI).path("/").build(), + testingFunctionResource, + new JsonMapper(mapper)); + TestingHttpClient testingHttpClient = new TestingHttpClient(jaxrsTestingHttpProcessor); + + restSqlFunctionExecutor = createRestSqlFunctionExecutor(testingHttpClient); + } + + private RestSqlFunctionExecutor createRestSqlFunctionExecutor(HttpClient testingHttpClient) + { + Bootstrap app = new Bootstrap( + // Specially use a testing HTTP client instead of a real one + binder -> binder.bind(HttpClient.class).annotatedWith(ForRestServer.class).toInstance(testingHttpClient), + binder -> binder.bind(NodeManager.class).toInstance(createNodeManager()), + // Otherwise use the exact same module as the rest sql function executor service + new RestSqlFunctionExecutorsModule()); + + Injector injector = app + .doNotInitializeLogging() + .setRequiredConfigurationProperties( + ImmutableMap.of( + "rest-based-function-manager.rest.url", REST_SERVER_URI.toString()) + ).initialize(); + return injector.getInstance(RestSqlFunctionExecutor.class); + } + + @Path("/v1/functions/{schema}/{functionName}/{functionId}/{version}") + //@PATH("http://127.0.0.1:7777/v1/functions/memory/abs/unittest.memory.abs%5Binteger%5D/123") + public static class TestingFunctionResource + { + private final PagesSerde pagesSerde; + public TestingFunctionResource(PagesSerde pagesSerde) + { + this.pagesSerde = requireNonNull(pagesSerde); + } + + @POST + @Consumes(MediaType.WILDCARD) + @Produces(MediaType.WILDCARD) + public byte[] post( + @PathParam("schema") String schema, + @PathParam("functionName") String functionName, + @PathParam("functionId") String functionId, + @PathParam("version") String version, + byte[] serializedPageByteArray) + { + Slice slice = wrappedBuffer(serializedPageByteArray); + SerializedPage serializedPage = readSerializedPage(new BasicSliceInput((slice))); + Page inputPage = pagesSerde.deserialize(serializedPage); + List types = new ArrayList<>(); + List values = new ArrayList<>(); + switch (functionName) { + case "power_tower": + double inputValue = longBitsToDouble(inputPage.getBlock(0).toLong(0)); + types.add(DoubleType.DOUBLE); + values.add(Math.pow(inputValue, inputValue)); + break; + case "abs": + types.add(IntegerType.INTEGER); + values.add(Math.abs(inputPage.getBlock(0).toLong(0))); + break; + case "bool_and": + types.add(BooleanType.BOOLEAN); + long result = 1; + for (int i = 0; i < inputPage.getChannelCount(); i++) { + result *= inputPage.getBlock(i).toLong(0); + } + values.add(result == 1L); + break; + case "reverse": + types.add(VarcharType.VARCHAR); + int length = inputPage.getBlock(0).getSliceLength(0); + String inputString = inputPage.getBlock(0).getSlice(0, 0, length).toStringUtf8(); + String reverse = ""; + for (int i = inputString.length() - 1; i >= 0; i--) { + reverse += inputString.charAt(i); + } + values.add(reverse); + break; + case "array_sum": + types.add(IntegerType.INTEGER); + long val1 = inputPage.getBlock(0).getBlock(0).getInt(0); + long val2 = inputPage.getBlock(0).getBlock(0).getInt(1); + values.add(val1 + val2); + break; + } + Page outputPage = createPage(types, values); + DynamicSliceOutput sliceOutput = new DynamicSliceOutput((int) outputPage.getRetainedSizeInBytes()); + writeSerializedPage(sliceOutput, pagesSerde.serialize(outputPage)); + return sliceOutput.slice().byteArray(); + } + } + + @Test(dataProvider = "testExecuteFunctionAbs") + public void testRestSqlFunctionExecutor(Object inputValue, Object expected) + throws ExecutionException, InterruptedException + { + restSqlFunctionExecutor.setBlockEncodingSerde(new BlockEncodingManager()); + ArrayList types = new ArrayList<>(); + ArrayList arguments = new ArrayList<>(); + types.add(IntegerType.INTEGER); + arguments.add(inputValue); + Type returnType = IntegerType.INTEGER; + + CompletableFuture output = restSqlFunctionExecutor.executeFunction( + "", + createRemoteScalarFunctionImplementation(FUNCTION_ABS_INT), + createPage(types, arguments), + new ArrayList<>(), + types, + returnType); + assertEquals(returnType.getLong(output.get().getResult(), 0), expected); + } + + @Test(dataProvider = "testExecuteFunctionPow") + public void testRestSqlFunctionExecutorPOW(Object inputValue, Object expectedValue) + throws ExecutionException, InterruptedException + { + restSqlFunctionExecutor.setBlockEncodingSerde(new BlockEncodingManager()); + ArrayList types = new ArrayList<>(); + ArrayList arguments = new ArrayList<>(); + types.add(DoubleType.DOUBLE); + arguments.add(inputValue); + Type returnType = DoubleType.DOUBLE; + + CompletableFuture output = restSqlFunctionExecutor.executeFunction( + "", + createRemoteScalarFunctionImplementation(FUNCTION_POWER_TOWER_DOUBLE_UPDATED), + createPage(types, arguments), + new ArrayList<>(), + types, + returnType); + assertEquals(returnType.getDouble(output.get().getResult(), 0), expectedValue); + } + + @Test + public void testRestSqlFunctionExecutorBoolAnd() + throws ExecutionException, InterruptedException + { + restSqlFunctionExecutor.setBlockEncodingSerde(new BlockEncodingManager()); + ArrayList types = new ArrayList<>(); + ArrayList arguments = new ArrayList<>(); + types.add(BooleanType.BOOLEAN); + arguments.add(true); + Type returnType = BooleanType.BOOLEAN; + + CompletableFuture output = restSqlFunctionExecutor.executeFunction( + "", + createRemoteScalarFunctionImplementation(FUNCTION_BOOL_AND), + createPage(types, arguments), + new ArrayList<>(), + types, + returnType); + + types.add(BooleanType.BOOLEAN); + arguments.add(false); + + CompletableFuture output2 = restSqlFunctionExecutor.executeFunction( + "", + createRemoteScalarFunctionImplementation(FUNCTION_BOOL_AND), + createPage(types, arguments), + new ArrayList<>(), + types, + returnType); + + assertTrue(returnType.getBoolean(output.get().getResult(), 0)); + assertFalse(returnType.getBoolean(output2.get().getResult(), 0)); + } + + @Test(dataProvider = "testExecuteFunctionReverseString") + public void testRestSqlFunctionExecutorReverseString(Object input, Object expected) + throws ExecutionException, InterruptedException + { + restSqlFunctionExecutor.setBlockEncodingSerde(new BlockEncodingManager()); + ArrayList types = new ArrayList<>(); + ArrayList arguments = new ArrayList<>(); + types.add(VarcharType.VARCHAR); + arguments.add(input); + Type returnType = VarcharType.VARCHAR; + + CompletableFuture output = restSqlFunctionExecutor.executeFunction( + "", + createRemoteScalarFunctionImplementation(FUNCTION_REV_STRING), + createPage(types, arguments), + new ArrayList<>(), + types, + returnType); + assertEquals(returnType.getSlice(output.get().getResult(), 0).toStringUtf8(), expected); + } + + @Test(dataProvider = "testExecuteFunctionArraySum") + public void testRestSqlFunctionExecutorArray(Object val1, Object val2, Object expected) + throws ExecutionException, InterruptedException + { + restSqlFunctionExecutor.setBlockEncodingSerde(new BlockEncodingManager()); + IntArrayBlockBuilder intArrayBlockBuilder = new IntArrayBlockBuilder(null, 2); + intArrayBlockBuilder.writeInt((int) val1); + intArrayBlockBuilder.writeInt((int) val2); + ArrayList types = new ArrayList<>(); + ArrayList arguments = new ArrayList<>(); + types.add(new ArrayType(IntegerType.INTEGER)); + arguments.add(intArrayBlockBuilder); + Type returnType = IntegerType.INTEGER; + + CompletableFuture output = restSqlFunctionExecutor.executeFunction( + "", + createRemoteScalarFunctionImplementation(FUNCTION_ARRAY_SUM), + createPage(types, arguments), + new ArrayList<>(), + types, + returnType); + assertEquals(returnType.getLong(output.get().getResult(), 0), expected); + } + + private NodeManager createNodeManager() + { + InternalNode internalNode = new InternalNode("test", REST_SERVER_URI, NodeVersion.UNKNOWN, false, false, false, true); + return new TestingNodeManager("testenv", internalNode, ImmutableSet.of()); + } + + private RemoteScalarFunctionImplementation createRemoteScalarFunctionImplementation(SqlInvokedFunction sqlInvokedFunction) + { + SqlFunctionHandle sqlFunctionHandle = new SqlFunctionHandle(sqlInvokedFunction.getFunctionId(), "123"); + return new RemoteScalarFunctionImplementation(sqlFunctionHandle, RoutineCharacteristics.Language.CPP, FunctionImplementationType.CPP); + } + + private static PagesSerde createPagesSerde() + { + return new PagesSerde(new BlockEncodingManager(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + } + + private static Page createPage(List types, List arguments) + { + PageBuilder pageBuilder = new PageBuilder(types); + pageBuilder.declarePosition(); + for (int i = 0; i < types.size(); i++) { + Type type = types.get(i); + BlockBuilder output = pageBuilder.getBlockBuilder(i); + switch (type.getTypeSignature().getBase()) { + case "integer": + case "bigint": + case "smallint": + case "tinyint": + case "real": + case "interval day to second": + case "interval year to month": + case "timestamp": + case "time": + type.writeLong(output, (Long) arguments.get(i)); + break; + case "double": + type.writeDouble(output, (Double) arguments.get(i)); + break; + case "boolean": + type.writeBoolean(output, (Boolean) arguments.get(i)); + break; + case "varchar": + case "char": + case "varbinary": + case "json": + type.writeSlice(output, Slices.utf8Slice(arguments.get(i).toString())); + break; + case "decimal": + BigDecimal bd = (BigDecimal) arguments.get(i); + type.writeSlice(output, Decimals.encodeScaledValue(bd)); + break; + case "object": + case "array": + case "row": + case "map": + type.writeObject(output, arguments.get(i)); + break; + } + } + return pageBuilder.build(); + } +}