diff --git a/rerun_py/src/arrow.rs b/rerun_py/src/arrow.rs index cdd830eb1842..bd579699a732 100644 --- a/rerun_py/src/arrow.rs +++ b/rerun_py/src/arrow.rs @@ -11,8 +11,8 @@ use arrow2::{ use pyo3::{ exceptions::{PyRuntimeError, PyValueError}, ffi::Py_uintptr_t, - types::{PyDict, PyString}, - PyAny, PyResult, + types::{PyAnyMethods as _, PyDict, PyDictMethods, PyString}, + Bound, PyAny, PyResult, }; use re_chunk::{Chunk, ChunkError, ChunkId, PendingRow, RowId, TimeColumn}; @@ -22,7 +22,10 @@ use re_sdk::{ComponentName, EntityPath, Timeline}; /// Perform conversion between a pyarrow array to arrow2 types. /// /// `name` is the name of the Rerun component, and the name of the pyarrow `Field` (column name). -fn array_to_rust(arrow_array: &PyAny, name: Option<&str>) -> PyResult<(Box, Field)> { +fn array_to_rust( + arrow_array: &Bound<'_, PyAny>, + name: Option<&str>, +) -> PyResult<(Box, Field)> { // prepare pointers to receive the Array struct let array = Box::new(ffi::ArrowArray::empty()); let schema = Box::new(ffi::ArrowSchema::empty()); @@ -90,7 +93,7 @@ fn array_to_rust(arrow_array: &PyAny, name: Option<&str>) -> PyResult<(Box, time_point: &TimePoint, ) -> PyResult { // Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency. @@ -99,8 +102,9 @@ pub fn build_row_from_components( let (arrays, fields): (Vec>, Vec) = itertools::process_results( components.iter().map(|(name, array)| { - let name = name.downcast::()?.to_str()?; - array_to_rust(array, Some(name)) + let py_name = name.downcast::()?; + let name: std::borrow::Cow<'_, str> = py_name.extract()?; + array_to_rust(&array, Some(&name)) }), |iter| iter.unzip(), )?; @@ -121,8 +125,8 @@ pub fn build_row_from_components( /// Build a [`Chunk`] given a '**kwargs'-style dictionary of component arrays. pub fn build_chunk_from_components( entity_path: EntityPath, - timelines: &PyDict, - components: &PyDict, + timelines: &Bound<'_, PyDict>, + components: &Bound<'_, PyDict>, ) -> PyResult { // Create chunk-id as early as possible. It has a timestamp and is used to estimate e2e latency. let chunk_id = ChunkId::new(); @@ -130,8 +134,9 @@ pub fn build_chunk_from_components( // Extract the timeline data let (arrays, fields): (Vec>, Vec) = itertools::process_results( timelines.iter().map(|(name, array)| { - let name = name.downcast::()?.to_str()?; - array_to_rust(array, Some(name)) + let py_name = name.downcast::()?; + let name: std::borrow::Cow<'_, str> = py_name.extract()?; + array_to_rust(&array, Some(&name)) }), |iter| iter.unzip(), )?; @@ -171,8 +176,9 @@ pub fn build_chunk_from_components( // Extract the component data let (arrays, fields): (Vec>, Vec) = itertools::process_results( components.iter().map(|(name, array)| { - let name = name.downcast::()?.to_str()?; - array_to_rust(array, Some(name)) + let py_name = name.downcast::()?; + let name: std::borrow::Cow<'_, str> = py_name.extract()?; + array_to_rust(&array, Some(&name)) }), |iter| iter.unzip(), )?; diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 35e3aeddb347..15b472e01dce 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -97,7 +97,7 @@ fn global_web_viewer_server( /// The python module is called "rerun_bindings". #[pymodule] -fn rerun_bindings(_py: Python<'_>, m: &PyModule) -> PyResult<()> { +fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> { // NOTE: We do this here because some the inner init methods don't respond too kindly to being // called more than once. re_log::setup_logging(); @@ -782,8 +782,8 @@ fn set_callback_sink(callback: PyObject, recording: Option<&PyRecordingStream>, let callback = move |msgs: &[LogMsg]| { Python::with_gil(|py| { let data = encode_ref_as_bytes_local(msgs.iter().map(Ok)).ok_or_log_error()?; - let bytes = PyBytes::new(py, &data); - callback.as_ref(py).call1((bytes,)).ok_or_log_error()?; + let bytes = PyBytes::new_bound(py, &data); + callback.bind(py).call1((bytes,)).ok_or_log_error()?; Some(()) }); }; @@ -830,7 +830,11 @@ impl PyMemorySinkStorage { /// Concatenate the contents of the [`MemorySinkStorage`] as bytes. /// /// Note: This will do a blocking flush before returning! - fn concat_as_bytes<'p>(&self, concat: Option<&Self>, py: Python<'p>) -> PyResult<&'p PyBytes> { + fn concat_as_bytes<'p>( + &self, + concat: Option<&Self>, + py: Python<'p>, + ) -> PyResult> { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { let concat_bytes = MemorySinkStorage::concat_memory_sinks_as_bytes( @@ -845,7 +849,7 @@ impl PyMemorySinkStorage { concat_bytes }) - .map(|bytes| PyBytes::new(py, bytes.as_slice())) + .map(|bytes| PyBytes::new_bound(py, bytes.as_slice())) .map_err(|err| PyRuntimeError::new_err(err.to_string())) } @@ -866,7 +870,7 @@ impl PyMemorySinkStorage { /// Drain all messages logged to the [`MemorySinkStorage`] and return as bytes. /// /// This will do a blocking flush before returning! - fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { + fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult> { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { let bytes = self.inner.drain_as_bytes(); @@ -875,7 +879,7 @@ impl PyMemorySinkStorage { bytes }) - .map(|bytes| PyBytes::new(py, bytes.as_slice())) + .map(|bytes| PyBytes::new_bound(py, bytes.as_slice())) .map_err(|err| PyRuntimeError::new_err(err.to_string())) } } @@ -892,9 +896,9 @@ impl PyBinarySinkStorage { /// /// If `flush` is `true`, the sink will be flushed before reading. #[pyo3(signature = (*, flush = true))] - fn read<'p>(&self, flush: bool, py: Python<'p>) -> &'p PyBytes { + fn read<'p>(&self, flush: bool, py: Python<'p>) -> Bound<'p, PyBytes> { // Release the GIL in case any flushing behavior needs to cleanup a python object. - PyBytes::new( + PyBytes::new_bound( py, py.allow_threads(|| { if flush { @@ -1066,7 +1070,7 @@ fn reset_time(recording: Option<&PyRecordingStream>) { fn log_arrow_msg( py: Python<'_>, entity_path: &str, - components: &PyDict, + components: Bound<'_, PyDict>, static_: bool, recording: Option<&PyRecordingStream>, ) -> PyResult<()> { @@ -1079,7 +1083,7 @@ fn log_arrow_msg( // It's important that we don't hold the session lock while building our arrow component. // the API we call to back through pyarrow temporarily releases the GIL, which can cause // a deadlock. - let row = crate::arrow::build_row_from_components(components, &TimePoint::default())?; + let row = crate::arrow::build_row_from_components(&components, &TimePoint::default())?; recording.record_row(entity_path, row, !static_); @@ -1108,8 +1112,8 @@ fn log_arrow_msg( fn send_arrow_chunk( py: Python<'_>, entity_path: &str, - timelines: &PyDict, - components: &PyDict, + timelines: Bound<'_, PyDict>, + components: Bound<'_, PyDict>, recording: Option<&PyRecordingStream>, ) -> PyResult<()> { let Some(recording) = get_data_recording(recording) else { @@ -1121,7 +1125,7 @@ fn send_arrow_chunk( // It's important that we don't hold the session lock while building our arrow component. // the API we call to back through pyarrow temporarily releases the GIL, which can cause // a deadlock. - let chunk = crate::arrow::build_chunk_from_components(entity_path, timelines, components)?; + let chunk = crate::arrow::build_chunk_from_components(entity_path, &timelines, &components)?; recording.send_chunk(chunk); @@ -1304,9 +1308,10 @@ fn escape_entity_path_part(part: &str) -> String { } #[pyfunction] -fn new_entity_path(parts: Vec<&str>) -> String { - let path = EntityPath::from(parts.into_iter().map(EntityPathPart::from).collect_vec()); - path.to_string() +fn new_entity_path(parts: Vec<&pyo3::types::PyString>) -> PyResult { + let parts: PyResult> = parts.iter().map(|part| part.to_str()).collect(); + let path = EntityPath::from(parts?.into_iter().map(EntityPathPart::from).collect_vec()); + Ok(path.to_string()) } // --- Helpers --- @@ -1361,16 +1366,16 @@ fn default_store_id(py: Python<'_>, variant: StoreKind, application_id: &str) -> } fn authkey(py: Python<'_>) -> PyResult> { - let locals = PyDict::new(py); + let locals = PyDict::new_bound(py); - py.run( + py.run_bound( r#" import multiprocessing # authkey is the same for child and parent processes, so this is how we know we're the same authkey = multiprocessing.current_process().authkey "#, None, - Some(locals), + Some(&locals), ) .and_then(|()| { locals @@ -1380,7 +1385,8 @@ authkey = multiprocessing.current_process().authkey .and_then(|authkey| { authkey .downcast() + .cloned() .map_err(|err| PyRuntimeError::new_err(err.to_string())) }) - .map(|authkey: &PyBytes| authkey.as_bytes().to_vec()) + .map(|authkey: Bound<'_, PyBytes>| authkey.as_bytes().to_vec()) }