Skip to content

Commit

Permalink
Update deprecated API usage
Browse files Browse the repository at this point in the history
  • Loading branch information
jleibs authored and teh-cmc committed Aug 31, 2024
1 parent 56845ab commit b53150e
Show file tree
Hide file tree
Showing 2 changed files with 45 additions and 33 deletions.
30 changes: 18 additions & 12 deletions rerun_py/src/arrow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use arrow2::{
use pyo3::{
exceptions::{PyRuntimeError, PyValueError},
ffi::Py_uintptr_t,
types::{PyDict, PyString},
PyAny, PyResult,
types::{PyAnyMethods as _, PyDict, PyDictMethods, PyString},
Bound, PyAny, PyResult,
};

use re_chunk::{Chunk, ChunkError, ChunkId, PendingRow, RowId, TimeColumn};
Expand All @@ -22,7 +22,10 @@ use re_sdk::{ComponentName, EntityPath, Timeline};
/// Perform conversion between a pyarrow array to arrow2 types.
///
/// `name` is the name of the Rerun component, and the name of the pyarrow `Field` (column name).
fn array_to_rust(arrow_array: &PyAny, name: Option<&str>) -> PyResult<(Box<dyn Array>, Field)> {
fn array_to_rust(
arrow_array: &Bound<'_, PyAny>,
name: Option<&str>,
) -> PyResult<(Box<dyn Array>, Field)> {
// prepare pointers to receive the Array struct
let array = Box::new(ffi::ArrowArray::empty());
let schema = Box::new(ffi::ArrowSchema::empty());
Expand Down Expand Up @@ -90,7 +93,7 @@ fn array_to_rust(arrow_array: &PyAny, name: Option<&str>) -> PyResult<(Box<dyn A

/// Build a [`PendingRow`] given a '**kwargs'-style dictionary of component arrays.
pub fn build_row_from_components(
components: &PyDict,
components: &Bound<'_, PyDict>,
time_point: &TimePoint,
) -> PyResult<PendingRow> {
// Create row-id as early as possible. It has a timestamp and is used to estimate e2e latency.
Expand All @@ -99,8 +102,9 @@ pub fn build_row_from_components(

let (arrays, fields): (Vec<Box<dyn Array>>, Vec<Field>) = itertools::process_results(
components.iter().map(|(name, array)| {
let name = name.downcast::<PyString>()?.to_str()?;
array_to_rust(array, Some(name))
let py_name = name.downcast::<PyString>()?;
let name: std::borrow::Cow<'_, str> = py_name.extract()?;
array_to_rust(&array, Some(&name))
}),
|iter| iter.unzip(),
)?;
Expand All @@ -121,17 +125,18 @@ pub fn build_row_from_components(
/// Build a [`Chunk`] given a '**kwargs'-style dictionary of component arrays.
pub fn build_chunk_from_components(
entity_path: EntityPath,
timelines: &PyDict,
components: &PyDict,
timelines: &Bound<'_, PyDict>,
components: &Bound<'_, PyDict>,
) -> PyResult<Chunk> {
// Create chunk-id as early as possible. It has a timestamp and is used to estimate e2e latency.
let chunk_id = ChunkId::new();

// Extract the timeline data
let (arrays, fields): (Vec<Box<dyn Array>>, Vec<Field>) = itertools::process_results(
timelines.iter().map(|(name, array)| {
let name = name.downcast::<PyString>()?.to_str()?;
array_to_rust(array, Some(name))
let py_name = name.downcast::<PyString>()?;
let name: std::borrow::Cow<'_, str> = py_name.extract()?;
array_to_rust(&array, Some(&name))
}),
|iter| iter.unzip(),
)?;
Expand Down Expand Up @@ -171,8 +176,9 @@ pub fn build_chunk_from_components(
// Extract the component data
let (arrays, fields): (Vec<Box<dyn Array>>, Vec<Field>) = itertools::process_results(
components.iter().map(|(name, array)| {
let name = name.downcast::<PyString>()?.to_str()?;
array_to_rust(array, Some(name))
let py_name = name.downcast::<PyString>()?;
let name: std::borrow::Cow<'_, str> = py_name.extract()?;
array_to_rust(&array, Some(&name))
}),
|iter| iter.unzip(),
)?;
Expand Down
48 changes: 27 additions & 21 deletions rerun_py/src/python_bridge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@ fn global_web_viewer_server(

/// The python module is called "rerun_bindings".
#[pymodule]
fn rerun_bindings(_py: Python<'_>, m: &PyModule) -> PyResult<()> {
fn rerun_bindings(_py: Python<'_>, m: &Bound<'_, PyModule>) -> PyResult<()> {
// NOTE: We do this here because some the inner init methods don't respond too kindly to being
// called more than once.
re_log::setup_logging();
Expand Down Expand Up @@ -782,8 +782,8 @@ fn set_callback_sink(callback: PyObject, recording: Option<&PyRecordingStream>,
let callback = move |msgs: &[LogMsg]| {
Python::with_gil(|py| {
let data = encode_ref_as_bytes_local(msgs.iter().map(Ok)).ok_or_log_error()?;
let bytes = PyBytes::new(py, &data);
callback.as_ref(py).call1((bytes,)).ok_or_log_error()?;
let bytes = PyBytes::new_bound(py, &data);
callback.bind(py).call1((bytes,)).ok_or_log_error()?;
Some(())
});
};
Expand Down Expand Up @@ -830,7 +830,11 @@ impl PyMemorySinkStorage {
/// Concatenate the contents of the [`MemorySinkStorage`] as bytes.
///
/// Note: This will do a blocking flush before returning!
fn concat_as_bytes<'p>(&self, concat: Option<&Self>, py: Python<'p>) -> PyResult<&'p PyBytes> {
fn concat_as_bytes<'p>(
&self,
concat: Option<&Self>,
py: Python<'p>,
) -> PyResult<Bound<'p, PyBytes>> {
// Release the GIL in case any flushing behavior needs to cleanup a python object.
py.allow_threads(|| {
let concat_bytes = MemorySinkStorage::concat_memory_sinks_as_bytes(
Expand All @@ -845,7 +849,7 @@ impl PyMemorySinkStorage {

concat_bytes
})
.map(|bytes| PyBytes::new(py, bytes.as_slice()))
.map(|bytes| PyBytes::new_bound(py, bytes.as_slice()))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}

Expand All @@ -866,7 +870,7 @@ impl PyMemorySinkStorage {
/// Drain all messages logged to the [`MemorySinkStorage`] and return as bytes.
///
/// This will do a blocking flush before returning!
fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> {
fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyBytes>> {
// Release the GIL in case any flushing behavior needs to cleanup a python object.
py.allow_threads(|| {
let bytes = self.inner.drain_as_bytes();
Expand All @@ -875,7 +879,7 @@ impl PyMemorySinkStorage {

bytes
})
.map(|bytes| PyBytes::new(py, bytes.as_slice()))
.map(|bytes| PyBytes::new_bound(py, bytes.as_slice()))
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
}
}
Expand All @@ -892,9 +896,9 @@ impl PyBinarySinkStorage {
///
/// If `flush` is `true`, the sink will be flushed before reading.
#[pyo3(signature = (*, flush = true))]
fn read<'p>(&self, flush: bool, py: Python<'p>) -> &'p PyBytes {
fn read<'p>(&self, flush: bool, py: Python<'p>) -> Bound<'p, PyBytes> {
// Release the GIL in case any flushing behavior needs to cleanup a python object.
PyBytes::new(
PyBytes::new_bound(
py,
py.allow_threads(|| {
if flush {
Expand Down Expand Up @@ -1066,7 +1070,7 @@ fn reset_time(recording: Option<&PyRecordingStream>) {
fn log_arrow_msg(
py: Python<'_>,
entity_path: &str,
components: &PyDict,
components: Bound<'_, PyDict>,
static_: bool,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
Expand All @@ -1079,7 +1083,7 @@ fn log_arrow_msg(
// It's important that we don't hold the session lock while building our arrow component.
// the API we call to back through pyarrow temporarily releases the GIL, which can cause
// a deadlock.
let row = crate::arrow::build_row_from_components(components, &TimePoint::default())?;
let row = crate::arrow::build_row_from_components(&components, &TimePoint::default())?;

recording.record_row(entity_path, row, !static_);

Expand Down Expand Up @@ -1108,8 +1112,8 @@ fn log_arrow_msg(
fn send_arrow_chunk(
py: Python<'_>,
entity_path: &str,
timelines: &PyDict,
components: &PyDict,
timelines: Bound<'_, PyDict>,
components: Bound<'_, PyDict>,
recording: Option<&PyRecordingStream>,
) -> PyResult<()> {
let Some(recording) = get_data_recording(recording) else {
Expand All @@ -1121,7 +1125,7 @@ fn send_arrow_chunk(
// It's important that we don't hold the session lock while building our arrow component.
// the API we call to back through pyarrow temporarily releases the GIL, which can cause
// a deadlock.
let chunk = crate::arrow::build_chunk_from_components(entity_path, timelines, components)?;
let chunk = crate::arrow::build_chunk_from_components(entity_path, &timelines, &components)?;

recording.send_chunk(chunk);

Expand Down Expand Up @@ -1304,9 +1308,10 @@ fn escape_entity_path_part(part: &str) -> String {
}

#[pyfunction]
fn new_entity_path(parts: Vec<&str>) -> String {
let path = EntityPath::from(parts.into_iter().map(EntityPathPart::from).collect_vec());
path.to_string()
fn new_entity_path(parts: Vec<&pyo3::types::PyString>) -> PyResult<String> {
let parts: PyResult<Vec<&str>> = parts.iter().map(|part| part.to_str()).collect();
let path = EntityPath::from(parts?.into_iter().map(EntityPathPart::from).collect_vec());
Ok(path.to_string())
}

// --- Helpers ---
Expand Down Expand Up @@ -1361,16 +1366,16 @@ fn default_store_id(py: Python<'_>, variant: StoreKind, application_id: &str) ->
}

fn authkey(py: Python<'_>) -> PyResult<Vec<u8>> {
let locals = PyDict::new(py);
let locals = PyDict::new_bound(py);

py.run(
py.run_bound(
r#"
import multiprocessing
# authkey is the same for child and parent processes, so this is how we know we're the same
authkey = multiprocessing.current_process().authkey
"#,
None,
Some(locals),
Some(&locals),
)
.and_then(|()| {
locals
Expand All @@ -1380,7 +1385,8 @@ authkey = multiprocessing.current_process().authkey
.and_then(|authkey| {
authkey
.downcast()
.cloned()
.map_err(|err| PyRuntimeError::new_err(err.to_string()))
})
.map(|authkey: &PyBytes| authkey.as_bytes().to_vec())
.map(|authkey: Bound<'_, PyBytes>| authkey.as_bytes().to_vec())
}

0 comments on commit b53150e

Please sign in to comment.