Skip to content

Commit

Permalink
Attempt to fix logging race (#45)
Browse files Browse the repository at this point in the history
  • Loading branch information
jopemachine authored Aug 25, 2023
1 parent 20d3471 commit e48dc7b
Show file tree
Hide file tree
Showing 14 changed files with 283 additions and 150 deletions.
3 changes: 2 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "rraft-py"
version = "0.2.0"
version = "0.2.9"
authors = ["Lablup Inc."]
license = "Apache-2.0"
repository = "https://github.com/lablup/rraft-py"
Expand All @@ -17,6 +17,7 @@ edition = "2021"
prost = "0.11"
protobuf = "2"
pyo3 = { version = "0.19.2", features = ["extension-module", "multiple-pymethods"] }
pyo3-asyncio = { version = "0.19", features = ["attributes", "tokio-runtime"] }
raft = { git = "https://github.com/jopemachine/raft-rs.git", branch = "feat/add-custom-deserializer", features = ["prost-codec", "default-logger"], default-features = false }
slog = { version = "2.2", features = ["max_level_debug", "release_max_level_debug"] }
slog-envlogger = "2.1.0"
Expand All @@ -26,6 +27,8 @@ slog-async = "2.7.0"
fxhash = "0.2.1"
bincode = "1.3.3"
bytes = "1.0"
tokio = { version = "1.32", features = ["sync", "macros"] }
once_cell = "1.7"

[lib]
name = "rraft"
Expand Down
2 changes: 2 additions & 0 deletions example/single_mem_node/use_threading.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,6 +164,8 @@ def handle_committed_entries(committed_entries: List[EntryRef]):
# Use another thread to propose a Raft request.
send_propose(logger)

logger.mutex.acquire_lock_and(lambda: print('start!!!'))

t = now()
timeout = 100
# Use a HashMap to hold the `propose` callbacks.
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
maturin==1.2.2
maturin==1.2.3
pytest==7.3.1
pytest-benchmark==4.0.0
30 changes: 26 additions & 4 deletions rraft.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,24 @@ class __Decoder:
def decode(v: bytes) -> Any:
""" """

def set_snapshot_data_deserializer(cb: Any) -> None:
""" """

def set_message_context_deserializer(cb: Any) -> None:
""" """

def set_confchange_context_deserializer(cb: Any) -> None:
""" """

def set_confchangev2_context_deserializer(cb: Any) -> None:
""" """

def set_entry_data_deserializer(cb: Any) -> None:
""" """

def set_entry_context_deserializer(cb: Any) -> None:
""" """

class OverflowStrategy:
""" """

Expand Down Expand Up @@ -218,6 +236,8 @@ class EntryType:
def from_int(v: int) -> "EntryType": ...

class __API_Logger:
mutex: "Mutex"

def info(self, s: str) -> None:
"""
Log info level record
Expand All @@ -228,25 +248,25 @@ class __API_Logger:
"""
Log debug level record
See `log` for documentation.
See `slog_debug` for documentation.
"""
def trace(self, s: str) -> None:
"""
Log trace level record
See `log` for documentation.
See `slog_trace` for documentation.
"""
def crit(self, s: str) -> None:
"""
Log crit level record
See `log` for documentation.
See `slog_crit` for documentation.
"""
def error(self, s: str) -> None:
"""
Log error level record
See `log` for documentation.
See `slog_error` for documentation.
"""

class Logger(__API_Logger):
Expand All @@ -255,6 +275,8 @@ class Logger(__API_Logger):
def __init__(
self, chan_size: int, overflow_strategy: "OverflowStrategy"
) -> None: ...
@staticmethod
def new_file_logger(log_path: str): ...
def make_ref(self) -> "LoggerRef": ...

class LoggerRef(__API_Logger):
Expand Down
6 changes: 5 additions & 1 deletion src/bindings/global.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
use std::sync::{Arc, Mutex};

use pyo3::prelude::*;

use crate::raftpb_bindings::conf_change::new_conf_change_single as _new_conf_change_single;
Expand All @@ -14,7 +16,7 @@ use raft::{
NO_LIMIT,
};

use crate::external_bindings::slog::PyLogger;
use crate::external_bindings::slog::{LoggerMode, PyLogger};
use crate::raftpb_bindings::message_type::PyMessageType;
use crate::utils::reference::RefMutOwner;

Expand All @@ -28,6 +30,8 @@ pub fn majority(total: usize) -> usize {
pub fn default_logger() -> PyLogger {
PyLogger {
inner: RefMutOwner::new(_default_logger()),
mutex: Arc::new(Mutex::new(())),
mode: LoggerMode::Stdout,
}
}

Expand Down
126 changes: 113 additions & 13 deletions src/external_bindings/slog.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
use pyo3::{intern, prelude::*, types::PyString};
use slog::*;
use slog_async::OverflowStrategy;
use std::fs::OpenOptions;
use std::sync::{Arc, Mutex};

use crate::implement_type_conversion;
use crate::utils::reference::{RefMutContainer, RefMutOwner};
use pyo3::{intern, prelude::*, types::PyString};
use slog::*;
use slog_async::OverflowStrategy;

#[pyclass(name = "OverflowStrategy")]
pub struct PyOverflowStrategy(pub OverflowStrategy);
Expand Down Expand Up @@ -50,16 +52,26 @@ impl PyOverflowStrategy {
}
}

#[derive(Clone)]
pub enum LoggerMode {
File,
Stdout,
}

#[derive(Clone)]
#[pyclass(name = "Logger")]
pub struct PyLogger {
pub inner: RefMutOwner<Logger>,
pub mutex: Arc<Mutex<()>>,
pub mode: LoggerMode,
}

#[derive(Clone)]
#[pyclass(name = "LoggerRef")]
pub struct PyLoggerRef {
pub inner: RefMutContainer<Logger>,
pub mutex: Arc<Mutex<()>>,
pub mode: LoggerMode,
}

#[derive(FromPyObject)]
Expand All @@ -76,6 +88,7 @@ impl PyLogger {
pub fn new(chan_size: usize, overflow_strategy: &PyOverflowStrategy) -> Self {
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::FullFormat::new(decorator).build().fuse();

let drain = slog_async::Async::new(drain)
.chan_size(chan_size)
.overflow_strategy(overflow_strategy.0)
Expand All @@ -86,12 +99,39 @@ impl PyLogger {

PyLogger {
inner: RefMutOwner::new(logger),
mutex: Arc::new(Mutex::new(())),
mode: LoggerMode::Stdout,
}
}

#[staticmethod]
pub fn new_file_logger(log_path: &PyString) -> Self {
let log_path = log_path.to_str().unwrap();
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(log_path)
.unwrap();

let decorator = slog_term::PlainDecorator::new(file);
let drain = slog_term::FullFormat::new(decorator).build().fuse();
let drain = slog_async::Async::new(drain).build().fuse();

let logger = slog::Logger::root(drain, o!());

PyLogger {
inner: RefMutOwner::new(logger),
mutex: Arc::new(Mutex::new(())),
mode: LoggerMode::File,
}
}

pub fn make_ref(&mut self) -> PyLoggerRef {
PyLoggerRef {
inner: RefMutContainer::new(&mut self.inner),
mutex: self.mutex.clone(),
mode: self.mode.clone(),
}
}

Expand All @@ -104,27 +144,87 @@ impl PyLogger {
#[pymethods]
impl PyLoggerRef {
pub fn info(&mut self, s: &PyString) -> PyResult<()> {
self.inner
.map_as_ref(|inner| info!(inner, "{}", format!("{}", s)))
let print = || {
self.inner
.map_as_ref(|inner| info!(inner, "{}", format!("{}", s)))
};

match self.mode {
LoggerMode::Stdout => {
let _guard = self.mutex.lock().unwrap();
print()
}
LoggerMode::File => {
print()
}
}
}

pub fn debug(&mut self, s: &PyString) -> PyResult<()> {
self.inner
.map_as_ref(|inner| debug!(inner, "{}", format!("{}", s)))
let print = || {
self.inner
.map_as_ref(|inner| debug!(inner, "{}", format!("{}", s)))
};

match self.mode {
LoggerMode::Stdout => {
let _guard = self.mutex.lock().unwrap();
print()
}
LoggerMode::File => {
print()
}
}
}

pub fn trace(&mut self, s: &PyString) -> PyResult<()> {
self.inner
.map_as_ref(|inner| trace!(inner, "{}", format!("{}", s)))
let print = || {
self.inner
.map_as_ref(|inner| trace!(inner, "{}", format!("{}", s)))
};

match self.mode {
LoggerMode::Stdout => {
let _guard = self.mutex.lock().unwrap();
print()
}
LoggerMode::File => {
print()
}
}
}

pub fn error(&mut self, s: &PyString) -> PyResult<()> {
self.inner
.map_as_ref(|inner| error!(inner, "{}", format!("{}", s)))
let print = || {
self.inner
.map_as_ref(|inner| error!(inner, "{}", format!("{}", s)))
};

match self.mode {
LoggerMode::Stdout => {
let _guard = self.mutex.lock().unwrap();
print()
}
LoggerMode::File => {
print()
}
}
}

pub fn crit(&mut self, s: &PyString) -> PyResult<()> {
self.inner
.map_as_ref(|inner| crit!(inner, "{}", format!("{}", s)))
let print = || {
self.inner
.map_as_ref(|inner: &Logger| crit!(inner, "{}", format!("{}", s)))
};

match self.mode {
LoggerMode::Stdout => {
let _guard = self.mutex.lock().unwrap();
print()
}
LoggerMode::File => {
print()
}
}
}
}
30 changes: 30 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,36 @@ fn rraft(py: Python, m: &PyModule) -> PyResult<()> {
m
)?)?;

m.add_function(wrap_pyfunction!(
utils::deserializer::set_confchange_context_deserializer,
m
)?)?;

m.add_function(wrap_pyfunction!(
utils::deserializer::set_confchangev2_context_deserializer,
m
)?)?;

m.add_function(wrap_pyfunction!(
utils::deserializer::set_entry_context_deserializer,
m
)?)?;

m.add_function(wrap_pyfunction!(
utils::deserializer::set_entry_data_deserializer,
m
)?)?;

m.add_function(wrap_pyfunction!(
utils::deserializer::set_message_context_deserializer,
m
)?)?;

m.add_function(wrap_pyfunction!(
utils::deserializer::set_snapshot_data_deserializer,
m
)?)?;

m.add(
"DestroyedRefUsedError",
py.get_type::<utils::errors::DestroyedRefUsedError>(),
Expand Down
Loading

0 comments on commit e48dc7b

Please sign in to comment.