How Handle Concurrent Subflows and Terminate Based on Result in Prefect #15500
-
Bug summaryI want to execute three subflows concurrently. As soon as one of the subflows completes, you want to print its result and gracefully terminate the remaining running tasks in the Prefect flow, ensuring that there are no lingering tasks executing in the background. The goal is to achieve this without causing any errors in the execution flow. without using asynio.gather() this is my code is below and here i executed 3 subflows in 3 tasks without using gather(). i want return when i got result from any task and stop the remaining parallel tasks. Is it possible ? import asyncio
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.exceptions import CancelledRun
@flow
async def subflow_1():
await asyncio.sleep(20) # Simulate work
print("Subflow 1 completed")
return "Result from subflow 1"
@flow
async def subflow_2():
await asyncio.sleep(30) # Simulate work
print("Subflow 2 completed")
return "Result from subflow 2"
@flow
async def subflow_3():
await asyncio.sleep(40) # Simulate work
print("Subflow 3 completed")
return "Result from subflow 3"
@task
async def task1():
return await subflow_1()
# Run the subflow and return the result
@task
async def task2():
return await subflow_2()
@task
async def task3():
return await subflow_3()
@flow(task_runner=ConcurrentTaskRunner())
async def main_flow():
# Submit tasks without awaiting them individually to ensure parallel execution
task_1 = await task1.submit()
task_2 = await task2.submit()
task_3 = await task3.submit()
if __name__ == "__main__":
asyncio.run(main_flow()) |
Beta Was this translation helpful? Give feedback.
Replies: 5 comments 3 replies
-
Hey @Yesurajup! There isn't anything available in Here's an example: import asyncio
from prefect import flow
@flow
async def subflow_1():
await asyncio.sleep(20) # Simulate work
print("Subflow 1 completed")
return "Result from subflow 1"
@flow
async def subflow_2():
await asyncio.sleep(30) # Simulate work
print("Subflow 2 completed")
return "Result from subflow 2"
@flow
async def subflow_3():
await asyncio.sleep(40) # Simulate work
print("Subflow 3 completed")
return "Result from subflow 3"
@flow
async def main_flow():
# Create asyncio tasks for each child flow
tasks = [
asyncio.create_task(subflow_1()),
asyncio.create_task(subflow_2()),
asyncio.create_task(subflow_3()),
]
# Wait until the first asyncio task completes
done, pending = await asyncio.wait(
tasks, return_when=asyncio.FIRST_COMPLETED)
# Get the result from the completed task(s)
for completed_task in done:
result = await completed_task
print(f'Completed: {result}')
# Cancel the remaining tasks
for pending_task in pending:
pending_task.cancel()
try:
await pending_task
except asyncio.CancelledError:
print(f'Cancelled: {pending_task.get_name()}')
# Run the main flow
asyncio.run(main()) I'm going to convert this to a discussion since this seems like a question more than a bug. Let me know if the above solution works for you or if you'd like to discuss further! |
Beta Was this translation helpful? Give feedback.
-
Thank you sir, It's working now.
…On Mon, Sep 30, 2024 at 8:13 PM Alexander Streed ***@***.***> wrote:
@Yesurajup <https://github.com/Yesurajup> If you wanted to see which runs
were cancelled in the UI, you could update the example to return a
Cancelled state like this:
import asynciofrom prefect import flowfrom prefect.exceptions import CancelledRunfrom prefect.states import Cancelled
@flowasync def subflow_1():
try:
await asyncio.sleep(20) # Simulate work
print("Subflow 1 completed")
return "Result from subflow 1"
except asyncio.CancelledError:
print("Subflow 1 cancelled. Exiting...")
return Cancelled()
@flowasync def subflow_2():
try:
await asyncio.sleep(30) # Simulate work
print("Subflow 2 completed")
return "Result from subflow 2"
except asyncio.CancelledError:
print("Subflow 2 cancelled. Exiting...")
return Cancelled()
@flowasync def subflow_3():
try:
await asyncio.sleep(40) # Simulate work
print("Subflow 3 completed")
return "Result from subflow 3"
except asyncio.CancelledError:
print("Subflow 3 cancelled. Exiting...")
return Cancelled()
@flowasync def main_flow():
# Create asyncio tasks for each child flow
tasks = [
asyncio.create_task(subflow_1()),
asyncio.create_task(subflow_2()),
asyncio.create_task(subflow_3()),
]
# Wait until the first asyncio task completes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Get the result from the completed task(s)
for completed_task in done:
result = await completed_task
print(f"Completed: {result}")
# Cancel the remaining tasks
for pending_task in pending:
pending_task.cancel()
try:
await pending_task
except CancelledRun:
print(f"Cancelled: {pending_task.get_name()}")
if __name__ == "__main__":
asyncio.run(main_flow())
—
Reply to this email directly, view it on GitHub
<#15500 (reply in thread)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AWUM65XIS3C7XYS4WQWPGWDZZFPSTAVCNFSM6AAAAABO6YKEWWVHI2DSMVQWIX3LMV43URDJONRXK43TNFXW4Q3PNVWWK3TUHMYTANZZHEZDGNA>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Beta Was this translation helpful? Give feedback.
-
sir, is it possible?
### Bug summary
```python
import asyncio
from prefect import flow, task
from prefect.task_runners import ConcurrentTaskRunner
from prefect.exceptions import CancelledRun
@task
async def end_event():
#here i need to write a code to make all running or ongoing subflows or
subtasks or tasks into complete success status and we need to return the
success of tasks or subflows or subtasks.
@flow
async def subflow_1():
await asyncio.sleep(60) # Simulate work
await end_event()
print("Subflow 1 completed")
return "Result from subflow 1"
@flow
async def subflow_2():
await asyncio.sleep(120) # Simulate work
await end_event()
print("Subflow 2 completed")
return "Result from subflow 2"
@flow
async def subflow_3():
await asyncio.sleep(180) # Simulate work
await end_event()
print("Subflow 3 completed")
return "Result from subflow 3"
@task
async def task1():
return await subflow_1()
# Run the subflow and return the result
@task
async def task2():
return await subflow_2()
@task
async def task3():
return await subflow_3()
@flow(task_runner=ConcurrentTaskRunner())
async def main_flow():
# Submit tasks without awaiting them individually to ensure parallel
execution
task_1 = await task1.submit()
task_2 = await task2.submit()
task_3 = await task3.submit()
if __name__ == "__main__":
asyncio.run(main_flow())
```
In my Prefect 3.x flow, I have multiple subflows running concurrently, and
I need to ensure that when the first subflow calls an end_event task, all
other ongoing subflows or tasks are immediately marked as "completed" with
a success status, even if they haven't finished their full execution.
Specifically, once one of the subflows finishes and triggers end_event, I
want all other concurrently running subflows to halt their execution
gracefully, mark them as successfully completed, and return a success
status.
One of these subflows is recursive, so I need to account for that behavior
as well.
|
Beta Was this translation helpful? Give feedback.
-
@Yesurajup you should be able to use the code I posted to accomplish what you're trying to do. If you want the sub flow runs to be marked as completed, you can return a import asyncio
from prefect import flow
from prefect.states import Completed
@flow
async def subflow_1():
try:
await asyncio.sleep(20) # Simulate work
print("Subflow 1 completed")
return "Result from subflow 1"
except asyncio.CancelledError:
print("Subflow 1 cancelled. Exiting...")
return Completed()
@flow
async def subflow_2():
try:
await asyncio.sleep(30) # Simulate work
print("Subflow 2 completed")
return "Result from subflow 2"
except asyncio.CancelledError:
print("Subflow 2 cancelled. Exiting...")
return Completed()
@flow
async def subflow_3():
try:
await asyncio.sleep(40) # Simulate work
print("Subflow 3 completed")
return "Result from subflow 3"
except asyncio.CancelledError:
print("Subflow 3 cancelled. Exiting...")
return Completed()
@flow
async def main_flow():
# Create asyncio tasks for each child flow
tasks = [
asyncio.create_task(subflow_1()),
asyncio.create_task(subflow_2()),
asyncio.create_task(subflow_3()),
]
# Wait until the first asyncio task completes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Get the result from the completed task(s)
for completed_task in done:
result = await completed_task
print(f"Completed: {result}")
# Cancel the remaining tasks
for pending_task in pending:
pending_task.cancel()
await pending_task
if __name__ == "__main__":
asyncio.run(main_flow()) The key is that you'll need to catch the |
Beta Was this translation helpful? Give feedback.
-
Awesome, thank you so much!
…On Wed, Oct 2, 2024 at 8:31 PM Alexander Streed ***@***.***> wrote:
@Yesurajup <https://github.com/Yesurajup> you should be able to use the
code I posted to accomplish what you're trying to do. If you want the sub
flow runs to be marked as completed, you can return a Completed state
instead of a Cancelled state:
import asynciofrom prefect import flowfrom prefect.states import Completed
@flowasync def subflow_1():
try:
await asyncio.sleep(20) # Simulate work
print("Subflow 1 completed")
return "Result from subflow 1"
except asyncio.CancelledError:
print("Subflow 1 cancelled. Exiting...")
return Completed()
@flowasync def subflow_2():
try:
await asyncio.sleep(30) # Simulate work
print("Subflow 2 completed")
return "Result from subflow 2"
except asyncio.CancelledError:
print("Subflow 2 cancelled. Exiting...")
return Completed()
@flowasync def subflow_3():
try:
await asyncio.sleep(40) # Simulate work
print("Subflow 3 completed")
return "Result from subflow 3"
except asyncio.CancelledError:
print("Subflow 3 cancelled. Exiting...")
return Completed()
@flowasync def main_flow():
# Create asyncio tasks for each child flow
tasks = [
asyncio.create_task(subflow_1()),
asyncio.create_task(subflow_2()),
asyncio.create_task(subflow_3()),
]
# Wait until the first asyncio task completes
done, pending = await asyncio.wait(tasks, return_when=asyncio.FIRST_COMPLETED)
# Get the result from the completed task(s)
for completed_task in done:
result = await completed_task
print(f"Completed: {result}")
# Cancel the remaining tasks
for pending_task in pending:
pending_task.cancel()
await pending_task
if __name__ == "__main__":
asyncio.run(main_flow())
The key is that you'll need to catch the asyncio.CancelledError from
inside of your flows and return the state you want from the flow.
—
Reply to this email directly, view it on GitHub
<#15500 (comment)>,
or unsubscribe
<https://github.com/notifications/unsubscribe-auth/AWUM65T2J4UN44ICP7EIUMDZZQDE5AVCNFSM6AAAAABO6YKEWWVHI2DSMVQWIX3LMV43URDJONRXK43TNFXW4Q3PNVWWK3TUHMYTAOBSGE4TIOI>
.
You are receiving this because you were mentioned.Message ID:
***@***.***>
|
Beta Was this translation helpful? Give feedback.
@Yesurajup you should be able to use the code I posted to accomplish what you're trying to do. If you want the sub flow runs to be marked as completed, you can return a
Completed
state instead of aCancelled
state: