Skip to content

Commit

Permalink
use GILProtected to synchronize access to various pyclass types (#…
Browse files Browse the repository at this point in the history
…21670)

As part of [upgrading to PyO3
v0.23.x](#21671), access
to`pyclass`-annotated types will no longer be implicitly synchronized
against the Python GIL. Those `pyclass`-annotated types [must now be
`Sync`](https://pyo3.rs/v0.23.1/class/thread-safety) and provide that
synchronization explicitly.

Several places in Pants use `RefCell` which is not Send/Sync by itself.
This PR uses `pyo3::sync::GILProtected` to provide explicit
synchronization against the GIL for those use cases. (Eventually, we may
wish to not use `GILProtected` to enable use of the Python 3.13 "no GIL"
free threaded build, but that day is not today.)

[Migration
guide](https://pyo3.rs/v0.23.1/migration.html#free-threaded-python-support)
  • Loading branch information
tdyas authored Nov 19, 2024
1 parent 691d396 commit aeb1b74
Show file tree
Hide file tree
Showing 2 changed files with 130 additions and 96 deletions.
164 changes: 94 additions & 70 deletions src/rust/engine/src/externs/interface.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use pyo3::prelude::{
pyclass, pyfunction, pymethods, pymodule, wrap_pyfunction, PyModule, PyObject,
PyResult as PyO3Result, Python, ToPyObject,
};
use pyo3::sync::GILProtected;
use pyo3::types::{
PyAnyMethods, PyBytes, PyDict, PyDictMethods, PyList, PyListMethods, PyModuleMethods, PyTuple,
PyType,
Expand Down Expand Up @@ -173,18 +174,18 @@ pub fn initialize() -> PyO3Result<()> {
}

#[pyclass]
struct PyTasks(RefCell<Tasks>);
struct PyTasks(GILProtected<RefCell<Tasks>>);

#[pymethods]
impl PyTasks {
#[new]
fn __new__() -> Self {
Self(RefCell::new(Tasks::new()))
Self(GILProtected::new(RefCell::new(Tasks::new())))
}
}

#[pyclass]
struct PyTypes(RefCell<Option<Types>>);
struct PyTypes(GILProtected<RefCell<Option<Types>>>);

#[pymethods]
impl PyTypes {
Expand Down Expand Up @@ -221,7 +222,7 @@ impl PyTypes {
parsed_javascript_deps_candidate_result: &Bound<'_, PyType>,
py: Python,
) -> Self {
Self(RefCell::new(Some(Types {
Self(GILProtected::new(RefCell::new(Some(Types {
directory_digest: TypeId::new(&py.get_type_bound::<externs::fs::PyDigest>()),
file_digest: TypeId::new(&py.get_type_bound::<externs::fs::PyFileDigest>()),
snapshot: TypeId::new(&py.get_type_bound::<externs::fs::PySnapshot>()),
Expand Down Expand Up @@ -265,7 +266,7 @@ impl PyTypes {
deps_request: TypeId::new(
&py.get_type_bound::<externs::dep_inference::PyNativeDependenciesRequest>(),
),
})))
}))))
}
}

Expand Down Expand Up @@ -509,14 +510,14 @@ impl PySessionCancellationLatch {

#[pyclass]
struct PyNailgunServer {
server: RefCell<Option<nailgun::Server>>,
server: GILProtected<RefCell<Option<nailgun::Server>>>,
executor: Executor,
}

#[pymethods]
impl PyNailgunServer {
fn port(&self) -> PyO3Result<u16> {
let borrowed_server = self.server.borrow();
fn port(&self, py: Python<'_>) -> PyO3Result<u16> {
let borrowed_server = self.server.get(py).borrow();
let server = borrowed_server.as_ref().ok_or_else(|| {
PyException::new_err("Cannot get the port of a server that has already shut down.")
})?;
Expand All @@ -525,7 +526,7 @@ impl PyNailgunServer {
}

#[pyclass]
struct PyExecutionRequest(RefCell<ExecutionRequest>);
struct PyExecutionRequest(GILProtected<RefCell<ExecutionRequest>>);

#[pymethods]
impl PyExecutionRequest {
Expand All @@ -538,7 +539,7 @@ impl PyExecutionRequest {
timeout: timeout_in_ms.map(Duration::from_millis),
..ExecutionRequest::default()
};
Self(RefCell::new(request))
Self(GILProtected::new(RefCell::new(request)))
}
}

Expand Down Expand Up @@ -658,17 +659,23 @@ fn nailgun_server_create(
.block_on(server_future)
.map_err(PyException::new_err)?;
Ok(PyNailgunServer {
server: RefCell::new(Some(server)),
server: GILProtected::new(RefCell::new(Some(server))),
executor: py_executor.0.clone(),
})
}

#[pyfunction]
fn nailgun_server_await_shutdown(
py: Python,
py: Python<'_>,
nailgun_server_ptr: &Bound<'_, PyNailgunServer>,
) -> PyO3Result<()> {
if let Some(server) = nailgun_server_ptr.borrow().server.borrow_mut().take() {
if let Some(server) = nailgun_server_ptr
.borrow()
.server
.get(py)
.borrow_mut()
.take()
{
let executor = nailgun_server_ptr.borrow().executor.clone();
py.allow_threads(|| executor.block_on(server.shutdown()))
.map_err(PyException::new_err)
Expand Down Expand Up @@ -743,19 +750,20 @@ fn hash_prefix_zero_bits(item: &str) -> u32 {
exec_strategy_opts,
ca_certs_path
))]
fn scheduler_create(
py_executor: &Bound<'_, externs::scheduler::PyExecutor>,
py_tasks: &Bound<'_, PyTasks>,
types_ptr: &Bound<'_, PyTypes>,
fn scheduler_create<'py>(
py: Python<'py>,
py_executor: &Bound<'py, externs::scheduler::PyExecutor>,
py_tasks: &Bound<'py, PyTasks>,
types_ptr: &Bound<'py, PyTypes>,
build_root: PathBuf,
local_execution_root_dir: PathBuf,
named_caches_dir: PathBuf,
ignore_patterns: Vec<String>,
use_gitignore: bool,
watch_filesystem: bool,
remoting_options: &Bound<'_, PyRemotingOptions>,
local_store_options: &Bound<'_, PyLocalStoreOptions>,
exec_strategy_opts: &Bound<'_, PyExecutionStrategyOptions>,
remoting_options: &Bound<'py, PyRemotingOptions>,
local_store_options: &Bound<'py, PyLocalStoreOptions>,
exec_strategy_opts: &Bound<'py, PyExecutionStrategyOptions>,
ca_certs_path: Option<PathBuf>,
) -> PyO3Result<PyScheduler> {
match fs::increase_limits() {
Expand All @@ -765,10 +773,11 @@ fn scheduler_create(
let types = types_ptr
.borrow()
.0
.get(py)
.borrow_mut()
.take()
.ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?;
let tasks = py_tasks.borrow().0.replace(Tasks::new());
let tasks = py_tasks.borrow().0.get(py).replace(Tasks::new());

// NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to
// use `tokio::spawn` since Python does not setup Tokio for the main thread. This also
Expand Down Expand Up @@ -1147,16 +1156,17 @@ fn scheduler_shutdown(py: Python, py_scheduler: &Bound<'_, PyScheduler>, timeout
}

#[pyfunction]
fn scheduler_execute(
py: Python<'_>,
py_scheduler: &Bound<'_, PyScheduler>,
py_session: &Bound<'_, PySession>,
py_execution_request: &Bound<'_, PyExecutionRequest>,
fn scheduler_execute<'py>(
py: Python<'py>,
py_scheduler: &Bound<'py, PyScheduler>,
py_session: &Bound<'py, PySession>,
py_execution_request: &Bound<'py, PyExecutionRequest>,
) -> PyO3Result<Vec<PyResult>> {
let scheduler = &py_scheduler.borrow().0;
let session = &py_session.borrow().0;
let execution_request_cell = &mut py_execution_request.borrow_mut().0;
let execution_request: &mut ExecutionRequest = execution_request_cell.get_mut();
let execution_request_cell_ref = py_execution_request.borrow();
let execution_request_cell = execution_request_cell_ref.0.get(py);
let execution_request: &mut ExecutionRequest = &mut execution_request_cell.borrow_mut();

scheduler.core.executor.enter(|| {
// TODO: A parent_id should be an explicit argument.
Expand All @@ -1179,15 +1189,17 @@ fn scheduler_execute(
}

#[pyfunction]
fn execution_add_root_select(
py_scheduler: &Bound<'_, PyScheduler>,
py_execution_request: &Bound<'_, PyExecutionRequest>,
fn execution_add_root_select<'py>(
py: Python<'py>,
py_scheduler: &Bound<'py, PyScheduler>,
py_execution_request: &Bound<'py, PyExecutionRequest>,
param_vals: Vec<PyObject>,
product: &Bound<'_, PyType>,
product: &Bound<'py, PyType>,
) -> PyO3Result<()> {
let scheduler = &py_scheduler.borrow().0;
let execution_request_cell = &mut py_execution_request.borrow_mut().0;
let execution_request: &mut ExecutionRequest = execution_request_cell.get_mut();
let execution_request_cell_ref = py_execution_request.borrow();
let execution_request_cell = execution_request_cell_ref.0.get(py);
let execution_request: &mut ExecutionRequest = &mut execution_request_cell.borrow_mut();

scheduler.core.executor.enter(|| {
let product = TypeId::new(product);
Expand All @@ -1202,12 +1214,13 @@ fn execution_add_root_select(
}

#[pyfunction]
fn tasks_task_begin(
py_tasks: &Bound<'_, PyTasks>,
fn tasks_task_begin<'py>(
py: Python<'py>,
py_tasks: &Bound<'py, PyTasks>,
func: PyObject,
output_type: &Bound<'_, PyType>,
arg_types: Vec<(String, Bound<'_, PyType>)>,
masked_types: Vec<Bound<'_, PyType>>,
output_type: &Bound<'py, PyType>,
arg_types: Vec<(String, Bound<'py, PyType>)>,
masked_types: Vec<Bound<'py, PyType>>,
side_effecting: bool,
engine_aware_return_type: bool,
cacheable: bool,
Expand All @@ -1225,7 +1238,7 @@ fn tasks_task_begin(
.map(|(name, typ)| (name, TypeId::new(&typ.as_borrowed())))
.collect();
let masked_types = masked_types.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks.borrow_mut().0.borrow_mut().task_begin(
py_tasks.borrow_mut().0.get(py).borrow_mut().task_begin(
func,
output_type,
side_effecting,
Expand All @@ -1241,69 +1254,80 @@ fn tasks_task_begin(
}

#[pyfunction]
fn tasks_task_end(py_tasks: &Bound<'_, PyTasks>) {
py_tasks.borrow_mut().0.borrow_mut().task_end();
fn tasks_task_end(py: Python<'_>, py_tasks: &Bound<'_, PyTasks>) {
py_tasks.borrow_mut().0.get(py).borrow_mut().task_end();
}

#[pyfunction]
fn tasks_add_call(
py_tasks: &Bound<'_, PyTasks>,
output: &Bound<'_, PyType>,
inputs: Vec<Bound<'_, PyType>>,
fn tasks_add_call<'py>(
py: Python<'py>,
py_tasks: &Bound<'py, PyTasks>,
output: &Bound<'py, PyType>,
inputs: Vec<Bound<'py, PyType>>,
rule_id: String,
explicit_args_arity: u16,
) {
let output = TypeId::new(output);
let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks
.borrow_mut()
.0
.borrow_mut()
.add_call(output, inputs, rule_id, explicit_args_arity);
py_tasks.borrow_mut().0.get(py).borrow_mut().add_call(
output,
inputs,
rule_id,
explicit_args_arity,
);
}

#[pyfunction]
fn tasks_add_get(
py_tasks: &Bound<'_, PyTasks>,
output: &Bound<'_, PyType>,
inputs: Vec<Bound<'_, PyType>>,
fn tasks_add_get<'py>(
py: Python<'py>,
py_tasks: &Bound<'py, PyTasks>,
output: &Bound<'py, PyType>,
inputs: Vec<Bound<'py, PyType>>,
) {
let output = TypeId::new(output);
let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks.borrow_mut().0.borrow_mut().add_get(output, inputs);
py_tasks
.borrow_mut()
.0
.get(py)
.borrow_mut()
.add_get(output, inputs);
}

#[pyfunction]
fn tasks_add_get_union(
py_tasks: &Bound<'_, PyTasks>,
output_type: &Bound<'_, PyType>,
input_types: Vec<Bound<'_, PyType>>,
in_scope_types: Vec<Bound<'_, PyType>>,
fn tasks_add_get_union<'py>(
py: Python<'py>,
py_tasks: &Bound<'py, PyTasks>,
output_type: &Bound<'py, PyType>,
input_types: Vec<Bound<'py, PyType>>,
in_scope_types: Vec<Bound<'py, PyType>>,
) {
let product = TypeId::new(output_type);
let input_types = input_types.into_iter().map(|t| TypeId::new(&t)).collect();
let in_scope_types = in_scope_types
.into_iter()
.map(|t| TypeId::new(&t))
.collect();
py_tasks
.borrow_mut()
.0
.borrow_mut()
.add_get_union(product, input_types, in_scope_types);
py_tasks.borrow_mut().0.get(py).borrow_mut().add_get_union(
product,
input_types,
in_scope_types,
);
}

#[pyfunction]
fn tasks_add_query(
py_tasks: &Bound<'_, PyTasks>,
output_type: &Bound<'_, PyType>,
input_types: Vec<Bound<'_, PyType>>,
fn tasks_add_query<'py>(
py: Python<'py>,
py_tasks: &Bound<'py, PyTasks>,
output_type: &Bound<'py, PyType>,
input_types: Vec<Bound<'py, PyType>>,
) {
let product = TypeId::new(output_type);
let params = input_types.into_iter().map(|t| TypeId::new(&t)).collect();
py_tasks
.borrow_mut()
.0
.get(py)
.borrow_mut()
.query_add(product, params);
}
Expand Down
Loading

0 comments on commit aeb1b74

Please sign in to comment.