Skip to content
This repository has been archived by the owner on Dec 8, 2020. It is now read-only.

Commit

Permalink
Merge pull request #11 from puppetlabs/features/scheduler-v2
Browse files Browse the repository at this point in the history
Breaking: Revise scheduler to correctly propagate error behavior
  • Loading branch information
impl authored Sep 11, 2019
2 parents b271dd0 + db6c5f1 commit af03879
Show file tree
Hide file tree
Showing 22 changed files with 1,211 additions and 583 deletions.
140 changes: 140 additions & 0 deletions scheduler/adhoc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
package scheduler

import (
"context"
"sync"
)

// adhocProcess wraps a process with a channel that can be used for notifying a
// caller of the process result.
type adhocProcess struct {
ch chan<- error
delegate Process
}

func (ap *adhocProcess) Description() string {
return ap.delegate.Description()
}

func (ap *adhocProcess) Run(ctx context.Context) (err error) {
defer func() {
if r := recover(); r != nil {
err = coerceError(r)

// Re-panic after we capture the error.
defer panic(r)
}

ap.ch <- err
}()

return ap.delegate.Run(ctx)
}

// AdhocDescriptor is a descriptor that allows external access to submit work to
// be scheduled. It is paired with an AdhocSubmitter, which should be provided
// to external clients to receive the work.
//
// This descriptor is non-blocking; it will indefinitely queue work, consuming a
// proportional amount of memory per pending process if the scheduler does not
// have availability. You may want to rate limit submissions.
type AdhocDescriptor struct {
queue []*adhocProcess
cond *sync.Cond
}

var _ Descriptor = &AdhocDescriptor{}

func (ad *AdhocDescriptor) runOnce(ctx context.Context) (*adhocProcess, bool) {
ad.cond.L.Lock()
defer ad.cond.L.Unlock()

for len(ad.queue) == 0 {
select {
case <-ctx.Done():
return nil, false
default:
}

ad.cond.Wait()
}

// Pluck the first item. We zero it out in the queue to make sure we can
// garbage collect the struct when it's done processing.
next := ad.queue[0]

ad.queue[0] = nil
ad.queue = ad.queue[1:]

return next, true
}

// Run executes this descriptor with the given process channel.
func (ad *AdhocDescriptor) Run(ctx context.Context, pc chan<- Process) error {
doneCh := make(chan struct{})
defer close(doneCh)

go func() {
select {
case <-doneCh:
case <-ctx.Done():
// There is a slight inefficiency here because we need to make sure
// we only wake up the descriptor waiting in the current context,
// but we don't know which one that is, so we have to broadcast.
ad.cond.L.Lock()
defer ad.cond.L.Unlock()

ad.cond.Broadcast()
}
}()

for {
p, ok := ad.runOnce(ctx)
if !ok {
break
}

pc <- p
}

return nil
}

// AdhocSubmitter is used to submit work to an adhoc descriptor.
//
// Work is always immediately enqueued.
type AdhocSubmitter struct {
target *AdhocDescriptor
}

// QueueLen returns the number of work items in the descriptor's queue. These
// items have not yet been submitted to the scheduler for processing.
func (as *AdhocSubmitter) QueueLen() int {
as.target.cond.L.Lock()
defer as.target.cond.L.Unlock()

return len(as.target.queue)
}

// Submit adds a new work item to the descriptor's queue.
func (as *AdhocSubmitter) Submit(p Process) <-chan error {
as.target.cond.L.Lock()
defer as.target.cond.L.Unlock()

ch := make(chan error, 1)

as.target.queue = append(as.target.queue, &adhocProcess{delegate: p, ch: ch})
as.target.cond.Signal()

return ch
}

// NewAdhocDescriptor returns a bound pair of adhoc descriptor and submitter.
// Submitting work items through the returned submitter will enqueue them to the
// returned descriptor.
func NewAdhocDescriptor() (*AdhocDescriptor, *AdhocSubmitter) {
ad := &AdhocDescriptor{cond: sync.NewCond(&sync.Mutex{})}
as := &AdhocSubmitter{target: ad}

return ad, as
}
158 changes: 158 additions & 0 deletions scheduler/adhoc_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
package scheduler_test

import (
"context"
"sync/atomic"
"testing"
"time"

"github.com/puppetlabs/errawr-go/v2/pkg/errawr"
"github.com/puppetlabs/errawr-go/v2/pkg/testutil"
"github.com/puppetlabs/horsehead/scheduler"
"github.com/stretchr/testify/assert"
)

func TestAdhocQueue(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ad, as := scheduler.NewAdhocDescriptor()
lc := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop)

p1 := as.Submit(scheduler.DescribeProcessFunc("p1", func(ctx context.Context) error {
return nil
}))
p2 := as.Submit(scheduler.DescribeProcessFunc("p2", func(ctx context.Context) error {
return nil
}))

slc := lc.Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc))
assert.Empty(t, slc.Errs())
}()

select {
case err := <-p1:
assert.NoError(t, err, "p1 returned error")
case <-ctx.Done():
assert.Fail(t, "p1 context expired")
}

select {
case err := <-p2:
assert.NoError(t, err, "p2 returned error")
case <-ctx.Done():
assert.Fail(t, "p2 context expired")
}
}

func TestAdhocErrors(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ad, as := scheduler.NewAdhocDescriptor()
lc := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop)

