Skip to content

Commit

Permalink
Use unique topic name to better support multitenancy
Browse files Browse the repository at this point in the history
  • Loading branch information
eriknw committed Sep 26, 2021
1 parent da21245 commit bcfc1e7
Show file tree
Hide file tree
Showing 2 changed files with 15 additions and 9 deletions.
19 changes: 12 additions & 7 deletions afar/_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class Run:
_gather_data = False
# Used to update outputs asynchronously
_outputs = {}
_channel = "afar-" + uuid4().hex

def __init__(self, *names, client=None, data=None):
self.names = names
Expand Down Expand Up @@ -187,8 +188,9 @@ def _run(
del self._magic_func._scoped.outer_scope[key]

capture_print = True
if capture_print and "afar-print" not in client._event_handlers:
client.subscribe_topic("afar-print", self._handle_print)
if capture_print and self._channel not in client._event_handlers:
client.subscribe_topic(self._channel, self._handle_print)
# When would be a good time to unsubscribe?
async_print = capture_print and supports_async_output()
if capture_print:
unique_key = uuid4().hex
Expand All @@ -206,6 +208,7 @@ def _run(
names,
futures,
capture_print,
self._channel,
unique_key,
pure=False,
**submit_kwargs,
Expand Down Expand Up @@ -314,7 +317,7 @@ class Get(Run):
_gather_data = True


def run_afar(magic_func, names, futures, capture_print, unique_key):
def run_afar(magic_func, names, futures, capture_print, channel, unique_key):
if capture_print:
try:
worker = get_worker()
Expand All @@ -323,8 +326,8 @@ def run_afar(magic_func, names, futures, capture_print, unique_key):
worker = None
try:
if capture_print and worker is not None:
worker.log_event("afar-print", (unique_key, "begin", None))
rec = PrintRecorder(unique_key)
worker.log_event(channel, (unique_key, "begin", None))
rec = PrintRecorder(channel, unique_key)
if "print" in magic_func._scoped.builtin_names and "print" not in futures:
sfunc = magic_func._scoped.bind(futures, print=rec)
else:
Expand All @@ -340,13 +343,15 @@ def run_afar(magic_func, names, futures, capture_print, unique_key):
if magic_func._display_expr and worker is not None:
# Hopefully computing the repr is fast. If it is slow, perhaps it would be
# better to add the return value to rv and call repr_afar as a separate task.
# Also, pretty_repr must be msgpack serializable if done via events.
# Hence, custom _ipython_display_ probably won't work.
pretty_repr = repr_afar(results.return_value, magic_func._repr_methods)
if pretty_repr is not None:
worker.log_event("afar-print", (unique_key, "display_expr", pretty_repr))
worker.log_event(channel, (unique_key, "display_expr", pretty_repr))
send_finish = False
finally:
if capture_print and worker is not None and send_finish:
worker.log_event("afar-print", (unique_key, "finish", None))
worker.log_event(channel, (unique_key, "finish", None))
return rv


Expand Down
5 changes: 3 additions & 2 deletions afar/_printing.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ class PrintRecorder:
local_print = LocalPrint()
print_lock = Lock()

def __init__(self, key):
def __init__(self, channel, key):
self.channel = channel
self.key = key

def __enter__(self):
Expand Down Expand Up @@ -57,4 +58,4 @@ def __call__(self, *args, file=None, **kwargs):
except ValueError:
pass
else:
worker.log_event("afar-print", (self.key, stream_name, file.getvalue()))
worker.log_event(self.channel, (self.key, stream_name, file.getvalue()))

0 comments on commit bcfc1e7

Please sign in to comment.