Skip to content

Commit

Permalink
Merge pull request ClickHouse#49160 from CurtizJ/flush-async-insert-q…
Browse files Browse the repository at this point in the history
…ueue

Allow to flush asynchronous insert queue
  • Loading branch information
CurtizJ authored Jul 25, 2023
2 parents e133db7 + 4531b10 commit 073dea6
Show file tree
Hide file tree
Showing 14 changed files with 235 additions and 22 deletions.
8 changes: 5 additions & 3 deletions programs/server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1476,16 +1476,18 @@ try

/// Load global settings from default_profile and system_profile.
global_context->setDefaultProfiles(config());
const Settings & settings = global_context->getSettingsRef();

/// Initialize background executors after we load default_profile config.
/// This is needed to load proper values of background_pool_size etc.
global_context->initializeBackgroundExecutorsIfNeeded();

if (settings.async_insert_threads)
if (server_settings.async_insert_threads)
{
global_context->setAsynchronousInsertQueue(std::make_shared<AsynchronousInsertQueue>(
global_context,
settings.async_insert_threads));
server_settings.async_insert_threads,
server_settings.async_insert_queue_flush_on_shutdown));
}

size_t mark_cache_size = server_settings.mark_cache_size;
String mark_cache_policy = server_settings.mark_cache_policy;
Expand Down
1 change: 1 addition & 0 deletions src/Access/Common/AccessType.h
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,7 @@ enum class AccessType
M(SYSTEM_SYNC_FILE_CACHE, "SYNC FILE CACHE", GLOBAL, SYSTEM) \
M(SYSTEM_FLUSH_DISTRIBUTED, "FLUSH DISTRIBUTED", TABLE, SYSTEM_FLUSH) \
M(SYSTEM_FLUSH_LOGS, "FLUSH LOGS", GLOBAL, SYSTEM_FLUSH) \
M(SYSTEM_FLUSH_ASYNC_INSERT_QUEUE, "FLUSH ASYNC INSERT QUEUE", GLOBAL, SYSTEM_FLUSH) \
M(SYSTEM_FLUSH, "", GROUP, SYSTEM) \
M(SYSTEM_THREAD_FUZZER, "SYSTEM START THREAD FUZZER, SYSTEM STOP THREAD FUZZER, START THREAD FUZZER, STOP THREAD FUZZER", GLOBAL, SYSTEM) \
M(SYSTEM_UNFREEZE, "SYSTEM UNFREEZE", GLOBAL, SYSTEM) \
Expand Down
2 changes: 2 additions & 0 deletions src/Core/ServerSettings.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ namespace DB
M(UInt64, merges_mutations_memory_usage_soft_limit, 0, "Limit on total memory usage for merges and mutations. Zero means Unlimited.", 0) \
M(Double, merges_mutations_memory_usage_to_ram_ratio, 0.5, "Same as merges_mutations_memory_usage_soft_limit but in to ram ratio. Allows to lower memory limit on low-memory systems.", 0) \
M(Bool, allow_use_jemalloc_memory, true, "Allows to use jemalloc memory.", 0) \
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert_queue_flush_on_shutdown, true, "If true queue of asynchronous inserts is flushed on graceful shutdown", 0) \
\
M(UInt64, max_concurrent_queries, 0, "Limit on total number of concurrently executed queries. Zero means Unlimited.", 0) \
M(UInt64, max_concurrent_insert_queries, 0, "Limit on total number of concurrently insert queries. Zero means Unlimited.", 0) \
Expand Down
2 changes: 1 addition & 1 deletion src/Core/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -675,7 +675,6 @@ class IColumn;
M(UInt64, merge_tree_min_bytes_per_task_for_remote_reading, 4 * DBMS_DEFAULT_BUFFER_SIZE, "Min bytes to read per task.", 0) \
M(Bool, merge_tree_use_const_size_tasks_for_remote_reading, true, "Whether to use constant size tasks for reading from a remote table.", 0) \
\
M(UInt64, async_insert_threads, 16, "Maximum number of threads to actually parse and insert data in background. Zero means asynchronous mode is disabled", 0) \
M(Bool, async_insert, false, "If true, data from INSERT query is stored in queue and later flushed to table in background. If wait_for_async_insert is false, INSERT query is processed almost instantly, otherwise client will wait until data will be flushed to table", 0) \
M(Bool, wait_for_async_insert, true, "If true wait for processing of asynchronous insertion", 0) \
M(Seconds, wait_for_async_insert_timeout, DBMS_DEFAULT_LOCK_ACQUIRE_TIMEOUT_SEC, "Timeout for waiting for processing asynchronous insertion", 0) \
Expand Down Expand Up @@ -822,6 +821,7 @@ class IColumn;
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, background_distributed_schedule_pool_size, 16) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_remote_read_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_remote_write_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, async_insert_threads, 16) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_replicated_fetches_network_bandwidth_for_server, 0) \
MAKE_DEPRECATED_BY_SERVER_CONFIG(M, UInt64, max_replicated_sends_network_bandwidth_for_server, 0) \
/* ---- */ \
Expand Down
76 changes: 64 additions & 12 deletions src/Interpreters/AsynchronousInsertQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,10 @@ void AsynchronousInsertQueue::InsertData::Entry::finish(std::exception_ptr excep
}
}

AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_)
AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_, bool flush_on_shutdown_)
: WithContext(context_)
, pool_size(pool_size_)
, flush_on_shutdown(flush_on_shutdown_)
, queue_shards(pool_size)
, pool(CurrentMetrics::AsynchronousInsertThreads, CurrentMetrics::AsynchronousInsertThreadsActive, pool_size)
{
Expand All @@ -164,8 +165,6 @@ AsynchronousInsertQueue::AsynchronousInsertQueue(ContextPtr context_, size_t poo

AsynchronousInsertQueue::~AsynchronousInsertQueue()
{
/// TODO: add a setting for graceful shutdown.

LOG_TRACE(log, "Shutting down the asynchronous insertion queue");
shutdown = true;

Expand All @@ -177,17 +176,18 @@ AsynchronousInsertQueue::~AsynchronousInsertQueue()
assert(dump_by_first_update_threads[i].joinable());
dump_by_first_update_threads[i].join();

if (flush_on_shutdown)
{
for (auto & [_, elem] : shard.queue)
scheduleDataProcessingJob(elem.key, std::move(elem.data), getContext());
}
else
{
std::lock_guard lock(shard.mutex);

for (auto & [_, elem] : shard.queue)
{
for (const auto & entry : elem.data->entries)
{
entry->finish(std::make_exception_ptr(Exception(
ErrorCodes::TIMEOUT_EXCEEDED, "Wait for async insert timeout exceeded)")));
}
}
}
}

Expand Down Expand Up @@ -232,7 +232,10 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
/// to avoid buffering of huge amount of data in memory.

auto read_buf = getReadBufferFromASTInsertQuery(query);
LimitReadBuffer limit_buf(*read_buf, settings.async_insert_max_data_size, /* throw_exception */ false, /* exact_limit */ {});

LimitReadBuffer limit_buf(
*read_buf, settings.async_insert_max_data_size,
/*throw_exception=*/ false, /*exact_limit=*/ {});

WriteBufferFromString write_buf(bytes);
copyData(limit_buf, write_buf);
Expand Down Expand Up @@ -284,18 +287,19 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)

assert(data);
data->size_in_bytes += entry_data_size;
++data->query_number;
data->entries.emplace_back(entry);
insert_future = entry->getFuture();

LOG_TRACE(log, "Have {} pending inserts with total {} bytes of data for query '{}'",
data->entries.size(), data->size_in_bytes, key.query_str);

bool has_enough_bytes = data->size_in_bytes >= key.settings.async_insert_max_data_size;
bool has_enough_queries = data->entries.size() >= key.settings.async_insert_max_query_number && key.settings.async_insert_deduplicate;

/// Here we check whether we hit the limit on maximum data size in the buffer.
/// And use setting from query context.
/// It works, because queries with the same set of settings are already grouped together.
if (data->size_in_bytes >= key.settings.async_insert_max_data_size
|| (data->query_number >= key.settings.async_insert_max_query_number && key.settings.async_insert_deduplicate))
if (!flush_stopped && (has_enough_bytes || has_enough_queries))
{
data_to_process = std::move(data);
shard.iterators.erase(it);
Expand All @@ -319,6 +323,51 @@ AsynchronousInsertQueue::push(ASTPtr query, ContextPtr query_context)
};
}

