diff --git a/src/rust/engine/src/externs/interface.rs b/src/rust/engine/src/externs/interface.rs index 91dab897153..ed86733a566 100644 --- a/src/rust/engine/src/externs/interface.rs +++ b/src/rust/engine/src/externs/interface.rs @@ -35,6 +35,7 @@ use pyo3::prelude::{ pyclass, pyfunction, pymethods, pymodule, wrap_pyfunction, PyModule, PyObject, PyResult as PyO3Result, Python, ToPyObject, }; +use pyo3::sync::GILProtected; use pyo3::types::{ PyAnyMethods, PyBytes, PyDict, PyDictMethods, PyList, PyListMethods, PyModuleMethods, PyTuple, PyType, @@ -173,18 +174,18 @@ pub fn initialize() -> PyO3Result<()> { } #[pyclass] -struct PyTasks(RefCell); +struct PyTasks(GILProtected>); #[pymethods] impl PyTasks { #[new] fn __new__() -> Self { - Self(RefCell::new(Tasks::new())) + Self(GILProtected::new(RefCell::new(Tasks::new()))) } } #[pyclass] -struct PyTypes(RefCell>); +struct PyTypes(GILProtected>>); #[pymethods] impl PyTypes { @@ -221,7 +222,7 @@ impl PyTypes { parsed_javascript_deps_candidate_result: &Bound<'_, PyType>, py: Python, ) -> Self { - Self(RefCell::new(Some(Types { + Self(GILProtected::new(RefCell::new(Some(Types { directory_digest: TypeId::new(&py.get_type_bound::()), file_digest: TypeId::new(&py.get_type_bound::()), snapshot: TypeId::new(&py.get_type_bound::()), @@ -265,7 +266,7 @@ impl PyTypes { deps_request: TypeId::new( &py.get_type_bound::(), ), - }))) + })))) } } @@ -509,14 +510,14 @@ impl PySessionCancellationLatch { #[pyclass] struct PyNailgunServer { - server: RefCell>, + server: GILProtected>>, executor: Executor, } #[pymethods] impl PyNailgunServer { - fn port(&self) -> PyO3Result { - let borrowed_server = self.server.borrow(); + fn port(&self, py: Python<'_>) -> PyO3Result { + let borrowed_server = self.server.get(py).borrow(); let server = borrowed_server.as_ref().ok_or_else(|| { PyException::new_err("Cannot get the port of a server that has already shut down.") })?; @@ -525,7 +526,7 @@ impl PyNailgunServer { } #[pyclass] -struct PyExecutionRequest(RefCell); +struct PyExecutionRequest(GILProtected>); #[pymethods] impl PyExecutionRequest { @@ -538,7 +539,7 @@ impl PyExecutionRequest { timeout: timeout_in_ms.map(Duration::from_millis), ..ExecutionRequest::default() }; - Self(RefCell::new(request)) + Self(GILProtected::new(RefCell::new(request))) } } @@ -658,17 +659,23 @@ fn nailgun_server_create( .block_on(server_future) .map_err(PyException::new_err)?; Ok(PyNailgunServer { - server: RefCell::new(Some(server)), + server: GILProtected::new(RefCell::new(Some(server))), executor: py_executor.0.clone(), }) } #[pyfunction] fn nailgun_server_await_shutdown( - py: Python, + py: Python<'_>, nailgun_server_ptr: &Bound<'_, PyNailgunServer>, ) -> PyO3Result<()> { - if let Some(server) = nailgun_server_ptr.borrow().server.borrow_mut().take() { + if let Some(server) = nailgun_server_ptr + .borrow() + .server + .get(py) + .borrow_mut() + .take() + { let executor = nailgun_server_ptr.borrow().executor.clone(); py.allow_threads(|| executor.block_on(server.shutdown())) .map_err(PyException::new_err) @@ -743,19 +750,20 @@ fn hash_prefix_zero_bits(item: &str) -> u32 { exec_strategy_opts, ca_certs_path ))] -fn scheduler_create( - py_executor: &Bound<'_, externs::scheduler::PyExecutor>, - py_tasks: &Bound<'_, PyTasks>, - types_ptr: &Bound<'_, PyTypes>, +fn scheduler_create<'py>( + py: Python<'py>, + py_executor: &Bound<'py, externs::scheduler::PyExecutor>, + py_tasks: &Bound<'py, PyTasks>, + types_ptr: &Bound<'py, PyTypes>, build_root: PathBuf, local_execution_root_dir: PathBuf, named_caches_dir: PathBuf, ignore_patterns: Vec, use_gitignore: bool, watch_filesystem: bool, - remoting_options: &Bound<'_, PyRemotingOptions>, - local_store_options: &Bound<'_, PyLocalStoreOptions>, - exec_strategy_opts: &Bound<'_, PyExecutionStrategyOptions>, + remoting_options: &Bound<'py, PyRemotingOptions>, + local_store_options: &Bound<'py, PyLocalStoreOptions>, + exec_strategy_opts: &Bound<'py, PyExecutionStrategyOptions>, ca_certs_path: Option, ) -> PyO3Result { match fs::increase_limits() { @@ -765,10 +773,11 @@ fn scheduler_create( let types = types_ptr .borrow() .0 + .get(py) .borrow_mut() .take() .ok_or_else(|| PyException::new_err("An instance of PyTypes may only be used once."))?; - let tasks = py_tasks.borrow().0.replace(Tasks::new()); + let tasks = py_tasks.borrow().0.get(py).replace(Tasks::new()); // NOTE: Enter the Tokio runtime so that libraries like Tonic (for gRPC) are able to // use `tokio::spawn` since Python does not setup Tokio for the main thread. This also @@ -1147,16 +1156,17 @@ fn scheduler_shutdown(py: Python, py_scheduler: &Bound<'_, PyScheduler>, timeout } #[pyfunction] -fn scheduler_execute( - py: Python<'_>, - py_scheduler: &Bound<'_, PyScheduler>, - py_session: &Bound<'_, PySession>, - py_execution_request: &Bound<'_, PyExecutionRequest>, +fn scheduler_execute<'py>( + py: Python<'py>, + py_scheduler: &Bound<'py, PyScheduler>, + py_session: &Bound<'py, PySession>, + py_execution_request: &Bound<'py, PyExecutionRequest>, ) -> PyO3Result> { let scheduler = &py_scheduler.borrow().0; let session = &py_session.borrow().0; - let execution_request_cell = &mut py_execution_request.borrow_mut().0; - let execution_request: &mut ExecutionRequest = execution_request_cell.get_mut(); + let execution_request_cell_ref = py_execution_request.borrow(); + let execution_request_cell = execution_request_cell_ref.0.get(py); + let execution_request: &mut ExecutionRequest = &mut execution_request_cell.borrow_mut(); scheduler.core.executor.enter(|| { // TODO: A parent_id should be an explicit argument. @@ -1179,15 +1189,17 @@ fn scheduler_execute( } #[pyfunction] -fn execution_add_root_select( - py_scheduler: &Bound<'_, PyScheduler>, - py_execution_request: &Bound<'_, PyExecutionRequest>, +fn execution_add_root_select<'py>( + py: Python<'py>, + py_scheduler: &Bound<'py, PyScheduler>, + py_execution_request: &Bound<'py, PyExecutionRequest>, param_vals: Vec, - product: &Bound<'_, PyType>, + product: &Bound<'py, PyType>, ) -> PyO3Result<()> { let scheduler = &py_scheduler.borrow().0; - let execution_request_cell = &mut py_execution_request.borrow_mut().0; - let execution_request: &mut ExecutionRequest = execution_request_cell.get_mut(); + let execution_request_cell_ref = py_execution_request.borrow(); + let execution_request_cell = execution_request_cell_ref.0.get(py); + let execution_request: &mut ExecutionRequest = &mut execution_request_cell.borrow_mut(); scheduler.core.executor.enter(|| { let product = TypeId::new(product); @@ -1202,12 +1214,13 @@ fn execution_add_root_select( } #[pyfunction] -fn tasks_task_begin( - py_tasks: &Bound<'_, PyTasks>, +fn tasks_task_begin<'py>( + py: Python<'py>, + py_tasks: &Bound<'py, PyTasks>, func: PyObject, - output_type: &Bound<'_, PyType>, - arg_types: Vec<(String, Bound<'_, PyType>)>, - masked_types: Vec>, + output_type: &Bound<'py, PyType>, + arg_types: Vec<(String, Bound<'py, PyType>)>, + masked_types: Vec>, side_effecting: bool, engine_aware_return_type: bool, cacheable: bool, @@ -1225,7 +1238,7 @@ fn tasks_task_begin( .map(|(name, typ)| (name, TypeId::new(&typ.as_borrowed()))) .collect(); let masked_types = masked_types.into_iter().map(|t| TypeId::new(&t)).collect(); - py_tasks.borrow_mut().0.borrow_mut().task_begin( + py_tasks.borrow_mut().0.get(py).borrow_mut().task_begin( func, output_type, side_effecting, @@ -1241,44 +1254,53 @@ fn tasks_task_begin( } #[pyfunction] -fn tasks_task_end(py_tasks: &Bound<'_, PyTasks>) { - py_tasks.borrow_mut().0.borrow_mut().task_end(); +fn tasks_task_end(py: Python<'_>, py_tasks: &Bound<'_, PyTasks>) { + py_tasks.borrow_mut().0.get(py).borrow_mut().task_end(); } #[pyfunction] -fn tasks_add_call( - py_tasks: &Bound<'_, PyTasks>, - output: &Bound<'_, PyType>, - inputs: Vec>, +fn tasks_add_call<'py>( + py: Python<'py>, + py_tasks: &Bound<'py, PyTasks>, + output: &Bound<'py, PyType>, + inputs: Vec>, rule_id: String, explicit_args_arity: u16, ) { let output = TypeId::new(output); let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect(); - py_tasks - .borrow_mut() - .0 - .borrow_mut() - .add_call(output, inputs, rule_id, explicit_args_arity); + py_tasks.borrow_mut().0.get(py).borrow_mut().add_call( + output, + inputs, + rule_id, + explicit_args_arity, + ); } #[pyfunction] -fn tasks_add_get( - py_tasks: &Bound<'_, PyTasks>, - output: &Bound<'_, PyType>, - inputs: Vec>, +fn tasks_add_get<'py>( + py: Python<'py>, + py_tasks: &Bound<'py, PyTasks>, + output: &Bound<'py, PyType>, + inputs: Vec>, ) { let output = TypeId::new(output); let inputs = inputs.into_iter().map(|t| TypeId::new(&t)).collect(); - py_tasks.borrow_mut().0.borrow_mut().add_get(output, inputs); + py_tasks + .borrow_mut() + .0 + .get(py) + .borrow_mut() + .add_get(output, inputs); } #[pyfunction] -fn tasks_add_get_union( - py_tasks: &Bound<'_, PyTasks>, - output_type: &Bound<'_, PyType>, - input_types: Vec>, - in_scope_types: Vec>, +fn tasks_add_get_union<'py>( + py: Python<'py>, + py_tasks: &Bound<'py, PyTasks>, + output_type: &Bound<'py, PyType>, + input_types: Vec>, + in_scope_types: Vec>, ) { let product = TypeId::new(output_type); let input_types = input_types.into_iter().map(|t| TypeId::new(&t)).collect(); @@ -1286,24 +1308,26 @@ fn tasks_add_get_union( .into_iter() .map(|t| TypeId::new(&t)) .collect(); - py_tasks - .borrow_mut() - .0 - .borrow_mut() - .add_get_union(product, input_types, in_scope_types); + py_tasks.borrow_mut().0.get(py).borrow_mut().add_get_union( + product, + input_types, + in_scope_types, + ); } #[pyfunction] -fn tasks_add_query( - py_tasks: &Bound<'_, PyTasks>, - output_type: &Bound<'_, PyType>, - input_types: Vec>, +fn tasks_add_query<'py>( + py: Python<'py>, + py_tasks: &Bound<'py, PyTasks>, + output_type: &Bound<'py, PyType>, + input_types: Vec>, ) { let product = TypeId::new(output_type); let params = input_types.into_iter().map(|t| TypeId::new(&t)).collect(); py_tasks .borrow_mut() .0 + .get(py) .borrow_mut() .query_add(product, params); } diff --git a/src/rust/engine/src/externs/mod.rs b/src/rust/engine/src/externs/mod.rs index 12ee80a2511..76ef434d42e 100644 --- a/src/rust/engine/src/externs/mod.rs +++ b/src/rust/engine/src/externs/mod.rs @@ -9,6 +9,7 @@ use futures::FutureExt; use lazy_static::lazy_static; use pyo3::exceptions::{PyAssertionError, PyException, PyStopIteration, PyTypeError, PyValueError}; use pyo3::prelude::*; +use pyo3::sync::GILProtected; use pyo3::types::{PyBytes, PyDict, PySequence, PyTuple, PyType}; use pyo3::{create_exception, import_exception, intern}; use pyo3::{FromPyObject, ToPyObject}; @@ -17,7 +18,6 @@ use std::cell::{Ref, RefCell}; use std::collections::BTreeMap; use std::convert::TryInto; use std::fmt; -use std::ops::Deref; use logging::PythonLogLevel; use rule_graph::RuleId; @@ -287,7 +287,7 @@ pub(crate) enum GeneratorInput { /// - a coroutine will eventually return a single return value. /// pub(crate) fn generator_send( - py: Python, + py: Python<'_>, generator_type: &TypeId, generator: &Value, input: GeneratorInput, @@ -348,13 +348,13 @@ pub(crate) fn generator_send( }; let result = if let Ok(call) = response.extract::>() { - Ok(GeneratorResponse::Call(call.take()?)) + Ok(GeneratorResponse::Call(call.take(py)?)) } else if let Ok(get) = response.extract::>() { // It isn't necessary to differentiate between `Get` and `Effect` here, as the static // analysis of `@rule`s has already validated usage. - Ok(GeneratorResponse::Get(get.take()?)) + Ok(GeneratorResponse::Get(get.take(py)?)) } else if let Ok(call) = response.extract::>() { - Ok(GeneratorResponse::NativeCall(call.take()?)) + Ok(GeneratorResponse::NativeCall(call.take(py)?)) } else if let Ok(get_multi) = response.downcast::() { // Was an `All` or `MultiGet`. let gogs = get_multi @@ -367,7 +367,7 @@ pub(crate) fn generator_send( Ok(GetOrGenerator::Generator(Value::new(gog.unbind()))) } else if let Ok(get) = gog.extract::>() { Ok(GetOrGenerator::Get( - get.take().map_err(PyException::new_err)?, + get.take(py).map_err(PyException::new_err)?, )) } else { Err(PyValueError::new_err(format!( @@ -488,15 +488,18 @@ fn interpret_get_inputs( } #[pyclass] -pub struct PyGeneratorResponseNativeCall(RefCell>); +pub struct PyGeneratorResponseNativeCall(GILProtected>>); impl PyGeneratorResponseNativeCall { pub fn new(call: impl Future> + 'static + Send) -> Self { - Self(RefCell::new(Some(NativeCall { call: call.boxed() }))) + Self(GILProtected::new(RefCell::new(Some(NativeCall { + call: call.boxed(), + })))) } - fn take(&self) -> Result { + fn take(&self, py: Python<'_>) -> Result { self.0 + .get(py) .borrow_mut() .take() .ok_or_else(|| "A `NativeCall` may only be consumed once.".to_owned()) @@ -525,11 +528,13 @@ impl PyGeneratorResponseNativeCall { } #[pyclass(subclass)] -pub struct PyGeneratorResponseCall(RefCell>); +pub struct PyGeneratorResponseCall(GILProtected>>); impl PyGeneratorResponseCall { - fn borrow_inner(&self) -> PyResult + '_> { - Ref::filter_map(self.0.borrow(), |inner| inner.as_ref()).map_err(|_| { + fn borrow_inner<'py>(&'py self, py: Python<'py>) -> PyResult> { + let inner: Ref<'py, _> = self.0.get(py).borrow(); + + Ref::filter_map(inner, |o: &Option| o.as_ref()).map_err(|_| { PyException::new_err( "A `Call` may not be consumed after being provided to the @rule engine.", ) @@ -562,25 +567,25 @@ impl PyGeneratorResponseCall { }; let (input_types, inputs) = interpret_get_inputs(py, input_arg0, input_arg1)?; - Ok(Self(RefCell::new(Some(Call { + Ok(Self(GILProtected::new(RefCell::new(Some(Call { rule_id: RuleId::from_string(rule_id), output_type, args, args_arity, input_types, inputs, - })))) + }))))) } #[getter] fn output_type<'py>(&self, py: Python<'py>) -> PyResult> { - Ok(self.borrow_inner()?.output_type.as_py_type_bound(py)) + Ok(self.borrow_inner(py)?.output_type.as_py_type_bound(py)) } #[getter] fn input_types<'py>(&self, py: Python<'py>) -> PyResult>> { Ok(self - .borrow_inner()? + .borrow_inner(py)? .input_types .iter() .map(|t| t.as_py_type_bound(py)) @@ -588,8 +593,8 @@ impl PyGeneratorResponseCall { } #[getter] - fn inputs<'p>(&'p self, py: Python<'p>) -> PyResult> { - let inner = self.borrow_inner()?; + fn inputs(&self, py: Python<'_>) -> PyResult> { + let inner = self.borrow_inner(py)?; let args: Vec = inner.args.as_ref().map_or_else( || Ok(Vec::default()), |args| args.to_py_object().extract(py), @@ -602,8 +607,9 @@ impl PyGeneratorResponseCall { } impl PyGeneratorResponseCall { - fn take(&self) -> Result { + fn take(&self, py: Python<'_>) -> Result { self.0 + .get(py) .borrow_mut() .take() .ok_or_else(|| "A `Call` may only be consumed once.".to_owned()) @@ -612,11 +618,12 @@ impl PyGeneratorResponseCall { // Contains a `RefCell>` in order to allow us to `take` the content without cloning. #[pyclass(subclass)] -pub struct PyGeneratorResponseGet(RefCell>); +pub struct PyGeneratorResponseGet(GILProtected>>); impl PyGeneratorResponseGet { - fn take(&self) -> Result { + fn take(&self, py: Python<'_>) -> Result { self.0 + .get(py) .borrow_mut() .take() .ok_or_else(|| "A `Get` may only be consumed once.".to_owned()) @@ -644,17 +651,18 @@ impl PyGeneratorResponseGet { let (input_types, inputs) = interpret_get_inputs(py, input_arg0, input_arg1)?; - Ok(Self(RefCell::new(Some(Get { + Ok(Self(GILProtected::new(RefCell::new(Some(Get { output, input_types, inputs, - })))) + }))))) } #[getter] fn output_type<'py>(&self, py: Python<'py>) -> PyResult> { Ok(self .0 + .get(py) .borrow() .as_ref() .ok_or_else(|| { @@ -670,6 +678,7 @@ impl PyGeneratorResponseGet { fn input_types<'py>(&self, py: Python<'py>) -> PyResult>> { Ok(self .0 + .get(py) .borrow() .as_ref() .ok_or_else(|| { @@ -684,9 +693,10 @@ impl PyGeneratorResponseGet { } #[getter] - fn inputs(&self) -> PyResult> { + fn inputs(&self, py: Python<'_>) -> PyResult> { Ok(self .0 + .get(py) .borrow() .as_ref() .ok_or_else(|| { @@ -700,10 +710,10 @@ impl PyGeneratorResponseGet { .collect()) } - fn __repr__(&self) -> PyResult { + fn __repr__(&self, py: Python<'_>) -> PyResult { Ok(format!( "{}", - self.0.borrow().as_ref().ok_or_else(|| { + self.0.get(py).borrow().as_ref().ok_or_else(|| { PyException::new_err( "A `Get` may not be consumed after being provided to the @rule engine.", )