Skip to content

Commit

Permalink
PXC-4173: PXC deadlock
Browse files Browse the repository at this point in the history
Initial changes with monitor
  • Loading branch information
venkatesh-prasad-v committed Jun 12, 2024
1 parent bbef137 commit 45b94f6
Show file tree
Hide file tree
Showing 11 changed files with 236 additions and 0 deletions.
1 change: 1 addition & 0 deletions sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ SET (RPL_REPLICA_SRCS
rpl_trx_boundary_parser.cc
udf_service_impl.cc
udf_service_util.cc
wsrep_async_monitor.cc
)
ADD_LIBRARY(rpl_replica STATIC ${RPL_REPLICA_SRCS})
ADD_DEPENDENCIES(rpl_replica GenError)
Expand Down
2 changes: 2 additions & 0 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8941,12 +8941,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
trans_commit_stmt()) the following call to my_error() will allow
overwriting the error */
my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
thd_leave_apply_monitor(thd);
return RESULT_ABORTED;
}

int rc = ordered_commit(thd, all, skip_commit);

if (run_wsrep_hooks) {
thd_leave_apply_monitor(thd);
wsrep_after_commit(thd, all);
}

Expand Down
17 changes: 17 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3216,6 +3216,23 @@ int Log_event::apply_event(Relay_log_info *rli) {
if (error) return -1;
}

