Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add task/scheduler cancellation API #557

Merged
merged 3 commits into from
Aug 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 28 additions & 0 deletions docs/src/task-spawning.md
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,34 @@ but can be enabled by setting the scheduler/thunk option ([Scheduler and Thunk o
non-dynamic usecases, since any thunk failure will propagate down to the output
thunk regardless of where it occurs.

## Cancellation

Sometimes a task runs longer than expected (maybe it's hanging due to a bug),
or the user decides that they don't want to wait on a task to run to
completion. In these cases, Dagger provides the `Dagger.cancel!` function,
which allows for stopping a task while it's running, or terminating it before
it gets the chance to start running.

```julia
t = Dagger.@spawn sleep(1000)
# We're bored, let's cancel `t`
Dagger.cancel!(t)
```

`Dagger.cancel!` is generally safe to call, as it will not actually *force* a
task to stop; instead, Dagger will simply "abandon" the task and allow it to
finish on its own in the background, and it will not block the execution of
other `DTask`s that are queued to run. It is possible to force-cancel a task by
doing `Dagger.cancel!(t; force=true)`, but this is generally discouraged, as it
can cause memory leaks, hangs, and segfaults.

If it's desired to cancel all tasks that are scheduled or running, one can call
`Dagger.cancel!()`, and all tasks will be abandoned (or force-cancelled, if
specified). Additionally, if Dagger's scheduler needs to be restarted for any
reason, one can call `Dagger.cancel!(;halt_sch=true)` to stop the scheduler and
all tasks. The scheduler will be automatically restarted on the next
`@spawn`/`spawn` call.

## Lazy API

Alongside the modern eager API, Dagger also has a legacy lazy API, accessible
Expand Down
1 change: 1 addition & 0 deletions src/Dagger.jl
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ include("thunk.jl")
include("submission.jl")
include("chunks.jl")
include("memory-spaces.jl")
include("cancellation.jl")

# Task scheduling
include("compute.jl")
Expand Down
128 changes: 128 additions & 0 deletions src/cancellation.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
"""
cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)

Cancels `task` at any point in its lifecycle, causing the scheduler to abandon
it. If `force` is `true`, the task will be interrupted with an
`InterruptException` (not recommended, this is unsafe). If `halt_sch` is
`true`, the scheduler will be halted after the task is cancelled (it will
restart automatically upon the next `@spawn`/`spawn` call).

As an example, the following code will cancel task `t` before it finishes
executing:

```julia
t = Dagger.@spawn sleep(1000)
# We're bored, let's cancel `t`
Dagger.cancel!(t)
```

Cancellation allows the scheduler to free up execution resources for other
tasks which are waiting to run. Using `cancel!` is generally a much safer
alternative to Ctrl+C, as it cooperates with the scheduler and runtime and
avoids unintended side effects.
"""
function cancel!(task::DTask; force::Bool=false, halt_sch::Bool=false)
tid = lock(Dagger.Sch.EAGER_ID_MAP) do id_map
id_map[task.uid]
end
cancel!(tid; force, halt_sch)
end
function cancel!(tid::Union{Int,Nothing}=nothing;
force::Bool=false, halt_sch::Bool=false)
remotecall_fetch(1, tid, force, halt_sch) do tid, force, halt_sch
state = Sch.EAGER_STATE[]
state === nothing && return
@lock state.lock _cancel!(state, tid, force, halt_sch)
end
end
function _cancel!(state, tid, force, halt_sch)
@assert islocked(state.lock)

# Get the scheduler uid
sch_uid = state.uid

# Cancel ready tasks
for task in state.ready
tid !== nothing && task.id != tid && continue
@dagdebug tid :cancel "Cancelling ready task"
state.cache[task] = InterruptException()
state.errored[task] = true
Sch.set_failed!(state, task)
end
empty!(state.ready)

# Cancel waiting tasks
for task in keys(state.waiting)
tid !== nothing && task.id != tid && continue
@dagdebug tid :cancel "Cancelling waiting task"
state.cache[task] = InterruptException()
state.errored[task] = true
Sch.set_failed!(state, task)
end
empty!(state.waiting)

# Cancel running tasks at the processor level
wids = unique(map(root_worker_id, values(state.running_on)))
for wid in wids
remotecall_fetch(wid, tid, sch_uid, force) do _tid, sch_uid, force
Dagger.Sch.proc_states(sch_uid) do states
for (proc, state) in states
istate = state.state
any_cancelled = false
@lock istate.queue begin
for (tid, task) in istate.tasks
_tid !== nothing && tid != _tid && continue
task_spec = istate.task_specs[tid]
Tf = task_spec[6]
Tf === typeof(Sch.eager_thunk) && continue
istaskdone(task) && continue
any_cancelled = true
@dagdebug tid :cancel "Cancelling running task ($Tf)"
if force
@dagdebug tid :cancel "Interrupting running task ($Tf)"
Threads.@spawn Base.throwto(task, InterruptException())
else
# Tell the processor to just drop this task
task_occupancy = task_spec[4]
time_util = task_spec[2]
istate.proc_occupancy[] -= task_occupancy
istate.time_pressure[] -= time_util
push!(istate.cancelled, tid)
to_proc = istate.proc
put!(istate.return_queue, (myid(), to_proc, tid, (InterruptException(), nothing)))
end
end
end
if any_cancelled
notify(istate.reschedule)
end
end
end
return
end
end

if halt_sch
unlock(state.lock)
try
# Give tasks a moment to be processed
sleep(0.5)

# Halt the scheduler
@dagdebug nothing :cancel "Halting the scheduler"
notify(state.halt)
put!(state.chan, (1, nothing, nothing, (Sch.SchedulerHaltedException(), nothing)))

# Wait for the scheduler to halt
@dagdebug nothing :cancel "Waiting for scheduler to halt"
while Sch.EAGER_INIT[]
sleep(0.1)
end
@dagdebug nothing :cancel "Scheduler halted"
finally
lock(state.lock)
end
end

return
end
1 change: 1 addition & 0 deletions src/precompile.jl
Original file line number Diff line number Diff line change
Expand Up @@ -49,4 +49,5 @@
yield()
@assert isempty(Sch.WORKER_MONITOR_CHANS)
@assert isempty(Sch.WORKER_MONITOR_TASKS)
ID_COUNTER[] = 1
end
27 changes: 24 additions & 3 deletions src/sch/Sch.jl
Original file line number Diff line number Diff line change
Expand Up @@ -652,6 +652,8 @@ function scheduler_exit(ctx, state::ComputeState, options)
lock(ctx.proc_notify) do
notify(ctx.proc_notify)
end

@dagdebug nothing :global "Tore down scheduler" uid=state.uid
end

function procs_to_use(ctx, options=ctx.options)
Expand Down Expand Up @@ -1161,11 +1163,14 @@ Base.hash(key::TaskSpecKey, h::UInt) = hash(key.task_id, hash(TaskSpecKey, h))
struct ProcessorInternalState
ctx::Context
proc::Processor
return_queue::RemoteChannel
queue::LockedObject{PriorityQueue{TaskSpecKey, UInt32, Base.Order.ForwardOrdering}}
reschedule::Doorbell
tasks::Dict{Int,Task}
task_specs::Dict{Int,Vector{Any}}
proc_occupancy::Base.RefValue{UInt32}
time_pressure::Base.RefValue{UInt64}
cancelled::Set{Int}
done::Base.RefValue{Bool}
end
struct ProcessorState
Expand Down Expand Up @@ -1314,6 +1319,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re

# Execute the task and return its result
t = @task begin
was_cancelled = false
result = try
do_task(to_proc, task)
catch err
Expand All @@ -1322,11 +1328,23 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
finally
lock(istate.queue) do _
delete!(tasks, thunk_id)
proc_occupancy[] -= task_occupancy
time_pressure[] -= time_util
delete!(istate.task_specs, thunk_id)
if !(thunk_id in istate.cancelled)
proc_occupancy[] -= task_occupancy
time_pressure[] -= time_util
else
# Task was cancelled, so occupancy and pressure are
# already reduced
pop!(istate.cancelled, thunk_id)
was_cancelled = true
end
end
notify(istate.reschedule)
end
if was_cancelled
# A result was already posted to the return queue
return
end
try
put!(return_queue, (myid(), to_proc, thunk_id, result))
catch err
Expand All @@ -1345,6 +1363,7 @@ function start_processor_runner!(istate::ProcessorInternalState, uid::UInt64, re
t.sticky = false
end
tasks[thunk_id] = errormonitor_tracked("thunk $thunk_id", schedule(t))
istate.task_specs[thunk_id] = task
proc_occupancy[] += task_occupancy
time_pressure[] += time_util
end
Expand Down Expand Up @@ -1377,10 +1396,12 @@ function do_tasks(to_proc, return_queue, tasks)
queue = PriorityQueue{TaskSpecKey, UInt32}()
queue_locked = LockedObject(queue)
reschedule = Doorbell()
istate = ProcessorInternalState(ctx, to_proc,
istate = ProcessorInternalState(ctx, to_proc, return_queue,
queue_locked, reschedule,
Dict{Int,Task}(),
Dict{Int,Vector{Any}}(),
Ref(UInt32(0)), Ref(UInt64(0)),
Set{Int}(),
Ref(false))
runner = start_processor_runner!(istate, uid, return_queue)
@static if VERSION < v"1.9"
Expand Down
10 changes: 9 additions & 1 deletion src/sch/eager.jl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ function init_eager()
end
if Threads.atomic_xchg!(EAGER_INIT, true)
wait(EAGER_READY)
if EAGER_STATE[] === nothing
throw(ConcurrencyViolationError("Eager scheduler failed to start"))
end
return
end
ctx = eager_context()
Expand All @@ -38,14 +41,19 @@ function init_eager()
seek(iob.io, 0)
write(stderr, iob)
finally
reset(EAGER_READY)
# N.B. Sequence order matters to ensure that observers can see that we failed to start
EAGER_STATE[] = nothing
notify(EAGER_READY)
reset(EAGER_READY)
lock(EAGER_ID_MAP) do id_map
empty!(id_map)
end
Threads.atomic_xchg!(EAGER_INIT, false)
end)
wait(EAGER_READY)
if EAGER_STATE[] === nothing
throw(ConcurrencyViolationError("Eager scheduler failed to start"))
end
end
function eager_thunk()
exec!(Dagger.sch_handle()) do ctx, state, task, tid, _
Expand Down
6 changes: 6 additions & 0 deletions src/threadproc.jl
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,12 @@ function execute!(proc::ThreadProc, @nospecialize(f), @nospecialize(args...); @n
fetch(task)
return result[]
catch err
if err isa InterruptException
if !istaskdone(task)
# Propagate cancellation signal
Threads.@spawn Base.throwto(task, InterruptException())
end
end
err, frames = Base.current_exceptions(task)[1]
rethrow(CapturedException(err, frames))
end
Expand Down
2 changes: 1 addition & 1 deletion src/utils/dagdebug.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ function istask end
function task_id end

const DAGDEBUG_CATEGORIES = Symbol[:global, :submit, :schedule, :scope,
:take, :execute, :move, :processor]
:take, :execute, :move, :processor, :cancel]
macro dagdebug(thunk, category, msg, args...)
cat_sym = category.value
@gensym id
Expand Down
11 changes: 11 additions & 0 deletions test/scheduler.jl
Original file line number Diff line number Diff line change
Expand Up @@ -535,3 +535,14 @@ end
end
end
end

@testset "Cancellation" begin
t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) sleep(100)
start_time = time_ns()
Dagger.cancel!(t)
@test_throws_unwrap Dagger.DTaskFailedException fetch(t)
t = Dagger.@spawn scope=Dagger.scope(worker=1, thread=1) yield()
fetch(t)
finish_time = time_ns()
@test (finish_time - start_time) * 1e-9 < 100
end