Skip to content

Commit

Permalink
Add Rest based remote functions
Browse files Browse the repository at this point in the history
  • Loading branch information
Joe-Abraham committed Nov 19, 2024
1 parent 084603c commit c79c741
Show file tree
Hide file tree
Showing 23 changed files with 369 additions and 43 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -66,3 +66,4 @@ presto-native-execution/deps-install
# Compiled executables used for docker build
/docker/presto-cli-*-executable.jar
/docker/presto-server-*.tar.gz
/docker/presto-remote-function-server-executable.jar
4 changes: 4 additions & 0 deletions docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,15 @@ FROM quay.io/centos/centos:stream9
ARG PRESTO_VERSION
ARG PRESTO_PKG=presto-server-$PRESTO_VERSION.tar.gz
ARG PRESTO_CLI_JAR=presto-cli-$PRESTO_VERSION-executable.jar
ARG PRESTO_REMOTE_SERVER_JAR=presto-remote-function-server-executable.jar
ARG JMX_PROMETHEUS_JAVAAGENT_VERSION=0.20.0

ENV PRESTO_HOME="/opt/presto-server"

COPY $PRESTO_PKG .
COPY $PRESTO_CLI_JAR /opt/presto-cli
COPY $PRESTO_REMOTE_SERVER_JAR /opt/presto-remote-server


RUN dnf install -y java-11-openjdk less procps python3 \
&& ln -s $(which python3) /usr/bin/python \
Expand All @@ -19,6 +22,7 @@ RUN dnf install -y java-11-openjdk less procps python3 \
&& rm -rf ./presto-server-$PRESTO_VERSION \
&& chmod +x /opt/presto-cli \
&& ln -s /opt/presto-cli /usr/local/bin/ \
&& chmod +x /opt/presto-remote-server \
# clean cache jobs
&& mv /etc/yum/protected.d/systemd.conf /etc/yum/protected.d/systemd.conf.bak \
&& dnf clean all \
Expand Down
2 changes: 2 additions & 0 deletions docker/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,6 @@

set -e

java -Dconfig=/opt/function-server/etc/config.properties -jar /opt/presto-remote-server >> log1.txt 2>&1

$PRESTO_HOME/bin/launcher run
20 changes: 20 additions & 0 deletions presto-main/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,26 @@
</ignorePackages>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<finalName>presto-remote-function-server-executable</finalName>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<mainClass>com.facebook.presto.server.FunctionServer</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-dependency-plugin</artifactId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,6 @@ public void run()
verifyJvmRequirements();
verifySystemTimeIsReasonable();

Logger log = Logger.get(FunctionServer.class);