#ifdef WITH_WSREP
// Ignore fake rotate events generated by the source server when
// the receiver thread reconnects.
bool real_event = server_id && !is_artificial_event();
if (WSREP_ON && (get_type_code() == binary_log::ROTATE_EVENT) &&
real_event) {
/*
If we are here, it means we received a real rotate event from source.
We need to drain/reset the async monitor to be able to the
"reset" of sequence numbers on source.
*/
Wsrep_async_monitor *async_monitor=rli->get_wsrep_async_monitor();
if (async_monitor) {
async_monitor->reset(0);
}
}
#endif /* WITH_WSREP */
#ifndef NDEBUG
/* all Workers are idle as done through wait_for_workers_to_finish */
for (uint k = 0; k < rli->curr_group_da.size(); k++) {
Expand Down
25 changes: 25 additions & 0 deletions sql/rpl_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@
#endif
#include "scope_guard.h"

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

struct mysql_cond_t;
struct mysql_mutex_t;

Expand Down Expand Up @@ -7224,6 +7228,9 @@ extern "C" void *handle_slave_sql(void *arg) {
bool mts_inited = false;
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Commit_order_manager *commit_order_mngr = nullptr;
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor = nullptr;
#endif /* WITH_WSREP */
Rpl_applier_reader applier_reader(rli);
Relay_log_info::enum_priv_checks_status priv_check_status =
Relay_log_info::enum_priv_checks_status::SUCCESS;
Expand Down Expand Up @@ -7269,6 +7276,16 @@ wsrep_restart_point :

rli->set_commit_order_manager(commit_order_mngr);

#ifdef WITH_WSREP
if (WSREP_ON) {
wsrep_async_monitor = new Wsrep_async_monitor();
rli->set_wsrep_async_monitor(wsrep_async_monitor);
if (rli->wsrep_async_monitor_last_left != 0) {
rli->restore_last_position_for_recovery();
}
}
#endif /* WITH_WSREP */

if (channel_map.is_group_replication_channel_name(rli->get_channel())) {
if (channel_map.is_group_replication_channel_name(rli->get_channel(),
true)) {
Expand Down Expand Up @@ -7702,6 +7719,14 @@ wsrep_restart_point :
delete commit_order_mngr;
}

#ifdef WITH_WSREP
if (wsrep_async_monitor) {
rli->backup_last_position_for_recovery();
rli->set_wsrep_async_monitor(nullptr);
delete wsrep_async_monitor;
}
#endif /* WITH_WSREP */

mysql_mutex_unlock(&rli->info_thd_lock);
set_thd_in_use_temporary_tables(
rli); // (re)set info_thd in use for saved temp tables
Expand Down
33 changes: 33 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
#include "sql/sql_class.h" // THD
#include "sql/system_variables.h"
#include "sql/table.h"
#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

class Commit_order_manager;
class Master_info;
Expand Down Expand Up @@ -1793,6 +1796,21 @@ class Relay_log_info : public Rpl_info {
commit_order_mngr = mngr;
}

#ifdef WITH_WSREP
Wsrep_async_monitor* get_wsrep_async_monitor() {
return wsrep_async_monitor;
}
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
void backup_last_position_for_recovery() {
wsrep_async_monitor_last_left = wsrep_async_monitor->last_left();
}
void restore_last_position_for_recovery() {
wsrep_async_monitor->reset(wsrep_async_monitor_last_left);
}
#endif /* WITH_WSREP */

/*
Following set function is required to initialize the 'until_option' during
MTS relay log recovery process.
Expand Down Expand Up @@ -1839,6 +1857,14 @@ class Relay_log_info : public Rpl_info {
rpl_filter = channel_filter;
}

#ifdef WITH_WSREP
/*
Stores Wsrep_async_monitor's last_left seqno.
Will be used while resetting the last_left seqno during start replica.
*/
unsigned long long wsrep_async_monitor_last_left;
#endif /* WITH_WSREP */

protected:
Format_description_log_event *rli_description_event;

Expand All @@ -1850,6 +1876,13 @@ class Relay_log_info : public Rpl_info {
*/
Commit_order_manager *commit_order_mngr;

#ifdef WITH_WSREP
/*
Wsrep_async_monitor orders commits and TOI replication made
by its workers.
*/
Wsrep_async_monitor *wsrep_async_monitor;
#endif /* WITH_WSREP */
/**
Delay slave SQL thread by this amount of seconds.
The delay is applied per transaction and based on the immediate master's
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) {
this->set_require_row_format(rli->is_row_format_required());

set_commit_order_manager(c_rli->get_commit_order_manager());
#ifdef WITH_WSREP
set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor());
#endif /* WITH_WSREP */

if (rli_init_info(false) ||
DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
Expand Down
51 changes: 51 additions & 0 deletions sql/wsrep_async_monitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#include <algorithm>
#include <cassert>

void Wsrep_async_monitor::enter(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Wait for its turn
m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });
m_last_entered = (seqno > m_last_entered) ? seqno : m_last_entered;
fprintf(stderr, "Entered the monitor with seqno: %llu\n", seqno);
}

void Wsrep_async_monitor::leave(seqno_t seqno) {
// Wait for its turn before leaving
std::unique_lock<std::mutex> lock(m_mutex);
m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });
m_last_left = seqno;

fprintf(stderr, "Left the monitor seqno: %llu\n", seqno);
// Notify all waiting threads
m_cond.notify_all();
}

void Wsrep_async_monitor::drain(seqno_t seqno) {
// Wait for its turn before resetting it
std::unique_lock<std::mutex> lock(m_mutex);

// No need to wait if the seqno has already left
if (seqno !=0 && seqno < m_last_left) {
return;
}

m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });

}

void Wsrep_async_monitor::reset(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// We can reset only if the last entered and last left
// are same, meaning that there is no one who is inside
// the monitor
assert(m_last_entered == m_last_left);
m_last_entered = seqno;
m_last_left = seqno;
m_cond.notify_all();
}

#endif /* WITH_WSREP */
42 changes: 42 additions & 0 deletions sql/wsrep_async_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#ifndef WSREP_ASYNC_MONITOR_H
#define WSREP_ASYNC_MONITOR_H

#ifdef WITH_WSREP
#include <condition_variable>
#include <mutex>

class Wsrep_async_monitor {
public:
using seqno_t = unsigned long long;

Wsrep_async_monitor() : m_last_entered(0), m_last_left(0) {}

~Wsrep_async_monitor() {
std::unique_lock<std::mutex> lock(m_mutex);
m_last_entered = 0;
m_last_left = 0;
m_cond.notify_all();
}

void enter(seqno_t seqno);
void leave(seqno_t seqno);
void drain(seqno_t seqno);
void reset(seqno_t seqno);

seqno_t last_entered() const { return m_last_entered; }
seqno_t last_left() const { return m_last_left; }

//std::mutex &mutex() { return m_mutex; }
//std::condition_variable &cond() { return m_cond; }

private:
std::mutex m_mutex;
std::condition_variable m_cond;

// TODO: Evaluate if we really need m_last_entered
seqno_t m_last_entered;
seqno_t m_last_left;
};

