Skip to content

Commit

Permalink
raftstore: do not update pending_cross_snap when peer is applying sna…
Browse files Browse the repository at this point in the history
…pshot (tikv#3873) (tikv#4004)

Signed-off-by: Connor1996 <zbk602423539@gmail.com>
  • Loading branch information
Connor1996 authored and overvenus committed Jan 2, 2019
1 parent f697493 commit 3464d2c
Show file tree
Hide file tree
Showing 5 changed files with 157 additions and 8 deletions.
4 changes: 3 additions & 1 deletion src/pd/pd.rs
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,9 @@ impl Display for Task {
Task::ReadStats { ref read_stats } => {
write!(f, "get the read statistics {:?}", read_stats)
}
Task::DestroyPeer { ref region_id } => write!(f, "destroy peer {}", region_id),
Task::DestroyPeer { ref region_id } => {
write!(f, "destroy peer of region {}", region_id)
}
}
}
}
Expand Down
24 changes: 20 additions & 4 deletions src/raftstore/store/store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,11 @@ impl<T: Transport, C: PdClient> Store<T, C> {
let merge_target = msg.get_merge_target();
let target_region_id = merge_target.get_id();

// When receiving message that has a merge target, it indicates that the source peer
// on this store is stale, the peers on other stores are already merged. The epoch
// in merge target is the state of target peer at the time when source peer is merged.
// So here we need to check the target peer on this store to decide whether the source
// to destory or wait target peer to catch up logs.
if let Some(epoch) = self.pending_cross_snap.get(&target_region_id).or_else(|| {
self.region_peers
.get(&target_region_id)
Expand All @@ -1082,11 +1087,12 @@ impl<T: Transport, C: PdClient> Store<T, C> {
epoch,
merge_target.get_region_epoch(),
);
// So the target peer has moved on, we should let it go.
// The target peer will move on, namely, it will apply a snapshot generated after merge,
// so destroy source peer.
if epoch.get_version() > merge_target.get_region_epoch().get_version() {
return Ok(true);
}
// Wait till it catching up logs.
// Wait till the target peer has catched up logs and source peer will be destroyed at that time.
return Ok(false);
}

Expand Down Expand Up @@ -1208,8 +1214,15 @@ impl<T: Transport, C: PdClient> Store<T, C> {
.map(|r| r.to_owned());
if let Some(exist_region) = r {
info!("region overlapped {:?}, {:?}", exist_region, snap_region);
self.pending_cross_snap
.insert(region_id, snap_region.get_region_epoch().to_owned());
let peer = &self.region_peers[&region_id];
// In some extreme case, it may happen that a new snapshot is received whereas a snapshot is still in applying
// if the snapshot under applying is generated before merge and the new snapshot is generated after merge,
// update `pending_cross_snap` here may cause source peer destroys itself improperly. So don't update
// `pending_cross_snap` here if peer is applying snapshot.
if !peer.is_applying_snapshot() && !peer.has_pending_snapshot() {
self.pending_cross_snap
.insert(region_id, snap_region.get_region_epoch().to_owned());
}
self.raft_metrics.message_dropped.region_overlap += 1;
return Ok(Some(key));
}
Expand Down Expand Up @@ -1247,6 +1260,9 @@ impl<T: Transport, C: PdClient> Store<T, C> {
}

fn on_raft_ready(&mut self) {
// Only enable the fail point when the store id is equal to 3, which is
// the id of slow store in tests.
fail_point!("on_raft_ready", self.store.get_id() == 3, |_| {});
let t = SlowTimer::new();
let pending_count = self.pending_raft_groups.len();
let previous_ready_metrics = self.raft_metrics.ready.clone();
Expand Down
124 changes: 124 additions & 0 deletions tests/failpoints_cases/test_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,16 +12,23 @@
// limitations under the License.

use std::sync::Arc;
use std::sync::atomic::AtomicBool;
use std::thread;
use std::time::*;

use fail;
use futures::Future;
use kvproto::raft_serverpb::{PeerState, RegionLocalState};
use raft::eraftpb::MessageType;

use tikv::pd::PdClient;
use tikv::raftstore::store::Peekable;
use tikv::raftstore::store::keys;
use tikv::storage::CF_RAFT;
use tikv::util::HandyRwLock;
use tikv::util::config::*;

use raftstore::cluster::Simulator;
use raftstore::node::new_node_cluster;
use raftstore::transport_simulate::*;
use raftstore::util;
Expand Down Expand Up @@ -257,3 +264,120 @@ fn test_node_merge_recover_snapshot() {
cluster.must_transfer_leader(1, new_peer(3, 3));
cluster.must_put(b"k40", b"v5");
}

// Test if a merge handled properly when there are two different snapshots of one region arrive
// in one raftstore tick.
#[test]
fn test_node_merge_multiple_snapshots_together() {
test_node_merge_multiple_snapshots(true)
}

// To be fixed
//
// Test if a merge handled properly when there are two different snapshots of one region arrive
// in different raftstore tick.
// #[test]
// fn test_node_merge_multiple_snapshots_not_together() {
// test_node_merge_multiple_snapshots(false)
// }

fn test_node_merge_multiple_snapshots(together: bool) {
let mut cluster = new_node_cluster(0, 3);
configure_for_merge(&mut cluster);
let pd_client = Arc::clone(&cluster.pd_client);
pd_client.disable_default_operator();
// make it gc quickly to trigger snapshot easily
cluster.cfg.raft_store.raft_log_gc_tick_interval = ReadableDuration::millis(20);
cluster.cfg.raft_store.raft_base_tick_interval = ReadableDuration::millis(10);
cluster.cfg.raft_store.raft_log_gc_count_limit = 10;
cluster.cfg.raft_store.merge_max_log_gap = 9;
cluster.run();

cluster.must_put(b"k1", b"v1");
cluster.must_put(b"k3", b"v3");

let region = pd_client.get_region(b"k1").unwrap();
cluster.must_split(&region, b"k2");
let left = pd_client.get_region(b"k1").unwrap();
let right = pd_client.get_region(b"k3").unwrap();

let target_leader = right
.get_peers()
.iter()
.find(|p| p.get_store_id() == 1)
.unwrap()
.clone();
cluster.must_transfer_leader(right.get_id(), target_leader);
let target_leader = left.get_peers()
.iter()
.find(|p| p.get_store_id() == 2)
.unwrap()
.clone();
cluster.must_transfer_leader(left.get_id(), target_leader);
must_get_equal(&cluster.get_engine(1), b"k3", b"v3");

// So cluster becomes:
// left region: 1 2(leader) I 3
// right region: 1(leader) 2 I 3
// I means isolation.(here just means 3 can not receive append log)
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(left.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));

// Add a collect snapshot filter, it will delay snapshots until have collected multiple snapshots from different peers
cluster.sim.wl().add_recv_filter(
3,
box LeadingDuplicatedSnapshotFilter::new(Arc::new(AtomicBool::new(false)), together),
);
// Write some data to trigger a snapshot of right region.
for i in 200..210 {
let key = format!("k{}", i);
let value = format!("v{}", i);
cluster.must_put(key.as_bytes(), value.as_bytes());
}
// Wait for snapshot to generate and send
thread::sleep(Duration::from_millis(100));

// Merge left and right region, due to isolation, the regions on store 3 are not merged yet.
pd_client.must_merge(left.get_id(), right.get_id());
thread::sleep(Duration::from_millis(200));

// Let peer of right region on store 3 to make append response to trigger a new snapshot
// one is snapshot before merge, the other is snapshot after merge.
// Then the old and new snapshot messages are received and handled in one tick,
// so `pending_cross_snap` may updated improperly and make merge source peer destory itself.
// Here blocks raftstore for a while to make it not to apply snapshot and receive new log now.
fail::cfg("on_raft_ready", "sleep(100)").unwrap();
cluster.clear_send_filters();
thread::sleep(Duration::from_millis(200));
// Filter message again to make sure peer on store 3 can not catch up CommitMerge log
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(left.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));
cluster.add_send_filter(CloneFilterFactory(
RegionPacketFilter::new(right.get_id(), 3)
.direction(Direction::Recv)
.msg_type(MessageType::MsgAppend),
));
// Cause filter is added again, no need to block raftstore anymore
fail::cfg("on_raft_ready", "off").unwrap();

// Wait some time to let already merged peer on store 1 or store 2 to notify
// the peer of left region on store 3 is stale, and then the peer will check
// `pending_cross_snap`
thread::sleep(Duration::from_millis(300));

cluster.must_put(b"k9", b"v9");
// let follower can reach the new log, then commit merge
cluster.clear_send_filters();
must_get_equal(&cluster.get_engine(3), b"k9", b"v9");
}
11 changes: 9 additions & 2 deletions tests/raftstore/transport_simulate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -526,14 +526,17 @@ pub struct LeadingDuplicatedSnapshotFilter {
dropped: AtomicBool,
stale: Arc<AtomicBool>,
last_msg: Mutex<Option<RaftMessage>>,
// whether the two different snapshots will send together
together: bool,
}

impl LeadingDuplicatedSnapshotFilter {
pub fn new(stale: Arc<AtomicBool>) -> LeadingDuplicatedSnapshotFilter {
pub fn new(stale: Arc<AtomicBool>, together: bool) -> LeadingDuplicatedSnapshotFilter {
LeadingDuplicatedSnapshotFilter {
dropped: AtomicBool::new(false),
stale,
last_msg: Mutex::new(None),
together,
}
}
}
Expand All @@ -555,7 +558,11 @@ impl Filter<StoreMsg> for LeadingDuplicatedSnapshotFilter {
if msg.get_message().get_msg_type() == MessageType::MsgSnapshot && !stale {
if last_msg.as_ref().map_or(false, |l| l != &msg) {
to_send.push(StoreMsg::RaftMessage(last_msg.take().unwrap()));
*last_msg = Some(msg);
if self.together {
to_send.push(StoreMsg::RaftMessage(msg));
} else {
*last_msg = Some(msg);
}
stale = true;
} else {
self.dropped.store(true, Ordering::Relaxed);
Expand Down
2 changes: 1 addition & 1 deletion tests/raftstore_cases/test_snap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ fn test_huge_snapshot<T: Simulator>(cluster: &mut Cluster<T>) {
let stale = Arc::new(AtomicBool::new(false));
cluster.sim.wl().add_recv_filter(
3,
box LeadingDuplicatedSnapshotFilter::new(Arc::clone(&stale)),
box LeadingDuplicatedSnapshotFilter::new(Arc::clone(&stale), false),
);
pd_client.must_add_peer(r1, new_peer(3, 3));
let mut i = 2 * 1024;
Expand Down

0 comments on commit 3464d2c

Please sign in to comment.