Skip to content

Commit

Permalink
Print locally on the worker too
Browse files Browse the repository at this point in the history
  • Loading branch information
eriknw committed Sep 27, 2021
1 parent bcfc1e7 commit c58b17b
Show file tree
Hide file tree
Showing 9 changed files with 44 additions and 5 deletions.
3 changes: 1 addition & 2 deletions .github/workflows/test_conda.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,7 @@ jobs:
activate-environment: afar
- name: Install dependencies
run: |
conda install -y -c conda-forge distributed pytest
pip install innerscope
conda install -y -c conda-forge distributed pytest innerscope
pip install -e .
- name: PyTest
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_pip.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ jobs:
run: |
pip install black flake8
flake8 .
black afar *.py --check --diff
black . --check --diff
- name: Coverage
env:
GITHUB_TOKEN: ${{ secrets.github_token }}
Expand Down
15 changes: 15 additions & 0 deletions afar/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,18 @@
"""afar runs code within a context manager or IPython magic on a Dask cluster.
>>> with afar.run, remotely:
... import dask_cudf
... df = dask_cudf.read_parquet("s3://...")
... result = df.sum().compute()
or to use an IPython magic:
>>> %load_ext afar
>>> %afar z = x + y
Read the documentation at https://github.com/eriknw/afar
"""

from ._core import get, run # noqa
from ._version import get_versions
from ._where import later, locally, remotely # noqa
Expand Down
5 changes: 5 additions & 0 deletions afar/_abra.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
"""Perform a magic trick: given lines of code, create a function to run remotely.
This callable object is able to provide the values of the requested argument
names and return the final expression so it can be displayed.
"""
import dis
from types import FunctionType

Expand Down
17 changes: 15 additions & 2 deletions afar/_core.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
"""Define the user-facing `run` object; this is where it all comes together."""
import dis
import sys
import traceback
from inspect import currentframe
from uuid import uuid4
from weakref import WeakKeyDictionary, WeakSet
Expand Down Expand Up @@ -344,10 +346,21 @@ def run_afar(magic_func, names, futures, capture_print, channel, unique_key):
# Hopefully computing the repr is fast. If it is slow, perhaps it would be
# better to add the return value to rv and call repr_afar as a separate task.
# Also, pretty_repr must be msgpack serializable if done via events.
# Hence, custom _ipython_display_ probably won't work.
# Hence, custom _ipython_display_ probably won't work, and we resort to
# trying to use a basic repr (if that fails, we show the first exception).
pretty_repr = repr_afar(results.return_value, magic_func._repr_methods)
if pretty_repr is not None:
worker.log_event(channel, (unique_key, "display_expr", pretty_repr))
try:
worker.log_event(channel, (unique_key, "display_expr", pretty_repr))
except Exception:
exc_info = sys.exc_info()
tb = traceback.format_exception(*exc_info)
try:
basic_repr = (repr(results.return_value), "__repr__", False)
worker.log_event(channel, (unique_key, "display_expr", basic_repr))
except Exception:
exc_repr = (tb, pretty_repr[1], True)
worker.log_event(channel, (unique_key, "display_expr", exc_repr))
send_finish = False
finally:
if capture_print and worker is not None and send_finish:
Expand Down
1 change: 1 addition & 0 deletions afar/_inspect.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Utilities to get the lines of the context body."""
import dis
from inspect import findsource

Expand Down
1 change: 1 addition & 0 deletions afar/_magic.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Define the IPython magic for using afar"""
from textwrap import indent

from dask.distributed import Client
Expand Down
4 changes: 4 additions & 0 deletions afar/_printing.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Classes used to capture print statements within a Dask task."""
import builtins
import sys
from io import StringIO
Expand Down Expand Up @@ -59,3 +60,6 @@ def __call__(self, *args, file=None, **kwargs):
pass
else:
worker.log_event(self.channel, (self.key, stream_name, file.getvalue()))
# Print locally too
stream = sys.stdout if stream_name == "stdout" else sys.stderr
LocalPrint.printer(file.getvalue(), end="", file=stream)
1 change: 1 addition & 0 deletions afar/_reprs.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
"""Utilities to calculate the (pretty) repr of objects remotely and display locally."""
import sys
import traceback

Expand Down

0 comments on commit c58b17b

Please sign in to comment.