#endif /* WITH_WSREP */
#endif /* WSREP_ASYNC_MONITOR_H */
55 changes: 55 additions & 0 deletions sql/wsrep_mysqld.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2858,6 +2858,57 @@ static void wsrep_RSU_end(THD *thd) {
thd->variables.wsrep_on = 1;
}

bool thd_enter_apply_monitor(THD* thd) {

if (thd->killed != THD::NOT_KILLED) {
return true;
}
if (thd->slave_thread) {
Slave_worker * sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
// Set current_mutex and current_cond here
// thd->enter_cond();
wsrep_async_monitor->enter(seqno);
} else {
// Error if we have more than one parallel worker
if (opt_mts_replica_parallel_workers > 1) {
WSREP_ERROR("Async replica thread %u is not monitored by wsrep async monitor",
thd->thread_id());
exit(1);
}
}
}
return false;
}

void thd_leave_apply_monitor(THD* thd) {
if (thd->slave_thread) {
Slave_worker * sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->leave(seqno);
{
if (thd->wsrep_skip_wsrep_hton == true)
fprintf(stderr, "DDL: thd_leave_apply_monitor: seqno=%llu\n", seqno);
else
fprintf(stderr, "DML: thd_leave_apply_monitor: seqno=%llu\n", seqno);
}
} else {
// Error if we have more than one parallel worker
if (opt_mts_replica_parallel_workers > 1) {
WSREP_ERROR("Async replica thread %u is not monitored by wsrep async monitor",
thd->thread_id());
exit(1);
}
}
}
}

int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
const Table_ref *table_list,
dd::Tablespace_table_ref_vec *trefs,
Expand Down Expand Up @@ -2954,6 +3005,8 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
thd->variables.auto_increment_increment = 1;
}

thd_enter_apply_monitor(thd);

DEBUG_SYNC(thd, "wsrep_to_isolation_begin_before_replication");

if (thd->variables.wsrep_on && wsrep_thd_is_local(thd)) {
Expand Down Expand Up @@ -3027,6 +3080,8 @@ void wsrep_to_isolation_end(THD *thd) {
}
if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd);

thd_leave_apply_monitor(thd);

DEBUG_SYNC(thd, "wsrep_to_isolation_end_before_wsrep_skip_wsrep_hton");
mysql_mutex_lock(&thd->LOCK_thd_data);

Expand Down
3 changes: 3 additions & 0 deletions sql/wsrep_mysqld.h
Original file line number Diff line number Diff line change
Expand Up @@ -443,6 +443,9 @@ extern PSI_thread_key key_THREAD_wsrep_post_rollbacker;
extern PSI_file_key key_file_wsrep_gra_log;
#endif /* HAVE_PSI_INTERFACE */

bool thd_enter_apply_monitor(THD* thd);
void thd_leave_apply_monitor(THD* thd);

class Alter_info;
int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_,
const Table_ref *table_list,
Expand Down
4 changes: 4 additions & 0 deletions sql/wsrep_trans_observer.h
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,10 @@ static inline int wsrep_before_commit(THD *thd, bool all) {
WSREP_DEBUG("wsrep_before_commit: %d, %lld", wsrep_is_real(thd, all),
(long long)wsrep_thd_trx_seqno(thd));
int ret = 0;

/* Take the apply monitor */
thd_enter_apply_monitor(thd);

assert(wsrep_run_commit_hook(thd, all));
if ((ret = thd->wsrep_cs().before_commit()) == 0) {
assert(!thd->wsrep_trx().ws_meta().gtid().is_undefined());
Expand Down

0 comments on commit 45b94f6

Please sign in to comment.