From c79c741468309eb5aa3187e620326e4770759ab4 Mon Sep 17 00:00:00 2001 From: Joe Abraham Date: Mon, 7 Oct 2024 17:42:06 +0530 Subject: [PATCH] Add Rest based remote functions --- .gitignore | 1 + docker/Dockerfile | 4 + docker/entrypoint.sh | 2 + presto-main/pom.xml | 20 +++++ .../presto/server/FunctionServer.java | 2 - presto-native-execution/CMakeLists.txt | 2 +- presto-native-execution/Makefile | 4 +- presto-native-execution/pom.xml | 26 ++---- .../presto_cpp/main/PrestoServer.cpp | 5 +- .../presto_cpp/main/common/Configs.cpp | 4 + .../presto_cpp/main/common/Configs.h | 6 ++ .../presto_cpp/main/types/CMakeLists.txt | 8 ++ .../main/types/PrestoToVeloxExpr.cpp | 79 +++++++++++++++++ .../main/types/tests/CMakeLists.txt | 10 +++ .../core/presto_protocol_core.cpp | 85 +++++++++++++++++++ .../core/presto_protocol_core.h | 21 ++++- .../core/presto_protocol_core.yml | 5 +- .../presto_protocol/presto_protocol.yml | 2 +- .../nativeworker/ContainerQueryRunner.java | 21 +++-- .../ContainerQueryRunnerUtils.java | 38 ++++++++- .../TestPrestoContainerRemoteFunction.java | 58 +++++++++++++ presto-native-execution/velox | 2 +- .../presto/spi/function/Signature.java | 7 ++ 23 files changed, 369 insertions(+), 43 deletions(-) create mode 100644 presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java diff --git a/.gitignore b/.gitignore index a4512f9f794d9..47c44171076d9 100644 --- a/.gitignore +++ b/.gitignore @@ -66,3 +66,4 @@ presto-native-execution/deps-install # Compiled executables used for docker build /docker/presto-cli-*-executable.jar /docker/presto-server-*.tar.gz +/docker/presto-remote-function-server-executable.jar diff --git a/docker/Dockerfile b/docker/Dockerfile index 684ce522c9f73..4255e382ad64b 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -3,12 +3,15 @@ FROM quay.io/centos/centos:stream9 ARG PRESTO_VERSION ARG PRESTO_PKG=presto-server-$PRESTO_VERSION.tar.gz ARG PRESTO_CLI_JAR=presto-cli-$PRESTO_VERSION-executable.jar +ARG PRESTO_REMOTE_SERVER_JAR=presto-remote-function-server-executable.jar ARG JMX_PROMETHEUS_JAVAAGENT_VERSION=0.20.0 ENV PRESTO_HOME="/opt/presto-server" COPY $PRESTO_PKG . COPY $PRESTO_CLI_JAR /opt/presto-cli +COPY $PRESTO_REMOTE_SERVER_JAR /opt/presto-remote-server + RUN dnf install -y java-11-openjdk less procps python3 \ && ln -s $(which python3) /usr/bin/python \ @@ -19,6 +22,7 @@ RUN dnf install -y java-11-openjdk less procps python3 \ && rm -rf ./presto-server-$PRESTO_VERSION \ && chmod +x /opt/presto-cli \ && ln -s /opt/presto-cli /usr/local/bin/ \ + && chmod +x /opt/presto-remote-server \ # clean cache jobs && mv /etc/yum/protected.d/systemd.conf /etc/yum/protected.d/systemd.conf.bak \ && dnf clean all \ diff --git a/docker/entrypoint.sh b/docker/entrypoint.sh index d4df39e601fb9..72dc38e6bbfae 100755 --- a/docker/entrypoint.sh +++ b/docker/entrypoint.sh @@ -2,4 +2,6 @@ set -e +java -Dconfig=/opt/function-server/etc/config.properties -jar /opt/presto-remote-server >> log1.txt 2>&1 + $PRESTO_HOME/bin/launcher run diff --git a/presto-main/pom.xml b/presto-main/pom.xml index dd0633a2b45e1..640f5fd8f8517 100644 --- a/presto-main/pom.xml +++ b/presto-main/pom.xml @@ -563,6 +563,26 @@ + + org.apache.maven.plugins + maven-shade-plugin + + + package + + shade + + + presto-remote-function-server-executable + + + com.facebook.presto.server.FunctionServer + + + + + + org.apache.maven.plugins maven-dependency-plugin diff --git a/presto-main/src/main/java/com/facebook/presto/server/FunctionServer.java b/presto-main/src/main/java/com/facebook/presto/server/FunctionServer.java index 19237b4c377d2..c13e38930ee08 100644 --- a/presto-main/src/main/java/com/facebook/presto/server/FunctionServer.java +++ b/presto-main/src/main/java/com/facebook/presto/server/FunctionServer.java @@ -47,8 +47,6 @@ public void run() verifyJvmRequirements(); verifySystemTimeIsReasonable(); - Logger log = Logger.get(FunctionServer.class); - List modules = ImmutableList.of( new FunctionServerModule(), new HttpServerModule(), diff --git a/presto-native-execution/CMakeLists.txt b/presto-native-execution/CMakeLists.txt index d5001dde70a74..1556e97c0fc45 100644 --- a/presto-native-execution/CMakeLists.txt +++ b/presto-native-execution/CMakeLists.txt @@ -57,7 +57,7 @@ option(PRESTO_ENABLE_ABFS "Build ABFS support" OFF) option(PRESTO_ENABLE_PARQUET "Enable Parquet support" OFF) # Forwards user input to VELOX_ENABLE_REMOTE_FUNCTIONS. -option(PRESTO_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF) +option(PRESTO_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" ON) option(PRESTO_ENABLE_TESTING "Enable tests" ON) diff --git a/presto-native-execution/Makefile b/presto-native-execution/Makefile index f3fb5f709f4d5..3781db67af00c 100644 --- a/presto-native-execution/Makefile +++ b/presto-native-execution/Makefile @@ -73,8 +73,8 @@ clean: #: Delete all build artifacts rm -rf $(BUILD_BASE_DIR) velox-submodule: #: Check out code for velox submodule - git submodule sync --recursive - git submodule update --init --recursive + # git submodule sync --recursive + # git submodule update --init --recursive submodules: velox-submodule diff --git a/presto-native-execution/pom.xml b/presto-native-execution/pom.xml index 3dcfe4a02a1d2..908ea7bd766f1 100644 --- a/presto-native-execution/pom.xml +++ b/presto-native-execution/pom.xml @@ -362,6 +362,12 @@ presto-cli-*-executable.jar + + ${project.parent.basedir}/presto-main/target + + presto-remote-function-server-executable.jar + + ${project.parent.basedir}/presto-server/target @@ -399,24 +405,6 @@ - - dependency - install - - build - - - - - presto-native-dependency:latest - - ${project.basedir}/scripts/dockerfiles/ubuntu-22.04-dependency.dockerfile - ${project.basedir} - - - - - worker install @@ -433,7 +421,7 @@ Release presto-native-dependency:latest - -DPRESTO_ENABLE_TESTING=OFF + -DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON 2 ubuntu:22.04 ubuntu diff --git a/presto-native-execution/presto_cpp/main/PrestoServer.cpp b/presto-native-execution/presto_cpp/main/PrestoServer.cpp index 770c0dfcd5472..28002196c4996 100644 --- a/presto-native-execution/presto_cpp/main/PrestoServer.cpp +++ b/presto-native-execution/presto_cpp/main/PrestoServer.cpp @@ -1268,10 +1268,11 @@ void PrestoServer::registerRemoteFunctions() { } else { VELOX_FAIL( "To register remote functions using a json file path you need to " - "specify the remote server location using '{}', '{}' or '{}'.", + "specify the remote server location using '{}', '{}' or '{}' or {}.", SystemConfig::kRemoteFunctionServerThriftAddress, SystemConfig::kRemoteFunctionServerThriftPort, - SystemConfig::kRemoteFunctionServerThriftUdsPath); + SystemConfig::kRemoteFunctionServerThriftUdsPath, + SystemConfig::kRemoteFunctionServerRestURL); } } #endif diff --git a/presto-native-execution/presto_cpp/main/common/Configs.cpp b/presto-native-execution/presto_cpp/main/common/Configs.cpp index 49246c7fb4a90..113555e4de648 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.cpp +++ b/presto-native-execution/presto_cpp/main/common/Configs.cpp @@ -339,6 +339,10 @@ std::string SystemConfig::remoteFunctionServerSerde() const { return optionalProperty(kRemoteFunctionServerSerde).value(); } +std::string SystemConfig::remoteFunctionRestUrl() const { + return optionalProperty(kRemoteFunctionServerRestURL).value(); +} + int32_t SystemConfig::maxDriversPerTask() const { return optionalProperty(kMaxDriversPerTask).value(); } diff --git a/presto-native-execution/presto_cpp/main/common/Configs.h b/presto-native-execution/presto_cpp/main/common/Configs.h index b1c1b1d1382d6..7f8d320fde1f5 100644 --- a/presto-native-execution/presto_cpp/main/common/Configs.h +++ b/presto-native-execution/presto_cpp/main/common/Configs.h @@ -603,6 +603,10 @@ class SystemConfig : public ConfigBase { static constexpr std::string_view kRemoteFunctionServerThriftUdsPath{ "remote-function-server.thrift.uds-path"}; + /// HTTP URL used by the remote function rest server. + static constexpr std::string_view kRemoteFunctionServerRestURL{ + "remote-function-server.rest.url"}; + /// Path where json files containing signatures for remote functions can be /// found. static constexpr std::string_view @@ -696,6 +700,8 @@ class SystemConfig : public ConfigBase { std::string remoteFunctionServerSerde() const; + std::string remoteFunctionRestUrl() const; + int32_t maxDriversPerTask() const; folly::Optional taskWriterCount() const; diff --git a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt index e37c99c432521..34dfaa75b35e9 100644 --- a/presto-native-execution/presto_cpp/main/types/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/CMakeLists.txt @@ -18,12 +18,20 @@ add_library( presto_types OBJECT PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp) + add_dependencies(presto_types presto_operators presto_type_converter velox_type velox_type_fbhive velox_dwio_dwrf_proto) target_link_libraries(presto_types presto_type_converter velox_type_fbhive velox_hive_partition_function velox_tpch_gen) +if(PRESTO_ENABLE_REMOTE_FUNCTIONS) + add_dependencies(presto_types velox_expression presto_server_remote_function + velox_functions_remote) + target_link_libraries(presto_types presto_server_remote_function + velox_functions_remote) +endif() + set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool) add_library(presto_function_metadata OBJECT FunctionMetadata.cpp) diff --git a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp index 2c2b2a3c5ea00..b9b68c76faffc 100644 --- a/presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp +++ b/presto-native-execution/presto_cpp/main/types/PrestoToVeloxExpr.cpp @@ -14,14 +14,23 @@ #include "presto_cpp/main/types/PrestoToVeloxExpr.h" #include +#include "presto_cpp/main/common/Configs.h" #include "presto_cpp/presto_protocol/Base64Util.h" #include "velox/common/base/Exceptions.h" #include "velox/functions/prestosql/types/JsonType.h" #include "velox/vector/ComplexVector.h" #include "velox/vector/ConstantVector.h" #include "velox/vector/FlatVector.h" +#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS +#include "presto_cpp/main/JsonSignatureParser.h" +#include "velox/expression/FunctionSignature.h" +#include "velox/functions/remote/client/Remote.h" +#endif using namespace facebook::velox::core; +#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS +using facebook::velox::functions::remote::PageFormat; +#endif using facebook::velox::TypeKind; namespace facebook::presto { @@ -412,6 +421,18 @@ std::optional VeloxExprConverter::tryConvertLike( returnType, args, getFunctionName(signature)); } +#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS +PageFormat fromSerdeString(const std::string_view& serdeName) { + if (serdeName == "presto_page") { + return PageFormat::PRESTO_PAGE; + } else { + VELOX_FAIL( + "presto_page serde is expected by remote function server but got : '{}'", + serdeName); + } +} +#endif + TypedExprPtr VeloxExprConverter::toVeloxExpr( const protocol::CallExpression& pexpr) const { if (auto builtin = std::dynamic_pointer_cast( @@ -458,10 +479,68 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr( pexpr.functionHandle)) { auto args = toVeloxExpr(pexpr.arguments); auto returnType = typeParser_->parse(pexpr.returnType); + return std::make_shared( returnType, args, getFunctionName(sqlFunctionHandle->functionId)); } +#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS + else if ( + auto restFunctionHandle = + std::dynamic_pointer_cast( + pexpr.functionHandle)) { + + auto args = toVeloxExpr(pexpr.arguments); + auto returnType = typeParser_->parse(pexpr.returnType); + + const auto* systemConfig = SystemConfig::instance(); + + velox::functions::RemoteVectorFunctionMetadata metadata; + metadata.serdeFormat = + fromSerdeString(systemConfig->remoteFunctionServerSerde()); + metadata.location = systemConfig->remoteFunctionRestUrl(); + metadata.functionId = restFunctionHandle->functionId; + metadata.version = restFunctionHandle->version; + + const auto& prestoSignature = restFunctionHandle->signature; + // parseTypeSignature + velox::exec::FunctionSignatureBuilder signatureBuilder; + // Handle type variable constraints + for (const auto& typeVar : prestoSignature.typeVariableConstraints) { + signatureBuilder.typeVariable(typeVar.name); + } + // Handle long variable constraints (for integer variables) + for (const auto& longVar : prestoSignature.longVariableConstraints) { + signatureBuilder.integerVariable(longVar.name); + } + + // Handle return type + signatureBuilder.returnType(prestoSignature.returnType); + + // Handle argument types + for (const auto& argType : prestoSignature.argumentTypes) { + signatureBuilder.argumentType(argType); + } + + // Handle variable arity + if (prestoSignature.variableArity) { + signatureBuilder.variableArity(); + } + + auto signature = signatureBuilder.build(); + std::vector veloxSignatures = { + signature}; + + velox::functions::registerRemoteFunction( + getFunctionName(restFunctionHandle->functionId), + veloxSignatures, + metadata, + false); + + return std::make_shared( + returnType, args, getFunctionName(restFunctionHandle->functionId)); + } +#endif VELOX_FAIL("Unsupported function handle: {}", pexpr.functionHandle->_type); } diff --git a/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt b/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt index 7a735501dc725..5762a3ea24533 100644 --- a/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt +++ b/presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt @@ -72,6 +72,16 @@ target_link_libraries( ${GFLAGS_LIBRARIES} pthread) +if(PRESTO_ENABLE_REMOTE_FUNCTIONS) + add_dependencies(presto_expressions_test presto_server_remote_function + velox_expression velox_functions_remote) + + target_link_libraries( + presto_expressions_test GTest::gmock GTest::gmock_main + presto_server_remote_function velox_expression velox_functions_remote) + +endif() + set_property(TARGET presto_expressions_test PROPERTY JOB_POOL_LINK presto_link_job_pool) diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp index 763bb62e39426..a3c67abdc31e5 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.cpp @@ -112,6 +112,10 @@ void to_json(json& j, const std::shared_ptr& p) { j = *std::static_pointer_cast(p); return; } + if (type == "rest") { + j = *std::static_pointer_cast(p); + return; + } throw TypeError(type + " no abstract type FunctionHandle "); } @@ -138,6 +142,13 @@ void from_json(const json& j, std::shared_ptr& p) { p = std::static_pointer_cast(k); return; } + if (type == "rest") { + std::shared_ptr k = + std::make_shared(); + j.get_to(*k); + p = std::static_pointer_cast(k); + return; + } throw TypeError(type + " no abstract type FunctionHandle "); } @@ -5849,6 +5860,20 @@ void to_json(json& j, const JsonBasedUdfFunctionMetadata& p) { "JsonBasedUdfFunctionMetadata", "AggregationFunctionMetadata", "aggregateMetadata"); + to_json_key( + j, + "functionId", + p.functionId, + "JsonBasedUdfFunctionMetadata", + "SqlFunctionId", + "functionId"); + to_json_key( + j, + "version", + p.version, + "JsonBasedUdfFunctionMetadata", + "String", + "version"); } void from_json(const json& j, JsonBasedUdfFunctionMetadata& p) { @@ -5901,6 +5926,20 @@ void from_json(const json& j, JsonBasedUdfFunctionMetadata& p) { "JsonBasedUdfFunctionMetadata", "AggregationFunctionMetadata", "aggregateMetadata"); + from_json_key( + j, + "functionId", + p.functionId, + "JsonBasedUdfFunctionMetadata", + "SqlFunctionId", + "functionId"); + from_json_key( + j, + "version", + p.version, + "JsonBasedUdfFunctionMetadata", + "String", + "version"); } } // namespace facebook::presto::protocol /* @@ -8156,6 +8195,52 @@ void from_json(const json& j, RemoteTransactionHandle& p) { } } // namespace facebook::presto::protocol namespace facebook::presto::protocol { +RestFunctionHandle::RestFunctionHandle() noexcept { + _type = "rest"; +} + +void to_json(json& j, const RestFunctionHandle& p) { + j = json::object(); + j["@type"] = "rest"; + to_json_key( + j, + "functionId", + p.functionId, + "RestFunctionHandle", + "SqlFunctionId", + "functionId"); + to_json_key( + j, "version", p.version, "RestFunctionHandle", "String", "version"); + to_json_key( + j, + "signature", + p.signature, + "RestFunctionHandle", + "Signature", + "signature"); +} + +void from_json(const json& j, RestFunctionHandle& p) { + p._type = j["@type"]; + from_json_key( + j, + "functionId", + p.functionId, + "RestFunctionHandle", + "SqlFunctionId", + "functionId"); + from_json_key( + j, "version", p.version, "RestFunctionHandle", "String", "version"); + from_json_key( + j, + "signature", + p.signature, + "RestFunctionHandle", + "Signature", + "signature"); +} +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { RowNumberNode::RowNumberNode() noexcept { _type = "com.facebook.presto.sql.planner.plan.RowNumberNode"; } diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h index cbce83539ca17..0faf18dfe9ec0 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.h @@ -67,21 +67,21 @@ extern const char* const PRESTO_ABORT_TASK_URL_PARAM; class Exception : public std::runtime_error { public: explicit Exception(const std::string& message) - : std::runtime_error(message){}; + : std::runtime_error(message) {}; }; class TypeError : public Exception { public: - explicit TypeError(const std::string& message) : Exception(message){}; + explicit TypeError(const std::string& message) : Exception(message) {}; }; class OutOfRange : public Exception { public: - explicit OutOfRange(const std::string& message) : Exception(message){}; + explicit OutOfRange(const std::string& message) : Exception(message) {}; }; class ParseError : public Exception { public: - explicit ParseError(const std::string& message) : Exception(message){}; + explicit ParseError(const std::string& message) : Exception(message) {}; }; using String = std::string; @@ -1508,6 +1508,8 @@ struct JsonBasedUdfFunctionMetadata { String schema = {}; RoutineCharacteristics routineCharacteristics = {}; std::shared_ptr aggregateMetadata = {}; + std::shared_ptr functionId = {}; + std::shared_ptr version = {}; }; void to_json(json& j, const JsonBasedUdfFunctionMetadata& p); void from_json(const json& j, JsonBasedUdfFunctionMetadata& p); @@ -1922,6 +1924,17 @@ void to_json(json& j, const RemoteTransactionHandle& p); void from_json(const json& j, RemoteTransactionHandle& p); } // namespace facebook::presto::protocol namespace facebook::presto::protocol { +struct RestFunctionHandle : public FunctionHandle { + SqlFunctionId functionId = {}; + String version = {}; + Signature signature = {}; + + RestFunctionHandle() noexcept; +}; +void to_json(json& j, const RestFunctionHandle& p); +void from_json(const json& j, RestFunctionHandle& p); +} // namespace facebook::presto::protocol +namespace facebook::presto::protocol { struct RowNumberNode : public PlanNode { std::shared_ptr source = {}; List partitionBy = {}; diff --git a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml index 2099bf0845414..7b40930a4dbcd 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/core/presto_protocol_core.yml @@ -175,7 +175,7 @@ AbstractClasses: subclasses: - { name: BuiltInFunctionHandle, key: $static } - { name: SqlFunctionHandle, key: json_file } - + - { name: RestFunctionHandle, key: rest } JavaClasses: - presto-spi/src/main/java/com/facebook/presto/spi/ErrorCause.java @@ -191,6 +191,7 @@ JavaClasses: - presto-main/src/main/java/com/facebook/presto/execution/buffer/BufferState.java - presto-main/src/main/java/com/facebook/presto/metadata/BuiltInFunctionHandle.java - presto-spi/src/main/java/com/facebook/presto/spi/function/SqlFunctionHandle.java + - presto-spi/src/main/java/com/facebook/presto/spi/function/RestFunctionHandle.java - presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaRequirement.java - presto-hdfs-core/src/main/java/com/facebook/presto/hive/CacheQuotaScope.java - presto-spi/src/main/java/com/facebook/presto/spi/relation/CallExpression.java @@ -314,5 +315,5 @@ JavaClasses: - presto-main/src/main/java/com/facebook/presto/connector/system/SystemTableLayoutHandle.java - presto-main/src/main/java/com/facebook/presto/connector/system/SystemTransactionHandle.java - presto-spi/src/main/java/com/facebook/presto/spi/function/AggregationFunctionMetadata.java - - presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/json/JsonBasedUdfFunctionMetadata.java + - presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/JsonBasedUdfFunctionMetadata.java - presto-spi/src/main/java/com/facebook/presto/spi/plan/ExchangeEncoding.java diff --git a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml index 4c68f6e8e0736..ce7b2f22d5ffd 100644 --- a/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml +++ b/presto-native-execution/presto_cpp/presto_protocol/presto_protocol.yml @@ -364,4 +364,4 @@ JavaClasses: - presto-main/src/main/java/com/facebook/presto/connector/system/SystemTableLayoutHandle.java - presto-main/src/main/java/com/facebook/presto/connector/system/SystemTransactionHandle.java - presto-spi/src/main/java/com/facebook/presto/spi/function/AggregationFunctionMetadata.java - - presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/json/JsonBasedUdfFunctionMetadata.java + - presto-function-namespace-managers/src/main/java/com/facebook/presto/functionNamespace/JsonBasedUdfFunctionMetadata.java diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java index d32eea55c3929..3d77d5b5d54a0 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunner.java @@ -38,6 +38,7 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; import java.sql.SQLException; import java.sql.Statement; import java.time.Duration; @@ -61,6 +62,7 @@ public class ContainerQueryRunner private static final String CLUSTER_SHUTDOWN_TIMEOUT = System.getProperty("clusterShutDownTimeout", "10"); private static final String BASE_DIR = System.getProperty("user.dir"); private static final int DEFAULT_COORDINATOR_PORT = 8080; + private static final int DEFAULT_FUNCTION_SERVER_PORT = 1122; private static final String TPCH_CATALOG = "tpch"; private static final String TINY_SCHEMA = "tiny"; private static final int DEFAULT_NUMBER_OF_WORKERS = 4; @@ -68,25 +70,24 @@ public class ContainerQueryRunner private final GenericContainer coordinator; private final List> workers = new ArrayList<>(); private final int coordinatorPort; + private final int functionServerPort; private final String catalog; private final String schema; - private final int numberOfWorkers; - private Connection connection; private Statement statement; public ContainerQueryRunner() throws InterruptedException, IOException { - this(DEFAULT_COORDINATOR_PORT, TPCH_CATALOG, TINY_SCHEMA, DEFAULT_NUMBER_OF_WORKERS); + this(DEFAULT_COORDINATOR_PORT, DEFAULT_FUNCTION_SERVER_PORT, TPCH_CATALOG, TINY_SCHEMA, DEFAULT_NUMBER_OF_WORKERS); } - public ContainerQueryRunner(int coordinatorPort, String catalog, String schema, int numberOfWorkers) + public ContainerQueryRunner(int coordinatorPort, int functionServerPort, String catalog, String schema, int numberOfWorkers) throws InterruptedException, IOException { this.coordinatorPort = coordinatorPort; + this.functionServerPort = functionServerPort; this.catalog = catalog; this.schema = schema; - this.numberOfWorkers = numberOfWorkers; // The container details can be added as properties in VM options for testing in IntelliJ. coordinator = createCoordinator(); @@ -110,6 +111,7 @@ public ContainerQueryRunner(int coordinatorPort, String catalog, String schema, try { Connection connection = DriverManager.getConnection(url, "test", null); statement = connection.createStatement(); + statement.execute("set session remote_functions_enabled=true"); } catch (SQLException e) { throw new RuntimeException(e); @@ -131,11 +133,14 @@ private GenericContainer createCoordinator() ContainerQueryRunnerUtils.createCoordinatorLogProperties(); ContainerQueryRunnerUtils.createCoordinatorNodeProperties(); ContainerQueryRunnerUtils.createCoordinatorEntryPointScript(); + ContainerQueryRunnerUtils.createFunctionNamespaceRemoteProperties(functionServerPort); + ContainerQueryRunnerUtils.createFunctionServerConfigProperties(functionServerPort); return new GenericContainer<>(PRESTO_COORDINATOR_IMAGE) .withExposedPorts(coordinatorPort) .withNetwork(network).withNetworkAliases("presto-coordinator") .withFileSystemBind(BASE_DIR + "/testcontainers/coordinator/etc", "/opt/presto-server/etc", BindMode.READ_WRITE) + .withFileSystemBind(BASE_DIR + "/testcontainers/coordinator/etc/function-server", "/opt/function-server/etc", BindMode.READ_ONLY) .withFileSystemBind(BASE_DIR + "/testcontainers/coordinator/entrypoint.sh", "/opt/entrypoint.sh", BindMode.READ_ONLY) .waitingFor(Wait.forLogMessage(".*======== SERVER STARTED ========.*", 1)) .withStartupTimeout(Duration.ofSeconds(Long.parseLong(CONTAINER_TIMEOUT))); @@ -144,7 +149,7 @@ private GenericContainer createCoordinator() private GenericContainer createNativeWorker(int port, String nodeId) throws IOException { - ContainerQueryRunnerUtils.createNativeWorkerConfigProperties(coordinatorPort, nodeId); + ContainerQueryRunnerUtils.createNativeWorkerConfigProperties(coordinatorPort, functionServerPort, nodeId); ContainerQueryRunnerUtils.createNativeWorkerTpchProperties(nodeId); ContainerQueryRunnerUtils.createNativeWorkerEntryPointScript(nodeId); ContainerQueryRunnerUtils.createNativeWorkerNodeProperties(nodeId); @@ -297,9 +302,9 @@ public Session getDefaultSession() public MaterializedResult execute(Session session, String sql) { try { + ResultSet resultSet = statement.executeQuery(sql); return ContainerQueryRunnerUtils - .toMaterializedResult( - statement.executeQuery(sql)); + .toMaterializedResult(resultSet); } catch (SQLException e) { throw new RuntimeException(e); diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java index 6f29f57b3da9c..7553814823e35 100644 --- a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/ContainerQueryRunnerUtils.java @@ -71,7 +71,7 @@ public static void createNativeWorkerTpchProperties(String nodeId) createPropertiesFile("testcontainers/" + nodeId + "/etc/catalog/tpch.properties", properties); } - public static void createNativeWorkerConfigProperties(int coordinatorPort, String nodeId) + public static void createNativeWorkerConfigProperties(int coordinatorPort, int functionServerPort, String nodeId) throws IOException { Properties properties = new Properties(); @@ -80,6 +80,7 @@ public static void createNativeWorkerConfigProperties(int coordinatorPort, Strin properties.setProperty("discovery.uri", "http://presto-coordinator:" + coordinatorPort); properties.setProperty("system-memory-gb", "2"); properties.setProperty("native.sidecar", "false"); + properties.setProperty("remote-function-server.rest.url", "http://presto-coordinator:" + functionServerPort); createPropertiesFile("testcontainers/" + nodeId + "/etc/config.properties", properties); } @@ -93,6 +94,8 @@ public static void createCoordinatorConfigProperties(int port) properties.setProperty("http-server.http.port", Integer.toString(port)); properties.setProperty("discovery-server.enabled", "true"); properties.setProperty("discovery.uri", "http://presto-coordinator:" + port); + properties.setProperty("list-built-in-functions-only", "false"); + properties.setProperty("native-execution-enabled", "false"); // Get native worker system properties and add them to the coordinator properties Map nativeWorkerProperties = NativeQueryRunnerUtils.getNativeWorkerSystemProperties(); @@ -103,6 +106,34 @@ public static void createCoordinatorConfigProperties(int port) createPropertiesFile("testcontainers/coordinator/etc/config.properties", properties); } + public static void createFunctionNamespaceRemoteProperties(int functionServerPort) + throws IOException + { + Properties properties = new Properties(); + properties.setProperty("function-namespace-manager.name", "rest"); + properties.setProperty("supported-function-languages", "Java"); + properties.setProperty("function-implementation-type", "REST"); + properties.setProperty("rest-based-function-manager.rest.url", "http://localhost:" + functionServerPort); + + String directoryPath = "testcontainers/function-namespace"; + File directory = new File(directoryPath); + if (!directory.exists()) { + directory.mkdirs(); + } + + createPropertiesFile("testcontainers/coordinator/etc/function-namespace/remote.properties", properties); + } + + public static void createFunctionServerConfigProperties(int functionServerPort) + throws IOException { + Properties properties = new Properties(); + properties.setProperty("http-server.http.port", String.valueOf(functionServerPort)); + properties.setProperty("regex-library", "RE2J"); + properties.setProperty("parse-decimal-literals-as-double", "true"); + + createPropertiesFile("testcontainers/coordinator/etc/function-server/config.properties", properties); + } + public static void createCoordinatorJvmConfig() throws IOException @@ -161,6 +192,11 @@ public static void createCoordinatorEntryPointScript() { String scriptContent = "#!/bin/sh\n" + "set -e\n" + + "java " + +// "-Dplugin.dir=/opt/presto-remote-server/function-server-plugin " + +// "-Dconfig=/opt/presto-remote-server/function-server-etc/config.properties " + +// "-jar /opt/presto-remote-server >> log1.txt 2>&1 & \n" + + "-Dconfig=/opt/function-server/etc/config.properties -jar /opt/presto-remote-server >> log1.txt 2>&1 & \n" + "$PRESTO_HOME/bin/launcher run\n"; createScriptFile("testcontainers/coordinator/entrypoint.sh", scriptContent); } diff --git a/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java new file mode 100644 index 0000000000000..9d2a0e0e4f2aa --- /dev/null +++ b/presto-native-execution/src/test/java/com/facebook/presto/nativeworker/TestPrestoContainerRemoteFunction.java @@ -0,0 +1,58 @@ +/* + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.facebook.presto.nativeworker; + +import com.facebook.presto.tests.AbstractTestQueryFramework; +import org.testng.annotations.Test; + +import static org.testng.Assert.assertEquals; + +public class TestPrestoContainerRemoteFunction + extends AbstractTestQueryFramework +{ + private ContainerQueryRunner queryRunner; + + @Override + protected ContainerQueryRunner createQueryRunner() + throws Exception + { + queryRunner = new ContainerQueryRunner(); + return queryRunner; + } + + @Test + public void testPresenceAndBasicFunctionality() + { + assertEquals( + computeActual("select remote.default.abs(-10)") + .getMaterializedRows().get(0).getField(0).toString(), + "10"); + assertEquals( + computeActual("select remote.default.abs(-1230)") + .getMaterializedRows().get(0).getField(0).toString(), + "1230"); + assertEquals( + computeActual("select remote.default.second(CAST('2001-01-02 03:04:05' as timestamp))") + .getMaterializedRows().get(0).getField(0).toString(), + "5"); + assertEquals( + computeActual("select remote.default.length(CAST('AB' AS VARBINARY))") + .getMaterializedRows().get(0).getField(0).toString(), + "2"); + assertEquals( + computeActual("select remote.default.floor(100000.99)") + .getMaterializedRows().get(0).getField(0).toString(), + "100000.0"); + } +} diff --git a/presto-native-execution/velox b/presto-native-execution/velox index 3eb2fa4e7f74b..55cfb8811e616 160000 --- a/presto-native-execution/velox +++ b/presto-native-execution/velox @@ -1 +1 @@ -Subproject commit 3eb2fa4e7f74b5f60bbbec96242c1fd64706f274 +Subproject commit 55cfb8811e6161744cd4ab4bc5afbe4b26a3a9eb diff --git a/presto-spi/src/main/java/com/facebook/presto/spi/function/Signature.java b/presto-spi/src/main/java/com/facebook/presto/spi/function/Signature.java index 024ba43035bd0..bf949e40da0d7 100644 --- a/presto-spi/src/main/java/com/facebook/presto/spi/function/Signature.java +++ b/presto-spi/src/main/java/com/facebook/presto/spi/function/Signature.java @@ -35,12 +35,19 @@ @ThriftStruct public final class Signature { + @JsonProperty("name") private final QualifiedObjectName name; + @JsonProperty("kind") private final FunctionKind kind; + @JsonProperty("typeVariableConstraints") private final List typeVariableConstraints; + @JsonProperty("longVariableConstraints") private final List longVariableConstraints; + @JsonProperty("returnType") private final TypeSignature returnType; + @JsonProperty("argumentTypes") private final List argumentTypes; + @JsonProperty("variableArity") private final boolean variableArity; @ThriftConstructor