Skip to content

Commit

Permalink
fix(bug): crash on takeover and info replication (#3282)
Browse files Browse the repository at this point in the history

Signed-off-by: adi_holden <adi@dragonflydb.io>
  • Loading branch information
adiholden authored Jul 8, 2024
1 parent fba902d commit 5c7c21b
Show file tree
Hide file tree
Showing 3 changed files with 30 additions and 28 deletions.
22 changes: 12 additions & 10 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2258,9 +2258,12 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
}

if (should_enter("REPLICATION")) {
ServerState& etl = *ServerState::tlocal();

if (etl.is_master) {
unique_lock lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
// insufficient in this scenario.
if (!replica_) {
append("role", "master");
append("connected_slaves", m.facade_stats.conn_stats.num_replicas);
const auto& replicas = m.replication_metrics;
Expand All @@ -2274,10 +2277,6 @@ void ServerFamily::Info(CmdArgList args, ConnectionContext* cntx) {
} else {
append("role", GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica");

// The replica pointer can still be mutated even while master=true,
// we don't want to drop the replica object in this fiber
unique_lock lk{replicaof_mu_};

auto replication_info_cb = [&](Replica::Info rinfo) {
append("master_host", rinfo.host);
append("master_port", rinfo.port);
Expand Down Expand Up @@ -2737,8 +2736,12 @@ void ServerFamily::ReplConf(CmdArgList args, ConnectionContext* cntx) {

void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
ServerState& etl = *ServerState::tlocal();
if (etl.is_master) {
unique_lock lk(replicaof_mu_);
// Thread local var is_master is updated under mutex replicaof_mu_ together with replica_,
// ensuring eventual consistency of is_master. When determining if the server is a replica and
// accessing the replica_ object, we must lock replicaof_mu_. Using is_master alone is
// insufficient in this scenario.
if (!replica_) {
rb->StartArray(2);
rb->SendBulkString("master");
auto vec = dfly_cmd_->GetReplicasRoleInfo();
Expand All @@ -2751,7 +2754,6 @@ void ServerFamily::Role(CmdArgList args, ConnectionContext* cntx) {
}

} else {
unique_lock lk{replicaof_mu_};
rb->StartArray(4 + cluster_replicas_.size() * 3);
rb->SendBulkString(GetFlag(FLAGS_info_replication_valkey_compatible) ? "slave" : "replica");

Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/instance.py
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ def create(self, existing_port=None, **kwargs) -> DflyInstance:
args.setdefault("noversion_check", None)
# MacOs does not set it automatically, so we need to set it manually
args.setdefault("maxmemory", "8G")
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1"
vmod = "dragonfly_connection=1,accept_server=1,listener_interface=1,main_service=1,rdb_save=1,replica=1,cluster_family=1,dflycmd=1"
args.setdefault("vmodule", vmod)
args.setdefault("jsonpathv2")

Expand Down
34 changes: 17 additions & 17 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -1151,10 +1151,7 @@ async def test_readonly_script(df_factory):
@pytest.mark.parametrize("master_threads, replica_threads", take_over_cases)
@pytest.mark.asyncio
async def test_take_over_counters(df_factory, master_threads, replica_threads):
master = df_factory.create(
proactor_threads=master_threads,
logtostderr=True,
)
master = df_factory.create(proactor_threads=master_threads)
replica1 = df_factory.create(proactor_threads=replica_threads)
replica2 = df_factory.create(proactor_threads=replica_threads)
replica3 = df_factory.create(proactor_threads=replica_threads)
Expand Down Expand Up @@ -1214,11 +1211,7 @@ async def test_take_over_seeder(
request, df_factory, df_seeder_factory, master_threads, replica_threads
):
tmp_file_name = "".join(random.choices(string.ascii_letters, k=10))
master = df_factory.create(
proactor_threads=master_threads,
dbfilename=f"dump_{tmp_file_name}",
logtostderr=True,
)
master = df_factory.create(proactor_threads=master_threads, dbfilename=f"dump_{tmp_file_name}")
replica = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica])

Expand All @@ -1229,17 +1222,27 @@ async def test_take_over_seeder(
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await wait_available_async(c_replica)

async def seed():
await seeder.run(target_ops=3000)
fill_task = asyncio.create_task(seeder.run())

stop_info = False

fill_task = asyncio.create_task(seed())
async def info_task():
my_client = replica.client()
while not stop_info:
info = await my_client.info("replication")
asyncio.sleep(0.5)

info_task = asyncio.create_task(info_task())

# Give the seeder a bit of time.
await asyncio.sleep(1)
await asyncio.sleep(3)
logging.debug("running repltakover")
await c_replica.execute_command(f"REPLTAKEOVER 5 SAVE")
logging.debug("after running repltakover")
seeder.stop()

assert await c_replica.execute_command("role") == ["master", []]
stop_info = True

# Need to wait a bit to give time to write the shutdown snapshot
await asyncio.sleep(1)
Expand All @@ -1258,10 +1261,7 @@ async def seed():
@pytest.mark.parametrize("master_threads, replica_threads", [[4, 4]])
@pytest.mark.asyncio
async def test_take_over_read_commands(df_factory, master_threads, replica_threads):
master = df_factory.create(
proactor_threads=master_threads,
logtostderr=True,
)
master = df_factory.create(proactor_threads=master_threads)
replica = df_factory.create(proactor_threads=replica_threads)
df_factory.start_all([master, replica])

Expand Down

0 comments on commit 5c7c21b

Please sign in to comment.