Skip to content

Commit

Permalink
[native] Fix unsafe row exchange source with compression support
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaoxmeng committed Dec 21, 2024
1 parent eae4696 commit ea7ceec
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "presto_cpp/main/common/Configs.h"
#include "presto_cpp/main/operators/UnsafeRowExchangeSource.h"
#include "velox/serializers/RowSerializer.h"

namespace facebook::presto::operators {

Expand All @@ -36,7 +37,7 @@ UnsafeRowExchangeSource::request(
return std::move(shuffle_->next())
.deferValue([this](velox::BufferPtr buffer) {
std::vector<velox::ContinuePromise> promises;
int64_t totalBytes = 0;
int64_t totalBytes{0};

{
std::lock_guard<std::mutex> l(queue_->mutex());
Expand All @@ -45,14 +46,25 @@ UnsafeRowExchangeSource::request(
queue_->enqueueLocked(nullptr, promises);
} else {
totalBytes = buffer->size();
VELOX_CHECK_LE(totalBytes, std::numeric_limits<int32_t>::max());

++numBatches_;

auto ioBuf =
folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size());
velox::serializer::detail::RowGroupHeader rowHeader{
.uncompressedSize = static_cast<int32_t>(totalBytes),
.compressedSize = static_cast<int32_t>(totalBytes),
.compressed = false};
auto headBuffer = std::make_shared<std::string>(
velox::serializer::detail::RowGroupHeader::size(), '0');
rowHeader.write(const_cast<char*>(headBuffer->data()));

auto ioBuf = folly::IOBuf::wrapBuffer(
headBuffer->data(), headBuffer->size());
ioBuf->appendToChain(
folly::IOBuf::wrapBuffer(buffer->as<char>(), buffer->size()));
queue_->enqueueLocked(
std::make_unique<velox::exec::SerializedPage>(
std::move(ioBuf), [buffer](auto& /*unused*/) {}),
std::move(ioBuf),
[buffer, headBuffer](auto& /*unused*/) {}),
promises);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -216,7 +216,8 @@ class BroadcastTest : public exec::test::OperatorTestBase {
pool(),
dataType,
velox::getNamedVectorSerde(velox::VectorSerde::Kind::kPresto),
&result);
&result,
nullptr);
return result;
}
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,8 @@ class Cursor {
std::vector<RowVectorPtr> vectors;
while (!input->atEnd()) {
RowVectorPtr vector;
VectorStreamGroup::read(input.get(), pool_, rowType_, serde, &vector);
VectorStreamGroup::read(
input.get(), pool_, rowType_, serde, &vector, nullptr);
vectors.emplace_back(vector);
}
return vectors;
Expand Down
2 changes: 1 addition & 1 deletion presto-native-execution/velox
Submodule velox updated 175 files

0 comments on commit ea7ceec

Please sign in to comment.