Skip to content

Commit

Permalink
PXC-4173: PXC node stalls with parallel replication workers executing…
Browse files Browse the repository at this point in the history
… DDLs via async node

https://perconadev.atlassian.net/browse/PXC-4173

Problem
=======
PXC replica could enter into a deadlock with multi threaded replication when
--replica-preserve-commit-order is enabled.

The deadlock is more likely to occur on a replica with
--replica-preserve-commit-order enabled when the source server's workload
consists of concurrent executions of DMLs and DDLs. In such a scenario, the
replica shall try to commit them in the same order as that of the source which
will conflict with galera's ordering resulting in a deadlock.

MySQL ordering: Commits are serialized as per their commit order on source server.
Galera ordering: Commits are serialized in a FIFO manner.

Scenario
========
Let's assume we have the following two transactions in the relay log with same
last_committed.

T1: DML, seqno=10
T2: DDL, seqno=11

+-----------+--------------+-------------------------------------------------------+
| Timestamp |    Thread    |                        Activity                       |
+-----------+--------------+-------------------------------------------------------+
|     t0    | Co-ordinator | Assigns T1 to Applier-1                               |
+-----------+--------------+-------------------------------------------------------+
|     t1    | Co-ordinator | Assigns T2 to Applier-2                               |
+-----------+--------------+-------------------------------------------------------+
|     t2    |   Applier-1  | DML starts with seqno = 10                            |
+-----------+--------------+-------------------------------------------------------+
|     t3    |   Applier-2  | DDL starts with seqno = 11                            |
+-----------+--------------+-------------------------------------------------------+
|     t4    |   Applier-2  | DDL calls TOI_begin(), acquires commit monitor        |
+-----------+--------------+-------------------------------------------------------+
|     t5    |   Applier-2  | DDL executes                                          |
+-----------+--------------+-------------------------------------------------------+
|     t5    |   Applier-1  | DML executes                                          |
+-----------+--------------+-------------------------------------------------------+
|     t6    |   Applier-2  | DDL reaches commit_order_manager, finds that          |
|           |              | it needs to wait until Applier-1 is committed         |
+-----------+--------------+-------------------------------------------------------+
|     t7    |   Applier-1  | DML reaches commit, calls commit_order_enter_local()  |
|           |              | and will wait for Applier-2 to release commit monitor |
+-----------+--------------+-------------------------------------------------------+

In the end it will result in
Applier-1: DML executing Xid_log_event::commit waiting for DDL to release commit order
Applier-2: DDL executing ALTER TABLE waiting for applier-1 to commit first

Solution
========
This commit introduces "Async Monitor" at server layer to ensure that the
ordering in galera is done in the same order in the relay log.

Async Monitor is similar to Commit Monitor used in galera which internally
keeps track of the last_left seqno.

If a transaction with seqno > last_left, then such transactions shall wait
until the condition `seqno = last_left + 1` is met, meaning it is its turn to
enter the Commit Monitor in galera.  This ensures that transactions are
registered in galera is the same sequence as on the source.

Lifetime of the Async Monitor:
Async Monitor is created on START REPLICA and is destroyed on STOP REPLICA, and
whenever applier thread exits. Before it is destroyed, the value of the
last_left is stored in the Relay_log_info object and the value is restored on
subsequent START SLAVE.
  • Loading branch information
venkatesh-prasad-v committed Sep 19, 2024
1 parent 9a4609a commit 25c09e8
Show file tree
Hide file tree
Showing 11 changed files with 241 additions and 0 deletions.
1 change: 1 addition & 0 deletions sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1148,6 +1148,7 @@ SET (RPL_REPLICA_SRCS
rpl_trx_boundary_parser.cc
udf_service_impl.cc
udf_service_util.cc
wsrep_async_monitor.cc
)
ADD_STATIC_LIBRARY(rpl_replica ${RPL_REPLICA_SRCS}
DEPENDENCIES GenError
Expand Down
2 changes: 2 additions & 0 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8942,12 +8942,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
trans_commit_stmt()) the following call to my_error() will allow
overwriting the error */
my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
thd_leave_apply_monitor(thd);
return RESULT_ABORTED;
}