void AsynchronousInsertQueue::flushAll()
{
std::lock_guard flush_lock(flush_mutex);

LOG_DEBUG(log, "Requested to flush asynchronous insert queue");

/// Disable background flushes to avoid adding new elements to the queue.
flush_stopped = true;
std::vector<Queue> queues_to_flush(pool_size);

for (size_t i = 0; i < pool_size; ++i)
{
std::lock_guard lock(queue_shards[i].mutex);
queues_to_flush[i] = std::move(queue_shards[i].queue);
queue_shards[i].iterators.clear();
}

size_t total_queries = 0;
size_t total_bytes = 0;
size_t total_entries = 0;

for (auto & queue : queues_to_flush)
{
total_queries += queue.size();
for (auto & [_, entry] : queue)
{
total_bytes += entry.data->size_in_bytes;
total_entries += entry.data->entries.size();
scheduleDataProcessingJob(entry.key, std::move(entry.data), getContext());
}
}

/// Note that jobs scheduled before the call of 'flushAll' are not counted here.
LOG_DEBUG(log,
"Will wait for finishing of {} flushing jobs (about {} inserts, {} bytes, {} distinct queries)",
pool.active(), total_entries, total_bytes, total_queries);

/// Wait until all jobs are finished. That includes also jobs
/// that were scheduled before the call of 'flushAll'.
pool.wait();

LOG_DEBUG(log, "Finished flushing of asynchronous insert queue");
flush_stopped = false;
}

void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
{
auto & shard = queue_shards[shard_num];
Expand All @@ -344,6 +393,9 @@ void AsynchronousInsertQueue::processBatchDeadlines(size_t shard_num)
if (shutdown)
return;

if (flush_stopped)
continue;

const auto now = std::chrono::steady_clock::now();

while (true)
Expand Down
12 changes: 9 additions & 3 deletions src/Interpreters/AsynchronousInsertQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ class AsynchronousInsertQueue : public WithContext
public:
using Milliseconds = std::chrono::milliseconds;

AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_);
AsynchronousInsertQueue(ContextPtr context_, size_t pool_size_, bool flush_on_shutdown_);
~AsynchronousInsertQueue();

struct PushResult
Expand All @@ -40,6 +40,8 @@ class AsynchronousInsertQueue : public WithContext
std::unique_ptr<ReadBuffer> insert_data_buffer;
};

/// Force flush the whole queue.
void flushAll();
PushResult push(ASTPtr query, ContextPtr query_context);
size_t getPoolSize() const { return pool_size; }

Expand Down Expand Up @@ -100,9 +102,7 @@ class AsynchronousInsertQueue : public WithContext
using EntryPtr = std::shared_ptr<Entry>;

std::list<EntryPtr> entries;

size_t size_in_bytes = 0;
size_t query_number = 0;
};

using InsertDataPtr = std::unique_ptr<InsertData>;
Expand Down Expand Up @@ -130,6 +130,8 @@ class AsynchronousInsertQueue : public WithContext
};

const size_t pool_size;
const bool flush_on_shutdown;

std::vector<QueueShard> queue_shards;

/// Logic and events behind queue are as follows:
Expand All @@ -141,6 +143,10 @@ class AsynchronousInsertQueue : public WithContext
/// (async_insert_max_data_size setting). If so, then again we dump the data.

