Skip to content

Commit

Permalink
PXC-4173: PXC node stalls with parallel replication workers executing…
Browse files Browse the repository at this point in the history
… DDLs via async node

https://perconadev.atlassian.net/browse/PXC-4173

Problem
=======
PXC replica could enter into a deadlock with multi threaded replication when
--replica-preserve-commit-order is enabled.

The deadlock is more likely to occur on a replica with
--replica-preserve-commit-order enabled when the source server's workload
consists of concurrent executions of DMLs and DDLs. In such a scenario, the
replica shall try to commit them in the same order as that of the source which
will conflict with galera's ordering resulting in a deadlock.

MySQL ordering: Commits are serialized as per their commit order on source server.
Galera ordering: Commits are serialized in a FIFO manner.

Scenario
========
Let's assume we have the following two transactions in the relay log with same
last_committed.

T1: DML, seqno=10
T2: DDL, seqno=11

+-----------+--------------+-------------------------------------------------------+
| Timestamp |    Thread    |                        Activity                       |
+-----------+--------------+-------------------------------------------------------+
|     t0    | Co-ordinator | Assigns T1 to Applier-1                               |
+-----------+--------------+-------------------------------------------------------+
|     t1    | Co-ordinator | Assigns T2 to Applier-2                               |
+-----------+--------------+-------------------------------------------------------+
|     t2    |   Applier-1  | DML starts with seqno = 10                            |
+-----------+--------------+-------------------------------------------------------+
|     t3    |   Applier-2  | DDL starts with seqno = 11                            |
+-----------+--------------+-------------------------------------------------------+
|     t4    |   Applier-2  | DDL calls TOI_begin(), acquires commit monitor        |
+-----------+--------------+-------------------------------------------------------+
|     t5    |   Applier-2  | DDL executes                                          |
+-----------+--------------+-------------------------------------------------------+
|     t5    |   Applier-1  | DML executes                                          |
+-----------+--------------+-------------------------------------------------------+
|     t6    |   Applier-2  | DDL reaches commit_order_manager, finds that          |
|           |              | it needs to wait until Applier-1 is committed         |
+-----------+--------------+-------------------------------------------------------+
|     t7    |   Applier-1  | DML reaches commit, calls commit_order_enter_local()  |
|           |              | and will wait for Applier-2 to release commit monitor |
+-----------+--------------+-------------------------------------------------------+

In the end it will result in
Applier-1: DML executing Xid_log_event::commit waiting for DDL to release commit order
Applier-2: DDL executing ALTER TABLE waiting for applier-1 to commit first

Solution
========
This commit introduces "Async Monitor" at server layer to ensure that the
ordering in galera is done in the same order in the relay log.

Async Monitor is similar to Commit Monitor used in galera, but differs in how
seqnois are tracked within the monitor.

The Async Monitor has the following methods.
1. schedule() - Schedules the transactions in as per their order in the
                the relay log.
2. enter() - Ensures that threads are serialized as per their order.
3. leave() - Ensures that threads are removed from the monitor in the serialized order.
4. skip()  - Provides some flexibilty to the monitor to skip transactions. This
             will be required for handling skipped and filtered transactions.

Lifetime of the Async Monitor:
Async Monitor is created on START REPLICA and is destroyed on STOP REPLICA, or
whenever applier thread exits.
  • Loading branch information
venkatesh-prasad-v committed Nov 18, 2024
1 parent bbef137 commit 0972579
Show file tree
Hide file tree
Showing 15 changed files with 514 additions and 0 deletions.
1 change: 1 addition & 0 deletions sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ SET (RPL_REPLICA_SRCS
rpl_trx_boundary_parser.cc
udf_service_impl.cc
udf_service_util.cc
wsrep_async_monitor.cc
)
ADD_LIBRARY(rpl_replica STATIC ${RPL_REPLICA_SRCS})
ADD_DEPENDENCIES(rpl_replica GenError)
Expand Down
2 changes: 2 additions & 0 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8941,12 +8941,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
trans_commit_stmt()) the following call to my_error() will allow
overwriting the error */
my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
thd_leave_async_monitor(thd);
return RESULT_ABORTED;
}

int rc = ordered_commit(thd, all, skip_commit);

if (run_wsrep_hooks) {
thd_leave_async_monitor(thd);
wsrep_after_commit(thd, all);
}