List<Module> modules = ImmutableList.of(
new FunctionServerModule(),
new HttpServerModule(),
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ option(PRESTO_ENABLE_ABFS "Build ABFS support" OFF)
option(PRESTO_ENABLE_PARQUET "Enable Parquet support" OFF)

# Forwards user input to VELOX_ENABLE_REMOTE_FUNCTIONS.
option(PRESTO_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" OFF)
option(PRESTO_ENABLE_REMOTE_FUNCTIONS "Enable remote function support" ON)

option(PRESTO_ENABLE_TESTING "Enable tests" ON)

Expand Down
4 changes: 2 additions & 2 deletions presto-native-execution/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ clean: #: Delete all build artifacts
rm -rf $(BUILD_BASE_DIR)

velox-submodule: #: Check out code for velox submodule
git submodule sync --recursive
git submodule update --init --recursive
# git submodule sync --recursive
# git submodule update --init --recursive

submodules: velox-submodule

Expand Down
26 changes: 7 additions & 19 deletions presto-native-execution/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,12 @@
<include>presto-cli-*-executable.jar</include>
</includes>
</resource>
<resource>
<directory>${project.parent.basedir}/presto-main/target</directory>
<includes>
<include>presto-remote-function-server-executable.jar</include>
</includes>
</resource>
<resource>
<directory>${project.parent.basedir}/presto-server/target</directory>
<includes>
Expand Down Expand Up @@ -399,24 +405,6 @@
</images>
</configuration>
</execution>
<execution>
<id>dependency</id>
<phase>install</phase>
<goals>
<goal>build</goal>
</goals>
<configuration>
<images>
<image>
<name>presto-native-dependency:latest</name>
<build>
<dockerFile>${project.basedir}/scripts/dockerfiles/ubuntu-22.04-dependency.dockerfile</dockerFile>
<contextDir>${project.basedir}</contextDir>
</build>
</image>
</images>
</configuration>
</execution>
<execution>
<id>worker</id>
<phase>install</phase>
Expand All @@ -433,7 +421,7 @@
<args>
<BUILD_TYPE>Release</BUILD_TYPE>
<DEPENDENCY_IMAGE>presto-native-dependency:latest</DEPENDENCY_IMAGE>
<EXTRA_CMAKE_FLAGS>-DPRESTO_ENABLE_TESTING=OFF</EXTRA_CMAKE_FLAGS>
<EXTRA_CMAKE_FLAGS>-DPRESTO_ENABLE_REMOTE_FUNCTIONS=ON</EXTRA_CMAKE_FLAGS>
<NUM_THREADS>2</NUM_THREADS>
<BASE_IMAGE>ubuntu:22.04</BASE_IMAGE>
<OSNAME>ubuntu</OSNAME>
Expand Down
5 changes: 3 additions & 2 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1268,10 +1268,11 @@ void PrestoServer::registerRemoteFunctions() {
} else {
VELOX_FAIL(
"To register remote functions using a json file path you need to "
"specify the remote server location using '{}', '{}' or '{}'.",
"specify the remote server location using '{}', '{}' or '{}' or {}.",
SystemConfig::kRemoteFunctionServerThriftAddress,
SystemConfig::kRemoteFunctionServerThriftPort,
SystemConfig::kRemoteFunctionServerThriftUdsPath);
SystemConfig::kRemoteFunctionServerThriftUdsPath,
SystemConfig::kRemoteFunctionServerRestURL);
}
}
#endif
Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,10 @@ std::string SystemConfig::remoteFunctionServerSerde() const {
return optionalProperty(kRemoteFunctionServerSerde).value();
}

std::string SystemConfig::remoteFunctionRestUrl() const {
return optionalProperty(kRemoteFunctionServerRestURL).value();
}