std::atomic<bool> shutdown{false};
std::atomic<bool> flush_stopped{false};

/// A mutex that prevents concurrent forced flushes of queue.
mutable std::mutex flush_mutex;

/// Dump the data only inside this pool.
ThreadPool pool;
Expand Down
17 changes: 17 additions & 0 deletions src/Interpreters/InterpreterSystemQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
#include <Interpreters/AsynchronousInsertLog.h>
#include <Interpreters/JIT/CompiledExpressionCache.h>
#include <Interpreters/TransactionLog.h>
#include <Interpreters/AsynchronousInsertQueue.h>
#include <BridgeHelper/CatBoostLibraryBridgeHelper.h>
#include <Access/AccessControl.h>
#include <Access/ContextAccess.h>
Expand Down Expand Up @@ -555,6 +556,17 @@ BlockIO InterpreterSystemQuery::execute()
);
break;
}
case Type::FLUSH_ASYNC_INSERT_QUEUE:
{
getContext()->checkAccess(AccessType::SYSTEM_FLUSH_ASYNC_INSERT_QUEUE);
auto * queue = getContext()->getAsynchronousInsertQueue();
if (!queue)
throw Exception(ErrorCodes::BAD_ARGUMENTS,
"Cannot flush asynchronous insert queue because it is not initialized");

queue->flushAll();
break;
}
case Type::STOP_LISTEN_QUERIES:
case Type::START_LISTEN_QUERIES:
throw Exception(ErrorCodes::NOT_IMPLEMENTED, "{} is not supported yet", query.type);
Expand Down Expand Up @@ -1149,6 +1161,11 @@ AccessRightsElements InterpreterSystemQuery::getRequiredAccessForDDLOnCluster()
required_access.emplace_back(AccessType::SYSTEM_FLUSH_LOGS);
break;
}
case Type::FLUSH_ASYNC_INSERT_QUEUE:
{
required_access.emplace_back(AccessType::SYSTEM_FLUSH_ASYNC_INSERT_QUEUE);
break;
}
case Type::RESTART_DISK:
{
required_access.emplace_back(AccessType::SYSTEM_RESTART_DISK);
Expand Down
1 change: 1 addition & 0 deletions src/Parsers/ASTSystemQuery.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@ class ASTSystemQuery : public IAST, public ASTQueryWithOnCluster
START_REPLICATION_QUEUES,
FLUSH_LOGS,
FLUSH_DISTRIBUTED,
FLUSH_ASYNC_INSERT_QUEUE,
STOP_DISTRIBUTED_SENDS,
START_DISTRIBUTED_SENDS,
START_THREAD_FUZZER,
Expand Down
1 change: 1 addition & 0 deletions tests/queries/0_stateless/01271_show_privileges.reference
Original file line number Diff line number Diff line change
Expand Up @@ -133,6 +133,7 @@ SYSTEM SYNC TRANSACTION LOG ['SYNC TRANSACTION LOG'] GLOBAL SYSTEM
SYSTEM SYNC FILE CACHE ['SYNC FILE CACHE'] GLOBAL SYSTEM
SYSTEM FLUSH DISTRIBUTED ['FLUSH DISTRIBUTED'] TABLE SYSTEM FLUSH
SYSTEM FLUSH LOGS ['FLUSH LOGS'] GLOBAL SYSTEM FLUSH
SYSTEM FLUSH ASYNC INSERT QUEUE ['FLUSH ASYNC INSERT QUEUE'] GLOBAL SYSTEM FLUSH
SYSTEM FLUSH [] \N SYSTEM
SYSTEM THREAD FUZZER ['SYSTEM START THREAD FUZZER','SYSTEM STOP THREAD FUZZER','START THREAD FUZZER','STOP THREAD FUZZER'] GLOBAL SYSTEM
SYSTEM UNFREEZE ['SYSTEM UNFREEZE'] GLOBAL SYSTEM
Expand Down
Loading

0 comments on commit 073dea6

Please sign in to comment.