Skip to content

Commit

Permalink
- Addressed review comments from Kamil
Browse files Browse the repository at this point in the history
- Handled the relay log recovery.
- Changed thd_enter_apply_monitor() to thd_enter_async_monitor().
- Changed thd_leave_apply_monitor() to thd_leave_async_monitor()
- Dont't track the last_left and last_entered
- The new logic now uses scheduled_seqnos queue to track the the lowest
  seqno the server is executing.
- The new logic also uses the skipped_seqno to track the skipped seqnos.
- Added unittests
- Added copyright to the newly introduced files.
  • Loading branch information
venkatesh-prasad-v committed Nov 18, 2024
1 parent 25c09e8 commit 30112dc
Show file tree
Hide file tree
Showing 12 changed files with 348 additions and 105 deletions.
4 changes: 2 additions & 2 deletions sql/binlog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -8942,14 +8942,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
trans_commit_stmt()) the following call to my_error() will allow
overwriting the error */
my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0));
thd_leave_apply_monitor(thd);
thd_leave_async_monitor(thd);
return RESULT_ABORTED;
}

int rc = ordered_commit(thd, all, skip_commit);

if (run_wsrep_hooks) {
thd_leave_apply_monitor(thd);
thd_leave_async_monitor(thd);
wsrep_after_commit(thd, all);
}

Expand Down
25 changes: 8 additions & 17 deletions sql/log_event.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2710,6 +2710,14 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) {

Gtid_log_event *gtid_log_ev = static_cast<Gtid_log_event *>(this);
rli->started_processing(gtid_log_ev);
#ifdef WITH_WSREP
Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = gtid_log_ev->sequence_number;
wsrep_async_monitor->schedule(seqno);
fprintf(stderr, "Scheduled the seqno: %llu in async monitor\n", seqno);
}
#endif /* WITH_WSREP */
}

