Skip to content

Commit

Permalink
[native] Add native endpoint for Velox plan conversion
Browse files Browse the repository at this point in the history
This adds an endpoint to the native Presto server that will convert
a Presto plan fragment to Velox. If the conversion is successful,
the server will send an ok response. If it fails, the server will
send an error response with a 422 status code as unprocessable.
The error message will contain a PlanConversionFailureInfo struct
with error type, code and message.

See also #23649
RFC: https://github.com/prestodb/rfcs/blob/main/RFC-0008-plan-checker.md
  • Loading branch information
BryanCutler committed Nov 26, 2024
1 parent 49c9b7e commit 78df0d9
Show file tree
Hide file tree
Showing 21 changed files with 325 additions and 43 deletions.
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ target_link_libraries(
presto_function_metadata
presto_http
presto_operators
presto_velox_conversion
velox_aggregates
velox_caching
velox_common_base
Expand Down
23 changes: 23 additions & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "presto_cpp/main/types/FunctionMetadata.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "velox/common/base/Counters.h"
#include "velox/common/base/StatsReporter.h"
#include "velox/common/caching/CacheTTLController.h"
Expand Down Expand Up @@ -473,6 +474,9 @@ void PrestoServer::run() {

pool_ =
velox::memory::MemoryManager::getInstance()->addLeafPool("PrestoServer");
nativeWorkerPool_ = velox::memory::MemoryManager::getInstance()->addLeafPool(
"PrestoNativeWorker");

taskManager_ = std::make_unique<TaskManager>(
driverExecutor_.get(), httpSrvCpuExecutor_.get(), spillerExecutor_.get());

Expand Down Expand Up @@ -1454,6 +1458,25 @@ void PrestoServer::registerSidecarEndpoints() {
proxygen::ResponseHandler* downstream) {
http::sendOkResponse(downstream, getFunctionsMetadata());
});
httpServer_->registerPost(
"/v1/velox/plan",
[server = this](
proxygen::HTTPMessage* message,
const std::vector<std::unique_ptr<folly::IOBuf>>& body,
proxygen::ResponseHandler* downstream) {
std::string planFragmentJson = util::extractMessageBody(body);
protocol::PlanConversionResponse response =
prestoToVeloxPlanConversion(
planFragmentJson,
server->nativeWorkerPool_.get(),
server->getVeloxPlanValidator());
if (response.failures.empty()) {
http::sendOkResponse(downstream, json(response));
} else {
http::sendResponse(
downstream, json(response), http::kHttpUnprocessableContent);
}
});
}

