diff --git a/src/raftstore/examples/raftstore.rs b/src/raftstore/examples/raftstore.rs index 4679dd3..42ab906 100644 --- a/src/raftstore/examples/raftstore.rs +++ b/src/raftstore/examples/raftstore.rs @@ -96,33 +96,34 @@ fn main() -> anyhow::Result<()> { node.do_main(peer_id, wg); } - let client_id = uuid::Uuid::new_v4(); + let client_ids = [ + uuid::Uuid::new_v4(), + uuid::Uuid::new_v4(), + uuid::Uuid::new_v4(), + ]; let mut results = vec![]; - let mut seq_id = 0; - for i in 0..10 { + let mut seq_id = 1; + for i in 0..20 { let (req, rx) = make_create_req( - ReqId { client_id, seq_id }, + ReqId { + client_id: client_ids[i % 3], + seq_id, + }, format!("mephisto-{}", i / 2), format!("value-{}", i), ); - requesters[0].send(req).unwrap(); + requesters[i % 3].send(req).unwrap(); results.push(rx); seq_id += 1; - if i % 2 == 0 { - let (req, rx) = make_create_req( - ReqId { client_id, seq_id }, - format!("mephisto-{}", i / 2), - format!("value-{}", i + 1002), - ); - requesters[0].send(req).unwrap(); - results.push(rx); - seq_id += 1; - } - - let (req, rx) = - make_get_data_req(ReqId { client_id, seq_id }, format!("mephisto-{}", i / 2)); - requesters[0].send(req).unwrap(); + let (req, rx) = make_get_data_req( + ReqId { + client_id: client_ids[i % 3], + seq_id, + }, + format!("mephisto-{}", i / 2), + ); + requesters[i % 3].send(req).unwrap(); results.push(rx); seq_id += 1; } diff --git a/src/raftstore/src/node.rs b/src/raftstore/src/node.rs index 1cebfc8..c4a4cf6 100644 --- a/src/raftstore/src/node.rs +++ b/src/raftstore/src/node.rs @@ -236,6 +236,14 @@ impl RaftNode { } } + for reads in self.pending_reads.values_mut() { + for read in reads.reads.iter_mut() { + if read.read_index == u64::MAX { + self.node.read_index(read.request.req_id.serialize()); + } + } + } + for msg in self.rx_message.try_iter() { self.node.step(msg)?; } @@ -326,23 +334,30 @@ impl RaftNode { // process pending reads if applied_index >= read_index fn process_pending_reads(&mut self, req_id: Option) { - fn process_pending_reads_one( - reads: &mut PendingReads, - applied_index: u64, - datatree: &DataTree, - ) { + let reads = match req_id { + None => self.pending_reads.values_mut().collect(), + Some(ReqId { client_id, .. }) => { + if let Some(reads) = self.pending_reads.get_mut(&client_id) { + vec![reads] + } else { + vec![] + } + } + }; + + for reads in reads { let seen_write_seq = reads.seen_write_seq; - let seen_apply_index = applied_index; + let seen_apply_index = self.state.applied_index; while let Some(read) = reads.reads.front() { if read.wait_write_seq <= seen_write_seq && read.read_index <= seen_apply_index { let read = reads.reads.pop_front().unwrap(); match read.request.request { DataTreeRequest::GetData(request) => { - let reply = datatree.get_data(request); + let reply = self.datatree.get_data(request); let reply = FatReply { header: ReplyHeader { req_id: read.request.req_id.seq_id, - txn_id: applied_index, + txn_id: self.state.applied_index, err: 0, }, reply, @@ -356,19 +371,6 @@ impl RaftNode { } } } - - match req_id { - None => { - for reads in self.pending_reads.values_mut() { - process_pending_reads_one(reads, self.state.applied_index, &self.datatree); - } - } - Some(ReqId { client_id, .. }) => { - if let Some(reads) = self.pending_reads.get_mut(&client_id) { - process_pending_reads_one(reads, self.state.applied_index, &self.datatree); - } - } - } } fn process_committed_entry(&mut self, entry: Entry) -> anyhow::Result<()> {