Skip to content

Commit

Permalink
Merge pull request #190 from mariomac/simple-shutdown
Browse files Browse the repository at this point in the history
simplify elegant shutdown
  • Loading branch information
Mario Macias authored Apr 25, 2022
2 parents 43ca3a2 + 10a217c commit f13dc86
Show file tree
Hide file tree
Showing 12 changed files with 19 additions and 68 deletions.
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,6 @@ require (
gopkg.in/inf.v0 v0.9.1 // indirect
gopkg.in/ini.v1 v1.66.2 // indirect
gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d // indirect
k8s.io/klog/v2 v2.30.0 // indirect
k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect
k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1523,8 +1523,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C
gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw=
gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk=
gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8=
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d h1:yjDpoTxoYVpCt04OYp8zlZsKtrEOK1O4U7l2aWbn3D8=
honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d/go.mod h1:rbNo0ST5hSazCG4rGfpHrwnwvzP1QX62WbhzD+ghGzs=
honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4=
Expand Down
7 changes: 2 additions & 5 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type encodeProm struct {
expiryTime int64
mList *list.List
mCache metricCache
exitChan chan struct{}
exitChan <-chan struct{}
}

var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -336,9 +336,6 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
}
}

ch := make(chan struct{})
utils.RegisterExitChannel(ch)

