Skip to content

Commit

Permalink
feat: Avoid small batches in Exchange (#12010)
Browse files Browse the repository at this point in the history
Summary:
Pull Request resolved: #12010

Prevent exchange client from unblocking to early. Unblocking to early impedes
effectiveness of page merging. When the cost of creating a vector is high (for
example for data sets with high number of columns) creating small pages can
make queries significantly less efficient.

For example it was observed that when network is congested and Exchange buffers
are not filled up as fast query may experience CPU efficiency drop up to 4x: T211034421

Reviewed By: xiaoxmeng

Differential Revision: D67615570

fbshipit-source-id: a9936d9a95d43045f3bdabc8db5a4638bfef177c
  • Loading branch information
arhimondr authored and facebook-github-bot committed Jan 23, 2025
1 parent 5e238f4 commit 121b230
Show file tree
Hide file tree
Showing 13 changed files with 524 additions and 71 deletions.
14 changes: 14 additions & 0 deletions velox/core/QueryConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,15 @@ class QueryConfig {
static constexpr const char* kMaxMergeExchangeBufferSize =
"merge_exchange.max_buffer_size";

/// The minimum number of bytes to accumulate in the ExchangeQueue
/// before unblocking a consumer. This is used to avoid creating tiny
/// batches which may have a negative impact on performance when the
/// cost of creating vectors is high (for example, when there are many
/// columns). To avoid latency degradation, the exchange client unblocks a
/// consumer when 1% of the data size observed so far is accumulated.
static constexpr const char* kMinExchangeOutputBatchBytes =
"min_exchange_output_batch_bytes";

static constexpr const char* kMaxPartialAggregationMemory =
"max_partial_aggregation_memory";

Expand Down Expand Up @@ -594,6 +603,11 @@ class QueryConfig {
return get<uint64_t>(kMaxMergeExchangeBufferSize, kDefault);
}

uint64_t minExchangeOutputBatchBytes() const {
static constexpr uint64_t kDefault = 2UL << 20;
return get<uint64_t>(kMinExchangeOutputBatchBytes, kDefault);
}

uint64_t preferredOutputBatchBytes() const {
static constexpr uint64_t kDefault = 10UL << 20;
return get<uint64_t>(kPreferredOutputBatchBytes, kDefault);
Expand Down
11 changes: 9 additions & 2 deletions velox/docs/configs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,13 @@ Generic Configuration
- Size of buffer in the exchange client that holds data fetched from other nodes before it is processed.
A larger buffer can increase network throughput for larger clusters and thus decrease query processing time
at the expense of reducing the amount of memory available for other usage.
* - min_exchange_output_batch_bytes
- integer
- 2MB
- The minimum number of bytes to accumulate in the ExchangeQueue before unblocking a consumer. This is used to avoid
creating tiny batches which may have a negative impact on performance when the cost of creating vectors is high
(for example, when there are many columns). To avoid latency degradation, the exchange client unblocks a consumer
when 1% of the data size observed so far is accumulated.
* - merge_exchange.max_buffer_size
- integer
- 128MB
Expand Down Expand Up @@ -670,13 +677,13 @@ Each query can override the config by setting corresponding query session proper
- Default AWS secret key to use.
* - hive.s3.endpoint
- string
-
-
- The S3 storage endpoint server. This can be used to connect to an S3-compatible storage system instead of AWS.
* - hive.s3.endpoint.region
- string
- us-east-1
- The S3 storage endpoint server region. Default is set by the AWS SDK. If not configured, region will be attempted
to be parsed from the hive.s3.endpoint value.
to be parsed from the hive.s3.endpoint value.
* - hive.s3.path-style-access
- bool
- false
Expand Down
5 changes: 3 additions & 2 deletions velox/exec/Exchange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ Exchange::Exchange(
operatorCtx_->driverCtx()->queryConfig(),
serdeKind_)},
processSplits_{operatorCtx_->driverCtx()->driverId == 0},
driverId_{driverCtx->driverId},
exchangeClient_{std::move(exchangeClient)} {}

void Exchange::addTaskIds(std::vector<std::string>& taskIds) {
Expand Down Expand Up @@ -111,8 +112,8 @@ BlockingReason Exchange::isBlocked(ContinueFuture* future) {
}

ContinueFuture dataFuture;
currentPages_ =
exchangeClient_->next(preferredOutputBatchBytes_, &atEnd_, &dataFuture);
currentPages_ = exchangeClient_->next(
driverId_, preferredOutputBatchBytes_, &atEnd_, &dataFuture);
if (!currentPages_.empty() || atEnd_) {
if (atEnd_ && noMoreSplits_) {
const auto numSplits = stats_.rlock()->numSplits;
Expand Down
2 changes: 2 additions & 0 deletions velox/exec/Exchange.h
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ class Exchange : public SourceOperator {
/// and passing these to ExchangeClient.
const bool processSplits_;

const int driverId_;

bool noMoreSplits_ = false;

std::shared_ptr<ExchangeClient> exchangeClient_;
Expand Down
14 changes: 11 additions & 3 deletions velox/exec/ExchangeClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -118,10 +118,14 @@ folly::F14FastMap<std::string, RuntimeMetric> ExchangeClient::stats() const {
return stats;
}

std::vector<std::unique_ptr<SerializedPage>>
ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
std::vector<std::unique_ptr<SerializedPage>> ExchangeClient::next(
int consumerId,
uint32_t maxBytes,
bool* atEnd,
ContinueFuture* future) {
std::vector<RequestSpec> requestSpecs;
std::vector<std::unique_ptr<SerializedPage>> pages;
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
{
std::lock_guard<std::mutex> l(queue_->mutex());
if (closed_) {
Expand All @@ -130,7 +134,8 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
}

*atEnd = false;
pages = queue_->dequeueLocked(maxBytes, atEnd, future);
pages = queue_->dequeueLocked(
consumerId, maxBytes, atEnd, future, &stalePromise);
if (*atEnd) {
return pages;
}
Expand All @@ -143,6 +148,9 @@ ExchangeClient::next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
}

// Outside of lock
if (stalePromise.valid()) {
stalePromise.setValue();
}
request(std::move(requestSpecs));
return pages;
}
Expand Down
8 changes: 6 additions & 2 deletions velox/exec/ExchangeClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,18 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
std::string taskId,
int destination,
int64_t maxQueuedBytes,
int32_t numberOfConsumers,
uint64_t minOutputBatchBytes,
memory::MemoryPool* pool,
folly::Executor* executor)
: taskId_{std::move(taskId)},
destination_(destination),
maxQueuedBytes_{maxQueuedBytes},
pool_(pool),
executor_(executor),
queue_(std::make_shared<ExchangeQueue>()) {
queue_(std::make_shared<ExchangeQueue>(
numberOfConsumers,
minOutputBatchBytes)) {
VELOX_CHECK_NOT_NULL(pool_);
VELOX_CHECK_NOT_NULL(executor_);
// NOTE: the executor is used to run async response callback from the
Expand Down Expand Up @@ -91,7 +95,7 @@ class ExchangeClient : public std::enable_shared_from_this<ExchangeClient> {
/// The data may be compressed, in which case 'maxBytes' applies to compressed
/// size.
std::vector<std::unique_ptr<SerializedPage>>
next(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
next(int consumerId, uint32_t maxBytes, bool* atEnd, ContinueFuture* future);

std::string toString() const;

Expand Down
56 changes: 50 additions & 6 deletions velox/exec/ExchangeQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
* limitations under the License.
*/
#include "velox/exec/ExchangeQueue.h"
#include <algorithm>

namespace facebook::velox::exec {

Expand Down Expand Up @@ -64,6 +65,15 @@ void ExchangeQueue::close() {
clearPromises(promises);
}

int64_t ExchangeQueue::minOutputBatchBytesLocked() const {
// always allow to unblock when at end
if (atEnd_) {
return 0;
}
// At most 1% of received bytes so far to minimize latency for small exchanges
return std::min<int64_t>(minOutputBatchBytes_, receivedBytes_ / 100);
}

void ExchangeQueue::enqueueLocked(
std::unique_ptr<SerializedPage>&& page,
std::vector<ContinuePromise>& promises) {
Expand All @@ -86,17 +96,45 @@ void ExchangeQueue::enqueueLocked(
receivedBytes_ += page->size();

queue_.push_back(std::move(page));
if (!promises_.empty()) {
const auto minBatchSize = minOutputBatchBytesLocked();
while (!promises_.empty()) {
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
const int32_t unblockedConsumers = numberOfConsumers_ - promises_.size();
const int64_t unasignedBytes =
totalBytes_ - unblockedConsumers * minBatchSize;
if (unasignedBytes < minBatchSize) {
break;
}
// Resume one of the waiting drivers.
promises.push_back(std::move(promises_.back()));
promises_.pop_back();
auto it = promises_.begin();
promises.push_back(std::move(it->second));
promises_.erase(it);
}
}

void ExchangeQueue::addPromiseLocked(
int consumerId,
ContinueFuture* future,
ContinuePromise* stalePromise) {
ContinuePromise promise{"ExchangeQueue::dequeue"};
*future = promise.getSemiFuture();
auto it = promises_.find(consumerId);
if (it != promises_.end()) {
// resolve stale promises outside the lock to avoid broken promises
*stalePromise = std::move(it->second);
it->second = std::move(promise);
} else {
promises_[consumerId] = std::move(promise);
}
VELOX_CHECK_LE(promises_.size(), numberOfConsumers_);
}

std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(
int consumerId,
uint32_t maxBytes,
bool* atEnd,
ContinueFuture* future) {
ContinueFuture* future,
ContinuePromise* stalePromise) {
VELOX_CHECK_NOT_NULL(future);
if (!error_.empty()) {
*atEnd = true;
Expand All @@ -105,15 +143,21 @@ std::vector<std::unique_ptr<SerializedPage>> ExchangeQueue::dequeueLocked(

*atEnd = false;

// If we don't have enough bytes to return, we wait for more data to be
// available
if (totalBytes_ < minOutputBatchBytesLocked()) {
addPromiseLocked(consumerId, future, stalePromise);
return {};
}

std::vector<std::unique_ptr<SerializedPage>> pages;
uint32_t pageBytes = 0;
for (;;) {
if (queue_.empty()) {
if (atEnd_) {
*atEnd = true;
} else if (pages.empty()) {
promises_.emplace_back("ExchangeQueue::dequeue");
*future = promises_.back().getSemiFuture();
addPromiseLocked(consumerId, future, stalePromise);
}
return pages;
}
Expand Down
49 changes: 46 additions & 3 deletions velox/exec/ExchangeQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,18 @@ class SerializedPage {
/// for input.
class ExchangeQueue {
public:
#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
explicit ExchangeQueue() : ExchangeQueue(1, 0) {}
#endif

explicit ExchangeQueue(
int32_t numberOfConsumers,
uint64_t minOutputBatchBytes)
: numberOfConsumers_{numberOfConsumers},
minOutputBatchBytes_{minOutputBatchBytes} {
VELOX_CHECK_GE(numberOfConsumers, 1);
}

~ExchangeQueue() {
clearAllPromises();
}
Expand Down Expand Up @@ -119,8 +131,20 @@ class ExchangeQueue {
///
/// The data may be compressed, in which case 'maxBytes' applies to compressed
/// size.
std::vector<std::unique_ptr<SerializedPage>> dequeueLocked(
int consumerId,
uint32_t maxBytes,
bool* atEnd,
ContinueFuture* future,
ContinuePromise* stalePromise);

#ifdef VELOX_ENABLE_BACKWARD_COMPATIBILITY
std::vector<std::unique_ptr<SerializedPage>>
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future);
dequeueLocked(uint32_t maxBytes, bool* atEnd, ContinueFuture* future) {
ContinuePromise stalePromise = ContinuePromise::makeEmpty();
return dequeueLocked(0, maxBytes, atEnd, future, &stalePromise);
}
#endif

/// Returns the total bytes held by SerializedPages in 'this'.
int64_t totalBytes() const {
Expand Down Expand Up @@ -166,6 +190,11 @@ class ExchangeQueue {
return {};
}

void addPromiseLocked(
int consumerId,
ContinueFuture* future,
ContinuePromise* stalePromise);

void clearAllPromises() {
std::vector<ContinuePromise> promises;
{
Expand All @@ -176,7 +205,14 @@ class ExchangeQueue {
}

std::vector<ContinuePromise> clearAllPromisesLocked() {
return std::move(promises_);
std::vector<ContinuePromise> promises(promises_.size());
auto it = promises_.begin();
while (it != promises_.end()) {
promises.push_back(std::move(it->second));
it = promises_.erase(it);
}
VELOX_CHECK(promises_.empty());
return promises;
}

static void clearPromises(std::vector<ContinuePromise>& promises) {
Expand All @@ -185,14 +221,21 @@ class ExchangeQueue {
}
}

int64_t minOutputBatchBytesLocked() const;

const int32_t numberOfConsumers_;
const uint64_t minOutputBatchBytes_;

int numCompleted_{0};
int numSources_{0};
bool noMoreSources_{false};
bool atEnd_{false};

std::mutex mutex_;
std::deque<std::unique_ptr<SerializedPage>> queue_;
std::vector<ContinuePromise> promises_;
// The map from consumer id to the waiting promise
folly::F14FastMap<int, ContinuePromise> promises_;

// When set, all promises will be realized and the next dequeue will
// throw an exception with this message.
std::string error_;
Expand Down
5 changes: 4 additions & 1 deletion velox/exec/MergeSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,9 @@ class MergeExchangeSource : public MergeSource {
mergeExchange->taskId(),
destination,
maxQueuedBytes,
1,
// Deliver right away to avoid blocking other sources
0,
pool,
executor)) {
client_->addRemoteTaskId(taskId);
Expand All @@ -146,7 +149,7 @@ class MergeExchangeSource : public MergeSource {
}

if (!currentPage_) {
auto pages = client_->next(1, &atEnd_, future);
auto pages = client_->next(0, 1, &atEnd_, future);
VELOX_CHECK_LE(pages.size(), 1);
currentPage_ = pages.empty() ? nullptr : std::move(pages.front());

Expand Down
8 changes: 6 additions & 2 deletions velox/exec/Task.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -970,7 +970,8 @@ void Task::initializePartitionOutput() {
// exchange client for each merge source to fetch data as we can't mix
// the data from different sources for merging.
if (auto exchangeNodeId = factory->needsExchangeClient()) {
createExchangeClientLocked(pipeline, exchangeNodeId.value());
createExchangeClientLocked(
pipeline, exchangeNodeId.value(), factory->numDrivers);
}
}
}
Expand Down Expand Up @@ -2982,7 +2983,8 @@ bool Task::pauseRequested(ContinueFuture* future) {

void Task::createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId) {
const core::PlanNodeId& planNodeId,
int32_t numberOfConsumers) {
VELOX_CHECK_NULL(
getExchangeClientLocked(pipelineId),
"Exchange client has been created at pipeline: {} for planNode: {}",
Expand All @@ -2998,6 +3000,8 @@ void Task::createExchangeClientLocked(
taskId_,
destination_,
queryCtx()->queryConfig().maxExchangeBufferSize(),
numberOfConsumers,
queryCtx()->queryConfig().minExchangeOutputBatchBytes(),
addExchangeClientPool(planNodeId, pipelineId),
queryCtx()->executor());
exchangeClientByPlanNode_.emplace(planNodeId, exchangeClients_[pipelineId]);
Expand Down
3 changes: 2 additions & 1 deletion velox/exec/Task.h
Original file line number Diff line number Diff line change
Expand Up @@ -1003,7 +1003,8 @@ class Task : public std::enable_shared_from_this<Task> {
// pipeline.
void createExchangeClientLocked(
int32_t pipelineId,
const core::PlanNodeId& planNodeId);
const core::PlanNodeId& planNodeId,
int32_t numberOfConsumers);

// Get a shared reference to the exchange client with the specified exchange
// plan node 'planNodeId'. The function returns null if there is no client
Expand Down
Loading

0 comments on commit 121b230

Please sign in to comment.