int rc = ordered_commit(thd, all, skip_commit);

if (run_wsrep_hooks) {
thd_leave_apply_monitor(thd);
wsrep_after_commit(thd, all);
}

Expand Down
17 changes: 17 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3222,6 +3222,23 @@ int Log_event::apply_event(Relay_log_info *rli) {
if (error) return -1;
}

#ifdef WITH_WSREP
// Ignore fake rotate events generated by the source server when
// the receiver thread reconnects.
bool real_event = server_id && !is_artificial_event();
if (WSREP_ON && (get_type_code() == binary_log::ROTATE_EVENT) &&
real_event) {
/*
If we are here, it means we received a real rotate event from source.
We need to drain/reset the async monitor to be able to the
"reset" of sequence numbers on source.
*/
Wsrep_async_monitor *async_monitor=rli->get_wsrep_async_monitor();
if (async_monitor) {
async_monitor->reset(0);
}
}
#endif /* WITH_WSREP */
#ifndef NDEBUG
/* all Workers are idle as done through wait_for_workers_to_finish */
for (uint k = 0; k < rli->curr_group_da.size(); k++) {
Expand Down
47 changes: 47 additions & 0 deletions sql/rpl_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,10 @@
#endif
#include "scope_guard.h"

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

struct mysql_cond_t;
struct mysql_mutex_t;

Expand Down Expand Up @@ -5083,6 +5087,25 @@ static int exec_relay_log_event(THD *thd, Relay_log_info *rli,
WSREP_INFO("Wsrep before statement error");
return 1;
}

if (WSREP_ON && (ev->get_type_code() == binary_log::GTID_LOG_EVENT ||
ev->get_type_code() == binary_log::ANONYMOUS_GTID_LOG_EVENT)) {

static bool async_monitor_init_done = false;

// Set the last_entered_ if we are starting applier thread post server
// shutdown.
if (!async_monitor_init_done) {
Wsrep_async_monitor *async_monitor=rli->get_wsrep_async_monitor();
if (async_monitor && async_monitor->last_left() == 0) {
const Gtid_log_event* gtid_ev = static_cast<Gtid_log_event *>(ev);
unsigned long long seqno = gtid_ev->sequence_number;
assert (seqno != 0);
async_monitor->reset(seqno - 1);
async_monitor_init_done = true;
}
}
}
#endif /* WITH_WSREP */

enum enum_slave_apply_event_and_update_pos_retval exec_res;
Expand Down Expand Up @@ -7225,6 +7248,9 @@ extern "C" void *handle_slave_sql(void *arg) {
bool mts_inited = false;
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Commit_order_manager *commit_order_mngr = nullptr;
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor = nullptr;
#endif /* WITH_WSREP */
Rpl_applier_reader applier_reader(rli);
Relay_log_info::enum_priv_checks_status priv_check_status =
Relay_log_info::enum_priv_checks_status::SUCCESS;
Expand Down Expand Up @@ -7270,6 +7296,19 @@ wsrep_restart_point :

rli->set_commit_order_manager(commit_order_mngr);

#ifdef WITH_WSREP
// Restore the last executed seqno
if (WSREP_ON && opt_replica_preserve_commit_order
&& !rli->is_parallel_exec()
&& rli->opt_replica_parallel_workers > 1) {
wsrep_async_monitor = new Wsrep_async_monitor();
rli->set_wsrep_async_monitor(wsrep_async_monitor);
if (rli->wsrep_async_monitor_last_left != 0) {
rli->restore_last_position_for_recovery();
}
}
#endif /* WITH_WSREP */

if (channel_map.is_group_replication_channel_name(rli->get_channel())) {
if (channel_map.is_group_replication_channel_name(rli->get_channel(),
true)) {
Expand Down Expand Up @@ -7703,6 +7742,14 @@ wsrep_restart_point :
delete commit_order_mngr;
}

#ifdef WITH_WSREP
if (wsrep_async_monitor) {
rli->backup_last_position_for_recovery();
rli->set_wsrep_async_monitor(nullptr);
delete wsrep_async_monitor;
}
#endif /* WITH_WSREP */

mysql_mutex_unlock(&rli->info_thd_lock);
set_thd_in_use_temporary_tables(
rli); // (re)set info_thd in use for saved temp tables
Expand Down
32 changes: 32 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@
#include "sql/sql_class.h" // THD
#include "sql/system_variables.h"
#include "sql/table.h"
#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

class Commit_order_manager;
class Master_info;
Expand Down Expand Up @@ -1794,6 +1797,21 @@ class Relay_log_info : public Rpl_info {
commit_order_mngr = mngr;
}

#ifdef WITH_WSREP
Wsrep_async_monitor* get_wsrep_async_monitor() {
return wsrep_async_monitor;
}
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
void backup_last_position_for_recovery() {
wsrep_async_monitor_last_left = wsrep_async_monitor->last_left();
}
void restore_last_position_for_recovery() {
wsrep_async_monitor->reset(wsrep_async_monitor_last_left);
}
#endif /* WITH_WSREP */

/*
Following set function is required to initialize the 'until_option' during
MTS relay log recovery process.
Expand Down Expand Up @@ -1840,6 +1858,14 @@ class Relay_log_info : public Rpl_info {
rpl_filter = channel_filter;
}

#ifdef WITH_WSREP
/*
Stores Wsrep_async_monitor's last_left seqno.
Will be used while resetting the last_left seqno during start replica.
*/
unsigned long long wsrep_async_monitor_last_left;
#endif /* WITH_WSREP */

protected:
Format_description_log_event *rli_description_event;

Expand All @@ -1851,6 +1877,12 @@ class Relay_log_info : public Rpl_info {
*/
Commit_order_manager *commit_order_mngr;

#ifdef WITH_WSREP
/*
Wsrep_async_monitor orders DMLs and DDls in galera.
*/
Wsrep_async_monitor *wsrep_async_monitor;
#endif /* WITH_WSREP */
/**
Delay slave SQL thread by this amount of seconds.
The delay is applied per transaction and based on the immediate master's
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -327,6 +327,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) {
this->set_require_row_format(rli->is_row_format_required());

set_commit_order_manager(c_rli->get_commit_order_manager());
#ifdef WITH_WSREP
set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor());
#endif /* WITH_WSREP */

if (rli_init_info(false) ||
DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
Expand Down
38 changes: 38 additions & 0 deletions sql/wsrep_async_monitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#include <algorithm>
#include <cassert>

void Wsrep_async_monitor::enter(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Wait for its turn before entering
m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });
m_last_entered = (seqno > m_last_entered) ? seqno : m_last_entered;
fprintf(stderr, "Entered the monitor with seqno: %llu\n", seqno);
}

void Wsrep_async_monitor::leave(seqno_t seqno) {
// Wait for its turn before leaving
std::unique_lock<std::mutex> lock(m_mutex);
m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });
m_last_left = seqno;

fprintf(stderr, "Left the monitor seqno: %llu\n", seqno);
// Notify all waiting threads
m_cond.notify_all();
}

void Wsrep_async_monitor::reset(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// We can reset only if the last entered and last left
// are same, meaning that there is no one who is inside
// the monitor
assert(m_last_entered == m_last_left);
m_last_entered = seqno;
m_last_left = seqno;
m_cond.notify_all();
}

#endif /* WITH_WSREP */
39 changes: 39 additions & 0 deletions sql/wsrep_async_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
#ifndef WSREP_ASYNC_MONITOR_H
#define WSREP_ASYNC_MONITOR_H

#ifdef WITH_WSREP
#include <iostream>
#include <condition_variable>
#include <mutex>

class Wsrep_async_monitor {
public:
using seqno_t = unsigned long long;

Wsrep_async_monitor() : m_last_entered(0), m_last_left(0) {}

~Wsrep_async_monitor() {
std::unique_lock<std::mutex> lock(m_mutex);
m_last_entered = 0;
m_last_left = 0;
m_cond.notify_all();
}

void enter(seqno_t seqno);
void leave(seqno_t seqno);
void reset(seqno_t seqno);

seqno_t last_entered() const { return m_last_entered; }
seqno_t last_left() const { return m_last_left; }

private:
std::mutex m_mutex;
std::condition_variable m_cond;

// TODO: Evaluate if we really need m_last_entered
seqno_t m_last_entered;
seqno_t m_last_left;
};

#endif /* WITH_WSREP */
#endif /* WSREP_ASYNC_MONITOR_H */
55 changes: 55 additions & 0 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2870,6 +2870,57 @@ static void wsrep_RSU_end(THD *thd) {
thd->disable_binlog_guard.reset();
}

bool thd_enter_apply_monitor(THD* thd) {

if (thd->killed != THD::NOT_KILLED) {
return true;
}
if (thd->slave_thread) {
Slave_worker * sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
// Set current_mutex and current_cond here
// thd->enter_cond();
wsrep_async_monitor->enter(seqno);
} else {
// Error if we have more than one parallel worker
if (opt_replica_preserve_commit_order && opt_mts_replica_parallel_workers > 1) {
WSREP_ERROR("Async replica thread %u is not monitored by wsrep async monitor",
thd->thread_id());
exit(1);
}
}
}
return false;
}