Expand Down
23 changes: 23 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2704,6 +2704,14 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) {

Gtid_log_event *gtid_log_ev = static_cast<Gtid_log_event *>(this);
rli->started_processing(gtid_log_ev);
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = gtid_log_ev->sequence_number;
wsrep_async_monitor->schedule(seqno);
fprintf(stderr, "Scheduled the seqno: %llu in async monitor\n", seqno);
}
#endif /* WITH_WSREP */
}

if (schedule_next_event(this, rli)) {
Expand Down Expand Up @@ -11207,6 +11215,21 @@ static enum_tbl_map_status check_table_map(Relay_log_info const *rli,
}
}

#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(rli->info_thd) && res == FILTERED_OUT) {
Slave_worker *sw = static_cast<Slave_worker *>(const_cast<Relay_log_info *>(rli));
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Filtered DML seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */

DBUG_PRINT("debug", ("check of table map ended up with: %u", res));

return res;
Expand Down
16 changes: 16 additions & 0 deletions sql/rpl_gtid_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
#include "sql/mysqld.h" // connection_events_loop_aborted
#include "sql/rpl_gtid.h"
#include "sql/rpl_rli.h" // Relay_log_info
#ifdef WITH_WSREP
#include "sql/rpl_rli_pdb.h" // Slave_worker
#endif /* WITH_WSREP */
#include "sql/sql_class.h" // THD
#include "sql/sql_lex.h"
#include "sql/sql_parse.h" // stmt_causes_implicit_commit
Expand Down Expand Up @@ -354,6 +357,19 @@ static inline void skip_statement(THD *thd) {
to notify that its session ticket was consumed.
*/
Commit_stage_manager::get_instance().finish_session_ticket(thd);
#ifdef WITH_WSREP
/* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */
if (thd->slave_thread) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Skipped the seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */

#ifndef NDEBUG
const Gtid_set *executed_gtids = gtid_state->get_executed_gtids();
Expand Down
24 changes: 24 additions & 0 deletions sql/rpl_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@
#endif
#include "scope_guard.h"

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

struct mysql_cond_t;
struct mysql_mutex_t;

Expand Down Expand Up @@ -7224,6 +7228,9 @@ extern "C" void *handle_slave_sql(void *arg) {
bool mts_inited = false;
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Commit_order_manager *commit_order_mngr = nullptr;
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor = nullptr;
#endif /* WITH_WSREP */
Rpl_applier_reader applier_reader(rli);
Relay_log_info::enum_priv_checks_status priv_check_status =
Relay_log_info::enum_priv_checks_status::SUCCESS;
Expand Down Expand Up @@ -7269,6 +7276,16 @@ wsrep_restart_point :

rli->set_commit_order_manager(commit_order_mngr);

#ifdef WITH_WSREP
// Restore the last executed seqno
if (WSREP_ON && opt_replica_preserve_commit_order
&& !rli->is_parallel_exec()
&& rli->opt_replica_parallel_workers > 1) {
wsrep_async_monitor = new Wsrep_async_monitor();
rli->set_wsrep_async_monitor(wsrep_async_monitor);
}
#endif /* WITH_WSREP */

if (channel_map.is_group_replication_channel_name(rli->get_channel())) {
if (channel_map.is_group_replication_channel_name(rli->get_channel(),
true)) {
Expand Down Expand Up @@ -7702,6 +7719,13 @@ wsrep_restart_point :
delete commit_order_mngr;
}

#ifdef WITH_WSREP
if (wsrep_async_monitor) {
rli->set_wsrep_async_monitor(nullptr);
delete wsrep_async_monitor;
}
#endif /* WITH_WSREP */

mysql_mutex_unlock(&rli->info_thd_lock);
set_thd_in_use_temporary_tables(
rli); // (re)set info_thd in use for saved temp tables
Expand Down
18 changes: 18 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
#include "sql/sql_class.h" // THD
#include "sql/system_variables.h"
#include "sql/table.h"
#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

class Commit_order_manager;
class Master_info;
Expand Down Expand Up @@ -1793,6 +1796,15 @@ class Relay_log_info : public Rpl_info {
commit_order_mngr = mngr;
}

#ifdef WITH_WSREP
Wsrep_async_monitor* get_wsrep_async_monitor() {
return wsrep_async_monitor;
}
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
#endif /* WITH_WSREP */

/*
Following set function is required to initialize the 'until_option' during
MTS relay log recovery process.
Expand Down Expand Up @@ -1850,6 +1862,12 @@ class Relay_log_info : public Rpl_info {
*/
Commit_order_manager *commit_order_mngr;

#ifdef WITH_WSREP
/*
Wsrep_async_monitor orders DMLs and DDls in galera.
*/
Wsrep_async_monitor *wsrep_async_monitor;
#endif /* WITH_WSREP */
/**
Delay slave SQL thread by this amount of seconds.
The delay is applied per transaction and based on the immediate master's
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) {
this->set_require_row_format(rli->is_row_format_required());

set_commit_order_manager(c_rli->get_commit_order_manager());
#ifdef WITH_WSREP
set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor());
#endif /* WITH_WSREP */

if (rli_init_info(false) ||
DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
Expand Down
15 changes: 15 additions & 0 deletions sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,21 @@ inline bool check_database_filters(THD *thd, const char *db,
break;
}
}

#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(thd) && !db_ok) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Filtered DDL seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */
return db_ok;
}

Expand Down
94 changes: 94 additions & 0 deletions sql/wsrep_async_monitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* Copyright (c) 2021 Percona LLC and/or its affiliates. All rights reserved.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#include <algorithm>
#include <cassert>

// Method for main thread to add scheduled seqnos
void Wsrep_async_monitor::schedule(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);
scheduled_seqnos.push(seqno);
}

// Method for both DDL and DML to enter the monitor
void Wsrep_async_monitor::enter(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Wait until this transaction is at the head of the scheduled queue
m_cond.wait(lock, [this, seqno] {
// Remove skipped transactions
while (!scheduled_seqnos.empty() &&
skipped_seqnos.count(scheduled_seqnos.front()) > 0) {
scheduled_seqnos.pop();
}
return !scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno;
});
}

// Method to be called after DDL/DML processing is complete
void Wsrep_async_monitor::leave(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Check if the sequence number matches the front of the queue.
// In a correctly functioning monitor this should not happen
// as each transaction should exit in the order it was scheduled
// and processed.
if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) {
// Remove the seqno from the scheduled queue now that it has completed
scheduled_seqnos.pop();
} else {
// std::cout << "Error: Mismatch in sequence numbers. Expected "
// << (scheduled_seqnos.empty()
// ? "none"
// : std::to_string(scheduled_seqnos.front()))
// << " but got " << seqno << "." << std::endl;
assert(false && "Sequence number mismatch in leave()");
exit(1);
}

// Notify waiting threads in case the next scheduled sequence can enter
m_cond.notify_all();
}

