Skip to content
This repository has been archived by the owner on Nov 23, 2023. It is now read-only.

Commit

Permalink
fix: retry read-index to overcome read-index can lose
Browse files Browse the repository at this point in the history
Signed-off-by: tison <wander4096@gmail.com>
  • Loading branch information
tisonkun committed Aug 25, 2023
1 parent 5aeeb5f commit 721981a
Show file tree
Hide file tree
Showing 2 changed files with 43 additions and 40 deletions.
39 changes: 20 additions & 19 deletions src/raftstore/examples/raftstore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,33 +96,34 @@ fn main() -> anyhow::Result<()> {
node.do_main(peer_id, wg);
}

let client_id = uuid::Uuid::new_v4();
let client_ids = [
uuid::Uuid::new_v4(),
uuid::Uuid::new_v4(),
uuid::Uuid::new_v4(),
];
let mut results = vec![];
let mut seq_id = 0;
for i in 0..10 {
let mut seq_id = 1;
for i in 0..20 {
let (req, rx) = make_create_req(
ReqId { client_id, seq_id },
ReqId {
client_id: client_ids[i % 3],
seq_id,
},
format!("mephisto-{}", i / 2),
format!("value-{}", i),
);
requesters[0].send(req).unwrap();
requesters[i % 3].send(req).unwrap();
results.push(rx);
seq_id += 1;

if i % 2 == 0 {
let (req, rx) = make_create_req(
ReqId { client_id, seq_id },
format!("mephisto-{}", i / 2),
format!("value-{}", i + 1002),
);
requesters[0].send(req).unwrap();
results.push(rx);
seq_id += 1;
}

let (req, rx) =
make_get_data_req(ReqId { client_id, seq_id }, format!("mephisto-{}", i / 2));
requesters[0].send(req).unwrap();
let (req, rx) = make_get_data_req(
ReqId {
client_id: client_ids[i % 3],
seq_id,
},
format!("mephisto-{}", i / 2),
);
requesters[i % 3].send(req).unwrap();
results.push(rx);
seq_id += 1;
}
Expand Down
44 changes: 23 additions & 21 deletions src/raftstore/src/node.rs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,14 @@ impl RaftNode {
}
}

for reads in self.pending_reads.values_mut() {
for read in reads.reads.iter_mut() {
if read.read_index == u64::MAX {
self.node.read_index(read.request.req_id.serialize());
}
}
}

for msg in self.rx_message.try_iter() {
self.node.step(msg)?;
}
Expand Down Expand Up @@ -326,23 +334,30 @@ impl RaftNode {

// process pending reads if applied_index >= read_index
fn process_pending_reads(&mut self, req_id: Option<ReqId>) {
fn process_pending_reads_one(
reads: &mut PendingReads,
applied_index: u64,
datatree: &DataTree,
) {
let reads = match req_id {
None => self.pending_reads.values_mut().collect(),
Some(ReqId { client_id, .. }) => {
if let Some(reads) = self.pending_reads.get_mut(&client_id) {
vec![reads]
} else {
vec![]
}
}
};

for reads in reads {
let seen_write_seq = reads.seen_write_seq;
let seen_apply_index = applied_index;
let seen_apply_index = self.state.applied_index;
while let Some(read) = reads.reads.front() {
if read.wait_write_seq <= seen_write_seq && read.read_index <= seen_apply_index {
let read = reads.reads.pop_front().unwrap();
match read.request.request {
DataTreeRequest::GetData(request) => {
let reply = datatree.get_data(request);
let reply = self.datatree.get_data(request);
let reply = FatReply {
header: ReplyHeader {
req_id: read.request.req_id.seq_id,
txn_id: applied_index,
txn_id: self.state.applied_index,
err: 0,
},
reply,
Expand All @@ -356,19 +371,6 @@ impl RaftNode {
}
}
}

match req_id {
None => {
for reads in self.pending_reads.values_mut() {
process_pending_reads_one(reads, self.state.applied_index, &self.datatree);
}
}
Some(ReqId { client_id, .. }) => {
if let Some(reads) = self.pending_reads.get_mut(&client_id) {
process_pending_reads_one(reads, self.state.applied_index, &self.datatree);
}
}
}
}

fn process_committed_entry(&mut self, entry: Entry) -> anyhow::Result<()> {
Expand Down

0 comments on commit 721981a

Please sign in to comment.