Skip to content

Commit

Permalink
Convert unneeded exceptions to warnings and autoregister atexit (#14)
Browse files Browse the repository at this point in the history
* Convert unneeded exceptions into warnings

* Register atexit to stop gilknocker

* Fixup

* Bump version
  • Loading branch information
milesgranger authored Apr 5, 2023
1 parent 03d5f43 commit 258e078
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 41 deletions.
26 changes: 13 additions & 13 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "gil-knocker"
version = "0.4.0"
version = "0.4.1"
edition = "2021"
authors = ["Miles Granger <miles59923@gmail.com>"]
license = "MIT"
Expand All @@ -19,5 +19,5 @@ codegen-units = 1
opt-level = 3

[dependencies]
pyo3 = { version = "0.17.3", features = ["extension-module"] }
pyo3 = { version = "0.18.2", features = ["extension-module"] }
parking_lot = "^0.12"
76 changes: 50 additions & 26 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,8 @@
use parking_lot::{const_rwlock, RwLock};
use pyo3::ffi::{PyEval_InitThreads, PyEval_ThreadsInitialized};
use pyo3::prelude::*;
use pyo3::{
exceptions::{PyBrokenPipeError, PyRuntimeError, PyTimeoutError},
PyResult,
};
use pyo3::{AsPyPointer, PyResult};
use std::ops::DerefMut;
use std::{
mem::take,
sync::{
Expand Down Expand Up @@ -120,46 +118,69 @@ impl KnockKnock {
}

/// Reset the contention metric/monitoring state
pub fn reset_contention_metric(&mut self) -> PyResult<()> {
pub fn reset_contention_metric(&mut self, py: Python) -> PyResult<()> {
if let Some(tx) = &self.tx {
// notify thread to reset metric and timers
tx.send(Message::Reset)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
if let Err(e) = tx.send(Message::Reset) {
let warning = py.get_type::<pyo3::exceptions::PyUserWarning>();
PyErr::warn(py, warning, &e.to_string(), 0)?;
}

// wait for ack
self.rx
if let Err(e) = self
.rx
.as_ref()
.unwrap() // if tx is set, then rx is as well.
.recv_timeout(self.timeout)
.map_err(|e| PyRuntimeError::new_err(e.to_string()))?;
{
let warning = py.get_type::<pyo3::exceptions::PyUserWarning>();
PyErr::warn(py, warning, &e.to_string(), 0)?;
}
}
*(*self.contention_metric).write() = 0f32;
Ok(())
}

/// Start polling the GIL to check if it's locked.
pub fn start(&mut self, py: Python) -> () {
fn start(mut slf: PyRefMut<'_, Self>) -> PyResult<()> {
unsafe {
if PyEval_ThreadsInitialized() == 0 {
PyEval_InitThreads();
}
}

// Register atexit function to stop gilknocker thread
// which reduces the chance of odd 'no Python frame' core dumps
// when trying to acquire the GIL when the process has exited.
{
let ptr = slf.as_ptr();
let py = slf.py();
let __knocker = unsafe { PyObject::from_borrowed_ptr(py, ptr) };
let atexit = py.import("atexit")?;
let locals = pyo3::types::PyDict::new(py);
locals.set_item("__knocker", __knocker)?;
locals.set_item("atexit", atexit)?;
py.run("atexit.register(__knocker.stop)", None, Some(locals))?;
}

let self_: &mut KnockKnock = slf.deref_mut();

// send messages to thread
let (tx, recv) = channel();
self.tx = Some(tx);
self_.tx = Some(tx);

// recieve messages from thread
let (send, rx) = channel();
self.rx = Some(rx);
self_.rx = Some(rx);

let contention_metric = Arc::new(const_rwlock(0_f32));
self.contention_metric = contention_metric.clone();
self_.contention_metric = contention_metric.clone();

let polling_interval = self.polling_interval;
let sampling_interval = self.sampling_interval;
let sleeping_interval = self.sleeping_interval;
let polling_interval = self_.polling_interval;
let sampling_interval = self_.sampling_interval;
let sleeping_interval = self_.sleeping_interval;

let handle = py.allow_threads(move || {
let handle = {
thread::spawn(move || {
let mut total_time_waiting = Duration::from_millis(0);
let mut total_time_sampling = Duration::from_millis(0);
Expand Down Expand Up @@ -213,8 +234,9 @@ impl KnockKnock {
}
}
})
});
self.handle = Some(handle);
};
self_.handle = Some(handle);
Ok(())
}

/// Is the GIL knocker thread running?
Expand All @@ -224,23 +246,25 @@ impl KnockKnock {
}

/// Stop polling the GIL.
pub fn stop(&mut self) -> PyResult<()> {
pub fn stop(&mut self, py: Python) -> PyResult<()> {
if let Some(handle) = take(&mut self.handle) {
if let Some(send) = take(&mut self.tx) {
send.send(Message::Stop)
.map_err(|e| PyBrokenPipeError::new_err(e.to_string()))?;
if let Err(e) = send.send(Message::Stop) {
let warning = py.get_type::<pyo3::exceptions::PyUserWarning>();
PyErr::warn(py, warning, &e.to_string(), 0)?;
}

let start = Instant::now();
while !handle.is_finished() {
if start.elapsed() > self.timeout {
return Err(PyTimeoutError::new_err("Failed to stop knocker thread."));
let warning = py.get_type::<pyo3::exceptions::PyUserWarning>();
PyErr::warn(py, warning, "Timed out waiting for sampling thread.", 0)?;
return Ok(());
}
thread::sleep(Duration::from_millis(100));
}
}
handle
.join()
.map_err(|_| PyRuntimeError::new_err("Failed to join knocker thread."))?;
handle.join().ok(); // Just ignore any potential panic from sampling thread.
}
Ok(())
}
Expand Down

0 comments on commit 258e078

Please sign in to comment.