// Method to skip a transaction that will not call enter() and exit()
void Wsrep_async_monitor::skip(unsigned long seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Check if the seqno is already marked as skipped
if (skipped_seqnos.count(seqno) > 0) {
return; // Already skipped, so do nothing
}

// Mark the seqno as skipped
skipped_seqnos.insert(seqno);

// Remove it from the scheduled queue if it is at the front
if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) {
scheduled_seqnos.pop();
}

// Notify in case other transactions are waiting to enter
m_cond.notify_all();
}

// Method to return if the monitor is empty, used by the unittests
bool Wsrep_async_monitor::is_empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return scheduled_seqnos.empty();
}
#endif /* WITH_WSREP */
54 changes: 54 additions & 0 deletions sql/wsrep_async_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifndef WSREP_ASYNC_MONITOR_H
#define WSREP_ASYNC_MONITOR_H

#ifdef WITH_WSREP
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <set>

class Wsrep_async_monitor {
public:
using seqno_t = unsigned long long;

// Method for main thread to add scheduled seqnos
void schedule(seqno_t seqno);

// Method for both DDL and DML to enter the monitor
void enter(seqno_t seqno);

// Method to be called after DDL/DML processing is complete
void leave(seqno_t seqno);

// Method to skip a transaction that will not call enter() and exit()
void skip(unsigned long seqno);

// Method to return if the monitor is empty, used by the unittests
bool is_empty();

private:
std::mutex m_mutex;
std::condition_variable m_cond;
std::set<seqno_t> skipped_seqnos; // Tracks skipped sequence numbers
std::queue<seqno_t> scheduled_seqnos; // Queue to track scheduled seqnos
};

#endif /* WITH_WSREP */
#endif /* WSREP_ASYNC_MONITOR_H */
Loading

0 comments on commit 0972579

Please sign in to comment.