p1 := as.Submit(scheduler.DescribeProcessFunc("p1", func(ctx context.Context) error {
return testutil.NewStubError("p1")
}))
p2 := as.Submit(scheduler.DescribeProcessFunc("p2", func(ctx context.Context) error {
panic(testutil.NewStubError("p2"))
}))

slc := lc.Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc))
assert.Empty(t, slc.Errs())
}()

select {
case err := <-p1:
switch rerr := err.(type) {
case errawr.Error:
assert.Equal(t, "p1", rerr.Code())
default:
assert.Fail(t, "p1 did not return an error")
}
case <-ctx.Done():
assert.Fail(t, "p1 context expired")
}

select {
case err := <-p2:
switch rerr := err.(type) {
case errawr.Error:
assert.Equal(t, "p2", rerr.Code())
default:
assert.Fail(t, "p2 did not return an error")
}
case <-ctx.Done():
assert.Fail(t, "p2 context expired")
}
}

func TestAdhocSubmissionMultipleLifecycles(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()

ad, as := scheduler.NewAdhocDescriptor()

slc1 := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop).
Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc1))
assert.Empty(t, slc1.Errs())
}()

slc2 := scheduler.
NewSegment(1, []scheduler.Descriptor{ad}).
WithErrorBehavior(scheduler.ErrorBehaviorDrop).
Start(scheduler.LifecycleStartOptions{})
defer func() {
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc2))
assert.Empty(t, slc2.Errs())
}()

var rc int32
p1 := as.Submit(scheduler.DescribeProcessFunc("p1", func(ctx context.Context) error {
// Should run exactly once, even if there are two active schedulers.
atomic.AddInt32(&rc, 1)
return nil
}))

assert.NoError(t, <-p1)
assert.Equal(t, int32(1), rc)

// Close one of the segments; we should still be able to run jobs on the
// second one.
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc1))
assert.Empty(t, slc1.Errs())

p2 := as.Submit(scheduler.DescribeProcessFunc("p2", func(ctx context.Context) error {
atomic.AddInt32(&rc, 1)
return nil
}))

assert.NoError(t, <-p2)
assert.Equal(t, int32(2), rc)

// Close the second segment. Now a process should just go into the queue as
// there is nothing to run it.
assert.NoError(t, scheduler.CloseWaitContext(ctx, slc2))
assert.Empty(t, slc2.Errs())

as.Submit(scheduler.DescribeProcessFunc("p3", func(ctx context.Context) error {
atomic.AddInt32(&rc, 1)
return nil
}))

assert.Equal(t, 1, as.QueueLen())
assert.Equal(t, int32(2), rc)
}
16 changes: 16 additions & 0 deletions scheduler/capturer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package scheduler

import (
"github.com/puppetlabs/horsehead/instrumentation/alerts"
"github.com/puppetlabs/horsehead/instrumentation/alerts/trackers"
)

var defaultCapturer = alerts.NewAlerts(alerts.NoDelegate, alerts.Options{}).NewCapturer()

func coalesceCapturer(candidate trackers.Capturer) trackers.Capturer {
if candidate == nil {
return defaultCapturer
}

return candidate
}
63 changes: 63 additions & 0 deletions scheduler/doc.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
Package scheduler provides a managed API to Goroutines using Lifecycles.
The most basic type of management is using the Schedulable interface with a
Scheduler:
worker := scheduler.SchedulableFunc(func(ctx context.Context, er scheduler.ErrorReporter) {
for {
select {
case <-ctx.Done():
return
case <-time.After(100 * time.Millisecond):
fmt.Println("Mmm... pie.")
}
}
})
l := scheduler.NewScheduler(scheduler.OneSchedulable(worker))
sl := l.Start(scheduler.LifecycleStartOptions{})
time.Sleep(1 * time.Second)
// Tell the scheduler to start closing.
sl.Close()
// Wait for all managed routines to finish.
<-sl.Done()
Schedulers terminate when all of their children exit.
You can choose from three canned error behaviors for most lifecycles:
ErrorBehaviorDrop, ErrorBehaviorCollect, and ErrorBehaviorTerminate.
ErrorBehaviorDrop ignores errors, allowing the lifecycle to continue executing
normally. ErrorBehaviorCollect stores all errors returned (potentially allowing
for unbounded memory growth, so use with discretion) and provides them when the
lifecycle completes. ErrorBehaviorTerminate causes the lifecycle to close as
soon as it receives an error. You may implement your own error behaviors by
conforming to the ErrorBehavior interface.
If you have a few lifecycles that are parameterized differently and you want to
manage them together, the Parent lifecycle aggregates them and runs them in
parallel.
This package also provides a more sophisticated lifecycle, Segment. A Segment
provides a worker pool and a mechanism for dispatching work. Dispatchers
implement the Descriptor interface and work items implement the Process
interface. The example above could equivalently be written as follows:
proc := scheduler.ProcessFunc(func(ctx context.Context) error {
fmt.Println("Mmm... pie.")
return nil
})
l := scheduler.NewSegment(1, []scheduler.Descriptor{
scheduler.NewIntervalDescriptor(100*time.Millisecond, proc),
})
// Start, close, and wait on the lifecycle as before.
Descriptors are particularly useful when asynchronously waiting on events from
external APIs for processing.
*/
package scheduler
Loading

0 comments on commit af03879

Please sign in to comment.