Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PXC-4173: PXC node stalls with parallel replication workers executing DDLs via async node #1927

Open
wants to merge 1 commit into
base: 8.0
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions sql/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1137,6 +1137,7 @@ SET (RPL_REPLICA_SRCS
rpl_trx_boundary_parser.cc
udf_service_impl.cc
udf_service_util.cc
wsrep_async_monitor.cc
)
ADD_LIBRARY(rpl_replica STATIC ${RPL_REPLICA_SRCS})
ADD_DEPENDENCIES(rpl_replica GenError)
Expand Down
2 changes: 2 additions & 0 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8941,12 +8941,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
trans_commit_stmt()) the following call to my_error() will allow
overwriting the error */
my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
thd_leave_async_monitor(thd);
return RESULT_ABORTED;
}

int rc = ordered_commit(thd, all, skip_commit);

if (run_wsrep_hooks) {
thd_leave_async_monitor(thd);
wsrep_after_commit(thd, all);
}

Expand Down
23 changes: 23 additions & 0 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2704,6 +2704,14 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) {

Gtid_log_event *gtid_log_ev = static_cast<Gtid_log_event *>(this);
rli->started_processing(gtid_log_ev);
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = gtid_log_ev->sequence_number;
wsrep_async_monitor->schedule(seqno);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is my understandig correct, that in this point transactions appear in the order they were committed on the source node?
We create the queue of ordered transactions, and will use AsyncMonitor to enforce this order of commit in Galera, right?

fprintf(stderr, "Scheduled the seqno: %llu in async monitor\n", seqno);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fprintf
multiple occurences

}
#endif /* WITH_WSREP */
}

if (schedule_next_event(this, rli)) {
Expand Down Expand Up @@ -11207,6 +11215,21 @@ static enum_tbl_map_status check_table_map(Relay_log_info const *rli,
}
}

#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(rli->info_thd) && res == FILTERED_OUT) {
Slave_worker *sw = static_cast<Slave_worker *>(const_cast<Relay_log_info *>(rli));
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Filtered DML seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */

DBUG_PRINT("debug", ("check of table map ended up with: %u", res));

return res;
Expand Down
16 changes: 16 additions & 0 deletions sql/rpl_gtid_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@
#include "sql/mysqld.h" // connection_events_loop_aborted
#include "sql/rpl_gtid.h"
#include "sql/rpl_rli.h" // Relay_log_info
#ifdef WITH_WSREP
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we keep WSREP-dependant includes after the standard ones, like we do in other cases?

#include "sql/rpl_rli_pdb.h" // Slave_worker
#endif /* WITH_WSREP */
#include "sql/sql_class.h" // THD
#include "sql/sql_lex.h"
#include "sql/sql_parse.h" // stmt_causes_implicit_commit
Expand Down Expand Up @@ -354,6 +357,19 @@ static inline void skip_statement(THD *thd) {
to notify that its session ticket was consumed.
*/
Commit_stage_manager::get_instance().finish_session_ticket(thd);
#ifdef WITH_WSREP
/* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */
if (thd->slave_thread) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Skipped the seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */

#ifndef NDEBUG
const Gtid_set *executed_gtids = gtid_state->get_executed_gtids();
Expand Down
24 changes: 24 additions & 0 deletions sql/rpl_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,10 @@
#endif
#include "scope_guard.h"

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#endif /* WITH_WSREP */

struct mysql_cond_t;
struct mysql_mutex_t;

Expand Down Expand Up @@ -7224,6 +7228,9 @@ extern "C" void *handle_slave_sql(void *arg) {
bool mts_inited = false;
Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
Commit_order_manager *commit_order_mngr = nullptr;
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor = nullptr;
#endif /* WITH_WSREP */
Rpl_applier_reader applier_reader(rli);
Relay_log_info::enum_priv_checks_status priv_check_status =
Relay_log_info::enum_priv_checks_status::SUCCESS;
Expand Down Expand Up @@ -7269,6 +7276,16 @@ wsrep_restart_point :

rli->set_commit_order_manager(commit_order_mngr);

#ifdef WITH_WSREP
// Restore the last executed seqno
if (WSREP_ON && opt_replica_preserve_commit_order
&& !rli->is_parallel_exec()
&& rli->opt_replica_parallel_workers > 1) {
wsrep_async_monitor = new Wsrep_async_monitor();
rli->set_wsrep_async_monitor(wsrep_async_monitor);
}
#endif /* WITH_WSREP */

if (channel_map.is_group_replication_channel_name(rli->get_channel())) {
if (channel_map.is_group_replication_channel_name(rli->get_channel(),
true)) {
Expand Down Expand Up @@ -7702,6 +7719,13 @@ wsrep_restart_point :
delete commit_order_mngr;
}

#ifdef WITH_WSREP
if (wsrep_async_monitor) {
rli->set_wsrep_async_monitor(nullptr);
delete wsrep_async_monitor;
}
#endif /* WITH_WSREP */

mysql_mutex_unlock(&rli->info_thd_lock);
set_thd_in_use_temporary_tables(
rli); // (re)set info_thd in use for saved temp tables
Expand Down
18 changes: 18 additions & 0 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@
#include "sql/sql_class.h" // THD
#include "sql/system_variables.h"
#include "sql/table.h"
#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like forward type declaration is enough, and the header can be included in *.cc file

#endif /* WITH_WSREP */

class Commit_order_manager;
class Master_info;
Expand Down Expand Up @@ -1793,6 +1796,15 @@ class Relay_log_info : public Rpl_info {
commit_order_mngr = mngr;
}

#ifdef WITH_WSREP
Wsrep_async_monitor* get_wsrep_async_monitor() {
return wsrep_async_monitor;
}
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
#endif /* WITH_WSREP */

/*
Following set function is required to initialize the 'until_option' during
MTS relay log recovery process.
Expand Down Expand Up @@ -1850,6 +1862,12 @@ class Relay_log_info : public Rpl_info {
*/
Commit_order_manager *commit_order_mngr;

#ifdef WITH_WSREP
/*
Wsrep_async_monitor orders DMLs and DDls in galera.
*/
Wsrep_async_monitor *wsrep_async_monitor;
#endif /* WITH_WSREP */
/**
Delay slave SQL thread by this amount of seconds.
The delay is applied per transaction and based on the immediate master's
Expand Down
3 changes: 3 additions & 0 deletions sql/rpl_rli_pdb.cc
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) {
this->set_require_row_format(rli->is_row_format_required());

set_commit_order_manager(c_rli->get_commit_order_manager());
#ifdef WITH_WSREP
set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor());
#endif /* WITH_WSREP */

if (rli_init_info(false) ||
DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false))
Expand Down
15 changes: 15 additions & 0 deletions sql/sql_parse.cc
Original file line number Diff line number Diff line change
Expand Up @@ -393,6 +393,21 @@ inline bool check_database_filters(THD *thd, const char *db,
break;
}
}

#ifdef WITH_WSREP
// This transaction is anyways going to be skipped. So skip the transaction
// in the async monitor as well
if (WSREP(thd) && !db_ok) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Filtered DDL seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */
return db_ok;
}

