Skip to content

Commit

Permalink
fix style
Browse files Browse the repository at this point in the history
  • Loading branch information
liuneng1994 committed Sep 4, 2024
1 parent bb8ab2b commit 88c5b47
Show file tree
Hide file tree
Showing 9 changed files with 17 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -323,9 +323,11 @@ class CollectMetricIterator(
private var outputRowCount = 0L
private var outputVectorCount = 0L
private var metricsUpdated = false
// Whether the stage is executed completely using ClickHouse pipeline.
private var wholeStagePipeline = true

override def hasNext: Boolean = {
// The hasNext call is triggered only when there is a fallback.
wholeStagePipeline = false
nativeIterator.hasNext
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -190,26 +190,31 @@ class CHColumnarShuffleWriter[K, V](
}

object CHColumnarShuffleWriter {
private val TOTAL_OUTPUT_ROWS = "total_output_rows"

private val TOTAL_OUTPUT_BATCHES = "total_output_batches"

// Pass the statistics of the last operator before shuffle to CollectMetricIterator.
def setOutputMetrics(splitResult: CHSplitResult): Unit = {
TaskContext
.get()
.getLocalProperties
.setProperty("total_output_rows", splitResult.getTotalRows.toString)
.setProperty(TOTAL_OUTPUT_ROWS, splitResult.getTotalRows.toString)
TaskContext
.get()
.getLocalProperties
.setProperty("total_output_batches", splitResult.getTotalBatches.toString)
.setProperty(TOTAL_OUTPUT_BATCHES, splitResult.getTotalBatches.toString)
}

def getTotalOutputRows(): Long = {
val output_rows = TaskContext.get().getLocalProperty("total_output_rows")
val output_rows = TaskContext.get().getLocalProperty(TOTAL_OUTPUT_ROWS)
var output_rows_value = 0L
if (output_rows != null && output_rows.nonEmpty) output_rows_value = output_rows.toLong
output_rows_value
}

def getTotalOutputBatches(): Long = {
val output_batches = TaskContext.get().getLocalProperty("total_output_batches")
val output_batches = TaskContext.get().getLocalProperty(TOTAL_OUTPUT_BATCHES)
var output_batches_value = 0L
if (output_batches != null) output_batches_value = output_batches.toLong
output_batches_value
Expand Down
5 changes: 2 additions & 3 deletions cpp-ch/local-engine/Parser/LocalExecutor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
#include <QueryPipeline/printPipeline.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Core/Settings.h>

#include "SerializedPlanParser.h"
#include <Parser/SerializedPlanParser.h>

using namespace DB;
namespace local_engine
Expand Down Expand Up @@ -169,4 +168,4 @@ std::string LocalExecutor::dumpPipeline() const
DB::printPipeline(processors, out);
return out.str();
}
}
}
5 changes: 2 additions & 3 deletions cpp-ch/local-engine/Parser/SerializedPlanParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1321,18 +1321,17 @@ std::unique_ptr<LocalExecutor> SerializedPlanParser::createExecutor(DB::QueryPla
const Settings & settings = context->getSettingsRef();
auto builder = buildQueryPipeline(*query_plan);

///

assert(s_plan.relations_size() == 1);
const substrait::PlanRel & root_rel = s_plan.relations().at(0);
assert(root_rel.has_root());
if (root_rel.root().input().has_write())
addSinkTransfrom(context, root_rel.root().input().write(), builder);
///

auto * logger = &Poco::Logger::get("SerializedPlanParser");
LOG_INFO(logger, "build pipeline {} ms", stopwatch.elapsedMicroseconds() / 1000.0);
LOG_DEBUG(
logger, "clickhouse plan [optimization={}]:\n{}", settings.query_plan_enable_optimizations, PlanUtil::explainPlan(*query_plan));
// LOG_DEBUG(logger, "clickhouse pipeline:\n{}", QueryPipelineUtil::explainPipeline(pipeline));

auto config = ExecutorConfig::loadFromContext(context);
return std::make_unique<LocalExecutor>(std::move(query_plan), std::move(builder), config.dump_pipeline);
Expand Down
2 changes: 1 addition & 1 deletion cpp-ch/local-engine/Shuffle/SparkExchangeSink.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,4 @@ std::unordered_map<String, SelectBuilderCreator> SparkExchangeManager::partition
{"single", createSingleSelectorBuilder},
{"range", createRangeSelectorBuilder},
};
}
}
2 changes: 0 additions & 2 deletions cpp-ch/local-engine/Storages/IO/NativeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,6 @@ DB::Block NativeReader::prepareByFirstBlock()
size_t rows = 0;
readVarUInt(columns, istr);
readVarUInt(rows, istr);
if (columns == 0)
throw Exception(ErrorCodes::LOGICAL_ERROR, "Bug!!! Should not read block with zero columns.");

if (columns > 1'000'000uz)
throw Exception(ErrorCodes::TOO_LARGE_ARRAY_SIZE, "Suspiciously many columns in Native format: {}", columns);
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/Storages/SourceFromJavaIter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include <Common/CHUtil.h>
#include <Common/Exception.h>
#include <Common/JNIUtils.h>
#include <Common/DebugUtils.h>

namespace DB
{
Expand Down
1 change: 0 additions & 1 deletion cpp-ch/local-engine/local_engine_jni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@
#include <Common/ErrorCodes.h>
#include <Processors/Executors/PipelineExecutor.h>
#include <Storages/Cache/CacheManager.h>
#include <Common/DebugUtils.h>

#ifdef __cplusplus
namespace DB
Expand Down
1 change: 1 addition & 0 deletions cpp-ch/local-engine/tests/benchmark_local_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include <Common/PODArray_fwd.h>
#include <Common/Stopwatch.h>
#include <Common/logger_useful.h>
#include <Parser/LocalExecutor.h>
#include "testConfig.h"

#if defined(__SSE2__)
Expand Down

0 comments on commit 88c5b47

Please sign in to comment.