From 3b8e1d1f090ae3a51b86442ce892aa85d3263865 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 1 May 2024 08:23:19 -0400 Subject: [PATCH 1/9] Introduce drain API to PyMemorySinkStorage --- crates/re_sdk/src/log_sink.rs | 23 +++++++++++++++++++++++ rerun_py/src/python_bridge.rs | 18 +++++++++++++++++- 2 files changed, 40 insertions(+), 1 deletion(-) diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index f8fdfdb5efec..5ccfc886ab85 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -245,6 +245,29 @@ impl MemorySinkStorage { Ok(buffer.into_inner()) } + /// Drain the stored messages and return them as an in-memory RRD. + #[inline] + pub fn drain_memory_sink_as_bytes( + &self, + ) -> Result, re_log_encoding::encoder::EncodeError> { + let mut buffer = std::io::Cursor::new(Vec::new()); + + { + let encoding_options = re_log_encoding::EncodingOptions::COMPRESSED; + let mut encoder = + re_log_encoding::encoder::Encoder::new(encoding_options, &mut buffer)?; + + let mut inner = self.inner.lock(); + inner.has_been_used = true; + + for message in &std::mem::take(&mut inner.msgs) { + encoder.append(message)?; + } + } + + Ok(buffer.into_inner()) + } + #[inline] /// Get the [`StoreId`] from the associated `RecordingStream` if it exists. pub fn store_id(&self) -> Option { diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 92306c57d6b7..fe37a1aee6bc 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -755,7 +755,7 @@ struct PyMemorySinkStorage { #[pymethods] impl PyMemorySinkStorage { - /// Concatenate the contents of the [`MemorySinkStorage`] as byes. + /// Concatenate the contents of the [`MemorySinkStorage`] as bytes. /// /// Note: This will do a blocking flush before returning! fn concat_as_bytes<'p>( @@ -792,6 +792,22 @@ impl PyMemorySinkStorage { self.inner.num_msgs() } + + /// Drain all messages logged to the [`MemorySinkStorage`] and return as bytes. + /// + /// This will do a blocking flush before returning! + fn drain<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { + // Release the GIL in case any flushing behavior needs to cleanup a python object. + py.allow_threads(|| { + self.rec.flush_blocking(); + flush_garbage_queue(); + }); + + self.inner + .drain_memory_sink_as_bytes() + .map(|bytes| PyBytes::new(py, bytes.as_slice())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) + } } /// Serve a web-viewer. From 230ed29bcc3f4dadd0478aa94e5a0ab9e00e1e1d Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 1 May 2024 09:01:22 -0400 Subject: [PATCH 2/9] Plumb through drain_as_bytes from python --- crates/re_sdk/src/log_sink.rs | 4 +--- rerun_py/rerun_sdk/rerun/memory.py | 8 ++++++++ rerun_py/src/python_bridge.rs | 4 ++-- 3 files changed, 11 insertions(+), 5 deletions(-) diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index 5ccfc886ab85..a4326b115f8b 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -247,9 +247,7 @@ impl MemorySinkStorage { /// Drain the stored messages and return them as an in-memory RRD. #[inline] - pub fn drain_memory_sink_as_bytes( - &self, - ) -> Result, re_log_encoding::encoder::EncodeError> { + pub fn drain_as_bytes(&self) -> Result, re_log_encoding::encoder::EncodeError> { let mut buffer = std::io::Cursor::new(Vec::new()); { diff --git a/rerun_py/rerun_sdk/rerun/memory.py b/rerun_py/rerun_sdk/rerun/memory.py index 3b4330517bf1..57a4626f83cf 100644 --- a/rerun_py/rerun_sdk/rerun/memory.py +++ b/rerun_py/rerun_sdk/rerun/memory.py @@ -53,6 +53,14 @@ def num_msgs(self) -> int: """ return self.storage.num_msgs() # type: ignore[no-any-return] + def drain_as_bytes(self) -> bytes: + """ + Drains the MemoryRecording and returns the data as bytes. + + This will flush the current sink before returning. + """ + return self.storage.drain_as_bytes() + @deprecated("Please use rerun.notebook_show() instead.") def as_html( self, diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index fe37a1aee6bc..11e2faa30627 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -796,7 +796,7 @@ impl PyMemorySinkStorage { /// Drain all messages logged to the [`MemorySinkStorage`] and return as bytes. /// /// This will do a blocking flush before returning! - fn drain<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { + fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { self.rec.flush_blocking(); @@ -804,7 +804,7 @@ impl PyMemorySinkStorage { }); self.inner - .drain_memory_sink_as_bytes() + .drain_as_bytes() .map(|bytes| PyBytes::new(py, bytes.as_slice())) .map_err(|err| PyRuntimeError::new_err(err.to_string())) } From d91ffb21d57b3e636a2007b4a683577c869028ae Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 1 May 2024 09:19:05 -0400 Subject: [PATCH 3/9] Test using drain_as_bytes --- tests/python/memory_drain/main.py | 32 +++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) create mode 100644 tests/python/memory_drain/main.py diff --git a/tests/python/memory_drain/main.py b/tests/python/memory_drain/main.py new file mode 100644 index 000000000000..e1657448071f --- /dev/null +++ b/tests/python/memory_drain/main.py @@ -0,0 +1,32 @@ +""" +Test showing that memory can be drained from a memory recording as valid RRD files. + +After running: +```bash +rerun *.rrd +``` +""" + +from __future__ import annotations + +import rerun as rr +import rerun.blueprint as rrb + + +def main() -> None: + with rr.new_recording("rerun_example_memory_drain"): + mem = rr.memory_recording() + + blueprint = rrb.Blueprint(rrb.TextLogView(name="My Logs", origin="test")) + + rr.send_blueprint(blueprint) + + for i in range(5): + rr.log("test", rr.TextLog(f"Message {i}")) + + with open(f"output_{i}.rrd", "wb") as f: + f.write(mem.drain_as_bytes()) + + +if __name__ == "__main__": + main() From 5c58524dc1aab7d04f9ccb8f233f442ad018235a Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 1 May 2024 09:27:30 -0400 Subject: [PATCH 4/9] Also very that thread isolation works as intended --- tests/python/memory_drain/main.py | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/tests/python/memory_drain/main.py b/tests/python/memory_drain/main.py index e1657448071f..fae632a3f9d3 100644 --- a/tests/python/memory_drain/main.py +++ b/tests/python/memory_drain/main.py @@ -9,12 +9,16 @@ from __future__ import annotations +import threading +import time +import uuid + import rerun as rr import rerun.blueprint as rrb -def main() -> None: - with rr.new_recording("rerun_example_memory_drain"): +def job(name: str) -> None: + with rr.new_recording("rerun_example_memory_drain", recording_id=uuid.uuid4()): mem = rr.memory_recording() blueprint = rrb.Blueprint(rrb.TextLogView(name="My Logs", origin="test")) @@ -22,11 +26,14 @@ def main() -> None: rr.send_blueprint(blueprint) for i in range(5): - rr.log("test", rr.TextLog(f"Message {i}")) + time.sleep(0.2) + rr.log("test", rr.TextLog(f"Job {name} Message {i}")) - with open(f"output_{i}.rrd", "wb") as f: + with open(f"output_{name}_{i}.rrd", "wb") as f: f.write(mem.drain_as_bytes()) if __name__ == "__main__": - main() + threading.Thread(target=job, args=("A",)).start() + threading.Thread(target=job, args=("B",)).start() + threading.Thread(target=job, args=("C",)).start() From 1a140ba4c0f1a39e832cf29fecf7aa2db3fa5b7f Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 1 May 2024 12:40:12 -0400 Subject: [PATCH 5/9] Introduce a generator-compatible decorator and verify stream isolation works correctly with generators across threads. --- rerun_py/rerun_sdk/rerun/__init__.py | 129 +------------- rerun_py/rerun_sdk/rerun/recording_stream.py | 178 +++++++++++++++++++ tests/python/memory_drain/main.py | 48 +++-- 3 files changed, 214 insertions(+), 141 deletions(-) diff --git a/rerun_py/rerun_sdk/rerun/__init__.py b/rerun_py/rerun_sdk/rerun/__init__.py index 5d901d926788..b1f096645213 100644 --- a/rerun_py/rerun_sdk/rerun/__init__.py +++ b/rerun_py/rerun_sdk/rerun/__init__.py @@ -86,8 +86,10 @@ get_recording_id, get_thread_local_data_recording, is_enabled, + new_recording, set_global_data_recording, set_thread_local_data_recording, + thread_local_stream, ) from .script_helpers import script_add_args, script_setup, script_teardown from .sinks import connect, disconnect, save, send_blueprint, serve, spawn, stdout @@ -256,133 +258,6 @@ def init( _spawn(default_blueprint=default_blueprint) -# TODO(#3793): defaulting recording_id to authkey should be opt-in -def new_recording( - application_id: str, - *, - recording_id: str | UUID | None = None, - make_default: bool = False, - make_thread_default: bool = False, - spawn: bool = False, - default_enabled: bool = True, -) -> RecordingStream: - """ - Creates a new recording with a user-chosen application id (name) that can be used to log data. - - If you only need a single global recording, [`rerun.init`][] might be simpler. - - !!! Warning - If you don't specify a `recording_id`, it will default to a random value that is generated once - at the start of the process. - That value will be kept around for the whole lifetime of the process, and even inherited by all - its subprocesses, if any. - - This makes it trivial to log data to the same recording in a multiprocess setup, but it also means - that the following code will _not_ create two distinct recordings: - ``` - rr.init("my_app") - rr.init("my_app") - ``` - - To create distinct recordings from the same process, specify distinct recording IDs: - ``` - from uuid import uuid4 - rec = rr.new_recording(application_id="test", recording_id=uuid4()) - rec = rr.new_recording(application_id="test", recording_id=uuid4()) - ``` - - Parameters - ---------- - application_id : str - Your Rerun recordings will be categorized by this application id, so - try to pick a unique one for each application that uses the Rerun SDK. - - For example, if you have one application doing object detection - and another doing camera calibration, you could have - `rerun.init("object_detector")` and `rerun.init("calibrator")`. - recording_id : Optional[str] - Set the recording ID that this process is logging to, as a UUIDv4. - - The default recording_id is based on `multiprocessing.current_process().authkey` - which means that all processes spawned with `multiprocessing` - will have the same default recording_id. - - If you are not using `multiprocessing` and still want several different Python - processes to log to the same Rerun instance (and be part of the same recording), - you will need to manually assign them all the same recording_id. - Any random UUIDv4 will work, or copy the recording id for the parent process. - make_default : bool - If true (_not_ the default), the newly initialized recording will replace the current - active one (if any) in the global scope. - make_thread_default : bool - If true (_not_ the default), the newly initialized recording will replace the current - active one (if any) in the thread-local scope. - spawn : bool - Spawn a Rerun Viewer and stream logging data to it. - Short for calling `spawn` separately. - If you don't call this, log events will be buffered indefinitely until - you call either `connect`, `show`, or `save` - default_enabled - Should Rerun logging be on by default? - Can be overridden with the RERUN env-var, e.g. `RERUN=on` or `RERUN=off`. - - Returns - ------- - RecordingStream - A handle to the [`rerun.RecordingStream`][]. Use it to log data to Rerun. - - """ - - application_path = None - - # NOTE: It'd be even nicer to do such thing on the Rust-side so that this little trick would - # only need to be written once and just work for all languages out of the box… unfortunately - # we lose most of the details of the python part of the backtrace once we go over the bridge. - # - # Still, better than nothing! - try: - import inspect - import pathlib - - # We're trying to grab the filesystem path of the example script that called `init()`. - # The tricky part is that we don't know how many layers are between this script and the - # original caller, so we have to walk the stack and look for anything that might look like - # an official Rerun example. - - MAX_FRAMES = 10 # try the first 10 frames, should be more than enough - FRAME_FILENAME_INDEX = 1 # `FrameInfo` tuple has `filename` at index 1 - - stack = inspect.stack() - for frame in stack[:MAX_FRAMES]: - filename = frame[FRAME_FILENAME_INDEX] - path = pathlib.Path(str(filename)).resolve() # normalize before comparison! - if "rerun/examples" in str(path): - application_path = path - except Exception: - pass - - if recording_id is not None: - recording_id = str(recording_id) - - recording = RecordingStream( - bindings.new_recording( - application_id=application_id, - recording_id=recording_id, - make_default=make_default, - make_thread_default=make_thread_default, - application_path=application_path, - default_enabled=default_enabled, - ) - ) - - if spawn: - from rerun.sinks import spawn as _spawn - - _spawn(recording=recording) - - return recording - - def version() -> str: """ Returns a verbose version string of the Rerun SDK. diff --git a/rerun_py/rerun_sdk/rerun/recording_stream.py b/rerun_py/rerun_sdk/rerun/recording_stream.py index 49894e92c8f8..a5b8f95ac4b3 100644 --- a/rerun_py/rerun_sdk/rerun/recording_stream.py +++ b/rerun_py/rerun_sdk/rerun/recording_stream.py @@ -1,8 +1,139 @@ from __future__ import annotations +import functools +import inspect +import uuid +from typing import Any, Callable, TypeVar + from rerun import bindings + # --- +# TODO(#3793): defaulting recording_id to authkey should be opt-in +def new_recording( + application_id: str, + *, + recording_id: str | uuid.UUID | None = None, + make_default: bool = False, + make_thread_default: bool = False, + spawn: bool = False, + default_enabled: bool = True, +) -> RecordingStream: + """ + Creates a new recording with a user-chosen application id (name) that can be used to log data. + + If you only need a single global recording, [`rerun.init`][] might be simpler. + + !!! Warning + If you don't specify a `recording_id`, it will default to a random value that is generated once + at the start of the process. + That value will be kept around for the whole lifetime of the process, and even inherited by all + its subprocesses, if any. + + This makes it trivial to log data to the same recording in a multiprocess setup, but it also means + that the following code will _not_ create two distinct recordings: + ``` + rr.init("my_app") + rr.init("my_app") + ``` + + To create distinct recordings from the same process, specify distinct recording IDs: + ``` + from uuid import uuid4 + rec = rr.new_recording(application_id="test", recording_id=uuid4()) + rec = rr.new_recording(application_id="test", recording_id=uuid4()) + ``` + + Parameters + ---------- + application_id : str + Your Rerun recordings will be categorized by this application id, so + try to pick a unique one for each application that uses the Rerun SDK. + + For example, if you have one application doing object detection + and another doing camera calibration, you could have + `rerun.init("object_detector")` and `rerun.init("calibrator")`. + recording_id : Optional[str] + Set the recording ID that this process is logging to, as a UUIDv4. + + The default recording_id is based on `multiprocessing.current_process().authkey` + which means that all processes spawned with `multiprocessing` + will have the same default recording_id. + + If you are not using `multiprocessing` and still want several different Python + processes to log to the same Rerun instance (and be part of the same recording), + you will need to manually assign them all the same recording_id. + Any random UUIDv4 will work, or copy the recording id for the parent process. + make_default : bool + If true (_not_ the default), the newly initialized recording will replace the current + active one (if any) in the global scope. + make_thread_default : bool + If true (_not_ the default), the newly initialized recording will replace the current + active one (if any) in the thread-local scope. + spawn : bool + Spawn a Rerun Viewer and stream logging data to it. + Short for calling `spawn` separately. + If you don't call this, log events will be buffered indefinitely until + you call either `connect`, `show`, or `save` + default_enabled + Should Rerun logging be on by default? + Can be overridden with the RERUN env-var, e.g. `RERUN=on` or `RERUN=off`. + + Returns + ------- + RecordingStream + A handle to the [`rerun.RecordingStream`][]. Use it to log data to Rerun. + + """ + + application_path = None + + # NOTE: It'd be even nicer to do such thing on the Rust-side so that this little trick would + # only need to be written once and just work for all languages out of the box… unfortunately + # we lose most of the details of the python part of the backtrace once we go over the bridge. + # + # Still, better than nothing! + try: + import inspect + import pathlib + + # We're trying to grab the filesystem path of the example script that called `init()`. + # The tricky part is that we don't know how many layers are between this script and the + # original caller, so we have to walk the stack and look for anything that might look like + # an official Rerun example. + + MAX_FRAMES = 10 # try the first 10 frames, should be more than enough + FRAME_FILENAME_INDEX = 1 # `FrameInfo` tuple has `filename` at index 1 + + stack = inspect.stack() + for frame in stack[:MAX_FRAMES]: + filename = frame[FRAME_FILENAME_INDEX] + path = pathlib.Path(str(filename)).resolve() # normalize before comparison! + if "rerun/examples" in str(path): + application_path = path + except Exception: + pass + + if recording_id is not None: + recording_id = str(recording_id) + + recording = RecordingStream( + bindings.new_recording( + application_id=application_id, + recording_id=recording_id, + make_default=make_default, + make_thread_default=make_thread_default, + application_path=application_path, + default_enabled=default_enabled, + ) + ) + + if spawn: + from rerun.sinks import spawn as _spawn + + _spawn(recording=recording) + + return recording class RecordingStream: @@ -272,3 +403,50 @@ def set_thread_local_data_recording(recording: RecordingStream) -> RecordingStre """ result = bindings.set_thread_local_data_recording(recording=RecordingStream.to_native(recording)) return RecordingStream(result) if result is not None else None + + +_TFunc = TypeVar("_TFunc", bound=Callable[..., Any]) + + +def thread_local_stream(application_id: str) -> Callable[[_TFunc], _TFunc]: + """ + Create a thread-local recording stream and use it when executing the decorated function. + + This can be helpful for decorating a function that represents a job or a task that you want to + to produce its own isolated recording. + + Parameters + ---------- + application_id : str + The application ID that this recording is associated with. + + """ + + def decorator(func: _TFunc) -> _TFunc: + if inspect.isgeneratorfunction(func): # noqa: F821 + + @functools.wraps(func) + def generator_wrapper(*args: Any, **kwargs: Any) -> Any: + gen = func(*args, **kwargs) + try: + with new_recording(application_id, recording_id=uuid.uuid4()): + value = next(gen) # Start the generator inside the context + while True: + value = gen.send((yield value)) # Continue the generator + except StopIteration: + pass + finally: + gen.close() + + return generator_wrapper # type: ignore[return-value] + else: + + @functools.wraps(func) + def wrapper(*args: Any, **kwargs: Any) -> Any: + with new_recording(application_id, recording_id=uuid.uuid4()): + gen = func(*args, **kwargs) + return gen + + return wrapper # type: ignore[return-value] + + return decorator diff --git a/tests/python/memory_drain/main.py b/tests/python/memory_drain/main.py index fae632a3f9d3..2d4d6c8b3126 100644 --- a/tests/python/memory_drain/main.py +++ b/tests/python/memory_drain/main.py @@ -9,31 +9,51 @@ from __future__ import annotations +import queue import threading import time -import uuid +from typing import Any, Iterator import rerun as rr import rerun.blueprint as rrb -def job(name: str) -> None: - with rr.new_recording("rerun_example_memory_drain", recording_id=uuid.uuid4()): - mem = rr.memory_recording() +@rr.thread_local_stream("rerun_example_memory_drain") +def job(name: str) -> Iterator[tuple[str, int, bytes]]: + mem = rr.memory_recording() - blueprint = rrb.Blueprint(rrb.TextLogView(name="My Logs", origin="test")) + blueprint = rrb.Blueprint(rrb.TextLogView(name="My Logs", origin="test")) - rr.send_blueprint(blueprint) + rr.send_blueprint(blueprint) - for i in range(5): - time.sleep(0.2) - rr.log("test", rr.TextLog(f"Job {name} Message {i}")) + for i in range(5): + time.sleep(0.2) + rr.log("test", rr.TextLog(f"Job {name} Message {i}")) - with open(f"output_{name}_{i}.rrd", "wb") as f: - f.write(mem.drain_as_bytes()) + print(f"YIELD {name} {i}") + yield (name, i, mem.drain_as_bytes()) + + +def queue_results(generator: Iterator[Any], out_queue: queue.Queue) -> None: + for item in generator: + out_queue.put(item) if __name__ == "__main__": - threading.Thread(target=job, args=("A",)).start() - threading.Thread(target=job, args=("B",)).start() - threading.Thread(target=job, args=("C",)).start() + results_queue: queue.Queue[tuple[str, int, bytes]] = queue.Queue() + + threads = [ + threading.Thread(target=queue_results, args=(job("A"), results_queue)), + threading.Thread(target=queue_results, args=(job("B"), results_queue)), + threading.Thread(target=queue_results, args=(job("C"), results_queue)), + ] + for t in threads: + t.start() + for t in threads: + t.join() + + while not results_queue.empty(): + name, i, data = results_queue.get() + + with open(f"output_{name}_{i}.rrd", "wb") as f: + f.write(data) From 20804d1cfc3f17319a8ab91665dd49e601d1a871 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Wed, 1 May 2024 12:41:35 -0400 Subject: [PATCH 6/9] Lint --- rerun_py/rerun_sdk/rerun/memory.py | 2 +- rerun_py/rerun_sdk/rerun/recording_stream.py | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/rerun_py/rerun_sdk/rerun/memory.py b/rerun_py/rerun_sdk/rerun/memory.py index 57a4626f83cf..ce4894dd12e3 100644 --- a/rerun_py/rerun_sdk/rerun/memory.py +++ b/rerun_py/rerun_sdk/rerun/memory.py @@ -59,7 +59,7 @@ def drain_as_bytes(self) -> bytes: This will flush the current sink before returning. """ - return self.storage.drain_as_bytes() + return self.storage.drain_as_bytes() # type: ignore[no-any-return] @deprecated("Please use rerun.notebook_show() instead.") def as_html( diff --git a/rerun_py/rerun_sdk/rerun/recording_stream.py b/rerun_py/rerun_sdk/rerun/recording_stream.py index a5b8f95ac4b3..0f48df1aac50 100644 --- a/rerun_py/rerun_sdk/rerun/recording_stream.py +++ b/rerun_py/rerun_sdk/rerun/recording_stream.py @@ -220,7 +220,6 @@ def _patch(funcs): # type: ignore[no-untyped-def] """Adds the given functions as methods to the `RecordingStream` class; injects `recording=self` in passing.""" import functools import os - from typing import Any # If this is a special RERUN_APP_ONLY context (launched via .spawn), we # can bypass everything else, which keeps us from monkey patching methods From c74e8f9b23a133e7cb442da71bef0b29a0a77a17 Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 2 May 2024 12:49:47 -0400 Subject: [PATCH 7/9] Move flushing over to MemorySink in rust and clean up duplicated recording stream --- crates/re_sdk/src/log_sink.rs | 55 +++++++++++++++++++++------ crates/re_sdk/src/recording_stream.rs | 30 +++++++-------- rerun_py/src/python_bridge.rs | 41 ++++++++++---------- 3 files changed, 79 insertions(+), 47 deletions(-) diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index a4326b115f8b..b83a5a4d1a0e 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -4,6 +4,8 @@ use std::sync::Arc; use parking_lot::Mutex; use re_log_types::{BlueprintActivationCommand, LogMsg, StoreId}; +use crate::RecordingStream; + /// Where the SDK sends its log messages. pub trait LogSink: Send + Sync + 'static { /// Send this log message. @@ -126,10 +128,15 @@ impl fmt::Debug for BufferedSink { /// /// Additionally the raw storage can be accessed and used to create an in-memory RRD. /// This is useful for things like the inline rrd-viewer in Jupyter notebooks. -#[derive(Default)] pub struct MemorySink(MemorySinkStorage); impl MemorySink { + /// Create a new [`MemorySink`] with an associated [`RecordingStream`]. + #[inline] + pub fn new(rec: RecordingStream) -> Self { + Self(MemorySinkStorage::new(rec)) + } + /// Access the raw `MemorySinkStorage` #[inline] pub fn buffer(&self) -> MemorySinkStorage { @@ -153,7 +160,11 @@ impl LogSink for MemorySink { #[inline] fn drain_backlog(&self) -> Vec { - self.0.take() + // Note that When draining the backlog, we don't call `take` since that would flush + // the stream. But drain_backlog is being called as part of `set_sink`, which has already queued + // a flush of the batcher. Queueing a second flush here seems to lead to a deadlock + // at shutdown. + std::mem::take(&mut (self.0.write())) } } @@ -170,10 +181,10 @@ struct MemorySinkStorageInner { } /// The storage used by [`MemorySink`]. -#[derive(Default, Clone)] +#[derive(Clone)] pub struct MemorySinkStorage { inner: Arc>, - pub(crate) rec: Option, + pub(crate) rec: RecordingStream, } impl Drop for MemorySinkStorage { @@ -194,6 +205,14 @@ impl Drop for MemorySinkStorage { } impl MemorySinkStorage { + /// Create a new [`MemorySinkStorage`] with an associated [`RecordingStream`]. + fn new(rec: RecordingStream) -> Self { + Self { + inner: Default::default(), + rec, + } + } + /// Write access to the inner array of [`LogMsg`]. #[inline] fn write(&self) -> parking_lot::MappedMutexGuard<'_, Vec> { @@ -203,8 +222,14 @@ impl MemorySinkStorage { } /// How many messages are currently written to this memory sink + /// + /// This automatically takes care of flushing the underlying [`crate::RecordingStream`]. #[inline] pub fn num_msgs(&self) -> usize { + // NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved + // in this flush; it's just a matter of making the table batcher tick early. + self.rec.flush_blocking(); + self.inner.lock().msgs.len() } @@ -213,15 +238,15 @@ impl MemorySinkStorage { /// This automatically takes care of flushing the underlying [`crate::RecordingStream`]. #[inline] pub fn take(&self) -> Vec { - if let Some(rec) = self.rec.as_ref() { - // NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved - // in this flush; it's just a matter of making the table batcher tick early. - rec.flush_blocking(); - } + // NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved + // in this flush; it's just a matter of making the table batcher tick early. + self.rec.flush_blocking(); std::mem::take(&mut (self.write())) } /// Convert the stored messages into an in-memory Rerun log file. + /// + /// This automatically takes care of flushing the underlying [`crate::RecordingStream`]. #[inline] pub fn concat_memory_sinks_as_bytes( sinks: &[&Self], @@ -233,6 +258,9 @@ impl MemorySinkStorage { let mut encoder = re_log_encoding::encoder::Encoder::new(encoding_options, &mut buffer)?; for sink in sinks { + // NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved + // in this flush; it's just a matter of making the table batcher tick early. + sink.rec.flush_blocking(); let mut inner = sink.inner.lock(); inner.has_been_used = true; @@ -246,8 +274,13 @@ impl MemorySinkStorage { } /// Drain the stored messages and return them as an in-memory RRD. + /// + /// This automatically takes care of flushing the underlying [`crate::RecordingStream`]. #[inline] pub fn drain_as_bytes(&self) -> Result, re_log_encoding::encoder::EncodeError> { + // NOTE: It's fine, this is an in-memory sink so by definition there's no I/O involved + // in this flush; it's just a matter of making the table batcher tick early. + self.rec.flush_blocking(); let mut buffer = std::io::Cursor::new(Vec::new()); { @@ -269,9 +302,7 @@ impl MemorySinkStorage { #[inline] /// Get the [`StoreId`] from the associated `RecordingStream` if it exists. pub fn store_id(&self) -> Option { - self.rec - .as_ref() - .and_then(|rec| rec.store_info().map(|info| info.store_id.clone())) + self.rec.store_info().map(|info| info.store_id.clone()) } } // ---------------------------------------------------------------------------- diff --git a/crates/re_sdk/src/recording_stream.rs b/crates/re_sdk/src/recording_stream.rs index 3bc5cfc0275f..2591b6bbff94 100644 --- a/crates/re_sdk/src/recording_stream.rs +++ b/crates/re_sdk/src/recording_stream.rs @@ -275,19 +275,22 @@ impl RecordingStreamBuilder { pub fn memory( self, ) -> RecordingStreamResult<(RecordingStream, crate::log_sink::MemorySinkStorage)> { - let sink = crate::log_sink::MemorySink::default(); - let mut storage = sink.buffer(); - let (enabled, store_info, batcher_config) = self.into_args(); - if enabled { - RecordingStream::new(store_info, batcher_config, Box::new(sink)).map(|rec| { - storage.rec = Some(rec.clone()); - (rec, storage) - }) + let rec = if enabled { + RecordingStream::new( + store_info, + batcher_config, + Box::new(crate::log_sink::BufferedSink::new()), + ) } else { re_log::debug!("Rerun disabled - call to memory() ignored"); - Ok((RecordingStream::disabled(), Default::default())) - } + Ok(RecordingStream::disabled()) + }?; + + let sink = crate::log_sink::MemorySink::new(rec.clone()); + let storage = sink.buffer(); + rec.set_sink(Box::new(sink)); + Ok((rec, storage)) } /// Creates a new [`RecordingStream`] that is pre-configured to stream the data through to a @@ -1581,12 +1584,9 @@ impl RecordingStream { /// terms of data durability and ordering. /// See [`Self::set_sink`] for more information. pub fn memory(&self) -> MemorySinkStorage { - let sink = crate::sink::MemorySink::default(); - let mut storage = sink.buffer(); - + let sink = crate::sink::MemorySink::new(self.clone()); + let storage = sink.buffer(); self.set_sink(Box::new(sink)); - storage.rec = Some(self.clone()); - storage } diff --git a/rerun_py/src/python_bridge.rs b/rerun_py/src/python_bridge.rs index 11e2faa30627..4e822916f338 100644 --- a/rerun_py/src/python_bridge.rs +++ b/rerun_py/src/python_bridge.rs @@ -742,14 +742,13 @@ fn memory_recording( flush_garbage_queue(); storage }); - PyMemorySinkStorage { rec: rec.0, inner } + PyMemorySinkStorage { inner } }) } #[pyclass(frozen)] struct PyMemorySinkStorage { // So we can flush when needed! - rec: RecordingStream, inner: MemorySinkStorage, } @@ -765,17 +764,18 @@ impl PyMemorySinkStorage { ) -> PyResult<&'p PyBytes> { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - self.rec.flush_blocking(); + let concat_bytes = MemorySinkStorage::concat_memory_sinks_as_bytes( + [Some(&self.inner), concat.map(|c| &c.inner)] + .iter() + .filter_map(|s| *s) + .collect_vec() + .as_slice(), + ); + flush_garbage_queue(); - }); - MemorySinkStorage::concat_memory_sinks_as_bytes( - [Some(&self.inner), concat.map(|c| &c.inner)] - .iter() - .filter_map(|s| *s) - .collect_vec() - .as_slice(), - ) + concat_bytes + }) .map(|bytes| PyBytes::new(py, bytes.as_slice())) .map_err(|err| PyRuntimeError::new_err(err.to_string())) } @@ -786,11 +786,12 @@ impl PyMemorySinkStorage { fn num_msgs(&self, py: Python<'_>) -> usize { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - self.rec.flush_blocking(); + let num = self.inner.num_msgs(); + flush_garbage_queue(); - }); - self.inner.num_msgs() + num + }) } /// Drain all messages logged to the [`MemorySinkStorage`] and return as bytes. @@ -799,14 +800,14 @@ impl PyMemorySinkStorage { fn drain_as_bytes<'p>(&self, py: Python<'p>) -> PyResult<&'p PyBytes> { // Release the GIL in case any flushing behavior needs to cleanup a python object. py.allow_threads(|| { - self.rec.flush_blocking(); + let bytes = self.inner.drain_as_bytes(); + flush_garbage_queue(); - }); - self.inner - .drain_as_bytes() - .map(|bytes| PyBytes::new(py, bytes.as_slice())) - .map_err(|err| PyRuntimeError::new_err(err.to_string())) + bytes + }) + .map(|bytes| PyBytes::new(py, bytes.as_slice())) + .map_err(|err| PyRuntimeError::new_err(err.to_string())) } } From dec8e56891a9d33b0f980f802974bb2fb97cd18f Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 2 May 2024 12:57:20 -0400 Subject: [PATCH 8/9] Add example for decorator --- rerun_py/rerun_sdk/rerun/recording_stream.py | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/rerun_py/rerun_sdk/rerun/recording_stream.py b/rerun_py/rerun_sdk/rerun/recording_stream.py index 0f48df1aac50..7a4873e41b12 100644 --- a/rerun_py/rerun_sdk/rerun/recording_stream.py +++ b/rerun_py/rerun_sdk/rerun/recording_stream.py @@ -414,6 +414,21 @@ def thread_local_stream(application_id: str) -> Callable[[_TFunc], _TFunc]: This can be helpful for decorating a function that represents a job or a task that you want to to produce its own isolated recording. + Example + ------- + ```python + @rr.thread_local_stream("rerun_example_job") + def job(name: str) -> None: + rr.save(f"job_{name}.rrd") + for i in range(5): + time.sleep(0.2) + rr.log("hello", rr.TextLog(f"Hello {i) from Job {name}")) + + threading.Thread(target=job, args=("A",)).start() + threading.Thread(target=job, args=("B",)).start() + ``` + This will produce 2 separate rrd files, each only containing the logs from the respective threads. + Parameters ---------- application_id : str From d75588bbfe6045cbdb03f4d1ebe875f339ec80ee Mon Sep 17 00:00:00 2001 From: Jeremy Leibs Date: Thu, 2 May 2024 12:58:20 -0400 Subject: [PATCH 9/9] lint --- crates/re_sdk/src/log_sink.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/crates/re_sdk/src/log_sink.rs b/crates/re_sdk/src/log_sink.rs index b83a5a4d1a0e..a67b649fe194 100644 --- a/crates/re_sdk/src/log_sink.rs +++ b/crates/re_sdk/src/log_sink.rs @@ -161,7 +161,7 @@ impl LogSink for MemorySink { #[inline] fn drain_backlog(&self) -> Vec { // Note that When draining the backlog, we don't call `take` since that would flush - // the stream. But drain_backlog is being called as part of `set_sink`, which has already queued + // the stream. But drain_backlog is being called as part of `set_sink`, which has already queued // a flush of the batcher. Queueing a second flush here seems to lead to a deadlock // at shutdown. std::mem::take(&mut (self.0.write()))