void thd_leave_apply_monitor(THD* thd) {
if (thd->slave_thread) {
Slave_worker * sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->leave(seqno);
{
if (thd->wsrep_skip_wsrep_hton == true)
fprintf(stderr, "DDL: thd_leave_apply_monitor: seqno=%llu\n", seqno);
else
fprintf(stderr, "DML: thd_leave_apply_monitor: seqno=%llu\n", seqno);
}
} else {
// Error if we have more than one parallel worker
if (opt_replica_preserve_commit_order && opt_mts_replica_parallel_workers > 1) {
WSREP_ERROR("Async replica thread %u is not monitored by wsrep async monitor",
thd->thread_id());
exit(1);
}
}
}
}

int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
const Table_ref *table_list,
dd::Tablespace_table_ref_vec *trefs,
Expand Down Expand Up @@ -2966,6 +3017,8 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
thd->variables.auto_increment_increment = 1;
}

thd_enter_apply_monitor(thd);

DEBUG_SYNC(thd, "wsrep_to_isolation_begin_before_replication");

if (thd->variables.wsrep_on && wsrep_thd_is_local(thd)) {
Expand Down Expand Up @@ -3039,6 +3092,8 @@ void wsrep_to_isolation_end(THD *thd) {
}
if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);

thd_leave_apply_monitor(thd);

DEBUG_SYNC(thd, "wsrep_to_isolation_end_before_wsrep_skip_wsrep_hton");
mysql_mutex_lock(&thd->LOCK_thd_data);

Expand Down
3 changes: 3 additions & 0 deletions sql/wsrep_mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ extern PSI_thread_key key_THREAD_wsrep_post_rollbacker;
extern PSI_file_key key_file_wsrep_gra_log;
#endif /* HAVE_PSI_INTERFACE */

bool thd_enter_apply_monitor(THD* thd);
void thd_leave_apply_monitor(THD* thd);

class Alter_info;
int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
const Table_ref *table_list,
Expand Down
4 changes: 4 additions & 0 deletions sql/wsrep_trans_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ static inline int wsrep_before_commit(THD *thd, bool all) {
WSREP_DEBUG("wsrep_before_commit: %d, %lld", wsrep_is_real(thd, all),
(long long)wsrep_thd_trx_seqno(thd));
int ret = 0;

/* Enter the apply monitor */
thd_enter_apply_monitor(thd);

assert(wsrep_run_commit_hook(thd, all));
if ((ret = thd->wsrep_cs().before_commit()) == 0) {
assert(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
Expand Down

0 comments on commit 25c09e8

Please sign in to comment.