diff --git a/sql/CMakeLists.txt b/sql/CMakeLists.txt index 428bdd0612c..c4eee7ee56e 100644 --- a/sql/CMakeLists.txt +++ b/sql/CMakeLists.txt @@ -1148,6 +1148,7 @@ SET (RPL_REPLICA_SRCS rpl_trx_boundary_parser.cc udf_service_impl.cc udf_service_util.cc + wsrep_async_monitor.cc ) ADD_STATIC_LIBRARY(rpl_replica ${RPL_REPLICA_SRCS} DEPENDENCIES GenError diff --git a/sql/binlog.cc b/sql/binlog.cc index 0a80cf75bec..c659bf44867 100644 --- a/sql/binlog.cc +++ b/sql/binlog.cc @@ -8942,12 +8942,14 @@ TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) { trans_commit_stmt()) the following call to my_error() will allow overwriting the error */ my_error(ER_TRANSACTION_ROLLBACK_DURING_COMMIT, MYF(0)); + thd_leave_async_monitor(thd); return RESULT_ABORTED; } int rc = ordered_commit(thd, all, skip_commit); if (run_wsrep_hooks) { + thd_leave_async_monitor(thd); wsrep_after_commit(thd, all); } diff --git a/sql/log_event.cc b/sql/log_event.cc index 5bb3c149c84..8c25cecdbf2 100644 --- a/sql/log_event.cc +++ b/sql/log_event.cc @@ -2710,6 +2710,14 @@ Slave_worker *Log_event::get_slave_worker(Relay_log_info *rli) { Gtid_log_event *gtid_log_ev = static_cast(this); rli->started_processing(gtid_log_ev); +#ifdef WITH_WSREP + Wsrep_async_monitor *wsrep_async_monitor {rli->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = gtid_log_ev->sequence_number; + wsrep_async_monitor->schedule(seqno); + fprintf(stderr, "Scheduled the seqno: %llu in async monitor\n", seqno); + } +#endif /* WITH_WSREP */ } if (schedule_next_event(this, rli)) { @@ -11220,6 +11228,21 @@ static enum_tbl_map_status check_table_map(Relay_log_info const *rli, } } +#ifdef WITH_WSREP + // This transaction is anyways going to be skipped. So skip the transaction + // in the async monitor as well + if (WSREP(rli->info_thd) && res == FILTERED_OUT) { + Slave_worker *sw = static_cast(const_cast(rli)); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + fprintf(stderr, "Filtered DML seqno: %llu in async monitor\n", seqno); + } + } +#endif /* WITH_WSREP */ + DBUG_PRINT("debug", ("check of table map ended up with: %u", res)); return res; diff --git a/sql/rpl_gtid_execution.cc b/sql/rpl_gtid_execution.cc index 2434f7eef34..31f9994da44 100644 --- a/sql/rpl_gtid_execution.cc +++ b/sql/rpl_gtid_execution.cc @@ -38,6 +38,9 @@ #include "sql/mysqld.h" // connection_events_loop_aborted #include "sql/rpl_gtid.h" #include "sql/rpl_rli.h" // Relay_log_info +#ifdef WITH_WSREP +#include "sql/rpl_rli_pdb.h" // Slave_worker +#endif /* WITH_WSREP */ #include "sql/sql_class.h" // THD #include "sql/sql_lex.h" #include "sql/sql_parse.h" // stmt_causes_implicit_commit @@ -355,6 +358,19 @@ static inline void skip_statement(THD *thd) { to notify that its session ticket was consumed. */ Commit_stage_manager::get_instance().finish_session_ticket(thd); +#ifdef WITH_WSREP + /* Despite the transaction was skipped, it needs to be updated in the Wsrep_async_monitor */ + if (thd->slave_thread) { + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + fprintf(stderr, "Skipped the seqno: %llu in async monitor\n", seqno); + } + } +#endif /* WITH_WSREP */ #ifndef NDEBUG const Gtid_set *executed_gtids = gtid_state->get_executed_gtids(); diff --git a/sql/rpl_replica.cc b/sql/rpl_replica.cc index 4632d1c7696..7328d4879ea 100644 --- a/sql/rpl_replica.cc +++ b/sql/rpl_replica.cc @@ -172,6 +172,10 @@ #endif #include "scope_guard.h" +#ifdef WITH_WSREP +#include "sql/wsrep_async_monitor.h" +#endif /* WITH_WSREP */ + struct mysql_cond_t; struct mysql_mutex_t; @@ -7225,6 +7229,9 @@ extern "C" void *handle_slave_sql(void *arg) { bool mts_inited = false; Global_THD_manager *thd_manager = Global_THD_manager::get_instance(); Commit_order_manager *commit_order_mngr = nullptr; +#ifdef WITH_WSREP + Wsrep_async_monitor *wsrep_async_monitor = nullptr; +#endif /* WITH_WSREP */ Rpl_applier_reader applier_reader(rli); Relay_log_info::enum_priv_checks_status priv_check_status = Relay_log_info::enum_priv_checks_status::SUCCESS; @@ -7270,6 +7277,16 @@ wsrep_restart_point : rli->set_commit_order_manager(commit_order_mngr); +#ifdef WITH_WSREP + // Restore the last executed seqno + if (WSREP_ON && opt_replica_preserve_commit_order + && !rli->is_parallel_exec() + && rli->opt_replica_parallel_workers > 1) { + wsrep_async_monitor = new Wsrep_async_monitor(); + rli->set_wsrep_async_monitor(wsrep_async_monitor); + } +#endif /* WITH_WSREP */ + if (channel_map.is_group_replication_channel_name(rli->get_channel())) { if (channel_map.is_group_replication_channel_name(rli->get_channel(), true)) { @@ -7703,6 +7720,13 @@ wsrep_restart_point : delete commit_order_mngr; } +#ifdef WITH_WSREP + if (wsrep_async_monitor) { + rli->set_wsrep_async_monitor(nullptr); + delete wsrep_async_monitor; + } +#endif /* WITH_WSREP */ + mysql_mutex_unlock(&rli->info_thd_lock); set_thd_in_use_temporary_tables( rli); // (re)set info_thd in use for saved temp tables diff --git a/sql/rpl_rli.h b/sql/rpl_rli.h index ae2ef3acf23..174d67ed5cb 100644 --- a/sql/rpl_rli.h +++ b/sql/rpl_rli.h @@ -64,6 +64,9 @@ #include "sql/sql_class.h" // THD #include "sql/system_variables.h" #include "sql/table.h" +#ifdef WITH_WSREP +#include "sql/wsrep_async_monitor.h" +#endif /* WITH_WSREP */ class Commit_order_manager; class Master_info; @@ -1794,6 +1797,15 @@ class Relay_log_info : public Rpl_info { commit_order_mngr = mngr; } +#ifdef WITH_WSREP + Wsrep_async_monitor* get_wsrep_async_monitor() { + return wsrep_async_monitor; + } + void set_wsrep_async_monitor(Wsrep_async_monitor *monitor) { + wsrep_async_monitor = monitor; + } +#endif /* WITH_WSREP */ + /* Following set function is required to initialize the 'until_option' during MTS relay log recovery process. @@ -1851,6 +1863,12 @@ class Relay_log_info : public Rpl_info { */ Commit_order_manager *commit_order_mngr; +#ifdef WITH_WSREP + /* + Wsrep_async_monitor orders DMLs and DDls in galera. + */ + Wsrep_async_monitor *wsrep_async_monitor; +#endif /* WITH_WSREP */ /** Delay slave SQL thread by this amount of seconds. The delay is applied per transaction and based on the immediate master's diff --git a/sql/rpl_rli_pdb.cc b/sql/rpl_rli_pdb.cc index 4f82404e45a..e395f7f0a82 100644 --- a/sql/rpl_rli_pdb.cc +++ b/sql/rpl_rli_pdb.cc @@ -327,6 +327,9 @@ int Slave_worker::init_worker(Relay_log_info *rli, ulong i) { this->set_require_row_format(rli->is_row_format_required()); set_commit_order_manager(c_rli->get_commit_order_manager()); +#ifdef WITH_WSREP + set_wsrep_async_monitor(c_rli->get_wsrep_async_monitor()); +#endif /* WITH_WSREP */ if (rli_init_info(false) || DBUG_EVALUATE_IF("inject_init_worker_init_info_fault", true, false)) diff --git a/sql/sql_parse.cc b/sql/sql_parse.cc index 6dfb125e2dc..fa748931258 100644 --- a/sql/sql_parse.cc +++ b/sql/sql_parse.cc @@ -394,6 +394,21 @@ inline bool check_database_filters(THD *thd, const char *db, break; } } + +#ifdef WITH_WSREP + // This transaction is anyways going to be skipped. So skip the transaction + // in the async monitor as well + if (WSREP(thd) && !db_ok) { + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->skip(seqno); + fprintf(stderr, "Filtered DDL seqno: %llu in async monitor\n", seqno); + } + } +#endif /* WITH_WSREP */ return db_ok; } diff --git a/sql/wsrep_async_monitor.cc b/sql/wsrep_async_monitor.cc new file mode 100644 index 00000000000..f64faa4fd5a --- /dev/null +++ b/sql/wsrep_async_monitor.cc @@ -0,0 +1,94 @@ +/* Copyright (c) 2021 Percona LLC and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifdef WITH_WSREP +#include "sql/wsrep_async_monitor.h" +#include +#include + +// Method for main thread to add scheduled seqnos +void Wsrep_async_monitor::schedule(seqno_t seqno) { + std::unique_lock lock(m_mutex); + scheduled_seqnos.push(seqno); +} + +// Method for both DDL and DML to enter the monitor +void Wsrep_async_monitor::enter(seqno_t seqno) { + std::unique_lock lock(m_mutex); + + // Wait until this transaction is at the head of the scheduled queue + m_cond.wait(lock, [this, seqno] { + // Remove skipped transactions + while (!scheduled_seqnos.empty() && + skipped_seqnos.count(scheduled_seqnos.front()) > 0) { + scheduled_seqnos.pop(); + } + return !scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno; + }); +} + +// Method to be called after DDL/DML processing is complete +void Wsrep_async_monitor::leave(seqno_t seqno) { + std::unique_lock lock(m_mutex); + + // Check if the sequence number matches the front of the queue. + // In a correctly functioning monitor this should not happen + // as each transaction should exit in the order it was scheduled + // and processed. + if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) { + // Remove the seqno from the scheduled queue now that it has completed + scheduled_seqnos.pop(); + } else { + // std::cout << "Error: Mismatch in sequence numbers. Expected " + // << (scheduled_seqnos.empty() + // ? "none" + // : std::to_string(scheduled_seqnos.front())) + // << " but got " << seqno << "." << std::endl; + assert(false && "Sequence number mismatch in leave()"); + exit(1); + } + + // Notify waiting threads in case the next scheduled sequence can enter + m_cond.notify_all(); +} + +// Method to skip a transaction that will not call enter() and exit() +void Wsrep_async_monitor::skip(unsigned long seqno) { + std::unique_lock lock(m_mutex); + + // Check if the seqno is already marked as skipped + if (skipped_seqnos.count(seqno) > 0) { + return; // Already skipped, so do nothing + } + + // Mark the seqno as skipped + skipped_seqnos.insert(seqno); + + // Remove it from the scheduled queue if it is at the front + if (!scheduled_seqnos.empty() && scheduled_seqnos.front() == seqno) { + scheduled_seqnos.pop(); + } + + // Notify in case other transactions are waiting to enter + m_cond.notify_all(); +} + +// Method to return if the monitor is empty, used by the unittests +bool Wsrep_async_monitor::is_empty() { + std::unique_lock lock(m_mutex); + return scheduled_seqnos.empty(); +} +#endif /* WITH_WSREP */ diff --git a/sql/wsrep_async_monitor.h b/sql/wsrep_async_monitor.h new file mode 100644 index 00000000000..b149941c069 --- /dev/null +++ b/sql/wsrep_async_monitor.h @@ -0,0 +1,54 @@ +/* Copyright (c) 2024 Percona LLC and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ + +#ifndef WSREP_ASYNC_MONITOR_H +#define WSREP_ASYNC_MONITOR_H + +#ifdef WITH_WSREP +#include +#include +#include +#include +#include + +class Wsrep_async_monitor { + public: + using seqno_t = unsigned long long; + + // Method for main thread to add scheduled seqnos + void schedule(seqno_t seqno); + + // Method for both DDL and DML to enter the monitor + void enter(seqno_t seqno); + + // Method to be called after DDL/DML processing is complete + void leave(seqno_t seqno); + + // Method to skip a transaction that will not call enter() and exit() + void skip(unsigned long seqno); + + // Method to return if the monitor is empty, used by the unittests + bool is_empty(); + + private: + std::mutex m_mutex; + std::condition_variable m_cond; + std::set skipped_seqnos; // Tracks skipped sequence numbers + std::queue scheduled_seqnos; // Queue to track scheduled seqnos +}; + +#endif /* WITH_WSREP */ +#endif /* WSREP_ASYNC_MONITOR_H */ diff --git a/sql/wsrep_mysqld.cc b/sql/wsrep_mysqld.cc index 1527eee14e9..5cc81513f36 100644 --- a/sql/wsrep_mysqld.cc +++ b/sql/wsrep_mysqld.cc @@ -2870,6 +2870,61 @@ static void wsrep_RSU_end(THD *thd) { thd->disable_binlog_guard.reset(); } +void thd_enter_async_monitor(THD* thd) { + + // Only replica threads are allowed to enter + if (thd->slave_thread) { + + // If the thread is already killed, leave it to the called to handle it. + if (thd->killed != THD::NOT_KILLED) { + return; + } + Slave_worker *sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + // TODO: If requied, we must set current_mutex and current_cond here + // i.e, thd->enter_cond(); + wsrep_async_monitor->enter(seqno); + } else { + // Error out and exit if we have more than one parallel worker and async monitor + // is not set yet. + if (opt_replica_preserve_commit_order && opt_mts_replica_parallel_workers > 1) { + WSREP_ERROR("Async replica thread %u is not monitored by wsrep async monitor", + thd->thread_id()); + exit(1); + } + } + } +} + +void thd_leave_async_monitor(THD* thd) { + if (thd->slave_thread) { + Slave_worker * sw = dynamic_cast(thd->rli_slave); + Wsrep_async_monitor *wsrep_async_monitor {sw->get_wsrep_async_monitor()}; + if (wsrep_async_monitor) { + auto seqno = sw->sequence_number(); + assert(seqno > 0); + wsrep_async_monitor->leave(seqno); + { + if (thd->wsrep_skip_wsrep_hton == true) + fprintf(stderr, "DDL: thd_leave_apply_monitor: seqno=%llu\n", seqno); + else + fprintf(stderr, "DML: thd_leave_apply_monitor: seqno=%llu\n", seqno); + } + } else { + // Error out and exit if we have more than one parallel worker and async monitor + // is not set yet. + if (opt_replica_preserve_commit_order && opt_mts_replica_parallel_workers > 1) { + WSREP_ERROR("Async replica thread %u is not monitored by wsrep async monitor", + thd->thread_id()); + exit(1); + } + } + } +} + int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, const Table_ref *table_list, dd::Tablespace_table_ref_vec *trefs, @@ -2966,6 +3021,8 @@ int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, thd->variables.auto_increment_increment = 1; } + thd_enter_async_monitor(thd); + DEBUG_SYNC(thd, "wsrep_to_isolation_begin_before_replication"); if (thd->variables.wsrep_on && wsrep_thd_is_local(thd)) { @@ -3039,6 +3096,8 @@ void wsrep_to_isolation_end(THD *thd) { } if (wsrep_emulate_bin_log) wsrep_thd_binlog_trx_reset(thd); + thd_leave_async_monitor(thd); + DEBUG_SYNC(thd, "wsrep_to_isolation_end_before_wsrep_skip_wsrep_hton"); mysql_mutex_lock(&thd->LOCK_thd_data); diff --git a/sql/wsrep_mysqld.h b/sql/wsrep_mysqld.h index f8542f6dd9f..67a7d2c44be 100644 --- a/sql/wsrep_mysqld.h +++ b/sql/wsrep_mysqld.h @@ -443,6 +443,9 @@ extern PSI_thread_key key_THREAD_wsrep_post_rollbacker; extern PSI_file_key key_file_wsrep_gra_log; #endif /* HAVE_PSI_INTERFACE */ +void thd_enter_async_monitor(THD* thd); +void thd_leave_async_monitor(THD* thd); + class Alter_info; int wsrep_to_isolation_begin(THD *thd, const char *db_, const char *table_, const Table_ref *table_list, diff --git a/sql/wsrep_trans_observer.h b/sql/wsrep_trans_observer.h index da2082cfa89..c22340fcb44 100644 --- a/sql/wsrep_trans_observer.h +++ b/sql/wsrep_trans_observer.h @@ -296,6 +296,10 @@ static inline int wsrep_before_commit(THD *thd, bool all) { WSREP_DEBUG("wsrep_before_commit: %d, %lld", wsrep_is_real(thd, all), (long long)wsrep_thd_trx_seqno(thd)); int ret = 0; + + /* Enter the async monitor */ + thd_enter_async_monitor(thd); + assert(wsrep_run_commit_hook(thd, all)); if ((ret = thd->wsrep_cs().before_commit()) == 0) { assert(!thd->wsrep_trx().ws_meta().gtid().is_undefined()); diff --git a/unittest/gunit/CMakeLists.txt b/unittest/gunit/CMakeLists.txt index fb051b94f68..0e6af9b00e0 100644 --- a/unittest/gunit/CMakeLists.txt +++ b/unittest/gunit/CMakeLists.txt @@ -463,6 +463,18 @@ MYSQL_ADD_EXECUTABLE(rpl_event_ctx-t rpl_event_ctx-t.cc LINK_LIBRARIES rpl_event_ctx_lib gunit_small ) +# WITH_WSREP +ADD_LIBRARY(wsrep_async_monitor_lib STATIC + ${CMAKE_SOURCE_DIR}/sql/wsrep_async_monitor.cc +) +TARGET_LINK_LIBRARIES(wsrep_async_monitor_lib) + +MYSQL_ADD_EXECUTABLE(wsrep_async_monitor-t wsrep_async_monitor-t.cc + ENABLE_EXPORTS + ADD_TEST wsrep_async_monitor + LINK_LIBRARIES wsrep_async_monitor_lib gunit_small +) + ADD_SUBDIRECTORY(ddl_rewriter) ADD_SUBDIRECTORY(innodb) ADD_SUBDIRECTORY(keyring) diff --git a/unittest/gunit/wsrep_async_monitor-t.cc b/unittest/gunit/wsrep_async_monitor-t.cc new file mode 100644 index 00000000000..59a735aed33 --- /dev/null +++ b/unittest/gunit/wsrep_async_monitor-t.cc @@ -0,0 +1,186 @@ +/* Copyright (c) 2021 Percona LLC and/or its affiliates. All rights reserved. + + This program is free software; you can redistribute it and/or + modify it under the terms of the GNU General Public License + as published by the Free Software Foundation; version 2 of + the License. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, write to the Free Software + Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA */ +#ifdef WITH_WSREP + +#include +#include +#include +#include "sql/wsrep_async_monitor.h" // Include the header file of your class + +using seqno_t = unsigned long long; + +class WsrepAsyncMonitorTest : public ::testing::Test { + protected: + Wsrep_async_monitor monitor; + + void SetUp() override { + // Any setup can be done here, if needed + } + + void TearDown() override { + // Any teardown can be done here, if needed + } +}; + +// Helper function to simulate processing a transaction +void processThread(Wsrep_async_monitor &monitor, seqno_t seqno) { + monitor.enter(seqno); + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + monitor.leave(seqno); +} + +// Test that the monitor can handle single transactions (DML and DDL) +TEST_F(WsrepAsyncMonitorTest, SingleTransaction) { + monitor.schedule(1); + + std::thread t1(processThread, std::ref(monitor), 1); + t1.join(); + + // Check if the queue is empty after leaving + ASSERT_TRUE(monitor.is_empty()); +} + +// Test that monitor can handle sequence mismatch. +TEST_F(WsrepAsyncMonitorTest, LeaveSequenceNumberMismatch) { + seqno_t seqno1 = 1; + seqno_t seqno2 = 2; + monitor.schedule(seqno1); + monitor.enter(seqno1); + + // Check if the queue is not empty + ASSERT_TRUE(!monitor.is_empty()); + + // Expect an assertion failure or exit due to sequence number mismatch + ASSERT_DEATH(monitor.leave(seqno2), "Sequence number mismatch in leave()"); +} + +// Test that multiple transactions are processed in the correct sequence +TEST_F(WsrepAsyncMonitorTest, MultipleQueriesInSequence) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + monitor.schedule(4); + + std::vector processed_seqnos; + auto transactionWorker = [&](seqno_t seqno, bool isDDL) { + monitor.enter(seqno); + processed_seqnos.push_back(seqno); + if (isDDL) { + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + monitor.leave(seqno); + }; + + std::thread t1(transactionWorker, 1, true); // DML with seqno 1 + std::thread t2(transactionWorker, 2, false); // DDL with seqno 2 + std::thread t3(transactionWorker, 3, true); // DML with seqno 3 + std::thread t4(transactionWorker, 4, false); // DML with seqno 4 + + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + EXPECT_EQ(processed_seqnos, std::vector({1, 2, 3, 4})); +} + +// Test that skipped transactions are not processed but do not block other +// transactions +TEST_F(WsrepAsyncMonitorTest, Skip) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + monitor.schedule(4); + monitor.schedule(5); + monitor.schedule(6); + + std::vector processed_seqnos; + auto transactionWorker = [&](seqno_t seqno, bool isDDL) { + monitor.enter(seqno); + processed_seqnos.push_back(seqno); + if (isDDL) { + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + monitor.leave(seqno); + }; + auto transactionSkip = [&](seqno_t seqno) { monitor.skip(seqno); }; + + // Make sure that the logic works for out-of-order enter() calls + std::thread t1(transactionWorker, 3, false); // DML with seqno 3 + std::thread t2(transactionSkip, 2); // Skipped seqno 2 + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::thread t3(transactionWorker, 1, false); // DML with seqno 1 + + t1.join(); + t2.join(); + t3.join(); + + // Make sure that the logic works for out-of-order enter() calls + std::thread t4(transactionSkip, 5); // DML with seqno 3 + std::thread t5(transactionWorker, 6, true); // Skipped seqno 2 + + std::this_thread::sleep_for(std::chrono::milliseconds(500)); + std::thread t6(transactionWorker, 4, false); // DML with seqno 1 + + t4.join(); + t5.join(); + t6.join(); + EXPECT_EQ(processed_seqnos, std::vector({1, 3, 4, 6})); +} + +// Test that transactions can be processed concurrently +TEST_F(WsrepAsyncMonitorTest, Concurrents) { + monitor.schedule(1); + monitor.schedule(2); + monitor.schedule(3); + monitor.schedule(4); + + std::vector processed_seqnos; + std::mutex seqno_mutex; + + auto transactionWorker = [&](seqno_t seqno, bool isDDL) { + monitor.enter(seqno); + { + std::lock_guard lock(seqno_mutex); + processed_seqnos.push_back(seqno); + if (isDDL) { + // Simulate work by sleeping briefly + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + } + } + monitor.leave(seqno); + }; + + std::thread t1(transactionWorker, 4, false); // DML with seqno 4 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread t2(transactionWorker, 3, true); // DDL with seqno 3 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread t3(transactionWorker, 2, false); // DML with seqno 2 + std::this_thread::sleep_for(std::chrono::milliseconds(50)); + std::thread t4(transactionWorker, 1, true); // DDL with seqno 1 + + t1.join(); + t2.join(); + t3.join(); + t4.join(); + + EXPECT_EQ(processed_seqnos, std::vector({1, 2, 3, 4})); +} +#endif /* WITH_WSREP */