Skip to content

Commit

Permalink
Merge branch 'master' into benc-tidy-taskrecord
Browse files Browse the repository at this point in the history
  • Loading branch information
khk-globus authored Jan 14, 2025
2 parents 47e3bfe + 784256d commit 794aeed
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 28 deletions.
37 changes: 17 additions & 20 deletions parsl/dataflow/dflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -484,24 +484,18 @@ def handle_join_update(self, task_record: TaskRecord, inner_app_future: Optional

# now we know each joinable Future is done
# so now look for any exceptions
exceptions_tids: List[Tuple[BaseException, Optional[str]]]
exceptions_tids: List[Tuple[BaseException, str]]
exceptions_tids = []
if isinstance(joinable, Future):
je = joinable.exception()
if je is not None:
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
tid = self.render_future_description(joinable)
exceptions_tids = [(je, tid)]
elif isinstance(joinable, list):
for future in joinable:
je = future.exception()
if je is not None:
if hasattr(joinable, 'task_record'):
tid = joinable.task_record['id']
else:
tid = None
tid = self.render_future_description(future)
exceptions_tids.append((je, tid))
else:
raise TypeError(f"Unknown joinable type {type(joinable)}")
Expand Down Expand Up @@ -918,13 +912,7 @@ def _unwrap_futures(self, args: Sequence[Any], kwargs: Dict[str, Any]) \
dep_failures = []

def append_failure(e: Exception, dep: Future) -> None:
# If this Future is associated with a task inside this DFK,
# then refer to the task ID.
# Otherwise make a repr of the Future object.
if hasattr(dep, 'task_record') and dep.task_record['dfk'] == self:
tid = "task " + repr(dep.task_record['id'])
else:
tid = repr(dep)
tid = self.render_future_description(dep)
dep_failures.extend([(e, tid)])

# Replace item in args
Expand Down Expand Up @@ -1076,10 +1064,7 @@ def submit(self,

depend_descs = []
for d in depends:
if isinstance(d, AppFuture) or isinstance(d, DataFuture):
depend_descs.append("task {}".format(d.tid))
else:
depend_descs.append(repr(d))
depend_descs.append(self.render_future_description(d))

if depend_descs != []:
waiting_message = "waiting on {}".format(", ".join(depend_descs))
Expand Down Expand Up @@ -1438,6 +1423,18 @@ def default_std_autopath(self, taskrecord, kw):
'' if label is None else '_{}'.format(label),
kw))

def render_future_description(self, dep: Future) -> str:
"""Renders a description of the future in the context of the
current DFK.
"""
if isinstance(dep, AppFuture) and dep.task_record['dfk'] == self:
tid = "task " + repr(dep.task_record['id'])
elif isinstance(dep, DataFuture):
tid = "DataFuture from task " + repr(dep.tid)
else:
tid = repr(dep)
return tid


class DataFlowKernelLoader:
"""Manage which DataFlowKernel is active.
Expand Down
3 changes: 3 additions & 0 deletions parsl/tests/test_python_apps/test_dep_standard_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,3 +43,6 @@ def test_future_fail_dependency():
# Future, plain_fut, somewhere in its str

assert repr(plain_fut) in str(ex)
assert len(ex.dependent_exceptions_tids) == 1
assert isinstance(ex.dependent_exceptions_tids[0][0], ValueError)
assert ex.dependent_exceptions_tids[0][1].startswith("<Future ")
25 changes: 17 additions & 8 deletions parsl/tests/test_python_apps/test_fail.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,26 @@ def test_no_deps():
pass


@pytest.mark.parametrize("fail_probs", ((1, 0), (0, 1)))
def test_fail_sequence(fail_probs):
"""Test failure in a sequence of dependencies
def test_fail_sequence_first():
t1 = random_fail(fail_prob=1)
t2 = random_fail(fail_prob=0, inputs=[t1])
t_final = random_fail(fail_prob=0, inputs=[t2])

App1 -> App2 ... -> AppN
"""
with pytest.raises(DependencyError):
t_final.result()

t1_fail_prob, t2_fail_prob = fail_probs
t1 = random_fail(fail_prob=t1_fail_prob)
t2 = random_fail(fail_prob=t2_fail_prob, inputs=[t1])
assert len(t_final.exception().dependent_exceptions_tids) == 1
assert isinstance(t_final.exception().dependent_exceptions_tids[0][0], DependencyError)
assert t_final.exception().dependent_exceptions_tids[0][1].startswith("task ")


def test_fail_sequence_middle():
t1 = random_fail(fail_prob=0)
t2 = random_fail(fail_prob=1, inputs=[t1])
t_final = random_fail(fail_prob=0, inputs=[t2])

with pytest.raises(DependencyError):
t_final.result()

assert len(t_final.exception().dependent_exceptions_tids) == 1
assert isinstance(t_final.exception().dependent_exceptions_tids[0][0], ManufacturedTestFailure)
6 changes: 6 additions & 0 deletions parsl/tests/test_python_apps/test_join.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,10 @@ def test_error():
f = outer_error()
e = f.exception()
assert isinstance(e, JoinError)

assert len(e.dependent_exceptions_tids) == 1
assert isinstance(e.dependent_exceptions_tids[0][0], InnerError)
assert e.dependent_exceptions_tids[0][1].startswith("task ")


def test_two_errors():
Expand All @@ -109,10 +112,12 @@ def test_two_errors():
de0 = e.dependent_exceptions_tids[0][0]
assert isinstance(de0, InnerError)
assert de0.args[0] == "Error A"
assert e.dependent_exceptions_tids[0][1].startswith("task ")

de1 = e.dependent_exceptions_tids[1][0]
assert isinstance(de1, InnerError)
assert de1.args[0] == "Error B"
assert e.dependent_exceptions_tids[1][1].startswith("task ")


def test_one_error_one_result():
Expand All @@ -125,6 +130,7 @@ def test_one_error_one_result():
de0 = e.dependent_exceptions_tids[0][0]
assert isinstance(de0, InnerError)
assert de0.args[0] == "Error A"
assert e.dependent_exceptions_tids[0][1].startswith("task ")


@join_app
Expand Down

0 comments on commit 794aeed

Please sign in to comment.