From 7db9de709e049adce06634b5e8a4ae28232a5300 Mon Sep 17 00:00:00 2001 From: Gyubong Date: Wed, 23 Aug 2023 18:38:54 +0900 Subject: [PATCH 1/5] Attempt to fix logging race --- Cargo.lock | 233 ++++++++++++++++++++++++++++++++- Cargo.toml | 5 +- requirements.txt | 2 +- rraft.pyi | 18 ++- src/bindings/global.rs | 2 + src/external_bindings/mod.rs | 1 + src/external_bindings/mutex.rs | 77 +++++++++++ src/external_bindings/slog.rs | 92 +++++++++++-- src/lib.rs | 1 + 9 files changed, 414 insertions(+), 17 deletions(-) create mode 100644 src/external_bindings/mutex.rs diff --git a/Cargo.lock b/Cargo.lock index 98ded35..6655234 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,6 +2,21 @@ # It is not intended for manual editing. version = 3 +[[package]] +name = "addr2line" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" + [[package]] name = "aho-corasick" version = "0.7.20" @@ -49,6 +64,21 @@ dependencies = [ "cc", ] +[[package]] +name = "backtrace" +version = "0.3.67" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" +dependencies = [ + "addr2line", + "cc", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", +] + [[package]] name = "bincode" version = "1.3.3" @@ -180,6 +210,95 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" +[[package]] +name = "futures" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" + +[[package]] +name = "futures-executor" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" + +[[package]] +name = "futures-macro" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.11", +] + +[[package]] +name = "futures-sink" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" + +[[package]] +name = "futures-task" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" + +[[package]] +name = "futures-util" +version = "0.3.28" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "fxhash" version = "0.2.1" @@ -223,6 +342,12 @@ dependencies = [ "syn 2.0.11", ] +[[package]] +name = "gimli" +version = "0.27.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" + [[package]] name = "hashbrown" version = "0.12.3" @@ -363,12 +488,31 @@ dependencies = [ "autocfg", ] +[[package]] +name = "miniz_oxide" +version = "0.6.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" +dependencies = [ + "adler", +] + [[package]] name = "multimap" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" +[[package]] +name = "num_cpus" +version = "1.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" +dependencies = [ + "hermit-abi 0.3.1", + "libc", +] + [[package]] name = "num_threads" version = "0.1.6" @@ -378,6 +522,15 @@ dependencies = [ "libc", ] +[[package]] +name = "object" +version = "0.30.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "03b4680b86d9cfafba8fc491dc9b6df26b68cf40e9e6cd73909194759a63c385" +dependencies = [ + "memchr", +] + [[package]] name = "once_cell" version = "1.17.1" @@ -417,6 +570,18 @@ dependencies = [ "indexmap", ] +[[package]] +name = "pin-project-lite" +version = "0.2.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "ppv-lite86" version = "0.2.17" @@ -568,6 +733,31 @@ dependencies = [ "unindent", ] +[[package]] +name = "pyo3-asyncio" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a2cc34c1f907ca090d7add03dc523acdd91f3a4dab12286604951e2f5152edad" +dependencies = [ + "futures", + "once_cell", + "pin-project-lite", + "pyo3", + "pyo3-asyncio-macros", + "tokio", +] + +[[package]] +name = "pyo3-asyncio-macros" +version = "0.19.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e4045f06429547179e4596f5c0b13c82efc8b04296016780133653ed69ce26b3" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "pyo3-build-config" version = "0.19.2" @@ -728,22 +918,31 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "rraft-py" -version = "0.2.0" +version = "0.2.9" dependencies = [ "bincode", "bytes", "fxhash", + "once_cell", "prost", "protobuf", "pyo3", + "pyo3-asyncio", "raft", "slog", "slog-async", "slog-envlogger", "slog-stdlog", "slog-term", + "tokio", ] +[[package]] +name = "rustc-demangle" +version = "0.1.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" + [[package]] name = "rustix" version = "0.37.5" @@ -776,6 +975,15 @@ version = "1.0.159" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065" +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + [[package]] name = "slog" version = "2.7.0" @@ -967,6 +1175,29 @@ dependencies = [ "time-core", ] +[[package]] +name = "tokio" +version = "1.32.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" +dependencies = [ + "backtrace", + "num_cpus", + "pin-project-lite", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "2.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.11", +] + [[package]] name = "unicode-ident" version = "1.0.8" diff --git a/Cargo.toml b/Cargo.toml index e29ce0d..91d3482 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "rraft-py" -version = "0.2.0" +version = "0.2.9" authors = ["Lablup Inc."] license = "Apache-2.0" repository = "https://github.com/lablup/rraft-py" @@ -17,6 +17,7 @@ edition = "2021" prost = "0.11" protobuf = "2" pyo3 = { version = "0.19.2", features = ["extension-module", "multiple-pymethods"] } +pyo3-asyncio = { version = "0.19", features = ["attributes", "tokio-runtime"] } raft = { git = "https://github.com/jopemachine/raft-rs.git", branch = "feat/add-custom-deserializer", features = ["prost-codec", "default-logger"], default-features = false } slog = { version = "2.2", features = ["max_level_debug", "release_max_level_debug"] } slog-envlogger = "2.1.0" @@ -26,6 +27,8 @@ slog-async = "2.7.0" fxhash = "0.2.1" bincode = "1.3.3" bytes = "1.0" +tokio = { version = "1.32", features = ["sync", "macros"] } +once_cell = "1.7" [lib] name = "rraft" diff --git a/requirements.txt b/requirements.txt index 2850225..3087cca 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,3 +1,3 @@ -maturin==1.2.2 +maturin==1.2.3 pytest==7.3.1 pytest-benchmark==4.0.0 \ No newline at end of file diff --git a/rraft.pyi b/rraft.pyi index 71133a3..fbb9237 100644 --- a/rraft.pyi +++ b/rraft.pyi @@ -218,6 +218,8 @@ class EntryType: def from_int(v: int) -> "EntryType": ... class __API_Logger: + mutex: "Mutex" + def info(self, s: str) -> None: """ Log info level record @@ -228,25 +230,25 @@ class __API_Logger: """ Log debug level record - See `log` for documentation. + See `slog_debug` for documentation. """ def trace(self, s: str) -> None: """ Log trace level record - See `log` for documentation. + See `slog_trace` for documentation. """ def crit(self, s: str) -> None: """ Log crit level record - See `log` for documentation. + See `slog_crit` for documentation. """ def error(self, s: str) -> None: """ Log error level record - See `log` for documentation. + See `slog_error` for documentation. """ class Logger(__API_Logger): @@ -3345,3 +3347,11 @@ class OtherError(RaftStorageError): """ Some other error occurred. """ + +class Mutex: + def incr(self): + """ """ + def decr(self): + """ """ + def acquire_lock_and(self, cb: Any): + """ """ diff --git a/src/bindings/global.rs b/src/bindings/global.rs index 7163443..c4dd9f0 100644 --- a/src/bindings/global.rs +++ b/src/bindings/global.rs @@ -1,5 +1,6 @@ use pyo3::prelude::*; +use crate::external_bindings::mutex::PyMutex; use crate::raftpb_bindings::conf_change::new_conf_change_single as _new_conf_change_single; use crate::raftpb_bindings::conf_change_single::PyConfChangeSingle; use crate::raftpb_bindings::conf_change_type::PyConfChangeType; @@ -28,6 +29,7 @@ pub fn majority(total: usize) -> usize { pub fn default_logger() -> PyLogger { PyLogger { inner: RefMutOwner::new(_default_logger()), + mutex: PyMutex::new(), } } diff --git a/src/external_bindings/mod.rs b/src/external_bindings/mod.rs index 5358d06..c281adf 100644 --- a/src/external_bindings/mod.rs +++ b/src/external_bindings/mod.rs @@ -1 +1,2 @@ +pub mod mutex; pub mod slog; diff --git a/src/external_bindings/mutex.rs b/src/external_bindings/mutex.rs new file mode 100644 index 0000000..85887ff --- /dev/null +++ b/src/external_bindings/mutex.rs @@ -0,0 +1,77 @@ +use pyo3::prelude::*; + +use once_cell::sync::Lazy; +use std::sync::Arc; +use tokio::sync::Mutex; + +static RT: Lazy = Lazy::new(|| { + tokio::runtime::Builder::new_current_thread() + .build() + .unwrap() +}); + +#[derive(Clone)] +#[pyclass(name = "Mutex")] +pub struct PyMutex { + pub inner: Arc>, +} + +impl PyMutex { + #[tokio::main] + pub async fn acquire_lock_and(&self, cb: impl FnOnce() -> PyResult) -> PyResult { + let mut guard = self.inner.lock().await; + + // Wait until the guard's int value becomes 0. + while *guard != 0 { + tokio::task::yield_now().await; + guard = self.inner.lock().await; + } + + // The guard will be dropped when after cb executed. + cb() + } + +} + +#[pymethods] +impl PyMutex { + #[new] + pub fn new() -> Self { + Self { + inner: Arc::new(Mutex::new(0)), + } + } + + pub fn clone(&self) -> Self { + Self { + inner: Arc::clone(&self.inner), + } + } + + pub fn incr(&self) -> PyResult<()> { + let inner = self.inner.clone(); + + RT.block_on(async { + let mut guard = inner.lock().await; + *guard += 1; + }); + + Ok(()) + } + + pub fn decr(&self) -> PyResult<()> { + let inner = self.inner.clone(); + + RT.block_on(async { + let mut guard = inner.lock().await; + *guard -= 1; + }); + + Ok(()) + } + + #[pyo3(name = "acquire_lock_and")] + pub fn py_acquire_lock_and(&self, cb: PyObject, py: Python) -> PyResult { + self.acquire_lock_and(|| cb.call(py, (), None)) + } +} diff --git a/src/external_bindings/slog.rs b/src/external_bindings/slog.rs index 0c28157..7d966c0 100644 --- a/src/external_bindings/slog.rs +++ b/src/external_bindings/slog.rs @@ -1,7 +1,10 @@ +use std::sync::Arc; + use pyo3::{intern, prelude::*, types::PyString}; use slog::*; use slog_async::OverflowStrategy; +use super::mutex::PyMutex; use crate::implement_type_conversion; use crate::utils::reference::{RefMutContainer, RefMutOwner}; @@ -54,12 +57,16 @@ impl PyOverflowStrategy { #[pyclass(name = "Logger")] pub struct PyLogger { pub inner: RefMutOwner, + #[pyo3(get)] + pub mutex: PyMutex, } #[derive(Clone)] #[pyclass(name = "LoggerRef")] pub struct PyLoggerRef { pub inner: RefMutContainer, + #[pyo3(get)] + pub mutex: PyMutex, } #[derive(FromPyObject)] @@ -70,12 +77,65 @@ pub enum PyLoggerMut<'p> { implement_type_conversion!(Logger, PyLoggerMut); +struct CallbackDrain { + inner: D, + before_hook: Box, + after_hook: Box, +} + +impl CallbackDrain { + fn new( + inner: D, + before_hook: impl Fn() + Send + Sync + 'static, + after_hook: impl Fn() + Send + Sync + 'static, + ) -> Self { + CallbackDrain { + inner, + before_hook: Box::new(before_hook), + after_hook: Box::new(after_hook), + } + } +} + +impl Drain for CallbackDrain { + type Ok = D::Ok; + type Err = D::Err; + + fn log( + &self, + record: &Record, + values: &OwnedKVList, + ) -> std::result::Result { + // Call the callback before logging + (self.before_hook)(); + let res = self.inner.log(record, values); + (self.after_hook)(); + res + } +} + #[pymethods] impl PyLogger { #[new] pub fn new(chan_size: usize, overflow_strategy: &PyOverflowStrategy) -> Self { + let mutex = PyMutex::new(); let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); + + let mutex_clone_before = mutex.clone(); + let mutex_clone_after = mutex.clone(); + + let drain = CallbackDrain::new( + drain, + move || { + mutex_clone_before.incr().unwrap(); + }, + move || { + mutex_clone_after.decr().unwrap(); + }, + ) + .fuse(); + let drain = slog_async::Async::new(drain) .chan_size(chan_size) .overflow_strategy(overflow_strategy.0) @@ -86,12 +146,14 @@ impl PyLogger { PyLogger { inner: RefMutOwner::new(logger), + mutex: mutex, } } pub fn make_ref(&mut self) -> PyLoggerRef { PyLoggerRef { inner: RefMutContainer::new(&mut self.inner), + mutex: self.mutex.clone(), } } @@ -104,27 +166,37 @@ impl PyLogger { #[pymethods] impl PyLoggerRef { pub fn info(&mut self, s: &PyString) -> PyResult<()> { - self.inner - .map_as_ref(|inner| info!(inner, "{}", format!("{}", s))) + self.mutex.acquire_lock_and(|| { + self.inner + .map_as_ref(|inner| info!(inner, "{}", format!("{}", s))) + }) } pub fn debug(&mut self, s: &PyString) -> PyResult<()> { - self.inner - .map_as_ref(|inner| debug!(inner, "{}", format!("{}", s))) + self.mutex.acquire_lock_and(|| { + self.inner + .map_as_ref(|inner| debug!(inner, "{}", format!("{}", s))) + }) } pub fn trace(&mut self, s: &PyString) -> PyResult<()> { - self.inner - .map_as_ref(|inner| trace!(inner, "{}", format!("{}", s))) + self.mutex.acquire_lock_and(|| { + self.inner + .map_as_ref(|inner| trace!(inner, "{}", format!("{}", s))) + }) } pub fn error(&mut self, s: &PyString) -> PyResult<()> { - self.inner - .map_as_ref(|inner| error!(inner, "{}", format!("{}", s))) + self.mutex.acquire_lock_and(|| { + self.inner + .map_as_ref(|inner| error!(inner, "{}", format!("{}", s))) + }) } pub fn crit(&mut self, s: &PyString) -> PyResult<()> { - self.inner - .map_as_ref(|inner| crit!(inner, "{}", format!("{}", s))) + self.mutex.acquire_lock_and(|| { + self.inner + .map_as_ref(|inner| crit!(inner, "{}", format!("{}", s))) + }) } } diff --git a/src/lib.rs b/src/lib.rs index 728dcb2..dfb6af6 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,6 +96,7 @@ fn rraft(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; + m.add_class::()?; m.add_class::()?; m.add_class::()?; From 503a5a1f3dd3b59c1b56c4762383eee96acc0459 Mon Sep 17 00:00:00 2001 From: Gyubong Date: Thu, 24 Aug 2023 11:02:58 +0900 Subject: [PATCH 2/5] Make deserializer logic correctly --- src/utils/deserializer.rs | 114 +++++++++++++++++++++++++------------- 1 file changed, 75 insertions(+), 39 deletions(-) diff --git a/src/utils/deserializer.rs b/src/utils/deserializer.rs index 9f9e214..af1ccb2 100644 --- a/src/utils/deserializer.rs +++ b/src/utils/deserializer.rs @@ -1,31 +1,61 @@ -use pyo3::{types::PyBytes, IntoPy, Python}; - -#[macro_export] -macro_rules! deserialize_bytes { - ($inner:ident, $deserializer_name: literal, $attr:ident, $py:ident) => {{ - if let Some(deserializer) = $py.eval($deserializer_name, None, None).ok() { - deserializer - .call((PyBytes::new($py, $inner.$attr.as_slice()),), None) - .unwrap() - .into_py($py) - .to_string() - } else { - format!("{:?}", $inner.$attr) - } - }}; -} +use std::sync::Mutex; -// IMPORTANT: UNCOMMENT ALL THE BELOW CODES when you want to use raft-rs's upstream repository. +use ::once_cell::sync::Lazy; +use pyo3::*; +use pyo3::{types::PyBytes, IntoPy, PyObject, Python}; use raft::derializer::{Bytes, CustomDeserializer}; pub struct MyDeserializer; // TODO: Refactor below codes to reduce code redundancy. +static ENTRY_CONTEXT_DESERIALIZE_CB: Lazy>> = Lazy::new(|| Mutex::new(None)); +static ENTRY_DATA_DESERIALIZE_CB: Lazy>> = Lazy::new(|| Mutex::new(None)); +static CONFCHANGEV2_CONTEXT_DESERIALIZE_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); +static CONFCHANGE_CONTEXT_DESERIALIZE_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); +static MESSAGE_CONTEXT_DESERIALIZER_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); +static SNAPSHOT_DATA_DESERIALIZER_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); + +#[pyfunction] +pub fn set_entry_context_deserializer(cb: PyObject) { + *ENTRY_CONTEXT_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_entry_data_deserializer(cb: PyObject) { + *ENTRY_DATA_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_confchangev2_context_deserializer(cb: PyObject) { + *CONFCHANGEV2_CONTEXT_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_confchange_context_deserializer(cb: PyObject) { + *CONFCHANGE_CONTEXT_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_message_context_deserializer(cb: PyObject) { + *MESSAGE_CONTEXT_DESERIALIZER_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_snapshot_data_deserializer(cb: PyObject) { + *SNAPSHOT_DATA_DESERIALIZER_CB.lock().unwrap() = Some(cb); +} + impl CustomDeserializer for MyDeserializer { fn entry_context_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("entry_context_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = ENTRY_CONTEXT_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -42,9 +72,11 @@ impl CustomDeserializer for MyDeserializer { fn entry_data_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("entry_data_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = ENTRY_DATA_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -61,12 +93,11 @@ impl CustomDeserializer for MyDeserializer { fn confchangev2_context_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py - .eval("confchangev2_context_deserializer", None, None) - .ok() - { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = CONFCHANGEV2_CONTEXT_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -83,10 +114,11 @@ impl CustomDeserializer for MyDeserializer { fn confchange_context_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("confchange_context_deserializer", None, None).ok() - { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = CONFCHANGE_CONTEXT_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -103,9 +135,11 @@ impl CustomDeserializer for MyDeserializer { fn message_context_deserializer(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("message_context_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = MESSAGE_CONTEXT_DESERIALIZER_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -122,9 +156,11 @@ impl CustomDeserializer for MyDeserializer { fn snapshot_data_deserializer(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("snapshot_data_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = SNAPSHOT_DATA_DESERIALIZER_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() From e68bbd2e78d5328b42d54c64030af9b59627cee0 Mon Sep 17 00:00:00 2001 From: Gyubong Date: Thu, 24 Aug 2023 11:04:04 +0900 Subject: [PATCH 3/5] Make deserializer logic correctly --- rraft.pyi | 18 ++++ src/external_bindings/slog.rs | 2 - src/lib.rs | 30 +++++++ src/raftpb_bindings/conf_change.rs | 20 ++--- src/raftpb_bindings/conf_change_v2.rs | 20 ++--- src/raftpb_bindings/entry.rs | 22 ++--- src/raftpb_bindings/message.rs | 35 ++------ src/raftpb_bindings/snapshot.rs | 18 ++-- src/utils/deserializer.rs | 114 +++++++++++++++++--------- 9 files changed, 148 insertions(+), 131 deletions(-) diff --git a/rraft.pyi b/rraft.pyi index fbb9237..680cd32 100644 --- a/rraft.pyi +++ b/rraft.pyi @@ -66,6 +66,24 @@ class __Decoder: def decode(v: bytes) -> Any: """ """ +def set_snapshot_data_deserializer(cb: Any) -> None: + """ """ + +def set_message_context_deserializer(cb: Any) -> None: + """ """ + +def set_confchange_context_deserializer(cb: Any) -> None: + """ """ + +def set_confchangev2_context_deserializer(cb: Any) -> None: + """ """ + +def set_entry_data_deserializer(cb: Any) -> None: + """ """ + +def set_entry_context_deserializer(cb: Any) -> None: + """ """ + class OverflowStrategy: """ """ diff --git a/src/external_bindings/slog.rs b/src/external_bindings/slog.rs index 7d966c0..21a4013 100644 --- a/src/external_bindings/slog.rs +++ b/src/external_bindings/slog.rs @@ -1,5 +1,3 @@ -use std::sync::Arc; - use pyo3::{intern, prelude::*, types::PyString}; use slog::*; use slog_async::OverflowStrategy; diff --git a/src/lib.rs b/src/lib.rs index dfb6af6..58dc4be 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -111,6 +111,36 @@ fn rraft(py: Python, m: &PyModule) -> PyResult<()> { m )?)?; + m.add_function(wrap_pyfunction!( + utils::deserializer::set_confchange_context_deserializer, + m + )?)?; + + m.add_function(wrap_pyfunction!( + utils::deserializer::set_confchangev2_context_deserializer, + m + )?)?; + + m.add_function(wrap_pyfunction!( + utils::deserializer::set_entry_context_deserializer, + m + )?)?; + + m.add_function(wrap_pyfunction!( + utils::deserializer::set_entry_data_deserializer, + m + )?)?; + + m.add_function(wrap_pyfunction!( + utils::deserializer::set_message_context_deserializer, + m + )?)?; + + m.add_function(wrap_pyfunction!( + utils::deserializer::set_snapshot_data_deserializer, + m + )?)?; + m.add( "DestroyedRefUsedError", py.get_type::(), diff --git a/src/raftpb_bindings/conf_change.rs b/src/raftpb_bindings/conf_change.rs index 06d357b..51e7b81 100644 --- a/src/raftpb_bindings/conf_change.rs +++ b/src/raftpb_bindings/conf_change.rs @@ -1,8 +1,8 @@ +use crate::implement_type_conversion; use crate::utils::{ errors::to_pyresult, reference::{RefMutContainer, RefMutOwner}, }; -use crate::{deserialize_bytes, implement_type_conversion}; use prost::Message as ProstMessage; use protobuf::Message as PbMessage; use pyo3::{intern, prelude::*, pyclass::CompareOp, types::PyBytes}; @@ -34,16 +34,6 @@ pub enum PyConfChangeMut<'p> { implement_type_conversion!(ConfChange, PyConfChangeMut); -pub fn format_confchange(cc: &ConfChange, py: Python) -> String { - format!( - "ConfChange {{ change_type: {change_type:?}, node_id: {node_id:?}, context: {context:?}, id: {id:?} }}", - change_type = cc.get_change_type(), - node_id = cc.get_node_id(), - id = cc.get_id(), - context = deserialize_bytes!(cc, "confchange_context_deserializer", context, py) - ) -} - #[pymethods] impl PyConfChange { #[new] @@ -73,8 +63,8 @@ impl PyConfChange { } } - pub fn __repr__(&self, py: Python) -> String { - format_confchange(&self.inner.inner, py) + pub fn __repr__(&self) -> String { + format!("{:?}", self.inner.inner) } pub fn __richcmp__(&self, py: Python, rhs: PyConfChangeMut, op: CompareOp) -> PyObject { @@ -95,8 +85,8 @@ impl PyConfChange { #[pymethods] impl PyConfChangeRef { - pub fn __repr__(&self, py: Python) -> PyResult { - self.inner.map_as_ref(|inner| format_confchange(inner, py)) + pub fn __repr__(&self) -> PyResult { + self.inner.map_as_ref(|inner| format!("{:?}", inner)) } pub fn __richcmp__( diff --git a/src/raftpb_bindings/conf_change_v2.rs b/src/raftpb_bindings/conf_change_v2.rs index 061a446..0ae8624 100644 --- a/src/raftpb_bindings/conf_change_v2.rs +++ b/src/raftpb_bindings/conf_change_v2.rs @@ -1,9 +1,9 @@ +use crate::implement_type_conversion; use crate::utils::{ errors::to_pyresult, reference::{RefMutContainer, RefMutOwner}, unsafe_cast::make_mut, }; -use crate::{deserialize_bytes, implement_type_conversion}; use prost::Message as ProstMessage; use protobuf::Message as PbMessage; use pyo3::{ @@ -20,15 +20,6 @@ use super::{ conf_change_transition::PyConfChangeTransition, }; -pub fn format_confchangev2(cc: &ConfChangeV2, py: Python) -> String { - format!( - "ConfChangeV2 {{ transition: {transition:?}, changes: {changes:?}, context: {context:?} }}", - transition = cc.transition(), - changes = cc.changes, - context = deserialize_bytes!(cc, "confchangev2_context_deserializer", context, py) - ) -} - #[derive(Clone)] #[pyclass(name = "ConfChangeV2")] pub struct PyConfChangeV2 { @@ -78,8 +69,8 @@ impl PyConfChangeV2 { } } - pub fn __repr__(&self, py: Python) -> String { - format_confchangev2(&self.inner.inner, py) + pub fn __repr__(&self) -> String { + format!("{:?}", self.inner.inner) } pub fn __richcmp__(&self, py: Python, rhs: PyConfChangeV2Mut, op: CompareOp) -> PyObject { @@ -106,9 +97,8 @@ impl Default for PyConfChangeV2 { #[pymethods] impl PyConfChangeV2Ref { - pub fn __repr__(&self, py: Python) -> PyResult { - self.inner - .map_as_ref(|inner| format_confchangev2(inner, py)) + pub fn __repr__(&self) -> PyResult { + self.inner.map_as_ref(|inner| format!("{:?}", inner)) } pub fn __richcmp__( diff --git a/src/raftpb_bindings/entry.rs b/src/raftpb_bindings/entry.rs index 5b6c9ac..7293889 100644 --- a/src/raftpb_bindings/entry.rs +++ b/src/raftpb_bindings/entry.rs @@ -1,6 +1,6 @@ +use crate::implement_type_conversion; use crate::utils::errors::to_pyresult; use crate::utils::reference::{RefMutContainer, RefMutOwner}; -use crate::{deserialize_bytes, implement_type_conversion}; use prost::Message as ProstMessage; use protobuf::Message as PbMessage; use pyo3::pyclass::CompareOp; @@ -30,18 +30,6 @@ pub enum PyEntryMut<'p> { implement_type_conversion!(Entry, PyEntryMut); -pub fn format_entry(entry: &Entry, py: Python) -> String { - format!( - "Entry {{ context: {context:}, data: {data:}, entry_type: {entry_type:?}, index: {index:}, sync_log: {sync_log:}, term: {term:} }}", - data=deserialize_bytes!(entry, "entry_data_deserializer", data, py), - context=deserialize_bytes!(entry, "entry_context_deserializer", context, py), - entry_type=entry.get_entry_type(), - index=entry.get_index(), - sync_log=entry.get_sync_log(), - term=entry.get_term(), - ) -} - #[pymethods] impl PyEntry { #[new] @@ -71,8 +59,8 @@ impl PyEntry { } } - pub fn __repr__(&self, py: Python) -> String { - format_entry(&self.inner.inner, py) + pub fn __repr__(&self) -> String { + format!("{:?}", self.inner.inner) } pub fn __richcmp__(&self, py: Python, rhs: PyEntryMut, op: CompareOp) -> PyObject { @@ -93,8 +81,8 @@ impl PyEntry { #[pymethods] impl PyEntryRef { - pub fn __repr__(&self, py: Python) -> PyResult { - self.inner.map_as_ref(|inner| format_entry(inner, py)) + pub fn __repr__(&self) -> PyResult { + self.inner.map_as_ref(|inner| format!("{:?}", inner)) } pub fn __richcmp__(&self, py: Python, rhs: PyEntryMut, op: CompareOp) -> PyResult { diff --git a/src/raftpb_bindings/message.rs b/src/raftpb_bindings/message.rs index 786affa..c61c1ff 100644 --- a/src/raftpb_bindings/message.rs +++ b/src/raftpb_bindings/message.rs @@ -7,15 +7,12 @@ use pyo3::{ types::{PyBytes, PyList}, }; +use crate::implement_type_conversion; use crate::utils::{ errors::to_pyresult, reference::{RefMutContainer, RefMutOwner}, unsafe_cast::make_mut, }; -use crate::{ - deserialize_bytes, implement_type_conversion, raftpb_bindings::entry::format_entry, - raftpb_bindings::snapshot::format_snapshot, -}; use raft::eraftpb::Message; use super::{ @@ -44,28 +41,6 @@ pub enum PyMessageMut<'p> { implement_type_conversion!(Message, PyMessageMut); -pub fn format_message(msg: &Message, py: Python) -> String { - format!( - "Message {{ msg_type: {msg_type:?}, to: {to:?}, from: {from:?}, term: {term:?}, log_term: {log_term:?}, index: {index:?}, entries: [{entries:?}], commit: {commit:?}, commit_term: {commit_term:?}, snapshot: {snapshot:?}, request_snapshot: {request_snapshot:?}, reject: {reject:?}, reject_hint: {reject_hint:?}, context: {context:?}, deprecated_priority: {deprecated_priority:?}, priority: {priority:?} }}", - msg_type=msg.get_msg_type(), - to=msg.get_to(), - from=msg.get_from(), - term=msg.get_term(), - log_term=msg.get_log_term(), - index=msg.get_index(), - entries=msg.get_entries().iter().map(|e| format_entry(e, py)).collect::>().join(", "), - commit=msg.get_commit(), - commit_term=msg.get_commit_term(), - snapshot=format_snapshot(msg.get_snapshot(), py), - request_snapshot=msg.get_request_snapshot(), - reject=msg.get_reject(), - reject_hint=msg.get_reject_hint(), - context=deserialize_bytes!(msg, "message_context_deserializer", context, py), - deprecated_priority=msg.get_deprecated_priority(), - priority=msg.get_priority(), - ) -} - #[pymethods] impl PyMessage { #[new] @@ -95,8 +70,8 @@ impl PyMessage { } } - pub fn __repr__(&self, py: Python) -> String { - format_message(&self.inner.inner, py) + pub fn __repr__(&self) -> String { + format!("{:?}", self.inner.inner) } pub fn __richcmp__(&self, py: Python, rhs: PyMessageMut, op: CompareOp) -> PyObject { @@ -117,8 +92,8 @@ impl PyMessage { #[pymethods] impl PyMessageRef { - pub fn __repr__(&self, py: Python) -> PyResult { - self.inner.map_as_ref(|inner| format_message(inner, py)) + pub fn __repr__(&self) -> PyResult { + self.inner.map_as_ref(|inner| format!("{:?}", inner)) } pub fn __richcmp__(&self, py: Python, rhs: PyMessageMut, op: CompareOp) -> PyResult { diff --git a/src/raftpb_bindings/snapshot.rs b/src/raftpb_bindings/snapshot.rs index d9a45f1..c61b5b7 100644 --- a/src/raftpb_bindings/snapshot.rs +++ b/src/raftpb_bindings/snapshot.rs @@ -2,11 +2,11 @@ use prost::Message as ProstMessage; use protobuf::Message as PbMessage; use pyo3::{intern, prelude::*, pyclass::CompareOp, types::PyBytes}; +use crate::implement_type_conversion; use crate::utils::{ errors::to_pyresult, reference::{RefMutContainer, RefMutOwner}, }; -use crate::{deserialize_bytes, implement_type_conversion}; use raft::eraftpb::Snapshot; use super::snapshot_metadata::{PySnapshotMetadataMut, PySnapshotMetadataRef}; @@ -31,14 +31,6 @@ pub enum PySnapshotMut<'p> { implement_type_conversion!(Snapshot, PySnapshotMut); -pub fn format_snapshot(snapshot: &Snapshot, py: Python) -> String { - format!( - "Snapshot {{ data: {data:?}, metadata: {metadata:?} }}", - data = deserialize_bytes!(snapshot, "snapshot_data_deserializer", data, py), - metadata = snapshot.metadata, - ) -} - #[pymethods] impl PySnapshot { #[new] @@ -68,8 +60,8 @@ impl PySnapshot { } } - pub fn __repr__(&self, py: Python) -> String { - format_snapshot(&self.inner, py) + pub fn __repr__(&self) -> String { + format!("{:?}", self.inner.inner) } pub fn __bool__(&self) -> bool { @@ -94,8 +86,8 @@ impl PySnapshot { #[pymethods] impl PySnapshotRef { - pub fn __repr__(&self, py: Python) -> PyResult { - self.inner.map_as_ref(|inner| format_snapshot(inner, py)) + pub fn __repr__(&self) -> PyResult { + self.inner.map_as_ref(|inner| format!("{:?}", inner)) } pub fn __richcmp__(&self, py: Python, rhs: PySnapshotMut, op: CompareOp) -> PyResult { diff --git a/src/utils/deserializer.rs b/src/utils/deserializer.rs index 9f9e214..af1ccb2 100644 --- a/src/utils/deserializer.rs +++ b/src/utils/deserializer.rs @@ -1,31 +1,61 @@ -use pyo3::{types::PyBytes, IntoPy, Python}; - -#[macro_export] -macro_rules! deserialize_bytes { - ($inner:ident, $deserializer_name: literal, $attr:ident, $py:ident) => {{ - if let Some(deserializer) = $py.eval($deserializer_name, None, None).ok() { - deserializer - .call((PyBytes::new($py, $inner.$attr.as_slice()),), None) - .unwrap() - .into_py($py) - .to_string() - } else { - format!("{:?}", $inner.$attr) - } - }}; -} +use std::sync::Mutex; -// IMPORTANT: UNCOMMENT ALL THE BELOW CODES when you want to use raft-rs's upstream repository. +use ::once_cell::sync::Lazy; +use pyo3::*; +use pyo3::{types::PyBytes, IntoPy, PyObject, Python}; use raft::derializer::{Bytes, CustomDeserializer}; pub struct MyDeserializer; // TODO: Refactor below codes to reduce code redundancy. +static ENTRY_CONTEXT_DESERIALIZE_CB: Lazy>> = Lazy::new(|| Mutex::new(None)); +static ENTRY_DATA_DESERIALIZE_CB: Lazy>> = Lazy::new(|| Mutex::new(None)); +static CONFCHANGEV2_CONTEXT_DESERIALIZE_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); +static CONFCHANGE_CONTEXT_DESERIALIZE_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); +static MESSAGE_CONTEXT_DESERIALIZER_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); +static SNAPSHOT_DATA_DESERIALIZER_CB: Lazy>> = + Lazy::new(|| Mutex::new(None)); + +#[pyfunction] +pub fn set_entry_context_deserializer(cb: PyObject) { + *ENTRY_CONTEXT_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_entry_data_deserializer(cb: PyObject) { + *ENTRY_DATA_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_confchangev2_context_deserializer(cb: PyObject) { + *CONFCHANGEV2_CONTEXT_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_confchange_context_deserializer(cb: PyObject) { + *CONFCHANGE_CONTEXT_DESERIALIZE_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_message_context_deserializer(cb: PyObject) { + *MESSAGE_CONTEXT_DESERIALIZER_CB.lock().unwrap() = Some(cb); +} + +#[pyfunction] +pub fn set_snapshot_data_deserializer(cb: PyObject) { + *SNAPSHOT_DATA_DESERIALIZER_CB.lock().unwrap() = Some(cb); +} + impl CustomDeserializer for MyDeserializer { fn entry_context_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("entry_context_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = ENTRY_CONTEXT_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -42,9 +72,11 @@ impl CustomDeserializer for MyDeserializer { fn entry_data_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("entry_data_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = ENTRY_DATA_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -61,12 +93,11 @@ impl CustomDeserializer for MyDeserializer { fn confchangev2_context_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py - .eval("confchangev2_context_deserializer", None, None) - .ok() - { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = CONFCHANGEV2_CONTEXT_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -83,10 +114,11 @@ impl CustomDeserializer for MyDeserializer { fn confchange_context_deserialize(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("confchange_context_deserializer", None, None).ok() - { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = CONFCHANGE_CONTEXT_DESERIALIZE_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -103,9 +135,11 @@ impl CustomDeserializer for MyDeserializer { fn message_context_deserializer(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("message_context_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = MESSAGE_CONTEXT_DESERIALIZER_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() @@ -122,9 +156,11 @@ impl CustomDeserializer for MyDeserializer { fn snapshot_data_deserializer(&self, v: &Bytes) -> String { fn deserialize(py: Python, data: &[u8]) -> String { - if let Some(deserializer) = py.eval("snapshot_data_deserializer", None, None).ok() { - deserializer - .call((PyBytes::new(py, data),), None) + let callback_lock = SNAPSHOT_DATA_DESERIALIZER_CB.lock().unwrap(); + + if let Some(callback) = &*callback_lock { + callback + .call(py, (PyBytes::new(py, data),), None) .unwrap() .into_py(py) .to_string() From 9b97d85874a1a19ad2f30be9fe18ee49533edae4 Mon Sep 17 00:00:00 2001 From: Gyubong Date: Thu, 24 Aug 2023 13:17:41 +0900 Subject: [PATCH 4/5] Try to fix Mutex --- example/single_mem_node/use_threading.py | 2 ++ src/external_bindings/mutex.rs | 38 +++++++----------------- 2 files changed, 13 insertions(+), 27 deletions(-) diff --git a/example/single_mem_node/use_threading.py b/example/single_mem_node/use_threading.py index 434c0ef..adbc905 100644 --- a/example/single_mem_node/use_threading.py +++ b/example/single_mem_node/use_threading.py @@ -164,6 +164,8 @@ def handle_committed_entries(committed_entries: List[EntryRef]): # Use another thread to propose a Raft request. send_propose(logger) + logger.mutex.acquire_lock_and(lambda: print('start!!!')) + t = now() timeout = 100 # Use a HashMap to hold the `propose` callbacks. diff --git a/src/external_bindings/mutex.rs b/src/external_bindings/mutex.rs index 85887ff..54ef15f 100644 --- a/src/external_bindings/mutex.rs +++ b/src/external_bindings/mutex.rs @@ -1,14 +1,6 @@ use pyo3::prelude::*; -use once_cell::sync::Lazy; -use std::sync::Arc; -use tokio::sync::Mutex; - -static RT: Lazy = Lazy::new(|| { - tokio::runtime::Builder::new_current_thread() - .build() - .unwrap() -}); +use std::sync::{Arc, Mutex}; #[derive(Clone)] #[pyclass(name = "Mutex")] @@ -19,18 +11,20 @@ pub struct PyMutex { impl PyMutex { #[tokio::main] pub async fn acquire_lock_and(&self, cb: impl FnOnce() -> PyResult) -> PyResult { - let mut guard = self.inner.lock().await; + let mut guard = self.inner.lock().unwrap(); // Wait until the guard's int value becomes 0. while *guard != 0 { tokio::task::yield_now().await; - guard = self.inner.lock().await; + guard = self.inner.lock().unwrap(); } + *guard += 1; // The guard will be dropped when after cb executed. - cb() + let res = cb(); + *guard -= 1; + res } - } #[pymethods] @@ -49,24 +43,14 @@ impl PyMutex { } pub fn incr(&self) -> PyResult<()> { - let inner = self.inner.clone(); - - RT.block_on(async { - let mut guard = inner.lock().await; - *guard += 1; - }); - + let mut guard = self.inner.lock().unwrap(); + *guard += 1; Ok(()) } pub fn decr(&self) -> PyResult<()> { - let inner = self.inner.clone(); - - RT.block_on(async { - let mut guard = inner.lock().await; - *guard -= 1; - }); - + let mut guard = self.inner.lock().unwrap(); + *guard -= 1; Ok(()) } From 5753e9722a01f35cbcd74ce07c45342726fdaeec Mon Sep 17 00:00:00 2001 From: Gyubong Date: Fri, 25 Aug 2023 12:06:30 +0900 Subject: [PATCH 5/5] Add slogger's FileMode --- Cargo.lock | 232 +-------------------------------- rraft.pyi | 10 +- src/bindings/global.rs | 8 +- src/external_bindings/mod.rs | 1 - src/external_bindings/mutex.rs | 61 --------- src/external_bindings/slog.rs | 182 +++++++++++++++----------- src/lib.rs | 1 - 7 files changed, 114 insertions(+), 381 deletions(-) delete mode 100644 src/external_bindings/mutex.rs diff --git a/Cargo.lock b/Cargo.lock index 6655234..3407bf9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2,21 +2,6 @@ # It is not intended for manual editing. version = 3 -[[package]] -name = "addr2line" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a76fd60b23679b7d19bd066031410fb7e458ccc5e958eb5c325888ce4baedc97" -dependencies = [ - "gimli", -] - -[[package]] -name = "adler" -version = "1.0.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f26201604c87b1e01bd3d98f8d5d9a8fcbb815e8cedb41ffccbeb4bf593a35fe" - [[package]] name = "aho-corasick" version = "0.7.20" @@ -64,21 +49,6 @@ dependencies = [ "cc", ] -[[package]] -name = "backtrace" -version = "0.3.67" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "233d376d6d185f2a3093e58f283f60f880315b6c60075b01f36b3b85154564ca" -dependencies = [ - "addr2line", - "cc", - "cfg-if", - "libc", - "miniz_oxide", - "object", - "rustc-demangle", -] - [[package]] name = "bincode" version = "1.3.3" @@ -210,95 +180,6 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0ce7134b9999ecaf8bcd65542e436736ef32ddca1b3e06094cb6ec5755203b80" -[[package]] -name = "futures" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "23342abe12aba583913b2e62f22225ff9c950774065e4bfb61a19cd9770fec40" -dependencies = [ - "futures-channel", - "futures-core", - "futures-executor", - "futures-io", - "futures-sink", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-channel" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" -dependencies = [ - "futures-core", - "futures-sink", -] - -[[package]] -name = "futures-core" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" - -[[package]] -name = "futures-executor" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ccecee823288125bd88b4d7f565c9e58e41858e47ab72e8ea2d64e93624386e0" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - -[[package]] -name = "futures-io" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" - -[[package]] -name = "futures-macro" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "89ca545a94061b6365f2c7355b4b32bd20df3ff95f02da9329b34ccc3bd6ee72" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.11", -] - -[[package]] -name = "futures-sink" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f43be4fe21a13b9781a69afa4985b0f6ee0e1afab2c6f454a8cf30e2b2237b6e" - -[[package]] -name = "futures-task" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "76d3d132be6c0e6aa1534069c705a74a5997a356c0dc2f86a47765e5617c5b65" - -[[package]] -name = "futures-util" -version = "0.3.28" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "26b01e40b772d54cf6c6d721c1d1abd0647a0106a12ecaa1c186273392a69533" -dependencies = [ - "futures-channel", - "futures-core", - "futures-io", - "futures-macro", - "futures-sink", - "futures-task", - "memchr", - "pin-project-lite", - "pin-utils", - "slab", -] - [[package]] name = "fxhash" version = "0.2.1" @@ -342,12 +223,6 @@ dependencies = [ "syn 2.0.11", ] -[[package]] -name = "gimli" -version = "0.27.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b6c80984affa11d98d1b88b66ac8853f143217b399d3c74116778ff8fdb4ed2e" - [[package]] name = "hashbrown" version = "0.12.3" @@ -488,31 +363,12 @@ dependencies = [ "autocfg", ] -[[package]] -name = "miniz_oxide" -version = "0.6.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b275950c28b37e794e8c55d88aeb5e139d0ce23fdbbeda68f8d7174abdf9e8fa" -dependencies = [ - "adler", -] - [[package]] name = "multimap" version = "0.8.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "e5ce46fe64a9d73be07dcbe690a38ce1b293be448fd8ce1e6c1b8062c9f72c6a" -[[package]] -name = "num_cpus" -version = "1.16.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4161fcb6d602d4d2081af7c3a45852d875a03dd337a6bfdd6e06407b61342a43" -dependencies = [ - "hermit-abi 0.3.1", - "libc", -] - [[package]] name = "num_threads" version = "0.1.6" @@ -522,15 +378,6 @@ dependencies = [ "libc", ] -[[package]] -name = "object" -version = "0.30.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "03b4680b86d9cfafba8fc491dc9b6df26b68cf40e9e6cd73909194759a63c385" -dependencies = [ - "memchr", -] - [[package]] name = "once_cell" version = "1.17.1" @@ -570,18 +417,6 @@ dependencies = [ "indexmap", ] -[[package]] -name = "pin-project-lite" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "12cc1b0bf1727a77a54b6654e7b5f1af8604923edc8b81885f8ec92f9e3f0a05" - -[[package]] -name = "pin-utils" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" - [[package]] name = "ppv-lite86" version = "0.2.17" @@ -733,31 +568,6 @@ dependencies = [ "unindent", ] -[[package]] -name = "pyo3-asyncio" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a2cc34c1f907ca090d7add03dc523acdd91f3a4dab12286604951e2f5152edad" -dependencies = [ - "futures", - "once_cell", - "pin-project-lite", - "pyo3", - "pyo3-asyncio-macros", - "tokio", -] - -[[package]] -name = "pyo3-asyncio-macros" -version = "0.19.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e4045f06429547179e4596f5c0b13c82efc8b04296016780133653ed69ce26b3" -dependencies = [ - "proc-macro2", - "quote", - "syn 1.0.109", -] - [[package]] name = "pyo3-build-config" version = "0.19.2" @@ -918,7 +728,7 @@ checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1" [[package]] name = "rraft-py" -version = "0.2.9" +version = "0.2.13" dependencies = [ "bincode", "bytes", @@ -927,22 +737,14 @@ dependencies = [ "prost", "protobuf", "pyo3", - "pyo3-asyncio", "raft", "slog", "slog-async", "slog-envlogger", "slog-stdlog", "slog-term", - "tokio", ] -[[package]] -name = "rustc-demangle" -version = "0.1.23" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" - [[package]] name = "rustix" version = "0.37.5" @@ -975,15 +777,6 @@ version = "1.0.159" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3c04e8343c3daeec41f58990b9d77068df31209f2af111e059e9fe9646693065" -[[package]] -name = "slab" -version = "0.4.9" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" -dependencies = [ - "autocfg", -] - [[package]] name = "slog" version = "2.7.0" @@ -1175,29 +968,6 @@ dependencies = [ "time-core", ] -[[package]] -name = "tokio" -version = "1.32.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "17ed6077ed6cd6c74735e21f37eb16dc3935f96878b1fe961074089cc80893f9" -dependencies = [ - "backtrace", - "num_cpus", - "pin-project-lite", - "tokio-macros", -] - -[[package]] -name = "tokio-macros" -version = "2.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "630bdcf245f78637c13ec01ffae6187cca34625e8c63150d424b59e55af2675e" -dependencies = [ - "proc-macro2", - "quote", - "syn 2.0.11", -] - [[package]] name = "unicode-ident" version = "1.0.8" diff --git a/rraft.pyi b/rraft.pyi index 680cd32..bd15ea3 100644 --- a/rraft.pyi +++ b/rraft.pyi @@ -275,6 +275,8 @@ class Logger(__API_Logger): def __init__( self, chan_size: int, overflow_strategy: "OverflowStrategy" ) -> None: ... + @staticmethod + def new_file_logger(log_path: str): ... def make_ref(self) -> "LoggerRef": ... class LoggerRef(__API_Logger): @@ -3365,11 +3367,3 @@ class OtherError(RaftStorageError): """ Some other error occurred. """ - -class Mutex: - def incr(self): - """ """ - def decr(self): - """ """ - def acquire_lock_and(self, cb: Any): - """ """ diff --git a/src/bindings/global.rs b/src/bindings/global.rs index c4dd9f0..ce3431b 100644 --- a/src/bindings/global.rs +++ b/src/bindings/global.rs @@ -1,6 +1,7 @@ +use std::sync::{Arc, Mutex}; + use pyo3::prelude::*; -use crate::external_bindings::mutex::PyMutex; use crate::raftpb_bindings::conf_change::new_conf_change_single as _new_conf_change_single; use crate::raftpb_bindings::conf_change_single::PyConfChangeSingle; use crate::raftpb_bindings::conf_change_type::PyConfChangeType; @@ -15,7 +16,7 @@ use raft::{ NO_LIMIT, }; -use crate::external_bindings::slog::PyLogger; +use crate::external_bindings::slog::{LoggerMode, PyLogger}; use crate::raftpb_bindings::message_type::PyMessageType; use crate::utils::reference::RefMutOwner; @@ -29,7 +30,8 @@ pub fn majority(total: usize) -> usize { pub fn default_logger() -> PyLogger { PyLogger { inner: RefMutOwner::new(_default_logger()), - mutex: PyMutex::new(), + mutex: Arc::new(Mutex::new(())), + mode: LoggerMode::Stdout, } } diff --git a/src/external_bindings/mod.rs b/src/external_bindings/mod.rs index c281adf..5358d06 100644 --- a/src/external_bindings/mod.rs +++ b/src/external_bindings/mod.rs @@ -1,2 +1 @@ -pub mod mutex; pub mod slog; diff --git a/src/external_bindings/mutex.rs b/src/external_bindings/mutex.rs deleted file mode 100644 index 54ef15f..0000000 --- a/src/external_bindings/mutex.rs +++ /dev/null @@ -1,61 +0,0 @@ -use pyo3::prelude::*; - -use std::sync::{Arc, Mutex}; - -#[derive(Clone)] -#[pyclass(name = "Mutex")] -pub struct PyMutex { - pub inner: Arc>, -} - -impl PyMutex { - #[tokio::main] - pub async fn acquire_lock_and(&self, cb: impl FnOnce() -> PyResult) -> PyResult { - let mut guard = self.inner.lock().unwrap(); - - // Wait until the guard's int value becomes 0. - while *guard != 0 { - tokio::task::yield_now().await; - guard = self.inner.lock().unwrap(); - } - - *guard += 1; - // The guard will be dropped when after cb executed. - let res = cb(); - *guard -= 1; - res - } -} - -#[pymethods] -impl PyMutex { - #[new] - pub fn new() -> Self { - Self { - inner: Arc::new(Mutex::new(0)), - } - } - - pub fn clone(&self) -> Self { - Self { - inner: Arc::clone(&self.inner), - } - } - - pub fn incr(&self) -> PyResult<()> { - let mut guard = self.inner.lock().unwrap(); - *guard += 1; - Ok(()) - } - - pub fn decr(&self) -> PyResult<()> { - let mut guard = self.inner.lock().unwrap(); - *guard -= 1; - Ok(()) - } - - #[pyo3(name = "acquire_lock_and")] - pub fn py_acquire_lock_and(&self, cb: PyObject, py: Python) -> PyResult { - self.acquire_lock_and(|| cb.call(py, (), None)) - } -} diff --git a/src/external_bindings/slog.rs b/src/external_bindings/slog.rs index 21a4013..588779c 100644 --- a/src/external_bindings/slog.rs +++ b/src/external_bindings/slog.rs @@ -1,10 +1,11 @@ -use pyo3::{intern, prelude::*, types::PyString}; -use slog::*; -use slog_async::OverflowStrategy; +use std::fs::OpenOptions; +use std::sync::{Arc, Mutex}; -use super::mutex::PyMutex; use crate::implement_type_conversion; use crate::utils::reference::{RefMutContainer, RefMutOwner}; +use pyo3::{intern, prelude::*, types::PyString}; +use slog::*; +use slog_async::OverflowStrategy; #[pyclass(name = "OverflowStrategy")] pub struct PyOverflowStrategy(pub OverflowStrategy); @@ -51,20 +52,26 @@ impl PyOverflowStrategy { } } +#[derive(Clone)] +pub enum LoggerMode { + File, + Stdout, +} + #[derive(Clone)] #[pyclass(name = "Logger")] pub struct PyLogger { pub inner: RefMutOwner, - #[pyo3(get)] - pub mutex: PyMutex, + pub mutex: Arc>, + pub mode: LoggerMode, } #[derive(Clone)] #[pyclass(name = "LoggerRef")] pub struct PyLoggerRef { pub inner: RefMutContainer, - #[pyo3(get)] - pub mutex: PyMutex, + pub mutex: Arc>, + pub mode: LoggerMode, } #[derive(FromPyObject)] @@ -75,65 +82,13 @@ pub enum PyLoggerMut<'p> { implement_type_conversion!(Logger, PyLoggerMut); -struct CallbackDrain { - inner: D, - before_hook: Box, - after_hook: Box, -} - -impl CallbackDrain { - fn new( - inner: D, - before_hook: impl Fn() + Send + Sync + 'static, - after_hook: impl Fn() + Send + Sync + 'static, - ) -> Self { - CallbackDrain { - inner, - before_hook: Box::new(before_hook), - after_hook: Box::new(after_hook), - } - } -} - -impl Drain for CallbackDrain { - type Ok = D::Ok; - type Err = D::Err; - - fn log( - &self, - record: &Record, - values: &OwnedKVList, - ) -> std::result::Result { - // Call the callback before logging - (self.before_hook)(); - let res = self.inner.log(record, values); - (self.after_hook)(); - res - } -} - #[pymethods] impl PyLogger { #[new] pub fn new(chan_size: usize, overflow_strategy: &PyOverflowStrategy) -> Self { - let mutex = PyMutex::new(); let decorator = slog_term::TermDecorator::new().build(); let drain = slog_term::FullFormat::new(decorator).build().fuse(); - let mutex_clone_before = mutex.clone(); - let mutex_clone_after = mutex.clone(); - - let drain = CallbackDrain::new( - drain, - move || { - mutex_clone_before.incr().unwrap(); - }, - move || { - mutex_clone_after.decr().unwrap(); - }, - ) - .fuse(); - let drain = slog_async::Async::new(drain) .chan_size(chan_size) .overflow_strategy(overflow_strategy.0) @@ -144,7 +99,31 @@ impl PyLogger { PyLogger { inner: RefMutOwner::new(logger), - mutex: mutex, + mutex: Arc::new(Mutex::new(())), + mode: LoggerMode::Stdout, + } + } + + #[staticmethod] + pub fn new_file_logger(log_path: &PyString) -> Self { + let log_path = log_path.to_str().unwrap(); + let file = OpenOptions::new() + .create(true) + .write(true) + .truncate(true) + .open(log_path) + .unwrap(); + + let decorator = slog_term::PlainDecorator::new(file); + let drain = slog_term::FullFormat::new(decorator).build().fuse(); + let drain = slog_async::Async::new(drain).build().fuse(); + + let logger = slog::Logger::root(drain, o!()); + + PyLogger { + inner: RefMutOwner::new(logger), + mutex: Arc::new(Mutex::new(())), + mode: LoggerMode::File, } } @@ -152,6 +131,7 @@ impl PyLogger { PyLoggerRef { inner: RefMutContainer::new(&mut self.inner), mutex: self.mutex.clone(), + mode: self.mode.clone(), } } @@ -164,37 +144,87 @@ impl PyLogger { #[pymethods] impl PyLoggerRef { pub fn info(&mut self, s: &PyString) -> PyResult<()> { - self.mutex.acquire_lock_and(|| { + let print = || { self.inner - .map_as_ref(|inner| info!(inner, "{}", format!("{}", s))) - }) + .map_as_ref(|inner| info!(inner, "{}", format!("{}", s))) + }; + + match self.mode { + LoggerMode::Stdout => { + let _guard = self.mutex.lock().unwrap(); + print() + } + LoggerMode::File => { + print() + } + } } pub fn debug(&mut self, s: &PyString) -> PyResult<()> { - self.mutex.acquire_lock_and(|| { + let print = || { self.inner - .map_as_ref(|inner| debug!(inner, "{}", format!("{}", s))) - }) + .map_as_ref(|inner| debug!(inner, "{}", format!("{}", s))) + }; + + match self.mode { + LoggerMode::Stdout => { + let _guard = self.mutex.lock().unwrap(); + print() + } + LoggerMode::File => { + print() + } + } } pub fn trace(&mut self, s: &PyString) -> PyResult<()> { - self.mutex.acquire_lock_and(|| { + let print = || { self.inner - .map_as_ref(|inner| trace!(inner, "{}", format!("{}", s))) - }) + .map_as_ref(|inner| trace!(inner, "{}", format!("{}", s))) + }; + + match self.mode { + LoggerMode::Stdout => { + let _guard = self.mutex.lock().unwrap(); + print() + } + LoggerMode::File => { + print() + } + } } pub fn error(&mut self, s: &PyString) -> PyResult<()> { - self.mutex.acquire_lock_and(|| { + let print = || { self.inner - .map_as_ref(|inner| error!(inner, "{}", format!("{}", s))) - }) + .map_as_ref(|inner| error!(inner, "{}", format!("{}", s))) + }; + + match self.mode { + LoggerMode::Stdout => { + let _guard = self.mutex.lock().unwrap(); + print() + } + LoggerMode::File => { + print() + } + } } pub fn crit(&mut self, s: &PyString) -> PyResult<()> { - self.mutex.acquire_lock_and(|| { + let print = || { self.inner - .map_as_ref(|inner| crit!(inner, "{}", format!("{}", s))) - }) + .map_as_ref(|inner: &Logger| crit!(inner, "{}", format!("{}", s))) + }; + + match self.mode { + LoggerMode::Stdout => { + let _guard = self.mutex.lock().unwrap(); + print() + } + LoggerMode::File => { + print() + } + } } } diff --git a/src/lib.rs b/src/lib.rs index 58dc4be..5e89153 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -96,7 +96,6 @@ fn rraft(py: Python, m: &PyModule) -> PyResult<()> { m.add_class::()?; m.add_class::()?; m.add_class::()?; - m.add_class::()?; m.add_class::()?; m.add_class::()?;