int32_t SystemConfig::maxDriversPerTask() const {
return optionalProperty<int32_t>(kMaxDriversPerTask).value();
}
Expand Down
6 changes: 6 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Configs.h
Original file line number Diff line number Diff line change
Expand Up @@ -603,6 +603,10 @@ class SystemConfig : public ConfigBase {
static constexpr std::string_view kRemoteFunctionServerThriftUdsPath{
"remote-function-server.thrift.uds-path"};

/// HTTP URL used by the remote function rest server.
static constexpr std::string_view kRemoteFunctionServerRestURL{
"remote-function-server.rest.url"};

/// Path where json files containing signatures for remote functions can be
/// found.
static constexpr std::string_view
Expand Down Expand Up @@ -696,6 +700,8 @@ class SystemConfig : public ConfigBase {

std::string remoteFunctionServerSerde() const;

std::string remoteFunctionRestUrl() const;

int32_t maxDriversPerTask() const;

folly::Optional<int32_t> taskWriterCount() const;
Expand Down
8 changes: 8 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,20 @@ add_library(
presto_types OBJECT
PrestoToVeloxQueryPlan.cpp PrestoToVeloxExpr.cpp VeloxPlanValidator.cpp
PrestoToVeloxSplit.cpp PrestoToVeloxConnector.cpp)

add_dependencies(presto_types presto_operators presto_type_converter velox_type
velox_type_fbhive velox_dwio_dwrf_proto)

target_link_libraries(presto_types presto_type_converter velox_type_fbhive
velox_hive_partition_function velox_tpch_gen)

if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
add_dependencies(presto_types velox_expression presto_server_remote_function
velox_functions_remote)
target_link_libraries(presto_types presto_server_remote_function
velox_functions_remote)
endif()

set_property(TARGET presto_types PROPERTY JOB_POOL_LINK presto_link_job_pool)

add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,23 @@

#include "presto_cpp/main/types/PrestoToVeloxExpr.h"
#include <boost/algorithm/string/case_conv.hpp>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/presto_protocol/Base64Util.h"
#include "velox/common/base/Exceptions.h"
#include "velox/functions/prestosql/types/JsonType.h"
#include "velox/vector/ComplexVector.h"
#include "velox/vector/ConstantVector.h"
#include "velox/vector/FlatVector.h"
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
#include "presto_cpp/main/JsonSignatureParser.h"
#include "velox/expression/FunctionSignature.h"
#include "velox/functions/remote/client/Remote.h"
#endif

using namespace facebook::velox::core;
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
using facebook::velox::functions::remote::PageFormat;
#endif
using facebook::velox::TypeKind;

namespace facebook::presto {
Expand Down Expand Up @@ -412,6 +421,18 @@ std::optional<TypedExprPtr> VeloxExprConverter::tryConvertLike(
returnType, args, getFunctionName(signature));
}

#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
PageFormat fromSerdeString(const std::string_view& serdeName) {
if (serdeName == "presto_page") {
return PageFormat::PRESTO_PAGE;
} else {
VELOX_FAIL(
"presto_page serde is expected by remote function server but got : '{}'",
serdeName);
}
}
#endif

TypedExprPtr VeloxExprConverter::toVeloxExpr(
const protocol::CallExpression& pexpr) const {
if (auto builtin = std::dynamic_pointer_cast<protocol::BuiltInFunctionHandle>(
Expand Down Expand Up @@ -458,10 +479,68 @@ TypedExprPtr VeloxExprConverter::toVeloxExpr(
pexpr.functionHandle)) {
auto args = toVeloxExpr(pexpr.arguments);
auto returnType = typeParser_->parse(pexpr.returnType);

return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(sqlFunctionHandle->functionId));
}
#ifdef PRESTO_ENABLE_REMOTE_FUNCTIONS
else if (
auto restFunctionHandle =
std::dynamic_pointer_cast<protocol::RestFunctionHandle>(
pexpr.functionHandle)) {

auto args = toVeloxExpr(pexpr.arguments);
auto returnType = typeParser_->parse(pexpr.returnType);

const auto* systemConfig = SystemConfig::instance();

velox::functions::RemoteVectorFunctionMetadata metadata;
metadata.serdeFormat =
fromSerdeString(systemConfig->remoteFunctionServerSerde());
metadata.location = systemConfig->remoteFunctionRestUrl();
metadata.functionId = restFunctionHandle->functionId;
metadata.version = restFunctionHandle->version;

const auto& prestoSignature = restFunctionHandle->signature;
// parseTypeSignature
velox::exec::FunctionSignatureBuilder signatureBuilder;
// Handle type variable constraints
for (const auto& typeVar : prestoSignature.typeVariableConstraints) {
signatureBuilder.typeVariable(typeVar.name);
}

// Handle long variable constraints (for integer variables)
for (const auto& longVar : prestoSignature.longVariableConstraints) {
signatureBuilder.integerVariable(longVar.name);
}

// Handle return type
signatureBuilder.returnType(prestoSignature.returnType);

// Handle argument types
for (const auto& argType : prestoSignature.argumentTypes) {
signatureBuilder.argumentType(argType);
}

// Handle variable arity
if (prestoSignature.variableArity) {
signatureBuilder.variableArity();
}

auto signature = signatureBuilder.build();
std::vector<velox::exec::FunctionSignaturePtr> veloxSignatures = {
signature};

velox::functions::registerRemoteFunction(
getFunctionName(restFunctionHandle->functionId),
veloxSignatures,
metadata,
false);

return std::make_shared<CallTypedExpr>(
returnType, args, getFunctionName(restFunctionHandle->functionId));
}
#endif
VELOX_FAIL("Unsupported function handle: {}", pexpr.functionHandle->_type);
}

Expand Down
10 changes: 10 additions & 0 deletions presto-native-execution/presto_cpp/main/types/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,16 @@ target_link_libraries(
${GFLAGS_LIBRARIES}
pthread)

if(PRESTO_ENABLE_REMOTE_FUNCTIONS)
add_dependencies(presto_expressions_test presto_server_remote_function
velox_expression velox_functions_remote)

target_link_libraries(
presto_expressions_test GTest::gmock GTest::gmock_main
presto_server_remote_function velox_expression velox_functions_remote)

endif()

set_property(TARGET presto_expressions_test PROPERTY JOB_POOL_LINK
presto_link_job_pool)

Expand Down
Loading

0 comments on commit c79c741

Please sign in to comment.