diff --git a/Cargo.lock b/Cargo.lock index b84372f..13a3b64 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -158,15 +158,6 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2da6da31387c7e4ef160ffab6d5e7f00c42626fe39aea70a7b0f1773f7dd6c1b" -[[package]] -name = "concurrent-queue" -version = "2.2.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "62ec6771ecfa0762d24683ee5a32ad78487a3d3afdc0fb8cae19d2c5deb50b7c" -dependencies = [ - "crossbeam-utils", -] - [[package]] name = "console" version = "0.15.7" @@ -215,20 +206,6 @@ dependencies = [ "itertools 0.10.5", ] -[[package]] -name = "crossbeam" -version = "0.8.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2801af0d36612ae591caa9568261fddce32ce6e08a7275ea334a06a4ad021a2c" -dependencies = [ - "cfg-if", - "crossbeam-channel", - "crossbeam-deque", - "crossbeam-epoch", - "crossbeam-queue", - "crossbeam-utils", -] - [[package]] name = "crossbeam-channel" version = "0.5.8" @@ -263,16 +240,6 @@ dependencies = [ "scopeguard", ] -[[package]] -name = "crossbeam-queue" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d1cfb3ea8a53f37c40dea2c7bedcbd88bdfae54f5e2175d6ecaff1c988353add" -dependencies = [ - "cfg-if", - "crossbeam-utils", -] - [[package]] name = "crossbeam-utils" version = "0.8.16" @@ -311,7 +278,7 @@ checksum = "eecf8589574ce9b895052fa12d69af7a233f99e6107f5cb8dd1044f2a17bfdcb" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.32", ] [[package]] @@ -540,25 +507,6 @@ dependencies = [ "tracing-subscriber", ] -[[package]] -name = "mephisto-raftstore" -version = "0.1.0" -dependencies = [ - "anyhow", - "bytes", - "crossbeam", - "mephisto-raft", - "polling", - "prost", - "prost-build", - "protobuf-src", - "rayon", - "scopeguard", - "tracing", - "tracing-subscriber", - "uuid", -] - [[package]] name = "multimap" version = "0.8.3" @@ -656,22 +604,6 @@ dependencies = [ "plotters-backend", ] -[[package]] -name = "polling" -version = "2.8.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4b2d323e8ca7996b3e23126511a523f7e62924d93ecd5ae73b333815b0eb3dce" -dependencies = [ - "autocfg", - "bitflags 1.3.2", - "cfg-if", - "concurrent-queue", - "libc", - "log", - "pin-project-lite", - "windows-sys 0.48.0", -] - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -680,12 +612,12 @@ checksum = "5b40af805b3121feab8a3c29f04d8ad262fa8e0561883e7653e024ae4479e6de" [[package]] name = "prettyplease" -version = "0.1.25" +version = "0.2.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6c8646e95016a7a6c4adea95bafa8a16baab64b583356217f2c85db4a39d9a86" +checksum = "ae005bd773ab59b4725093fd7df83fd7892f7d8eafb48dbd7de6e024e4215f9d" dependencies = [ "proc-macro2", - "syn 1.0.109", + "syn 2.0.32", ] [[package]] @@ -723,9 +655,9 @@ dependencies = [ [[package]] name = "prost" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "0b82eaa1d779e9a4bc1c3217db8ffbeabaae1dca241bf70183242128d48681cd" +checksum = "f4fdd22f3b9c31b53c060df4a0613a1c7f062d4115a2b984dd15b1858f7e340d" dependencies = [ "bytes", "prost-derive", @@ -733,44 +665,44 @@ dependencies = [ [[package]] name = "prost-build" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "119533552c9a7ffacc21e099c24a0ac8bb19c2a2a3f363de84cd9b844feab270" +checksum = "8bdf592881d821b83d471f8af290226c8d51402259e9bb5be7f9f8bdebbb11ac" dependencies = [ "bytes", "heck", - "itertools 0.10.5", - "lazy_static", + "itertools 0.11.0", "log", "multimap", + "once_cell", "petgraph", "prettyplease", "prost", "prost-types", "regex", - "syn 1.0.109", + "syn 2.0.32", "tempfile", "which", ] [[package]] name = "prost-derive" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e5d2d8d10f3c6ded6da8b05b5fb3b8a5082514344d56c9f871412d29b4e075b4" +checksum = "265baba7fabd416cf5078179f7d2cbeca4ce7a9041111900675ea7c4cb8a4c32" dependencies = [ "anyhow", - "itertools 0.10.5", + "itertools 0.11.0", "proc-macro2", "quote", - "syn 1.0.109", + "syn 2.0.32", ] [[package]] name = "prost-types" -version = "0.11.9" +version = "0.12.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "213622a1460818959ac1181aaeb2dc9c7f63df720db7d788b3e24eacd1983e13" +checksum = "e081b29f63d83a4bc75cfc9f3fe424f9156cf92d8a4f0c9407cce9a1b67327cf" dependencies = [ "prost", ] @@ -940,7 +872,7 @@ checksum = "aafe972d60b0b9bee71a91b92fee2d4fb3c9d7e8f6b179aa99f27203d99a4816" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.32", ] [[package]] @@ -1002,9 +934,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.29" +version = "2.0.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c324c494eba9d92503e6f1ef2e6df781e78f6a7705a0202d9801b198807d518a" +checksum = "239814284fd6f1a4ffe4ca893952cdd93c224b6a1571c9a9eadd670295c0c9e2" dependencies = [ "proc-macro2", "quote", @@ -1041,7 +973,7 @@ checksum = "6bb623b56e39ab7dcd4b1b98bb6c8f8d907ed255b18de254088016b27a8ee19b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.32", ] [[package]] @@ -1084,7 +1016,7 @@ checksum = "5f4f31f56159e98206da9efd823404b79b6ef3143b4a7ab76e67b1751b25a4ab" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.32", ] [[package]] @@ -1134,15 +1066,6 @@ version = "1.10.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1dd624098567895118886609431a7c3b8f516e41d30e0643f03d94592a147e36" -[[package]] -name = "uuid" -version = "1.4.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79daa5ed5740825c40b389c5e50312b9c86df53fccd33f281df655642b43869d" -dependencies = [ - "getrandom", -] - [[package]] name = "valuable" version = "0.1.0" @@ -1192,7 +1115,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.32", "wasm-bindgen-shared", ] @@ -1214,7 +1137,7 @@ checksum = "54681b18a46765f095758388f2d0cf16eb8d4169b639ab575a8f5693af210c7b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.29", + "syn 2.0.32", "wasm-bindgen-backend", "wasm-bindgen-shared", ] diff --git a/Cargo.toml b/Cargo.toml index 39abbed..ab5354e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,6 @@ [workspace] members = [ 'src/raft', - 'src/raftstore', ] resolver = "2" diff --git a/src/raft/Cargo.toml b/src/raft/Cargo.toml index c87cdd2..55d7d80 100644 --- a/src/raft/Cargo.toml +++ b/src/raft/Cargo.toml @@ -31,7 +31,7 @@ enum-iterator = "1.4.1" fxhash = "0.2.1" fail = "0.5.1" getset = "0.1.1" -prost = "0.11.9" +prost = "0.12.1" thiserror = "1.0" rand = "0.8.5" tracing = "0.1.37" @@ -45,5 +45,5 @@ regex = "1.8.4" tracing-subscriber = "0.3.17" [build-dependencies] -prost-build = "0.11.9" +prost-build = "0.12.1" protobuf-src = "1.1.0" diff --git a/src/raftstore/Cargo.toml b/src/raftstore/Cargo.toml deleted file mode 100644 index 783c0b9..0000000 --- a/src/raftstore/Cargo.toml +++ /dev/null @@ -1,43 +0,0 @@ -# Copyright 2023 tison -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -[package] -name = "mephisto-raftstore" -version.workspace = true -edition.workspace = true -authors.workspace = true -readme.workspace = true -license.workspace = true -keywords.workspace = true -repository.workspace = true - -[dependencies] -anyhow = "1.0.72" -bytes = "1.4.0" -crossbeam = "0.8.2" -mephisto-raft = { path = "../raft" } -polling = "2.8.0" -prost = "0.11.9" -scopeguard = "1.2.0" -tracing = "0.1.37" -uuid = { version = "1.4.1", features = ["v4"] } - -[dev-dependencies] -rayon = "1.7.0" -tracing-subscriber = "0.3.17" - - -[build-dependencies] -prost-build = "0.11.9" -protobuf-src = "1.1.0" diff --git a/src/raftstore/build.rs b/src/raftstore/build.rs deleted file mode 100644 index 419cebc..0000000 --- a/src/raftstore/build.rs +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -fn main() -> std::io::Result<()> { - std::env::set_var("PROTOC", protobuf_src::protoc()); - - let mut config = prost_build::Config::new(); - config.bytes(["data"]); - config.compile_protos(&["datatree.proto"], &["proto"])?; - - println!("cargo:rerun-if-changed=build.rs"); - println!("cargo:rerun-if-changed=datatree.proto"); - Ok(()) -} diff --git a/src/raftstore/datatree.proto b/src/raftstore/datatree.proto deleted file mode 100644 index b790177..0000000 --- a/src/raftstore/datatree.proto +++ /dev/null @@ -1,51 +0,0 @@ -/* - * Copyright 2023 tison - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -syntax = "proto3"; -package datatree; - -enum RequestType { - ReqCreate = 0; - ReqGetData = 1; -} - -message RequestHeader { - uint64 req_id = 1; - RequestType req_type = 2; -} - -message ReplyHeader { - uint64 req_id = 1; - uint64 txn_id = 2; - int32 err = 3; -} - -message CreateRequest { - string path = 1; - bytes data = 2; -} - -message CreateReply { - string path = 1; -} - -message GetDataRequest { - string path = 1; -} - -message GetDataReply { - bytes data = 1; -} diff --git a/src/raftstore/examples/raftstore.rs b/src/raftstore/examples/raftstore.rs deleted file mode 100644 index 42ab906..0000000 --- a/src/raftstore/examples/raftstore.rs +++ /dev/null @@ -1,141 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use crossbeam::{channel::Receiver, sync::WaitGroup}; -use mephisto_raft::Peer; -use mephisto_raftstore::{ - node::RaftNode, - proto::{ - datatree::{CreateRequest, DataTreeRequest, GetDataRequest, RequestHeader, RequestType}, - FatReply, FatRequest, ReqId, - }, -}; -use tracing::{info, Level}; -use tracing_subscriber::{filter::FilterFn, layer::SubscriberExt, util::SubscriberInitExt, Layer}; - -fn make_create_req(req_id: ReqId, path: String, data: String) -> (FatRequest, Receiver) { - let (tx, rx) = crossbeam::channel::bounded(1); - let req = FatRequest { - req_id, - header: RequestHeader { - req_id: req_id.seq_id, - req_type: RequestType::ReqCreate as i32, - }, - request: DataTreeRequest::Create(CreateRequest { - path, - data: data.into(), - }), - reply: tx, - }; - (req, rx) -} - -fn make_get_data_req(req_id: ReqId, path: String) -> (FatRequest, Receiver) { - let (tx, rx) = crossbeam::channel::bounded(1); - let req = FatRequest { - req_id, - header: RequestHeader { - req_id: req_id.seq_id, - req_type: RequestType::ReqGetData as i32, - }, - request: DataTreeRequest::GetData(GetDataRequest { path }), - reply: tx, - }; - (req, rx) -} - -fn main() -> anyhow::Result<()> { - let filter = FilterFn::new(|meta| { - // filter out logs of the "polling" crate - !meta.target().starts_with("polling") - }) - .with_max_level_hint(Level::INFO); - let layer = tracing_subscriber::fmt::layer(); - - tracing_subscriber::registry() - .with(layer.with_filter(filter)) - .init(); - - let peers = vec![ - Peer { - id: 1, - address: "127.0.0.1:9746".to_string(), - }, - Peer { - id: 2, - address: "127.0.0.1:9846".to_string(), - }, - Peer { - id: 3, - address: "127.0.0.1:9946".to_string(), - }, - ]; - - let mut shutdown_nodes = vec![]; - let shutdown_waiters = WaitGroup::new(); - - let mut requesters = vec![]; - - for peer in peers.iter() { - let node = RaftNode::new(peer.clone(), peers.clone()); - requesters.push(node.tx_request()); - shutdown_nodes.push(node.shutdown()); - let wg = shutdown_waiters.clone(); - let peer_id = peer.id; - node.do_main(peer_id, wg); - } - - let client_ids = [ - uuid::Uuid::new_v4(), - uuid::Uuid::new_v4(), - uuid::Uuid::new_v4(), - ]; - let mut results = vec![]; - let mut seq_id = 1; - for i in 0..20 { - let (req, rx) = make_create_req( - ReqId { - client_id: client_ids[i % 3], - seq_id, - }, - format!("mephisto-{}", i / 2), - format!("value-{}", i), - ); - requesters[i % 3].send(req).unwrap(); - results.push(rx); - seq_id += 1; - - let (req, rx) = make_get_data_req( - ReqId { - client_id: client_ids[i % 3], - seq_id, - }, - format!("mephisto-{}", i / 2), - ); - requesters[i % 3].send(req).unwrap(); - results.push(rx); - seq_id += 1; - } - - for (i, result) in results.iter().enumerate() { - info!("rx.recv()[{i}]={:?}", result.recv()); - } - - for shutdown in shutdown_nodes { - shutdown.shutdown(); - } - shutdown_waiters.wait(); - - Ok(()) -} diff --git a/src/raftstore/src/datatree.rs b/src/raftstore/src/datatree.rs deleted file mode 100644 index 2e466be..0000000 --- a/src/raftstore/src/datatree.rs +++ /dev/null @@ -1,42 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::HashMap; - -use bytes::Bytes; - -use crate::proto::datatree::{ - CreateReply, CreateRequest, DataTreeReply, GetDataReply, GetDataRequest, -}; - -#[derive(Debug, Default)] -pub struct DataTree { - nodes: HashMap, -} - -impl DataTree { - pub fn create(&mut self, req: CreateRequest) -> DataTreeReply { - self.nodes.insert(req.path.clone(), req.data); - DataTreeReply::Create(CreateReply { path: req.path }) - } - - pub fn get_data(&self, req: GetDataRequest) -> DataTreeReply { - let data = self - .nodes - .get(req.path.as_str()) - .cloned() - .unwrap_or_default(); - DataTreeReply::GetData(GetDataReply { data }) - } -} diff --git a/src/raftstore/src/lib.rs b/src/raftstore/src/lib.rs deleted file mode 100644 index ae3ddbe..0000000 --- a/src/raftstore/src/lib.rs +++ /dev/null @@ -1,26 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#![feature(io_error_other)] -#![feature(assert_matches)] - -pub mod datatree; -pub mod node; -pub mod proto; -pub mod router; -pub mod service; -pub mod shutdown; -pub mod transport; - -pub type RaftMessage = mephisto_raft::eraftpb::Message; diff --git a/src/raftstore/src/node.rs b/src/raftstore/src/node.rs deleted file mode 100644 index c4a4cf6..0000000 --- a/src/raftstore/src/node.rs +++ /dev/null @@ -1,438 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - collections::{BTreeMap, VecDeque}, - time::{Duration, Instant}, -}; - -use bytes::Bytes; -use crossbeam::{ - channel::{Receiver, Select, Sender, TryRecvError}, - sync::WaitGroup, -}; -use mephisto_raft::{ - eraftpb, - eraftpb::{Entry, EntryType}, - storage::MemStorage, - Config, Peer, RawNode, SoftState, INVALID_ID, -}; -use prost::Message; -use tracing::{error, error_span, info, trace}; - -use crate::{ - datatree::DataTree, - proto::{ - datatree::{CreateRequest, DataTreeRequest, ReplyHeader, RequestHeader, RequestType}, - FatReply, FatRequest, ReqId, - }, - router::RaftRouter, - service::RaftService, - RaftMessage, -}; - -#[derive(Default)] -struct RaftState { - state: SoftState, - applied_index: u64, -} - -pub struct ShutdownNode { - tx_shutdown: Sender<()>, -} - -impl ShutdownNode { - pub fn shutdown(self) { - if let Err(err) = self.tx_shutdown.send(()) { - // receiver closed - already shutdown - trace!(?err, "shutdown node cannot send signal"); - } - } -} - -#[derive(Debug, Default)] -struct PendingReads { - head_write_seq: u64, - seen_write_seq: u64, - reads: VecDeque, -} - -#[derive(Debug)] -struct PendingRead { - read_index: u64, - wait_write_seq: u64, - request: FatRequest, -} - -pub struct RaftNode { - this: Peer, - peers: Vec, - node: RawNode, - state: RaftState, - - // mock datatree - datatree: DataTree, - - sending_requests: VecDeque, - pending_requests: BTreeMap, - pending_reads: BTreeMap, - - tx_shutdown: Sender<()>, - rx_shutdown: Receiver<()>, - tx_request: Sender, - rx_request: Receiver, - tx_message: Sender, - rx_message: Receiver, - tick: Receiver, -} - -impl RaftNode { - pub fn new(this: Peer, peers: Vec) -> RaftNode { - let id = this.id; - - let config = { - let mut config = Config::new(id); - config.pre_vote = true; - config.priority = (1 << id) as i64; - config.election_tick = 10; - config.heartbeat_tick = 1; - config.max_size_per_msg = 1024 * 1024 * 1024; - config.validate().unwrap(); - config - }; - - let voters = peers.iter().map(|p| p.id).collect::>(); - let storage = MemStorage::new_with_conf_state((voters, vec![])); - storage.wl().mut_hard_state().term = 1; - let node = RawNode::new(&config, storage).unwrap(); - - let (tx_message, rx_message) = crossbeam::channel::unbounded(); - let (tx_request, rx_request) = crossbeam::channel::unbounded(); - let (tx_shutdown, rx_shutdown) = crossbeam::channel::bounded(1); - let tick = crossbeam::channel::tick(Duration::from_millis(100)); - - RaftNode { - this, - peers, - node, - state: RaftState::default(), - datatree: Default::default(), - sending_requests: VecDeque::new(), - pending_requests: BTreeMap::new(), - pending_reads: BTreeMap::new(), - tx_shutdown, - rx_shutdown, - tx_request, - rx_request, - tx_message, - rx_message, - tick, - } - } - - pub fn tx_request(&self) -> Sender { - self.tx_request.clone() - } - - pub fn shutdown(&self) -> ShutdownNode { - ShutdownNode { - tx_shutdown: self.tx_shutdown.clone(), - } - } - - pub fn do_main(self, id: u64, wg: WaitGroup) { - std::thread::spawn(move || { - error_span!("node", id).in_scope(|| match self.internal_do_main() { - Ok(()) => info!("node shutdown"), - Err(err) => error!(?err, "node crashed"), - }); - drop(wg); - }); - } - - pub fn internal_do_main(mut self) -> anyhow::Result<()> { - let service = RaftService::new(self.this.clone(), self.tx_message.clone())?; - let shutdown_service = service.shutdown(); - let shutdown_waiters = WaitGroup::new(); - service.do_main(self.this.id, shutdown_waiters.clone()); - - let (router, shutdown_router) = RaftRouter::new(self.this.clone(), self.peers.clone()); - let _guard = scopeguard::guard((), move |_| { - shutdown_router.shutdown(); - shutdown_service.shutdown(); - shutdown_waiters.wait(); - }); - - loop { - if self.process_inbound()? { - return Ok(()); - } - - self.process_ready(&router)?; - } - } - - // return true if shutdown - fn process_inbound(&mut self) -> anyhow::Result { - let mut select = Select::new(); - select.recv(&self.rx_shutdown); - select.recv(&self.rx_message); - select.recv(&self.rx_request); - select.recv(&self.tick); - select.ready(); - - if !matches!(self.rx_shutdown.try_recv(), Err(TryRecvError::Empty)) { - return Ok(true); - } - - for _ in self.tick.try_iter() { - if self.node.tick() { - return Ok(false); - } - } - - for req in self.rx_request.try_iter() { - self.sending_requests.push_back(req); - } - - if self.state.state.leader_id != INVALID_ID { - for request in self.sending_requests.drain(..) { - match request.request { - DataTreeRequest::Create(ref req) => { - let mut data = request.header.encode_length_delimited_to_vec(); - data.append(&mut req.encode_length_delimited_to_vec()); - self.node.propose(request.req_id.serialize(), data)?; - let entry = self - .pending_reads - .entry(request.req_id.client_id) - .or_default(); - entry.head_write_seq = request.req_id.seq_id; - self.pending_requests.insert(request.req_id, request); - } - DataTreeRequest::GetData(_) => { - self.node.read_index(request.req_id.serialize()); - let entry = self - .pending_reads - .entry(request.req_id.client_id) - .or_default(); - entry.reads.push_back(PendingRead { - read_index: u64::MAX, - wait_write_seq: entry.head_write_seq, - request, - }); - } - } - } - } - - for reads in self.pending_reads.values_mut() { - for read in reads.reads.iter_mut() { - if read.read_index == u64::MAX { - self.node.read_index(read.request.req_id.serialize()); - } - } - } - - for msg in self.rx_message.try_iter() { - self.node.step(msg)?; - } - - Ok(false) - } - - fn process_ready(&mut self, router: &RaftRouter) -> anyhow::Result<()> { - if !self.node.has_ready() { - return Ok(()); - } - - let mut ready = self.node.ready(); - - if let Some(ss) = ready.ss() { - if ss.raft_state != self.state.state.raft_state { - info!( - "changing raft node role from {:?} to {:?}", - self.state.state.raft_state, ss.raft_state - ); - } - self.state.state = *ss; - } - - for msg in ready.take_messages() { - self.process_message(msg, router); - } - - if let Some(hs) = ready.hs() { - self.node.store().wl().set_hard_state(hs.clone()); - } - - if !ready.entries().is_empty() { - self.node.store().wl().append(ready.entries())?; - } - - for msg in ready.take_persisted_messages() { - self.process_message(msg, router); - } - - for read_state in ready.take_read_states() { - let req_id = ReqId::deserialize(read_state.request_ctx.as_slice()); - if let Some(req) = self.pending_reads.get_mut(&req_id.client_id) { - for read in req.reads.iter_mut() { - let ReqId { seq_id, client_id } = read.request.req_id; - debug_assert_eq!(req_id.client_id, client_id); - if seq_id > req_id.seq_id { - break; - } - read.read_index = read.read_index.min(read_state.index); - } - } - } - - self.process_pending_reads(None); - - for entry in ready.take_committed_entries() { - self.process_committed_entry(entry)?; - self.process_pending_reads(None); - } - - self.node.advance(ready); - Ok(()) - } - - fn process_message(&self, msg: RaftMessage, router: &RaftRouter) { - match msg.msg_type() { - eraftpb::MessageType::MsgAppend - | eraftpb::MessageType::MsgRequestPreVote - | eraftpb::MessageType::MsgAppendResponse - | eraftpb::MessageType::MsgRequestPreVoteResponse - | eraftpb::MessageType::MsgRequestVote - | eraftpb::MessageType::MsgRequestVoteResponse - | eraftpb::MessageType::MsgHeartbeat - | eraftpb::MessageType::MsgHeartbeatResponse - | eraftpb::MessageType::MsgPropose - | eraftpb::MessageType::MsgReadIndex - | eraftpb::MessageType::MsgReadIndexResp => { - if msg.to != self.this.id { - router.send(msg.to, msg); - } else { - unreachable!("outbound message send to self {msg:?}") - } - } - _ => unimplemented!("unimplemented {msg:?}"), - } - } - - // process pending reads if applied_index >= read_index - fn process_pending_reads(&mut self, req_id: Option) { - let reads = match req_id { - None => self.pending_reads.values_mut().collect(), - Some(ReqId { client_id, .. }) => { - if let Some(reads) = self.pending_reads.get_mut(&client_id) { - vec![reads] - } else { - vec![] - } - } - }; - - for reads in reads { - let seen_write_seq = reads.seen_write_seq; - let seen_apply_index = self.state.applied_index; - while let Some(read) = reads.reads.front() { - if read.wait_write_seq <= seen_write_seq && read.read_index <= seen_apply_index { - let read = reads.reads.pop_front().unwrap(); - match read.request.request { - DataTreeRequest::GetData(request) => { - let reply = self.datatree.get_data(request); - let reply = FatReply { - header: ReplyHeader { - req_id: read.request.req_id.seq_id, - txn_id: self.state.applied_index, - err: 0, - }, - reply, - }; - read.request.reply.send(reply).unwrap(); - } - r => unreachable!("illegal request {:?}", r), - } - } else { - break; - } - } - } - } - - fn process_committed_entry(&mut self, entry: Entry) -> anyhow::Result<()> { - let applied_index = self.state.applied_index.max(entry.index); - match entry.entry_type() { - EntryType::EntryNormal => { - if entry.data.is_empty() { - // empty entry indicate a becoming leader event - self.state.applied_index = applied_index; - return Ok(()); - } - - let req_id = ReqId::deserialize(entry.context.as_slice()); - let (hdr, request) = { - let mut payload = Bytes::from(entry.data); - let hdr = RequestHeader::decode_length_delimited(&mut payload)?; - let request = match hdr.req_type() { - RequestType::ReqCreate => DataTreeRequest::Create( - CreateRequest::decode_length_delimited(&mut payload)?, - ), - r => unreachable!("illegal request type {:?}", r), - }; - (hdr, request) - }; - - match request { - DataTreeRequest::Create(request) => { - let reply = self.datatree.create(request); - if let Some(reads) = self.pending_reads.get_mut(&req_id.client_id) { - for read in reads.reads.iter_mut() { - if read.wait_write_seq < req_id.seq_id { - // seen latter write, must flush - read.read_index = 0; - } else { - break; - } - } - self.process_pending_reads(Some(req_id)); - } - if let Some(sending_reply) = self.pending_requests.remove(&req_id) { - let reply = FatReply { - header: ReplyHeader { - req_id: req_id.seq_id, - txn_id: applied_index, - err: 0, - }, - reply, - }; - sending_reply.reply.send(reply).unwrap(); - } - } - _ => unreachable!("illegal request type {:?}", hdr.req_type()), - } - - if let Some(reads) = self.pending_reads.get_mut(&req_id.client_id) { - reads.seen_write_seq = req_id.seq_id; - } - } - EntryType::EntryConfChange => unimplemented!("EntryConfChange"), - } - - self.state.applied_index = applied_index; - Ok(()) - } -} diff --git a/src/raftstore/src/proto.rs b/src/raftstore/src/proto.rs deleted file mode 100644 index 6426560..0000000 --- a/src/raftstore/src/proto.rs +++ /dev/null @@ -1,76 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::mem::size_of; - -use bytes::{Buf, BufMut, BytesMut}; -use crossbeam::channel::Sender; - -use crate::proto::datatree::{DataTreeReply, DataTreeRequest, ReplyHeader, RequestHeader}; - -pub mod datatree { - include!(concat!(env!("OUT_DIR"), "/datatree.rs")); - - #[derive(Debug, Clone)] - pub enum DataTreeRequest { - Create(CreateRequest), - GetData(GetDataRequest), - } - - #[derive(Debug, Clone)] - pub enum DataTreeReply { - Error(i32), - Create(CreateReply), - GetData(GetDataReply), - } -} - -#[derive(Debug, Copy, Clone, Hash, Ord, PartialOrd, Eq, PartialEq)] -pub struct ReqId { - pub client_id: uuid::Uuid, - pub seq_id: u64, -} - -impl ReqId { - pub fn serialize(&self) -> Vec { - let mut data = BytesMut::with_capacity(3 * size_of::()); - let (msb, lsb) = self.client_id.as_u64_pair(); - data.put_u64(msb); - data.put_u64(lsb); - data.put_u64(self.seq_id); - data.to_vec() - } - - pub fn deserialize(mut data: B) -> ReqId { - let msb = data.get_u64(); - let lsb = data.get_u64(); - let client_id = uuid::Uuid::from_u64_pair(msb, lsb); - let seq_id = data.get_u64(); - ReqId { client_id, seq_id } - } -} - -#[derive(Debug)] -pub struct FatRequest { - pub req_id: ReqId, - pub header: RequestHeader, - pub request: DataTreeRequest, - pub reply: Sender, -} - -#[derive(Debug)] -pub struct FatReply { - pub header: ReplyHeader, - pub reply: DataTreeReply, -} diff --git a/src/raftstore/src/router.rs b/src/raftstore/src/router.rs deleted file mode 100644 index 6700d36..0000000 --- a/src/raftstore/src/router.rs +++ /dev/null @@ -1,170 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{collections::HashMap, io, io::Write, net::TcpStream}; - -use bytes::BufMut; -use crossbeam::{ - channel::{Receiver, Select, Sender, TryRecvError}, - sync::WaitGroup, -}; -use mephisto_raft::Peer; -use prost::Message; -use tracing::{error, error_span, info, trace}; - -use crate::{transport::Transport, RaftMessage}; - -pub struct ShutdownRouter { - shutdown_signals: Vec>, - shutdown_waiters: WaitGroup, -} - -impl ShutdownRouter { - pub fn shutdown(self) { - for shutdown in self.shutdown_signals { - if let Err(err) = shutdown.send(()) { - // receiver closed - already shutdown - trace!(?err, "shutdown router connection cannot send signal"); - } - } - self.shutdown_waiters.wait(); - } -} - -pub struct RaftRouter { - this: Peer, - peers: HashMap>, -} - -impl RaftRouter { - pub fn new(this: Peer, peer_list: Vec) -> (RaftRouter, ShutdownRouter) { - let mut shutdown_signals = vec![]; - let shutdown_waiters = WaitGroup::new(); - - let mut peers = HashMap::new(); - for peer in peer_list { - let peer_id = peer.id; - if this.id == peer_id { - continue; - } - - let (tx_message, rx_message) = crossbeam::channel::unbounded(); - let (tx_shutdown, rx_shutdown) = crossbeam::channel::bounded(1); - peers.insert(peer.id, tx_message); - shutdown_signals.push(tx_shutdown); - - let c = Connection { - peer, - rx_message, - rx_shutdown, - }; - c.do_main(this.id, peer_id, shutdown_waiters.clone()); - } - - let router = RaftRouter { this, peers }; - let shutdown = ShutdownRouter { - shutdown_signals, - shutdown_waiters, - }; - (router, shutdown) - } - - pub fn send(&self, to: u64, msg: RaftMessage) { - debug_assert_ne!(to, self.this.id); - let peer = self.peers.get(&to).unwrap(); - peer.send(msg).unwrap(); - } -} - -struct Connection { - peer: Peer, - rx_message: Receiver, - rx_shutdown: Receiver<()>, -} - -// tolerate maybe server not yet started -#[derive(Debug, Default)] -struct RetryRefused { - times: i32, -} - -impl RetryRefused { - fn retry(&mut self, err: &io::Error) -> bool { - self.times += 1; - matches!(err.kind(), io::ErrorKind::ConnectionRefused) && self.times <= 10 - } - - fn reset(&mut self) { - self.times = 0; - } -} - -impl Connection { - fn do_main(self, this: u64, peer: u64, wg: WaitGroup) { - std::thread::spawn(move || { - error_span!("router", this, peer).in_scope(|| match self.internal_do_main() { - Ok(()) => info!("router closed"), - Err(err) => error!(?err, "router failed"), - }); - drop(wg); - }); - } - - fn internal_do_main(self) -> io::Result<()> { - let mut retry_refused = RetryRefused::default(); - let mut transport = Transport::default(); - let mut socket = None; - - loop { - if transport.write_context().buffer().is_empty() { - let mut select = Select::new(); - select.recv(&self.rx_shutdown); - select.recv(&self.rx_message); - select.ready(); - } - - if !matches!(self.rx_shutdown.try_recv(), Err(TryRecvError::Empty)) { - return Ok(()); - } - - for msg in self.rx_message.try_iter() { - let buf = msg.encode_to_vec(); - { - let n = buf.len(); - let buf = u32::to_be_bytes(n as u32); - transport.write_context().buffer().put_slice(buf.as_slice()); - } - transport.write_context().buffer().put_slice(buf.as_slice()); - } - - let stream = match socket { - None => match TcpStream::connect(self.peer.address.clone()) { - Ok(stream) => { - retry_refused.reset(); - stream.set_nonblocking(true)?; - stream.set_nodelay(true)?; - socket = Some(stream); - socket.as_mut().unwrap() - } - Err(ref err) if retry_refused.retry(err) => continue, - Err(err) => return Err(err), - }, - Some(ref mut stream) => stream, - }; - - transport.write_bytes(stream)?; - stream.flush()?; - } - } -} diff --git a/src/raftstore/src/service.rs b/src/raftstore/src/service.rs deleted file mode 100644 index 232d578..0000000 --- a/src/raftstore/src/service.rs +++ /dev/null @@ -1,209 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - io, - mem::size_of, - net::{TcpListener, TcpStream}, - ops::DerefMut, - sync::{ - atomic::{AtomicBool, Ordering}, - Arc, - }, -}; - -use bytes::Buf; -use crossbeam::{channel::Sender, sync::WaitGroup}; -use mephisto_raft::Peer; -use polling::{Event, Poller}; -use tracing::{debug, error, error_span, info, trace}; - -use crate::{ - shutdown::ShutdownIO, - transport::{ReadContext, Transport}, - RaftMessage, -}; - -fn interrupted(err: &io::Error) -> bool { - matches!(err.kind(), io::ErrorKind::Interrupted) -} - -fn would_block(err: &io::Error) -> bool { - matches!(err.kind(), io::ErrorKind::WouldBlock) -} - -pub struct RaftService { - this: Peer, - tx_message: Sender, - poller: Arc, - is_shutdown: Arc, -} - -impl RaftService { - pub fn shutdown(&self) -> ShutdownIO { - ShutdownIO::new(self.poller.clone(), self.is_shutdown.clone()) - } - - pub fn new(this: Peer, tx_message: Sender) -> io::Result { - Ok(RaftService { - this, - tx_message, - poller: Arc::new(Poller::new()?), - is_shutdown: Arc::new(AtomicBool::new(false)), - }) - } - - pub fn do_main(self, id: u64, wg: WaitGroup) { - std::thread::spawn(move || { - error_span!("service", id).in_scope(|| match self.internal_do_main() { - Ok(()) => info!("raft service closed"), - Err(err) => error!(?err, "raft service failed"), - }); - drop(wg); - }); - } - - fn internal_do_main(self) -> io::Result<()> { - info!( - address = self.this.address, - "raft service is serving requests" - ); - - let listener = TcpListener::bind(self.this.address)?; - listener.set_nonblocking(true)?; - self.poller.add(&listener, Event::readable(1))?; - - let mut guard = scopeguard::guard( - (vec![], WaitGroup::new()), - |(signals, waiters): (Vec, WaitGroup)| { - for shutdown in signals { - shutdown.shutdown(); - } - waiters.wait(); - }, - ); - - let (shutdown_signals, shutdown_waiters) = guard.deref_mut(); - - loop { - let mut events = vec![]; - self.poller.wait(&mut events, None)?; - - if self.is_shutdown.load(Ordering::SeqCst) { - return Ok(()); - } - - while let Some(socket) = match listener.incoming().next() { - None => None, - Some(Err(ref err)) if would_block(err) => None, - Some(Err(ref err)) if interrupted(err) => None, - Some(Err(err)) => return Err(err), - Some(Ok(socket)) => Some(socket), - } { - let address = socket.peer_addr()?; - trace!("accepted connection to {}", address); - let c = Connection::new(socket, self.tx_message.clone())?; - shutdown_signals.push(c.shutdown()); - c.do_main(self.this.id, format!("{address}"), shutdown_waiters.clone()); - } - - self.poller.modify(&listener, Event::readable(1))?; - } - } -} - -struct Connection { - socket: TcpStream, - transport: Transport, - tx_message: Sender, - poller: Arc, - is_shutdown: Arc, -} - -impl Connection { - fn new(socket: TcpStream, tx_message: Sender) -> io::Result { - Ok(Connection { - socket, - tx_message, - transport: Transport::default(), - poller: Arc::new(Poller::new()?), - is_shutdown: Arc::new(AtomicBool::new(false)), - }) - } - - fn shutdown(&self) -> ShutdownIO { - ShutdownIO::new(self.poller.clone(), self.is_shutdown.clone()) - } - - fn do_main(self, id: u64, address: String, wg: WaitGroup) { - std::thread::spawn(move || { - error_span!("srvconn", id, address).in_scope(|| match self.internal_do_main() { - Ok(()) => info!("srvconn closed"), - Err(err) => error!(?err, "srvconn failed"), - }); - drop(wg); - }); - } - - fn internal_do_main(mut self) -> io::Result<()> { - self.socket.set_nodelay(true)?; - self.socket.set_nonblocking(true)?; - self.poller.add(&self.socket, Event::readable(1))?; - - loop { - let mut events = vec![]; - self.poller.wait(&mut events, None)?; - if self.is_shutdown.load(Ordering::SeqCst) { - return Ok(()); - } - - let mut read_ctx = self.transport.read_bytes(&mut self.socket)?; - - while let Some(req) = try_read_one_message(&mut read_ctx)? { - debug!(?req, "receive service message"); - self.tx_message.send(req).unwrap(); - } - - if read_ctx.is_closed() { - return Ok(()); - } - - self.poller.modify(&self.socket, Event::readable(1))?; - } - } -} - -fn try_read_one_message(read_ctx: &mut ReadContext) -> io::Result> { - use prost::Message; - - let read_buf = read_ctx.buffer(); - - let mut peek_buf = read_buf.chunk(); - if peek_buf.remaining() < size_of::() { - return Ok(None); - } - let n = peek_buf.get_u32() as usize; - if read_buf.remaining() < size_of::() + n { - return Ok(None); - } - - read_buf.advance(size_of::()); - match RaftMessage::decode(&read_buf.chunk()[..n]) { - Ok(msg) => { - read_buf.advance(n); - Ok(Some(msg)) - } - Err(err) => Err(io::Error::other(err)), - } -} diff --git a/src/raftstore/src/shutdown.rs b/src/raftstore/src/shutdown.rs deleted file mode 100644 index 89b525e..0000000 --- a/src/raftstore/src/shutdown.rs +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::sync::{ - atomic::{AtomicBool, Ordering}, - Arc, -}; - -use polling::Poller; -use tracing::trace; - -pub struct ShutdownIO { - poller: Arc, - shutdown: Arc, -} - -impl ShutdownIO { - pub fn new(poller: Arc, shutdown: Arc) -> ShutdownIO { - ShutdownIO { poller, shutdown } - } - - pub fn shutdown(&self) { - self.shutdown.store(true, Ordering::SeqCst); - if let Err(err) = self.poller.notify() { - trace!(?err, "shutdown IO cannot notify the poller"); - } - } -} diff --git a/src/raftstore/src/transport.rs b/src/raftstore/src/transport.rs deleted file mode 100644 index 6c82a6f..0000000 --- a/src/raftstore/src/transport.rs +++ /dev/null @@ -1,128 +0,0 @@ -// Copyright 2023 tison -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::{ - io, - io::{Read, Write}, - slice, -}; - -use bytes::{Buf, BufMut, BytesMut}; - -fn interrupted(err: &io::Error) -> bool { - matches!(err.kind(), io::ErrorKind::Interrupted) -} - -fn would_block(err: &io::Error) -> bool { - matches!(err.kind(), io::ErrorKind::WouldBlock) -} - -#[derive(Debug)] -pub struct ReadContext<'a> { - buf: &'a mut BytesMut, - closed: bool, -} - -impl<'a> ReadContext<'a> { - pub fn buffer(&mut self) -> &mut impl Buf { - self.buf - } - - pub fn is_closed(&self) -> bool { - self.closed - } -} - -#[derive(Debug)] -pub struct WriteContext<'a> { - buf: &'a mut BytesMut, -} - -impl<'a> WriteContext<'a> { - pub fn buffer(&mut self) -> &mut BytesMut { - self.buf - } -} - -#[derive(Debug, Default)] -pub struct Transport { - read_buf: BytesMut, - write_buf: BytesMut, - written: usize, -} - -impl Transport { - pub fn write_context(&mut self) -> WriteContext { - WriteContext { - buf: &mut self.write_buf, - } - } - - pub fn read_bytes(&mut self, socket: &mut R) -> io::Result { - const READ_BYTES_LEN: usize = 4096; - - let mut connection_closed = false; - loop { - self.read_buf.reserve(READ_BYTES_LEN); - let buf = unsafe { - let chunk_mut_ptr = self.read_buf.chunk_mut().as_mut_ptr(); - slice::from_raw_parts_mut(chunk_mut_ptr, READ_BYTES_LEN) - }; - match socket.read(buf) { - Ok(0) => { - // Reading 0 bytes means the other side has closed the - // connection or is done writing, then so are we. - connection_closed = true; - break; - } - Ok(n) => unsafe { - self.read_buf.advance_mut(n); - }, - // Would block "errors" are the OS's way of saying that the - // connection is not actually ready to perform this I/O operation. - Err(ref err) if would_block(err) => break, - Err(ref err) if interrupted(err) => continue, - // Other errors we'll consider fatal. - Err(err) => return Err(err), - } - } - - Ok(ReadContext { - buf: &mut self.read_buf, - closed: connection_closed, - }) - } - - pub fn write_bytes(&mut self, socket: &mut W) -> io::Result<()> { - loop { - if self.write_buf.is_empty() { - return Ok(()); - } - - match socket.write(&self.write_buf[self.written..]) { - Ok(n) => { - // consume - self.written += n; - if self.written >= self.write_buf.len() { - self.write_buf.clear(); - self.written = 0; - } - } - Err(ref err) if would_block(err) => return Ok(()), - Err(ref err) if interrupted(err) => continue, - Err(err) => return Err(err), - } - } - } -}