Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: check if a thread is still alive after the block is dead #1875

Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 69 additions & 1 deletion crates/native_blockifier/src/py_block_executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ use crate::storage::{PapyrusStorage, Storage, StorageConfig};
pub(crate) type RawTransactionExecutionResult = Vec<u8>;
pub(crate) type PyVisitedSegmentsMapping = Vec<(PyFelt, Vec<usize>)>;

use std::sync::{Arc, Mutex};
use std::thread::{self, JoinHandle};
#[cfg(test)]
#[path = "py_block_executor_test.rs"]
mod py_block_executor_test;
Expand Down Expand Up @@ -113,7 +115,11 @@ impl ThinTransactionExecutionInfo {
ResourcesMapping(resources)
}
}

#[derive(Clone)]
pub struct MyStruct {
pub a: i32,
b: i32,
}
#[pyclass]
pub struct PyBlockExecutor {
pub bouncer_config: BouncerConfig,
Expand All @@ -124,8 +130,25 @@ pub struct PyBlockExecutor {
/// `Send` trait is required for `pyclass` compatibility as Python objects must be threadsafe.
pub storage: Box<dyn Storage + Send>,
pub global_contract_cache: GlobalContractCache,
thread_handle: Arc<Mutex<Option<JoinHandle<()>>>>,
shared_struct: Arc<Mutex<Option<MyStruct>>>,
}

fn thread_logic(shared_struct: Arc<Mutex<Option<MyStruct>>>) {
let mut temp_shared_struct = shared_struct.lock().unwrap();
*temp_shared_struct = Some(MyStruct { a: 1, b: 2 });
std::mem::drop(temp_shared_struct);
loop {
let temp_shared_struct = shared_struct.try_lock().expect("msg");
let my_struct = temp_shared_struct.as_ref().unwrap();
println!("MyStruct is accessible: a = {}, b = {}", my_struct.a, my_struct.b);
if my_struct.a >= 8 {
panic!("Mystuct.a: {} is greater then 8.", my_struct.a);
}
std::mem::drop(temp_shared_struct);
thread::sleep(std::time::Duration::from_secs(1));
}
}
#[pymethods]
impl PyBlockExecutor {
#[new]
Expand All @@ -145,6 +168,12 @@ impl PyBlockExecutor {
VersionedConstants::get_versioned_constants(py_versioned_constants_overrides.into());
log::debug!("Initialized Block Executor.");

let shared_struct = Arc::new(Mutex::new(None));
let shared_struct_clone = Arc::clone(&shared_struct);
let thread_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
thread_logic(shared_struct_clone);
}))));

Self {
bouncer_config: bouncer_config.try_into().expect("Failed to parse bouncer config."),
tx_executor_config: TransactionExecutorConfig {
Expand All @@ -155,6 +184,8 @@ impl PyBlockExecutor {
tx_executor: None,
storage: Box::new(storage),
global_contract_cache: GlobalContractCache::new(global_contract_cache_size),
thread_handle,
shared_struct,
}
}

Expand Down Expand Up @@ -245,6 +276,28 @@ impl PyBlockExecutor {
})
.collect();

let is_thread_alive = {
let handle = self.thread_handle.lock().unwrap();
if let Some(handle) = handle.as_ref() {
handle.thread().unpark();
true
} else {
false
}
};

if is_thread_alive {
println!("Thread is still alive");
} else {
println!("Thread is not alive");
}

let mut shared_struct = self.shared_struct.lock().unwrap();
if let Some(ref mut my_struct) = *shared_struct {
my_struct.a += 1;
}
std::mem::drop(shared_struct);

// Convert to Py types and allocate it on Python's heap, to be visible for Python's
// garbage collector.
Python::with_gil(|py| {
Expand Down Expand Up @@ -376,6 +429,11 @@ impl PyBlockExecutor {
// contracts.
let mut versioned_constants = VersionedConstants::latest_constants().clone();
versioned_constants.disable_cairo0_redeclaration = false;
let shared_struct = Arc::new(Mutex::new(None));
let shared_struct_clone = Arc::clone(&shared_struct);
let thread_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
thread_logic(shared_struct_clone);
}))));
Self {
bouncer_config: BouncerConfig {
block_max_capacity: BouncerWeights {
Expand All @@ -386,11 +444,14 @@ impl PyBlockExecutor {
tx_executor_config: TransactionExecutorConfig {
concurrency_config: concurrency_config.into(),
},

storage: Box::new(PapyrusStorage::new_for_testing(path, &os_config.chain_id)),
chain_info: os_config.into_chain_info(),
versioned_constants,
tx_executor: None,
global_contract_cache: GlobalContractCache::new(GLOBAL_CONTRACT_CACHE_SIZE_FOR_TEST),
thread_handle,
shared_struct,
}
}
}
Expand All @@ -413,6 +474,11 @@ impl PyBlockExecutor {
#[cfg(any(feature = "testing", test))]
pub fn create_for_testing_with_storage(storage: impl Storage + Send + 'static) -> Self {
use blockifier::state::global_cache::GLOBAL_CONTRACT_CACHE_SIZE_FOR_TEST;
let shared_struct = Arc::new(Mutex::new(None));
let shared_struct_clone = Arc::clone(&shared_struct);
let thread_handle = Arc::new(Mutex::new(Some(thread::spawn(move || {
thread_logic(shared_struct_clone);
}))));
Self {
bouncer_config: BouncerConfig::max(),
tx_executor_config: TransactionExecutorConfig::create_for_testing(true),
Expand All @@ -421,6 +487,8 @@ impl PyBlockExecutor {
versioned_constants: VersionedConstants::latest_constants().clone(),
tx_executor: None,
global_contract_cache: GlobalContractCache::new(GLOBAL_CONTRACT_CACHE_SIZE_FOR_TEST),
thread_handle,
shared_struct,
}
}

Expand Down
Loading