log.Debugf("metrics = %v", metrics)
w := &encodeProm{
port: fmt.Sprintf(":%v", portNum),
Expand All @@ -347,7 +344,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
expiryTime: expiryTime,
mList: list.New(),
mCache: make(metricCache),
exitChan: ch,
exitChan: utils.ExitChannel(),
}
go startPrometheusInterface(w)
go w.cleanupExpiredEntriesLoop()
Expand Down
4 changes: 1 addition & 3 deletions pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,10 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
done := make(chan struct{})
utils.RegisterExitChannel(done)
go func() {
for {
select {
case <-done:
case <-utils.ExitChannel():
return
case <-ticker.C:
aggregates.cleanupExpiredEntries()
Expand Down
7 changes: 2 additions & 5 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ingestCollector struct {
in chan map[string]interface{}
batchFlushTime time.Duration
batchMaxLength int
exitChan chan struct{}
exitChan <-chan struct{}
}

// TransportWrapper is an implementation of the goflow2 transport interface
Expand Down Expand Up @@ -200,9 +200,6 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
log.Infof("hostname = %s", jsonIngestCollector.HostName)
log.Infof("port = %d", jsonIngestCollector.Port)

ch := make(chan struct{})
pUtils.RegisterExitChannel(ch)

bml := defaultBatchMaxLength
if jsonIngestCollector.BatchMaxLen != 0 {
bml = jsonIngestCollector.BatchMaxLen
Expand All @@ -211,7 +208,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
return &ingestCollector{
hostname: jsonIngestCollector.HostName,
port: jsonIngestCollector.Port,
exitChan: ch,
exitChan: pUtils.ExitChannel(),
batchFlushTime: defaultBatchFlushTime,
batchMaxLength: bml,
}, nil
Expand Down
6 changes: 2 additions & 4 deletions pkg/pipeline/ingest/ingest_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

type IngestFile struct {
params config.Ingest
exitChan chan struct{}
exitChan <-chan struct{}
PrevRecords []interface{}
TotalRecords int
}
Expand Down Expand Up @@ -104,10 +104,8 @@ func NewIngestFile(params config.StageParam) (Ingester, error) {

log.Debugf("input file name = %s", params.Ingest.File.Filename)

ch := make(chan struct{})
utils.RegisterExitChannel(ch)
return &IngestFile{
params: params.Ingest,
exitChan: ch,
exitChan: utils.ExitChannel(),
}, nil
}
4 changes: 1 addition & 3 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,8 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
}

func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
exitCh := make(chan struct{})
utils.RegisterExitChannel(exitCh)
go func() {
<-exitCh
<-utils.ExitChannel()
close(no.flowPackets)
no.collector.Close()
}()
Expand Down
7 changes: 2 additions & 5 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ingestKafka struct {
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
in chan string
exitChan chan struct{}
exitChan <-chan struct{}
prevRecords []interface{} // copy of most recently sent records; for testing and debugging
}

Expand Down Expand Up @@ -153,13 +153,10 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Debugf("kafkaReader = %v", kafkaReader)

ch := make(chan struct{})
utils.RegisterExitChannel(ch)

return &ingestKafka{
kafkaParams: jsonIngestKafka,
kafkaReader: kafkaReader,
exitChan: ch,
exitChan: utils.ExitChannel(),
in: make(chan string, channelSizeKafka),
prevRecords: make([]interface{}, 0),
}, nil
Expand Down
9 changes: 0 additions & 9 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,10 +124,6 @@ func Test_IngestKafka(t *testing.T) {
require.Equal(t, record1, receivedEntries[0])
require.Equal(t, record2, receivedEntries[1])
require.Equal(t, record3, receivedEntries[2])

// make the ingest thread exit
close(ingestKafka.exitChan)
time.Sleep(time.Second)
}

type fakeKafkaReader struct {
Expand Down Expand Up @@ -177,9 +173,4 @@ func Test_KafkaListener(t *testing.T) {

require.Equal(t, 1, len(receivedEntries))
require.Equal(t, string(fakeRecord), receivedEntries[0])

// make the ingest thread exit
close(ingestKafka.exitChan)
time.Sleep(time.Second)

}
19 changes: 5 additions & 14 deletions pkg/pipeline/utils/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,40 +20,31 @@ package utils
import (
"os"
"os/signal"
"sync"
"syscall"

log "github.com/sirupsen/logrus"
)

var (
registeredChannels []chan struct{}
chanMutex sync.Mutex
exitChannel chan struct{}
)

func RegisterExitChannel(ch chan struct{}) {
chanMutex.Lock()
defer chanMutex.Unlock()
registeredChannels = append(registeredChannels, ch)
func ExitChannel() <-chan struct{} {
return exitChannel
}

func SetupElegantExit() {
log.Debugf("entering SetupElegantExit")
// handle elegant exit; create support for channels of go routines that want to exit cleanly
registeredChannels = make([]chan struct{}, 0)
exitChannel = make(chan struct{})
exitSigChan := make(chan os.Signal, 1)
log.Debugf("registered exit signal channel")
signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
// wait for exit signal; then stop all the other go functions
sig := <-exitSigChan
log.Debugf("received exit signal = %v", sig)
chanMutex.Lock()
defer chanMutex.Unlock()
// exit signal received; stop other go functions
for _, ch := range registeredChannels {
close(ch)
}
close(exitChannel)
log.Debugf("exiting SetupElegantExit go function")
}()
log.Debugf("exiting SetupElegantExit")
Expand Down
14 changes: 2 additions & 12 deletions pkg/pipeline/utils/exit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,19 +10,9 @@ import (

func Test_SetupElegantExit(t *testing.T) {
SetupElegantExit()
require.Equal(t, 0, len(registeredChannels))
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
RegisterExitChannel(ch1)
require.Equal(t, 1, len(registeredChannels))
RegisterExitChannel(ch2)
require.Equal(t, 2, len(registeredChannels))
RegisterExitChannel(ch3)
require.Equal(t, 3, len(registeredChannels))

select {
case <-ch1:
case <-ExitChannel():
// should not get here
require.Error(t, fmt.Errorf("channel should have been empty"))
default:
Expand All @@ -34,7 +24,7 @@ func Test_SetupElegantExit(t *testing.T) {
require.Equal(t, nil, err)

select {
case <-ch1:
case <-ExitChannel():
break
default:
// should not get here
Expand Down
7 changes: 2 additions & 5 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Loki struct {
client emitter
timeNow func() time.Time
in chan config.GenericMap
exitChan chan struct{}
exitChan <-chan struct{}
}

var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -263,17 +263,14 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) {
return nil, err
}

ch := make(chan struct{})
pUtils.RegisterExitChannel(ch)

in := make(chan config.GenericMap, channelSize)

l := &Loki{
lokiConfig: lokiConfig,
apiConfig: jsonWriteLoki,
client: client,
timeNow: time.Now,
exitChan: ch,
exitChan: pUtils.ExitChannel(),
in: in,
}

Expand Down

0 comments on commit f13dc86

Please sign in to comment.