protocol::NodeStatus PrestoServer::fetchNodeStatus() {
Expand Down
1 change: 1 addition & 0 deletions presto-native-execution/presto_cpp/main/PrestoServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,7 @@ class PrestoServer {
std::unique_ptr<Announcer> announcer_;
std::unique_ptr<PeriodicHeartbeatManager> heartbeatManager_;
std::shared_ptr<velox::memory::MemoryPool> pool_;
std::shared_ptr<velox::memory::MemoryPool> nativeWorkerPool_;
std::unique_ptr<TaskManager> taskManager_;
std::unique_ptr<TaskResource> taskResource_;
std::atomic<NodeState> nodeState_{NodeState::kActive};
Expand Down
8 changes: 1 addition & 7 deletions presto-native-execution/presto_cpp/main/TaskResource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,13 +221,7 @@ proxygen::RequestHandler* TaskResource::createOrUpdateTaskImpl(
httpSrvCpuExecutor_,
[this, &body, taskId, createOrUpdateFunc]() {
const auto startProcessCpuTimeNs = util::getProcessCpuTimeNs();

// TODO Avoid copy
std::ostringstream oss;
for (auto& buf : body) {
oss << std::string((const char*)buf->data(), buf->length());
}
std::string updateJson = oss.str();
std::string updateJson = util::extractMessageBody(body);

std::unique_ptr<protocol::TaskInfo> taskInfo;
try {
Expand Down
9 changes: 9 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,13 @@ void installSignalHandler() {
#endif // __APPLE__
}

std::string extractMessageBody(
const std::vector<std::unique_ptr<folly::IOBuf>>& body) {
// TODO Avoid copy
std::ostringstream oss;
for (auto& buf : body) {
oss << std::string((const char*)buf->data(), buf->length());
}
return oss.str();
}
} // namespace facebook::presto::util
3 changes: 3 additions & 0 deletions presto-native-execution/presto_cpp/main/common/Utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,7 @@ long getProcessCpuTimeNs();
/// context such as the queryId.
void installSignalHandler();

std::string extractMessageBody(
const std::vector<std::unique_ptr<folly::IOBuf>>& body);

} // namespace facebook::presto::util
7 changes: 2 additions & 5 deletions presto-native-execution/presto_cpp/main/http/HttpClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <folly/synchronization/Latch.h>
#include <velox/common/base/Exceptions.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpClient.h"

namespace facebook::presto::http {
Expand Down Expand Up @@ -169,11 +170,7 @@ HttpResponse::nextAllocationSize(uint64_t dataLength) const {
std::string HttpResponse::dumpBodyChain() const {
std::string responseBody;
if (!bodyChain_.empty()) {
std::ostringstream oss;
for (const auto& buf : bodyChain_) {
oss << std::string((const char*)buf->data(), buf->length());
}
responseBody = oss.str();
responseBody = util::extractMessageBody(bodyChain_);
}
return responseBody;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ const uint16_t kHttpAccepted = 202;
const uint16_t kHttpNoContent = 204;
const uint16_t kHttpUnauthorized = 401;
const uint16_t kHttpNotFound = 404;
const uint16_t kHttpUnprocessableContent = 422;
const uint16_t kHttpInternalServerError = 500;

const char kMimeTypeApplicationJson[] = "application/json";
Expand Down
44 changes: 28 additions & 16 deletions presto-native-execution/presto_cpp/main/http/HttpServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,22 +25,7 @@ void sendOkResponse(proxygen::ResponseHandler* downstream) {
}

void sendOkResponse(proxygen::ResponseHandler* downstream, const json& body) {
// nlohmann::json throws when it finds invalid UTF-8 characters. In that case
// the server will crash. We handle such situation here and generate body
// replacing the faulty UTF-8 sequences.
std::string messageBody;
try {
messageBody = body.dump();
} catch (const std::exception& e) {
messageBody =
body.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace);
LOG(WARNING) << "Failed to serialize json to string. "
"Will retry with 'replace' option. "
"Json Dump:\n"
<< messageBody;
}

sendOkResponse(downstream, messageBody);
sendResponse(downstream, body, http::kHttpOk);
}

void sendOkResponse(
Expand Down Expand Up @@ -75,6 +60,33 @@ void sendErrorResponse(
.sendWithEOM();
}

void sendResponse(
proxygen::ResponseHandler* downstream,
const json& body,
uint16_t status) {
// nlohmann::json throws when it finds invalid UTF-8 characters. In that case
// the server will crash. We handle such situation here and generate body
// replacing the faulty UTF-8 sequences.
std::string messageBody;
try {
messageBody = body.dump();
} catch (const std::exception& e) {
messageBody =
body.dump(-1, ' ', false, nlohmann::detail::error_handler_t::replace);
LOG(WARNING) << "Failed to serialize json to string. "
"Will retry with 'replace' option. "
"Json Dump:\n"
<< messageBody;
}

proxygen::ResponseBuilder(downstream)
.status(status, "")
.header(
proxygen::HTTP_HEADER_CONTENT_TYPE, http::kMimeTypeApplicationJson)
.body(messageBody)
.sendWithEOM();
}

HttpConfig::HttpConfig(const folly::SocketAddress& address, bool reusePort)
: address_(address), reusePort_(reusePort) {}

Expand Down
5 changes: 5 additions & 0 deletions presto-native-execution/presto_cpp/main/http/HttpServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,11 @@ void sendErrorResponse(
const std::string& error = "",
uint16_t status = http::kHttpInternalServerError);

void sendResponse(
proxygen::ResponseHandler* downstream,
const json& body,
uint16_t status);

class AbstractRequestHandler : public proxygen::RequestHandler {
public:
void onRequest(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <velox/common/base/tests/GTestUtils.h>
#include <velox/common/memory/Memory.h>
#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/common/Utils.h"
#include "presto_cpp/main/http/HttpClient.h"
#include "presto_cpp/main/http/HttpServer.h"
#include "velox/common/base/StatsReporter.h"
Expand Down Expand Up @@ -157,14 +158,6 @@ std::string bodyAsString(
return oss.str();
}

std::string toString(std::vector<std::unique_ptr<folly::IOBuf>>& bufs) {
std::ostringstream oss;
for (auto& buf : bufs) {
oss << std::string((const char*)buf->data(), buf->length());
}
return oss.str();
}

void echo(
proxygen::HTTPMessage* message,
std::vector<std::unique_ptr<folly::IOBuf>>& body,
Expand All @@ -181,7 +174,7 @@ void echo(
proxygen::ResponseBuilder(downstream)
.status(facebook::presto::http::kHttpOk, "")
.header(proxygen::HTTP_HEADER_CONTENT_TYPE, "text/plain")
.body(toString(body))
.body(facebook::presto::util::extractMessageBody(body))
.sendWithEOM();
}

Expand Down
4 changes: 4 additions & 0 deletions presto-native-execution/presto_cpp/main/types/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,10 @@ add_library(presto_function_metadata OBJECT FunctionMetadata.cpp)

target_link_libraries(presto_function_metadata velox_function_registry)

add_library(presto_velox_conversion OBJECT VeloxPlanConversion.cpp)

target_link_libraries(presto_velox_conversion velox_type)

if(PRESTO_ENABLE_TESTING)
add_subdirectory(tests)
endif()
Original file line number Diff line number Diff line change
Expand Up @@ -1846,7 +1846,9 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
case protocol::SystemPartitioning::FIXED: {
switch (systemPartitioningHandle->function) {
case protocol::SystemPartitionFunction::ROUND_ROBIN: {
auto numPartitions = partitioningScheme.bucketToPartition->size();
auto numPartitions = partitioningScheme.bucketToPartition
? partitioningScheme.bucketToPartition->size()
: 1;

if (numPartitions == 1) {
planFragment.planNode = core::PartitionedOutputNode::single(
Expand All @@ -1870,7 +1872,9 @@ core::PlanFragment VeloxQueryPlanConverterBase::toVeloxQueryPlan(
return planFragment;
}
case protocol::SystemPartitionFunction::HASH: {
auto numPartitions = partitioningScheme.bucketToPartition->size();
auto numPartitions = partitioningScheme.bucketToPartition
? partitioningScheme.bucketToPartition->size()
: 1;

if (numPartitions == 1) {
planFragment.planNode = core::PartitionedOutputNode::single(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#include "presto_cpp/main/types/VeloxPlanConversion.h"
#include "presto_cpp/main/common/Exception.h"
#include "presto_cpp/main/types/PrestoToVeloxQueryPlan.h"
#include "velox/core/QueryCtx.h"

using namespace facebook::velox;

namespace {

facebook::presto::protocol::PlanConversionFailureInfo copyFailureInfo(
const facebook::presto::protocol::ExecutionFailureInfo& failure) {
facebook::presto::protocol::PlanConversionFailureInfo failureCopy;
failureCopy.type = failure.type;
failureCopy.message = failure.message;
failureCopy.stack = failure.stack;
failureCopy.errorCode = failure.errorCode;
return failureCopy;
}
} // namespace

namespace facebook::presto {

protocol::PlanConversionResponse prestoToVeloxPlanConversion(
const std::string& planFragmentJson,
memory::MemoryPool* pool,
const VeloxPlanValidator* planValidator) {
protocol::PlanConversionResponse response;

try {
protocol::PlanFragment planFragment = json::parse(planFragmentJson);

auto queryCtx = core::QueryCtx::create();
VeloxInteractiveQueryPlanConverter converter(queryCtx.get(), pool);

// Create a taskId and empty TableWriteInfo needed for plan conversion.
protocol::TaskId taskId = "velox-plan-conversion.0.0.0.0";
auto tableWriteInfo = std::make_shared<protocol::TableWriteInfo>();

// Attempt to convert the plan fragment to a Velox plan.
if (auto writeNode =
std::dynamic_pointer_cast<const protocol::TableWriterNode>(
planFragment.root)) {
// TableWriteInfo is not yet built at the planning stage, so we can not
// fully convert a TableWriteNode and skip that node of the fragment.
auto writeSourceNode =
converter.toVeloxQueryPlan(writeNode->source, tableWriteInfo, taskId);
planValidator->validatePlanFragment(core::PlanFragment(writeSourceNode));
} else {
auto veloxPlan =
converter.toVeloxQueryPlan(planFragment, tableWriteInfo, taskId);
planValidator->validatePlanFragment(veloxPlan);
}
} catch (const VeloxException& e) {
response.failures.emplace_back(
copyFailureInfo(VeloxToPrestoExceptionTranslator::translate(e)));
} catch (const std::exception& e) {
response.failures.emplace_back(
copyFailureInfo(VeloxToPrestoExceptionTranslator::translate(e)));
}

return response;
}

} // namespace facebook::presto
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
#pragma once

#include "presto_cpp/main/types/VeloxPlanValidator.h"
#include "presto_cpp/presto_protocol/core/presto_protocol_core.h"
#include "velox/common/memory/MemoryPool.h"

namespace facebook::presto {

/// Convert a Presto plan fragment to a Velox Plan. If the conversion fails,
/// the failure info will be included in the response.
/// @param planFragmentJson string that will parse as protocol::PlanFragment.
/// @param pool MemoryPool used during conversion.
/// @param planValidator VeloxPlanValidator to validate the converted plan.
protocol::PlanConversionResponse prestoToVeloxPlanConversion(
const std::string& planFragmentJson,
velox::memory::MemoryPool* pool,
const VeloxPlanValidator* planValidator);

} // namespace facebook::presto
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ bool planHasNestedJoinLoop(const velox::core::PlanNodePtr planNode) {
}

void VeloxPlanValidator::validatePlanFragment(
const velox::core::PlanFragment& fragment) {
const velox::core::PlanFragment& fragment) const {
const auto failOnNestedLoopJoin =
SystemConfig::instance()
->optionalProperty<bool>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@
namespace facebook::presto {
class VeloxPlanValidator {
public:
virtual void validatePlanFragment(const velox::core::PlanFragment& fragment);
virtual void validatePlanFragment(
const velox::core::PlanFragment& fragment) const;
virtual ~VeloxPlanValidator() = default;
};
} // namespace facebook::presto
Loading

0 comments on commit 78df0d9

Please sign in to comment.