if (schedule_next_event(this, rli)) {
Expand Down Expand Up @@ -3222,23 +3230,6 @@ int Log_event::apply_event(Relay_log_info *rli) {
if (error) return -1;
}

#ifdef WITH_WSREP
// Ignore fake rotate events generated by the source server when
// the receiver thread reconnects.
bool real_event = server_id && !is_artificial_event();
if (WSREP_ON && (get_type_code() == binary_log::ROTATE_EVENT) &&
real_event) {
/*
If we are here, it means we received a real rotate event from source.
We need to drain/reset the async monitor to be able to the
"reset" of sequence numbers on source.
*/
Wsrep_async_monitor *async_monitor=rli->get_wsrep_async_monitor();
if (async_monitor) {
async_monitor->reset(0);
}
}
#endif /* WITH_WSREP */
#ifndef NDEBUG
/* all Workers are idle as done through wait_for_workers_to_finish */
for (uint k = 0; k < rli->curr_group_da.size(); k++) {
Expand Down
16 changes: 16 additions & 0 deletions sql/rpl_gtid_execution.cc
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,9 @@
#include "sql/mysqld.h" // connection_events_loop_aborted
#include "sql/rpl_gtid.h"
#include "sql/rpl_rli.h" // Relay_log_info
#ifdef WITH_WSREP
#include "sql/rpl_rli_pdb.h" // Slave_worker
#endif /* WITH_WSREP */
#include "sql/sql_class.h" // THD
#include "sql/sql_lex.h"
#include "sql/sql_parse.h" // stmt_causes_implicit_commit
Expand Down Expand Up @@ -355,6 +358,19 @@ static inline void skip_statement(THD *thd) {
to notify that its session ticket was consumed.
*/
Commit_stage_manager::get_instance().finish_session_ticket(thd);
#ifdef WITH_WSREP
/* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */
if (thd->slave_thread) {
Slave_worker *sw = dynamic_cast<Slave_worker*>(thd->rli_slave);
Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()};
if (wsrep_async_monitor) {
auto seqno = sw->sequence_number();
assert(seqno > 0);
wsrep_async_monitor->skip(seqno);
fprintf(stderr, "Skipped the seqno: %llu in async monitor\n", seqno);
}
}
#endif /* WITH_WSREP */

#ifndef NDEBUG
const Gtid_set *executed_gtids = gtid_state->get_executed_gtids();
Expand Down
23 changes: 0 additions & 23 deletions sql/rpl_replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -5087,25 +5087,6 @@ static int exec_relay_log_event(THD *thd, Relay_log_info *rli,
WSREP_INFO("Wsrep before statement error");
return 1;
}

if (WSREP_ON && (ev->get_type_code() == binary_log::GTID_LOG_EVENT ||
ev->get_type_code() == binary_log::ANONYMOUS_GTID_LOG_EVENT)) {

static bool async_monitor_init_done = false;

// Set the last_entered_ if we are starting applier thread post server
// shutdown.
if (!async_monitor_init_done) {
Wsrep_async_monitor *async_monitor=rli->get_wsrep_async_monitor();
if (async_monitor && async_monitor->last_left() == 0) {
const Gtid_log_event* gtid_ev = static_cast<Gtid_log_event *>(ev);
unsigned long long seqno = gtid_ev->sequence_number;
assert (seqno != 0);
async_monitor->reset(seqno - 1);
async_monitor_init_done = true;
}
}
}
#endif /* WITH_WSREP */

enum enum_slave_apply_event_and_update_pos_retval exec_res;
Expand Down Expand Up @@ -7303,9 +7284,6 @@ wsrep_restart_point :
&& rli->opt_replica_parallel_workers > 1) {
wsrep_async_monitor = new Wsrep_async_monitor();
rli->set_wsrep_async_monitor(wsrep_async_monitor);
if (rli->wsrep_async_monitor_last_left != 0) {
rli->restore_last_position_for_recovery();
}
}
#endif /* WITH_WSREP */

Expand Down Expand Up @@ -7744,7 +7722,6 @@ wsrep_restart_point :

#ifdef WITH_WSREP
if (wsrep_async_monitor) {
rli->backup_last_position_for_recovery();
rli->set_wsrep_async_monitor(nullptr);
delete wsrep_async_monitor;
}
Expand Down
14 changes: 0 additions & 14 deletions sql/rpl_rli.h
Original file line number Diff line number Diff line change
Expand Up @@ -1804,12 +1804,6 @@ class Relay_log_info : public Rpl_info {
void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) {
wsrep_async_monitor = monitor;
}
void backup_last_position_for_recovery() {
wsrep_async_monitor_last_left = wsrep_async_monitor->last_left();
}
void restore_last_position_for_recovery() {
wsrep_async_monitor->reset(wsrep_async_monitor_last_left);
}
#endif /* WITH_WSREP */

/*
Expand Down Expand Up @@ -1858,14 +1852,6 @@ class Relay_log_info : public Rpl_info {
rpl_filter = channel_filter;
}

#ifdef WITH_WSREP
/*
Stores Wsrep_async_monitor's last_left seqno.
Will be used while resetting the last_left seqno during start replica.
*/
unsigned long long wsrep_async_monitor_last_left;
#endif /* WITH_WSREP */

protected:
Format_description_log_event *rli_description_event;

Expand Down
88 changes: 72 additions & 16 deletions sql/wsrep_async_monitor.cc
Original file line number Diff line number Diff line change
@@ -1,38 +1,94 @@
/* Copyright (c) 2021 Percona LLC and/or its affiliates. All rights reserved.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifdef WITH_WSREP
#include "sql/wsrep_async_monitor.h"
#include <algorithm>
#include <cassert>

// Method for main thread to add scheduled seqnos
void Wsrep_async_monitor::schedule(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);
scheduled_seqnos.push(seqno);
}

// Method for both DDL and DML to enter the monitor
void Wsrep_async_monitor::enter(seqno_t seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// Wait for its turn before entering
m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });
m_last_entered = (seqno > m_last_entered) ? seqno : m_last_entered;
fprintf(stderr, "Entered the monitor with seqno: %llu\n", seqno);
// Wait until this transaction is at the head of the scheduled queue
m_cond.wait(lock, [this, seqno] {
// Remove skipped transactions
while (!scheduled_seqnos.empty() &&
skipped_seqnos.count(scheduled_seqnos.front()) > 0) {
scheduled_seqnos.pop();
}
return !scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno;
});
}

// Method to be called after DDL/DML processing is complete
void Wsrep_async_monitor::leave(seqno_t seqno) {
// Wait for its turn before leaving
std::unique_lock<std::mutex> lock(m_mutex);
m_cond.wait(lock, [this, seqno]() { return seqno == m_last_left + 1; });
m_last_left = seqno;

fprintf(stderr, "Left the monitor seqno: %llu\n", seqno);
// Notify all waiting threads
// Check if the sequence number matches the front of the queue.
// In a correctly functioning monitor this should not happen
// as each transaction should exit in the order it was scheduled
// and processed.
if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) {
// Remove the seqno from the scheduled queue now that it has completed
scheduled_seqnos.pop();
} else {
// std::cout << "Error: Mismatch in sequence numbers. Expected "
// << (scheduled_seqnos.empty()
// ? "none"
// : std::to_string(scheduled_seqnos.front()))
// << " but got " << seqno << "." << std::endl;
assert(false && "Sequence number mismatch in leave()");
exit(1);
}

// Notify waiting threads in case the next scheduled sequence can enter
m_cond.notify_all();
}

void Wsrep_async_monitor::reset(seqno_t seqno) {
// Method to skip a transaction that will not call enter() and exit()
void Wsrep_async_monitor::skip(unsigned long seqno) {
std::unique_lock<std::mutex> lock(m_mutex);

// We can reset only if the last entered and last left
// are same, meaning that there is no one who is inside
// the monitor
assert(m_last_entered == m_last_left);
m_last_entered = seqno;
m_last_left = seqno;
// Check if the seqno is already marked as skipped
if (skipped_seqnos.count(seqno) > 0) {
return; // Already skipped, so do nothing
}

// Mark the seqno as skipped
skipped_seqnos.insert(seqno);

// Remove it from the scheduled queue if it is at the front
if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) {
scheduled_seqnos.pop();
}

// Notify in case other transactions are waiting to enter
m_cond.notify_all();
}

// Method to return if the monitor is empty, used by the unittests
bool Wsrep_async_monitor::is_empty() {
std::unique_lock<std::mutex> lock(m_mutex);
return scheduled_seqnos.empty();
}
#endif /* WITH_WSREP */
47 changes: 31 additions & 16 deletions sql/wsrep_async_monitor.h
Original file line number Diff line number Diff line change
@@ -1,38 +1,53 @@
/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved.

This program is free software; you can redistribute it and/or
modify it under the terms of the GNU General Public License
as published by the Free Software Foundation; version 2 of
the License.

This program is distributed in the hope that it will be useful,
but WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
GNU General Public License for more details.

You should have received a copy of the GNU General Public License
along with this program; if not, write to the Free Software
Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */

#ifndef WSREP_ASYNC_MONITOR_H
#define WSREP_ASYNC_MONITOR_H

#ifdef WITH_WSREP
#include <iostream>
#include <condition_variable>
#include <iostream>
#include <mutex>
#include <queue>
#include <set>

class Wsrep_async_monitor {
public:
using seqno_t = unsigned long long;

Wsrep_async_monitor() : m_last_entered(0), m_last_left(0) {}

~Wsrep_async_monitor() {
std::unique_lock<std::mutex> lock(m_mutex);
m_last_entered = 0;
m_last_left = 0;
m_cond.notify_all();
}
// Method for main thread to add scheduled seqnos
void schedule(seqno_t seqno);

// Method for both DDL and DML to enter the monitor
void enter(seqno_t seqno);

// Method to be called after DDL/DML processing is complete
void leave(seqno_t seqno);
void reset(seqno_t seqno);

seqno_t last_entered() const { return m_last_entered; }
seqno_t last_left() const { return m_last_left; }
// Method to skip a transaction that will not call enter() and exit()
void skip(unsigned long seqno);

// Method to return if the monitor is empty, used by the unittests
bool is_empty();

private:
std::mutex m_mutex;
std::condition_variable m_cond;

// TODO: Evaluate if we really need m_last_entered
seqno_t m_last_entered;
seqno_t m_last_left;
std::set<seqno_t> skipped_seqnos; // Tracks skipped sequence numbers
std::queue<seqno_t> scheduled_seqnos; // Queue to track scheduled seqnos
};

#endif /* WITH_WSREP */
Expand Down
Loading

0 comments on commit 30112dc

Please sign in to comment.