Expand Down
94 changes: 94 additions & 0 deletions sql/wsrep_async_monitor.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#include <algorithm>
#include <cassert>

// Method for main thread to add scheduled seqnos
void Wsrep_async_monitor::schedule(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);
scheduled_seqnos.push(seqno);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seqons should be growing monolithically, right?
Can we add an assert like seqno = prev+1?
If not monolithically growing (why?), at least let's add an assert seqno > prev.
A good code comment would be appreciated

}

// Method for both DDL and DML to enter the monitor
void Wsrep_async_monitor::enter(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Wait until this transaction is at the head of the scheduled queue
m_cond.wait(lock, [this, seqno] {
// Remove skipped transactions
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When this can happen? Could you add a code comment?
I suspect this is the case when A, B, C have been scheduled, then B skipped, but skip() removes only from the front of scheduled_seqno queue

while (!scheduled_seqnos.empty() &&
skipped_seqnos.count(scheduled_seqnos.front()) > 0) {
scheduled_seqnos.pop();
}
return !scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno;
});
}

// Method to be called after DDL/DML processing is complete
void Wsrep_async_monitor::leave(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Check if the sequence number matches the front of the queue.
// In a correctly functioning monitor this should not happen
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In a correctly functioning monitor this should not happen ->
In a correctly functioning monitor this should always be true

// as each transaction should exit in the order it was scheduled
// and processed.
if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) {
// Remove the seqno from the scheduled queue now that it has completed
scheduled_seqnos.pop();
} else {
// std::cout << "Error: Mismatch in sequence numbers. Expected "
// << (scheduled_seqnos.empty()
// ? "none"
// : std::to_string(scheduled_seqnos.front()))
// << " but got " << seqno << "." << std::endl;
assert(false && "Sequence number mismatch in leave()");
exit(1);
}

// Notify waiting threads in case the next scheduled sequence can enter
m_cond.notify_all();
}

// Method to skip a transaction that will not call enter() and exit()
void Wsrep_async_monitor::skip(unsigned long seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Check if the seqno is already marked as skipped
if (skipped_seqnos.count(seqno) > 0) {
return; // Already skipped, so do nothing
}

// Mark the seqno as skipped
skipped_seqnos.insert(seqno);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I can't see any place where seqnos are removed from skipped_seqnos


// Remove it from the scheduled queue if it is at the front
if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) {
scheduled_seqnos.pop();
}

// Notify in case other transactions are waiting to enter
m_cond.notify_all();
}

// Method to return if the monitor is empty, used by the unittests
bool Wsrep_async_monitor::is_empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return scheduled_seqnos.empty();
}
#endif /* WITH_WSREP */
54 changes: 54 additions & 0 deletions sql/wsrep_async_monitor.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifndef WSREP_ASYNC_MONITOR_H
#define WSREP_ASYNC_MONITOR_H

#ifdef WITH_WSREP
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <set>

class Wsrep_async_monitor {
public:
using seqno_t = unsigned long long;

// Method for main thread to add scheduled seqnos
void schedule(seqno_t seqno);

// Method for both DDL and DML to enter the monitor
void enter(seqno_t seqno);

// Method to be called after DDL/DML processing is complete
void leave(seqno_t seqno);

// Method to skip a transaction that will not call enter() and exit()
void skip(unsigned long seqno);

// Method to return if the monitor is empty, used by the unittests
bool is_empty();

private:
std::mutex m_mutex;
std::condition_variable m_cond;
std::set<seqno_t> skipped_seqnos; // Tracks skipped sequence numbers
std::queue<seqno_t> scheduled_seqnos; // Queue to track scheduled seqnos
};

#endif /* WITH_WSREP */
#endif /* WSREP_ASYNC_MONITOR_H */
Loading
Loading