Skip to content
This repository has been archived by the owner on Apr 2, 2024. It is now read-only.

Commit

Permalink
Merge pull request #1652 from mozilla-services/serialize_sandbox_signal
Browse files Browse the repository at this point in the history
SIGUSR2 to serialize sandboxes when wedged
  • Loading branch information
Mike Trinkala committed Aug 7, 2015
2 parents 93166f3 + 152b354 commit 74e346f
Show file tree
Hide file tree
Showing 23 changed files with 334 additions and 106 deletions.
13 changes: 13 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,13 +1,26 @@
0.10.0 (2015-??-??)
===================

Backwards Incompatibilities
---------------------------

* `PluginHelper.PipelinePack` method now returns `(*PipelinePack, err)` values
instead of just `*PipelinePack`.

Features
--------

* Allow multiple sandbox module directories to be specified (#1525).

* Add Nginx stub status lua sandbox decoder.

* Added support for SIGUSR2 signal for use when Heka is wedged, triggers a Heka
pipeline report to the console, abort and serialize of all sandboxes, and a
shutdown.

* InputRunner and router's `Inject` methods each now return an error that can
be checked to verify successful message injection.

* Added `SplitStreamNullSplitterToEOF` to SplitterRunner interface so input
plugins can avoid generating messages with whatever happens to come back from
a `Read()` call.
Expand Down
9 changes: 6 additions & 3 deletions docs/source/developing/old_apis.rst
Original file line number Diff line number Diff line change
Expand Up @@ -73,9 +73,12 @@ To generate new messages, your filter must call
be passed in should be obtained from the ``MsgLoopCount`` value on the pack
that you're already holding, or possibly zero if the new message is being
triggered by a timed ticker instead of an incoming message. The PipelinePack
method will either return a pack ready for you to populate or nil if the loop
count is greater than the configured maximum value, as a safeguard against
inadvertently creating infinite message loops.
method returns two values, the first a ``*PipelinePack`` and the second an
error. If all goes well, you'll get a pack ready for you to populate and a nil
error. If the loop count is greater than the configured maximum value (as a
safeguard against inadvertently creating infinite message loops), or if a pack
isn't available for some other reason, you'll get a nil pack and a non-nil
error.

Once a pack has been obtained, a filter plugin can populate its Message struct
using the various provided mutator methods. The pack can then be injected into
Expand Down
12 changes: 7 additions & 5 deletions docs/source/developing/plugin.rst
Original file line number Diff line number Diff line change
Expand Up @@ -779,11 +779,13 @@ either case, the process is the same.
To generate new messages, your filter must call
``PluginHelper.PipelinePack(msgLoopCount int)``. The ``msgloopCount`` value to
be passed in should be obtained from the ``MsgLoopCount`` value on the pack
that you're already holding, if called from within ProcessMessage, or zero if
called from within TimerEvent. The PipelinePack method will either return a
pack ready for you to populate or nil if the loop count is greater than the
configured maximum value, as a safeguard against inadvertently creating
infinite message loops.
that you're already holding, or zero if called from within TimerEvent. The
PipelinePack method returns two values, the first a ``*PipelinePack`` and the
second an error. If all goes well, you'll get a pack ready for you to populate
and a nil error. If the loop count is greater than the configured maximum value
(as a safeguard against inadvertently creating infinite message loops), or if a
pack isn't available for some other reason, you'll get a nil pack and a non-nil
error.

Once a pack has been obtained, a filter plugin can populate its Message struct
using any of its provided mutator methods. (Note that this is the *only* time
Expand Down
47 changes: 39 additions & 8 deletions docs/source/monitoring/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@
Monitoring Internal State
=========================

Heka can emit metrics about it's internal state to either an outgoing
Heka message (and, through the DashboardOutput, to a web dashboard) or
to stdout.
Sending SIGUSR1 to hekad on a UNIX will send a plain text report
tostdout. On Windows, you will need to send signal
10 to the hekad process using Powershell.
Heka Reports
------------

Heka can emit metrics about it's internal state to either an outgoing Heka
message (and, through the DashboardOutput, to a web dashboard) or to stdout.
Sending SIGUSR1 to hekad on a UNIX will send a plain text report to stdout. On
Windows, you will need to send signal 10 to the hekad process using Powershell.

Sample text output ::

Expand Down Expand Up @@ -74,5 +75,35 @@ Sample text output ::
MatchAvgDuration: 336
========

To enable the HTTP interface, you will need to enable the
dashboard output plugin, see :ref:`config_dashboard_output`.
To enable the HTTP interface, you will need to enable the dashboard output
plugin, see :ref:`config_dashboard_output`.

Aborting When Wedged
--------------------

In some cases a misconfigured Heka instance or some other conditions can cause
Heka to suffer from "pack exhaustion", resulting in the process becoming wedged
such that no more data flows through the pipeline. In this case Heka will not
shut down cleanly when sent a SIGINT signal via ``ctrl-c``.

When this happens, Heka administrators should send a SIGUSR2 (or signal 11 on
Windows) to the hekad process. If Heka detects that messages are successfully
flowing through the system this will have no impact, but if Heka can verify
that the pipeline is wedged then the following sequence of events will be
triggered:

* A Heka report will be generated and output to the console, hopefully
providing insight into what caused the wedging.

* A shutdown signal will be sent to the Heka pipeline, triggering a shut down
if one hasn't already been sent.

* An `abort` signal will propagate through the Heka pipeline, causing any
wedged sandboxes to become unwedged and immediately exit, preserving their
state if specified with the ``preserve_data = true`` config setting.

When this has finished usually Heka will become unwedged and exit cleanly,
preserving the state of all so configured sandboxes. In some cases Heka will
still not exit cleanly and will require a SIGQUIT signal. Even in these cases,
however, state of sandbox plugins will often be serialized to disk such that
it's available after a restart.
17 changes: 11 additions & 6 deletions pipeline/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ type PluginHelper interface {
// pointer that can be populated w/ message data and inserted into the
// Heka pipeline. Returns `nil` if the loop count value provided is
// greater than the maximum allowed by the Heka instance.
PipelinePack(msgLoopCount uint) *PipelinePack
PipelinePack(msgLoopCount uint) (*PipelinePack, error)

// Returns an input plugin of the given name that provides the
// StatAccumulator interface, or an error value if such a plugin
Expand Down Expand Up @@ -210,7 +210,7 @@ func NewPipelineConfig(globals *GlobalConfigStruct) (config *PipelineConfig) {
config.OutputRunners = make(map[string]OutputRunner)

config.allEncoders = make(map[string]Encoder)
config.router = NewMessageRouter(globals.PluginChanSize)
config.router = NewMessageRouter(globals.PluginChanSize, globals.abortChan)
config.inputRecycleChan = make(chan *PipelinePack, globals.PoolSize)
config.injectRecycleChan = make(chan *PipelinePack, globals.PoolSize)
config.LogMsgs = make([]string, 0, 4)
Expand All @@ -227,18 +227,23 @@ func NewPipelineConfig(globals *GlobalConfigStruct) (config *PipelineConfig) {
// Callers should pass in the msgLoopCount value from any relevant Message
// objects they are holding. Returns a PipelinePack for injection into Heka
// pipeline, or nil if the msgLoopCount is above the configured maximum.
func (self *PipelineConfig) PipelinePack(msgLoopCount uint) *PipelinePack {
func (self *PipelineConfig) PipelinePack(msgLoopCount uint) (*PipelinePack, error) {
if msgLoopCount++; msgLoopCount > self.Globals.MaxMsgLoops {
return nil
return nil, fmt.Errorf("exceeded MaxMsgLoops = %d", self.Globals.MaxMsgLoops)
}
var pack *PipelinePack
select {
case pack = <-self.injectRecycleChan:
case <-self.Globals.abortChan:
return nil, AbortError
}
pack := <-self.injectRecycleChan
pack.Message.SetTimestamp(time.Now().UnixNano())
pack.Message.SetUuid(uuid.NewRandom())
pack.Message.SetHostname(self.hostname)
pack.Message.SetPid(self.pid)
pack.RefCount = 1
pack.MsgLoopCount = msgLoopCount
return pack
return pack, nil
}

// Returns the router.
Expand Down
14 changes: 6 additions & 8 deletions pipeline/counter_filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,10 +102,9 @@ func (this *CounterFilter) tally(fr FilterRunner, h PluginHelper,
this.rate = float64(msgsSent) / elapsedTime.Seconds()
this.rates = append(this.rates, this.rate)

pack := h.PipelinePack(msgLoopCount)
if pack == nil {
fr.LogError(fmt.Errorf("exceeded MaxMsgLoops = %d",
h.PipelineConfig().Globals.MaxMsgLoops))
pack, e := h.PipelinePack(msgLoopCount)
if e != nil {
fr.LogError(e)
return
}
pack.Message.SetLogger(fr.Name())
Expand All @@ -124,10 +123,9 @@ func (this *CounterFilter) tally(fr FilterRunner, h PluginHelper,
sum += val
}
mean := sum / float64(samples)
pack := h.PipelinePack(msgLoopCount)
if pack == nil {
fr.LogError(fmt.Errorf("exceeded MaxMsgLoops = %d",
h.PipelineConfig().Globals.MaxMsgLoops))
pack, e := h.PipelinePack(msgLoopCount)
if e != nil {
fr.LogError(e)
return
}
pack.Message.SetLogger(fr.Name())
Expand Down
61 changes: 60 additions & 1 deletion pipeline/pipeline_runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package pipeline

import (
"errors"
"os"
"os/signal"
"path/filepath"
Expand All @@ -37,6 +38,8 @@ const (
STOP = "stop"
)

var AbortError = errors.New("Aborting")

// Struct for holding global pipeline config values.
type GlobalConfigStruct struct {
MaxMsgProcessDuration uint64
Expand All @@ -53,6 +56,7 @@ type GlobalConfigStruct struct {
SampleDenominator int
sigChan chan os.Signal
Hostname string
abortChan chan struct{}
}

// Creates a GlobalConfigStruct object populated w/ default values.
Expand All @@ -70,6 +74,7 @@ func DefaultGlobals() (globals *GlobalConfigStruct) {
SampleDenominator: 1000,
sigChan: make(chan os.Signal, 1),
Hostname: hostname,
abortChan: make(chan struct{}),
}
}

Expand Down Expand Up @@ -107,6 +112,10 @@ func (g *GlobalConfigStruct) LogMessage(src, msg string) {
LogInfo.Printf("%s: %s", src, msg)
}

func (g *GlobalConfigStruct) AbortChan() chan struct{} {
return g.abortChan
}

func isAbs(path string) bool {
return strings.HasPrefix(path, string(os.PathSeparator)) || len(filepath.VolumeName(path)) > 0
}
Expand Down Expand Up @@ -318,7 +327,8 @@ func Run(config *PipelineConfig) {
}

// wait for sigint
signal.Notify(globals.sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP, SIGUSR1)
signal.Notify(globals.sigChan, syscall.SIGINT, syscall.SIGTERM, syscall.SIGHUP,
SIGUSR1, SIGUSR2)

for !globals.IsShuttingDown() {
select {
Expand All @@ -335,6 +345,9 @@ func Run(config *PipelineConfig) {
case SIGUSR1:
LogInfo.Println("Queue report initiated.")
go config.allReportsStdout()
case SIGUSR2:
LogInfo.Println("Sandbox abort initiated.")
go sandboxAbort(config)
}
}
}
Expand Down Expand Up @@ -386,3 +399,49 @@ func Run(config *PipelineConfig) {

LogInfo.Println("Shutdown complete.")
}

func sandboxAbort(config *PipelineConfig) {
// This should only be run when the router isn't processing messages, so we
// try to inject a new message and exit if successful. Far from perfect,
// but protects in most cases.
var pack *PipelinePack
doSave := false

select {
case pack = <-config.injectRecycleChan:
case <-time.After(1 * time.Second):
}

if pack == nil {
select {
case pack = <-config.inputRecycleChan:
case <-time.After(1 * time.Second):
}
}

if pack == nil {
doSave = true
} else {
pack.Message.SetType("heka.router-test")
select {
case config.router.inChan <- pack:
case <-time.After(1 * time.Second):
doSave = true
}
}

if !doSave {
msg := "Can't save sandboxes while router is processing messages, use regular shutdown."
LogError.Println(msg)
return
}

// If we got this far the router is presumably wedged and we'll go ahead
// and do it. First we generate a report for forensics re: why we were
// wedged, then send a shutdown signal, then close the abortChan to free up
// and abort any sandboxes that are wedged inside process_message or
// timer_event.
config.allReportsStdout()
config.Globals.ShutDown()
close(config.Globals.abortChan)
}
1 change: 1 addition & 0 deletions pipeline/pipeline_signals_darwin.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ package pipeline
import "syscall"

const SIGUSR1 = syscall.SIGUSR1
const SIGUSR2 = syscall.SIGUSR2
1 change: 1 addition & 0 deletions pipeline/pipeline_signals_freebsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ package pipeline
import "syscall"

const SIGUSR1 = syscall.SIGUSR1
const SIGUSR2 = syscall.SIGUSR2
1 change: 1 addition & 0 deletions pipeline/pipeline_signals_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ package pipeline
import "syscall"

const SIGUSR1 = syscall.SIGUSR1
const SIGUSR2 = syscall.SIGUSR2
1 change: 1 addition & 0 deletions pipeline/pipeline_signals_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,3 +24,4 @@ import (
// SIGUSR1 isn't defined on Windows.

const SIGUSR1 = syscall.Signal(0xa)
const SIGUSR2 = syscall.Signal(0xb)
6 changes: 6 additions & 0 deletions pipeline/plugin_interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,3 +104,9 @@ type Output interface {
type TickerPlugin interface {
TimerEvent() (err error)
}

// Implemented by the sandbox plugins to allow out-of-band sandbox teardown.
type Destroyable interface {
StopSB()
Destroy() error
}
Loading

0 comments on commit 74e346f

Please sign in to comment.