diff --git a/afar/_core.py b/afar/_core.py index 0ed2c81..fb83222 100644 --- a/afar/_core.py +++ b/afar/_core.py @@ -19,6 +19,7 @@ class Run: _gather_data = False # Used to update outputs asynchronously _outputs = {} + _channel = "afar-" + uuid4().hex def __init__(self, *names, client=None, data=None): self.names = names @@ -187,8 +188,9 @@ def _run( del self._magic_func._scoped.outer_scope[key] capture_print = True - if capture_print and "afar-print" not in client._event_handlers: - client.subscribe_topic("afar-print", self._handle_print) + if capture_print and self._channel not in client._event_handlers: + client.subscribe_topic(self._channel, self._handle_print) + # When would be a good time to unsubscribe? async_print = capture_print and supports_async_output() if capture_print: unique_key = uuid4().hex @@ -206,6 +208,7 @@ def _run( names, futures, capture_print, + self._channel, unique_key, pure=False, **submit_kwargs, @@ -314,7 +317,7 @@ class Get(Run): _gather_data = True -def run_afar(magic_func, names, futures, capture_print, unique_key): +def run_afar(magic_func, names, futures, capture_print, channel, unique_key): if capture_print: try: worker = get_worker() @@ -323,8 +326,8 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): worker = None try: if capture_print and worker is not None: - worker.log_event("afar-print", (unique_key, "begin", None)) - rec = PrintRecorder(unique_key) + worker.log_event(channel, (unique_key, "begin", None)) + rec = PrintRecorder(channel, unique_key) if "print" in magic_func._scoped.builtin_names and "print" not in futures: sfunc = magic_func._scoped.bind(futures, print=rec) else: @@ -340,13 +343,15 @@ def run_afar(magic_func, names, futures, capture_print, unique_key): if magic_func._display_expr and worker is not None: # Hopefully computing the repr is fast. If it is slow, perhaps it would be # better to add the return value to rv and call repr_afar as a separate task. + # Also, pretty_repr must be msgpack serializable if done via events. + # Hence, custom _ipython_display_ probably won't work. pretty_repr = repr_afar(results.return_value, magic_func._repr_methods) if pretty_repr is not None: - worker.log_event("afar-print", (unique_key, "display_expr", pretty_repr)) + worker.log_event(channel, (unique_key, "display_expr", pretty_repr)) send_finish = False finally: if capture_print and worker is not None and send_finish: - worker.log_event("afar-print", (unique_key, "finish", None)) + worker.log_event(channel, (unique_key, "finish", None)) return rv diff --git a/afar/_printing.py b/afar/_printing.py index 1bd2137..836bffc 100644 --- a/afar/_printing.py +++ b/afar/_printing.py @@ -21,7 +21,8 @@ class PrintRecorder: local_print = LocalPrint() print_lock = Lock() - def __init__(self, key): + def __init__(self, channel, key): + self.channel = channel self.key = key def __enter__(self): @@ -57,4 +58,4 @@ def __call__(self, *args, file=None, **kwargs): except ValueError: pass else: - worker.log_event("afar-print", (self.key, stream_name, file.getvalue())) + worker.log_event(self.channel, (self.key, stream_name, file.getvalue()))