Skip to content

Commit

Permalink
Merge pull request ClickHouse#52174 from arenadata/ADQM-988
Browse files Browse the repository at this point in the history
Added possibility to save logs on crash and options to configure logs buffer
  • Loading branch information
tavplubix committed Jul 28, 2023
2 parents 0160a95 + 112058d commit 3920060
Show file tree
Hide file tree
Showing 40 changed files with 795 additions and 179 deletions.
128 changes: 126 additions & 2 deletions docs/en/operations/server-configuration-parameters/settings.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions docs/en/operations/system-tables/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ An example:
<engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
-->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_log>
</clickhouse>
```
Expand Down
212 changes: 179 additions & 33 deletions docs/ru/operations/server-configuration-parameters/settings.md

Large diffs are not rendered by default.

4 changes: 4 additions & 0 deletions docs/ru/operations/system-tables/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ sidebar_label: "Системные таблицы"
<engine>ENGINE = MergeTree PARTITION BY toYYYYMM(event_date) ORDER BY (event_date, event_time) SETTINGS index_granularity = 1024</engine>
-->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_log>
</clickhouse>
```
Expand Down
53 changes: 53 additions & 0 deletions programs/server/config.xml
Original file line number Diff line number Diff line change
Expand Up @@ -1026,6 +1026,14 @@

<!-- Interval of flushing data. -->
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<!-- Maximal size in lines for the logs. When non-flushed logs amount reaches max_size, logs dumped to the disk. -->
<max_size_rows>1048576</max_size_rows>
<!-- Pre-allocated size in lines for the logs. -->
<reserved_size_rows>8192</reserved_size_rows>
<!-- Lines amount threshold, reaching it launches flushing logs to the disk in background. -->
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<!-- Indication whether logs should be dumped to the disk in case of a crash -->
<flush_on_crash>false</flush_on_crash>

<!-- example of using a different storage policy for a system table -->
<!-- storage_policy>local_ssd</storage_policy -->
Expand All @@ -1039,6 +1047,11 @@

<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<!-- Indication whether logs should be dumped to the disk in case of a crash -->
<flush_on_crash>false</flush_on_crash>
</trace_log>

<!-- Query thread log. Has information about all threads participated in query execution.
Expand All @@ -1048,6 +1061,10 @@
<table>query_thread_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</query_thread_log>

<!-- Query views log. Has information about all dependent views associated with a query.
Expand All @@ -1066,6 +1083,10 @@
<table>part_log</table>
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</part_log>

<!-- Uncomment to write text log into table.
Expand All @@ -1075,6 +1096,10 @@
<database>system</database>
<table>text_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
<level></level>
</text_log>
-->
Expand All @@ -1084,7 +1109,11 @@
<database>system</database>
<table>metric_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<collect_interval_milliseconds>1000</collect_interval_milliseconds>
<flush_on_crash>false</flush_on_crash>
</metric_log>

<!--
Expand All @@ -1095,6 +1124,10 @@
<database>system</database>
<table>asynchronous_metric_log</table>
<flush_interval_milliseconds>7000</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</asynchronous_metric_log>

<!--
Expand All @@ -1119,6 +1152,10 @@
<database>system</database>
<table>opentelemetry_span_log</table>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</opentelemetry_span_log>


Expand All @@ -1130,6 +1167,10 @@

<partition_by />
<flush_interval_milliseconds>1000</flush_interval_milliseconds>
<max_size_rows>1024</max_size_rows>
<reserved_size_rows>1024</reserved_size_rows>
<buffer_size_rows_flush_threshold>512</buffer_size_rows_flush_threshold>
<flush_on_crash>true</flush_on_crash>
</crash_log>

<!-- Session log. Stores user log in (successful or not) and log out events.
Expand All @@ -1142,6 +1183,10 @@
<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</session_log> -->

<!-- Profiling on Processors level. -->
Expand All @@ -1151,6 +1196,10 @@

<partition_by>toYYYYMM(event_date)</partition_by>
<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
</processors_profile_log>

<!-- Log of asynchronous inserts. It allows to check status
Expand All @@ -1161,6 +1210,10 @@
<table>asynchronous_insert_log</table>

<flush_interval_milliseconds>7500</flush_interval_milliseconds>
<max_size_rows>1048576</max_size_rows>
<reserved_size_rows>8192</reserved_size_rows>
<buffer_size_rows_flush_threshold>524288</buffer_size_rows_flush_threshold>
<flush_on_crash>false</flush_on_crash>
<partition_by>event_date</partition_by>
<ttl>event_date + INTERVAL 3 DAY</ttl>
</asynchronous_insert_log>
Expand Down
81 changes: 47 additions & 34 deletions src/Common/SystemLogBase.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,30 +31,25 @@ namespace ErrorCodes
extern const int TIMEOUT_EXCEEDED;
}

namespace
{
constexpr size_t DBMS_SYSTEM_LOG_QUEUE_SIZE = 1048576;
}

ISystemLog::~ISystemLog() = default;


template <typename LogElement>
SystemLogQueue<LogElement>::SystemLogQueue(
const String & table_name_,
size_t flush_interval_milliseconds_,
bool turn_off_logger_)
: log(&Poco::Logger::get("SystemLogQueue (" + table_name_ + ")"))
, flush_interval_milliseconds(flush_interval_milliseconds_)
SystemLogQueue<LogElement>::SystemLogQueue(const SystemLogQueueSettings & settings_)
: log(&Poco::Logger::get("SystemLogQueue (" + settings_.database + "." +settings_.table + ")"))
, settings(settings_)

{
if (turn_off_logger_)
queue.reserve(settings.reserved_size_rows);

if (settings.turn_off_logger)
log->setLevel(0);
}

static thread_local bool recursive_push_call = false;

template <typename LogElement>
void SystemLogQueue<LogElement>::push(const LogElement & element)
void SystemLogQueue<LogElement>::push(LogElement&& element)
{
/// It is possible that the method will be called recursively.
/// Better to drop these events to avoid complications.
Expand All @@ -70,17 +65,17 @@ void SystemLogQueue<LogElement>::push(const LogElement & element)
MemoryTrackerBlockerInThread temporarily_disable_memory_tracker;

/// Should not log messages under mutex.
bool queue_is_half_full = false;
bool buffer_size_rows_flush_threshold_exceeded = false;

{
std::unique_lock lock(mutex);

if (is_shutdown)
return;

if (queue.size() == DBMS_SYSTEM_LOG_QUEUE_SIZE / 2)
if (queue.size() == settings.buffer_size_rows_flush_threshold)
{
queue_is_half_full = true;
buffer_size_rows_flush_threshold_exceeded = true;

// The queue more than half full, time to flush.
// We only check for strict equality, because messages are added one
Expand All @@ -94,7 +89,7 @@ void SystemLogQueue<LogElement>::push(const LogElement & element)
flush_event.notify_all();
}

if (queue.size() >= DBMS_SYSTEM_LOG_QUEUE_SIZE)
if (queue.size() >= settings.max_size_rows)
{
// Ignore all further entries until the queue is flushed.
// Log a message about that. Don't spam it -- this might be especially
Expand All @@ -108,27 +103,28 @@ void SystemLogQueue<LogElement>::push(const LogElement & element)
// TextLog sets its logger level to 0, so this log is a noop and
// there is no recursive logging.
lock.unlock();
LOG_ERROR(log, "Queue is full for system log '{}' at {}", demangle(typeid(*this).name()), queue_front_index);
LOG_ERROR(log, "Queue is full for system log '{}' at {}. max_size_rows {}",
demangle(typeid(*this).name()),
queue_front_index,
settings.max_size_rows);
}

return;
}

queue.push_back(element);
queue.push_back(std::move(element));
}

if (queue_is_half_full)
LOG_INFO(log, "Queue is half full for system log '{}'.", demangle(typeid(*this).name()));
if (buffer_size_rows_flush_threshold_exceeded)
LOG_INFO(log, "Queue is half full for system log '{}'. buffer_size_rows_flush_threshold {}",
demangle(typeid(*this).name()), settings.buffer_size_rows_flush_threshold);
}

template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
void SystemLogQueue<LogElement>::handleCrash()
{
uint64_t this_thread_requested_offset = queue->notifyFlush(force);
if (this_thread_requested_offset == uint64_t(-1))
return;

queue->waitFlush(this_thread_requested_offset);
if (settings.notify_flush_on_crash)
notifyFlush(/* force */ true);
}

template <typename LogElement>
Expand Down Expand Up @@ -185,11 +181,13 @@ void SystemLogQueue<LogElement>::confirm(uint64_t to_flush_end)
}

template <typename LogElement>
typename SystemLogQueue<LogElement>::Index SystemLogQueue<LogElement>::pop(std::vector<LogElement>& output, bool& should_prepare_tables_anyway, bool& exit_this_thread)
typename SystemLogQueue<LogElement>::Index SystemLogQueue<LogElement>::pop(std::vector<LogElement> & output,
bool & should_prepare_tables_anyway,
bool & exit_this_thread)
{
std::unique_lock lock(mutex);
flush_event.wait_for(lock,
std::chrono::milliseconds(flush_interval_milliseconds),
std::chrono::milliseconds(settings.flush_interval_milliseconds),
[&] ()
{
return requested_flush_up_to > flushed_up_to || is_shutdown || is_force_prepare_tables;
Expand Down Expand Up @@ -219,11 +217,26 @@ void SystemLogQueue<LogElement>::shutdown()

template <typename LogElement>
SystemLogBase<LogElement>::SystemLogBase(
const String& table_name_,
size_t flush_interval_milliseconds_,
const SystemLogQueueSettings & settings_,
std::shared_ptr<SystemLogQueue<LogElement>> queue_)
: queue(queue_ ? queue_ : std::make_shared<SystemLogQueue<LogElement>>(table_name_, flush_interval_milliseconds_))
: queue(queue_ ? queue_ : std::make_shared<SystemLogQueue<LogElement>>(settings_))
{
}

template <typename LogElement>
void SystemLogBase<LogElement>::flush(bool force)
{
uint64_t this_thread_requested_offset = queue->notifyFlush(force);
if (this_thread_requested_offset == uint64_t(-1))
return;

queue->waitFlush(this_thread_requested_offset);
}

template <typename LogElement>
void SystemLogBase<LogElement>::handleCrash()
{
queue->handleCrash();
}

template <typename LogElement>
Expand All @@ -234,9 +247,9 @@ void SystemLogBase<LogElement>::startup()
}

template <typename LogElement>
void SystemLogBase<LogElement>::add(const LogElement & element)
void SystemLogBase<LogElement>::add(LogElement element)
{
queue->push(element);
queue->push(std::move(element));
}

template <typename LogElement>
Expand Down
Loading

0 comments on commit 3920060

Please sign in to comment.