diff --git a/go.mod b/go.mod index 762bee48d..9ab42d51b 100644 --- a/go.mod +++ b/go.mod @@ -85,7 +85,6 @@ require ( gopkg.in/inf.v0 v0.9.1 // indirect gopkg.in/ini.v1 v1.66.2 // indirect gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b // indirect - honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d // indirect k8s.io/klog/v2 v2.30.0 // indirect k8s.io/kube-openapi v0.0.0-20211115234752-e816edb12b65 // indirect k8s.io/utils v0.0.0-20211116205334-6203023598ed // indirect diff --git a/go.sum b/go.sum index 0d52da1d0..c7d1f45f8 100644 --- a/go.sum +++ b/go.sum @@ -1523,8 +1523,6 @@ gopkg.in/yaml.v3 v3.0.0-20210107192922-496545a6307b/go.mod h1:K4uyk7z7BCEPqu6E+C gotest.tools v2.2.0+incompatible/go.mod h1:DsYFclhRJ6vuDpmuTbkuFWG+y2sxOXAzmJt81HFBacw= gotest.tools/v3 v3.0.2/go.mod h1:3SzNCllyD9/Y+b5r9JIKQ474KzkZyqLqEfYqMsX94Bk= gotest.tools/v3 v3.0.3/go.mod h1:Z7Lb0S5l+klDB31fvDQX8ss/FlKDxtlFlw3Oa8Ymbl8= -honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d h1:yjDpoTxoYVpCt04OYp8zlZsKtrEOK1O4U7l2aWbn3D8= -honnef.co/go/netdb v0.0.0-20210921115105-e902e863d85d/go.mod h1:rbNo0ST5hSazCG4rGfpHrwnwvzP1QX62WbhzD+ghGzs= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190106161140-3f1c8253044a/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/pkg/pipeline/encode/encode_prom.go b/pkg/pipeline/encode/encode_prom.go index 659e720e7..1c5f3c6b2 100644 --- a/pkg/pipeline/encode/encode_prom.go +++ b/pkg/pipeline/encode/encode_prom.go @@ -82,7 +82,7 @@ type encodeProm struct { expiryTime int64 mList *list.List mCache metricCache - exitChan chan struct{} + exitChan <-chan struct{} } var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{ @@ -336,9 +336,6 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { } } - ch := make(chan struct{}) - utils.RegisterExitChannel(ch) - log.Debugf("metrics = %v", metrics) w := &encodeProm{ port: fmt.Sprintf(":%v", portNum), @@ -347,7 +344,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) { expiryTime: expiryTime, mList: list.New(), mCache: make(metricCache), - exitChan: ch, + exitChan: utils.ExitChannel(), } go startPrometheusInterface(w) go w.cleanupExpiredEntriesLoop() diff --git a/pkg/pipeline/extract/aggregate/aggregates.go b/pkg/pipeline/extract/aggregate/aggregates.go index d83179d05..b12d57988 100644 --- a/pkg/pipeline/extract/aggregate/aggregates.go +++ b/pkg/pipeline/extract/aggregate/aggregates.go @@ -75,12 +75,10 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi func (aggregates *Aggregates) cleanupExpiredEntriesLoop() { ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second) - done := make(chan struct{}) - utils.RegisterExitChannel(done) go func() { for { select { - case <-done: + case <-utils.ExitChannel(): return case <-ticker.C: aggregates.cleanupExpiredEntries() diff --git a/pkg/pipeline/ingest/ingest_collector.go b/pkg/pipeline/ingest/ingest_collector.go index 7df418544..f7dbfcf3c 100644 --- a/pkg/pipeline/ingest/ingest_collector.go +++ b/pkg/pipeline/ingest/ingest_collector.go @@ -51,7 +51,7 @@ type ingestCollector struct { in chan map[string]interface{} batchFlushTime time.Duration batchMaxLength int - exitChan chan struct{} + exitChan <-chan struct{} } // TransportWrapper is an implementation of the goflow2 transport interface @@ -200,9 +200,6 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { log.Infof("hostname = %s", jsonIngestCollector.HostName) log.Infof("port = %d", jsonIngestCollector.Port) - ch := make(chan struct{}) - pUtils.RegisterExitChannel(ch) - bml := defaultBatchMaxLength if jsonIngestCollector.BatchMaxLen != 0 { bml = jsonIngestCollector.BatchMaxLen @@ -211,7 +208,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) { return &ingestCollector{ hostname: jsonIngestCollector.HostName, port: jsonIngestCollector.Port, - exitChan: ch, + exitChan: pUtils.ExitChannel(), batchFlushTime: defaultBatchFlushTime, batchMaxLength: bml, }, nil diff --git a/pkg/pipeline/ingest/ingest_file.go b/pkg/pipeline/ingest/ingest_file.go index 4dab07e92..159d2c6d3 100644 --- a/pkg/pipeline/ingest/ingest_file.go +++ b/pkg/pipeline/ingest/ingest_file.go @@ -30,7 +30,7 @@ import ( type IngestFile struct { params config.Ingest - exitChan chan struct{} + exitChan <-chan struct{} PrevRecords []interface{} TotalRecords int } @@ -104,10 +104,8 @@ func NewIngestFile(params config.StageParam) (Ingester, error) { log.Debugf("input file name = %s", params.Ingest.File.Filename) - ch := make(chan struct{}) - utils.RegisterExitChannel(ch) return &IngestFile{ params: params.Ingest, - exitChan: ch, + exitChan: utils.ExitChannel(), }, nil } diff --git a/pkg/pipeline/ingest/ingest_grpc.go b/pkg/pipeline/ingest/ingest_grpc.go index 2be7ea728..65df92284 100644 --- a/pkg/pipeline/ingest/ingest_grpc.go +++ b/pkg/pipeline/ingest/ingest_grpc.go @@ -38,10 +38,8 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) { } func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) { - exitCh := make(chan struct{}) - utils.RegisterExitChannel(exitCh) go func() { - <-exitCh + <-utils.ExitChannel() close(no.flowPackets) no.collector.Close() }() diff --git a/pkg/pipeline/ingest/ingest_kafka.go b/pkg/pipeline/ingest/ingest_kafka.go index 3a55d2f7f..d23dbc8ab 100644 --- a/pkg/pipeline/ingest/ingest_kafka.go +++ b/pkg/pipeline/ingest/ingest_kafka.go @@ -38,7 +38,7 @@ type ingestKafka struct { kafkaParams api.IngestKafka kafkaReader kafkaReadMessage in chan string - exitChan chan struct{} + exitChan <-chan struct{} prevRecords []interface{} // copy of most recently sent records; for testing and debugging } @@ -153,13 +153,10 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) { } log.Debugf("kafkaReader = %v", kafkaReader) - ch := make(chan struct{}) - utils.RegisterExitChannel(ch) - return &ingestKafka{ kafkaParams: jsonIngestKafka, kafkaReader: kafkaReader, - exitChan: ch, + exitChan: utils.ExitChannel(), in: make(chan string, channelSizeKafka), prevRecords: make([]interface{}, 0), }, nil diff --git a/pkg/pipeline/ingest/ingest_kafka_test.go b/pkg/pipeline/ingest/ingest_kafka_test.go index cba3e4ea1..8bb4f7f1c 100644 --- a/pkg/pipeline/ingest/ingest_kafka_test.go +++ b/pkg/pipeline/ingest/ingest_kafka_test.go @@ -124,10 +124,6 @@ func Test_IngestKafka(t *testing.T) { require.Equal(t, record1, receivedEntries[0]) require.Equal(t, record2, receivedEntries[1]) require.Equal(t, record3, receivedEntries[2]) - - // make the ingest thread exit - close(ingestKafka.exitChan) - time.Sleep(time.Second) } type fakeKafkaReader struct { @@ -177,9 +173,4 @@ func Test_KafkaListener(t *testing.T) { require.Equal(t, 1, len(receivedEntries)) require.Equal(t, string(fakeRecord), receivedEntries[0]) - - // make the ingest thread exit - close(ingestKafka.exitChan) - time.Sleep(time.Second) - } diff --git a/pkg/pipeline/utils/exit.go b/pkg/pipeline/utils/exit.go index c0f96d304..90c2ef66c 100644 --- a/pkg/pipeline/utils/exit.go +++ b/pkg/pipeline/utils/exit.go @@ -20,27 +20,23 @@ package utils import ( "os" "os/signal" - "sync" "syscall" log "github.com/sirupsen/logrus" ) var ( - registeredChannels []chan struct{} - chanMutex sync.Mutex + exitChannel chan struct{} ) -func RegisterExitChannel(ch chan struct{}) { - chanMutex.Lock() - defer chanMutex.Unlock() - registeredChannels = append(registeredChannels, ch) +func ExitChannel() <-chan struct{} { + return exitChannel } func SetupElegantExit() { log.Debugf("entering SetupElegantExit") // handle elegant exit; create support for channels of go routines that want to exit cleanly - registeredChannels = make([]chan struct{}, 0) + exitChannel = make(chan struct{}) exitSigChan := make(chan os.Signal, 1) log.Debugf("registered exit signal channel") signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM) @@ -48,12 +44,7 @@ func SetupElegantExit() { // wait for exit signal; then stop all the other go functions sig := <-exitSigChan log.Debugf("received exit signal = %v", sig) - chanMutex.Lock() - defer chanMutex.Unlock() - // exit signal received; stop other go functions - for _, ch := range registeredChannels { - close(ch) - } + close(exitChannel) log.Debugf("exiting SetupElegantExit go function") }() log.Debugf("exiting SetupElegantExit") diff --git a/pkg/pipeline/utils/exit_test.go b/pkg/pipeline/utils/exit_test.go index 646a1f7f1..432f7441f 100644 --- a/pkg/pipeline/utils/exit_test.go +++ b/pkg/pipeline/utils/exit_test.go @@ -10,19 +10,9 @@ import ( func Test_SetupElegantExit(t *testing.T) { SetupElegantExit() - require.Equal(t, 0, len(registeredChannels)) - ch1 := make(chan struct{}) - ch2 := make(chan struct{}) - ch3 := make(chan struct{}) - RegisterExitChannel(ch1) - require.Equal(t, 1, len(registeredChannels)) - RegisterExitChannel(ch2) - require.Equal(t, 2, len(registeredChannels)) - RegisterExitChannel(ch3) - require.Equal(t, 3, len(registeredChannels)) select { - case <-ch1: + case <-ExitChannel(): // should not get here require.Error(t, fmt.Errorf("channel should have been empty")) default: @@ -34,7 +24,7 @@ func Test_SetupElegantExit(t *testing.T) { require.Equal(t, nil, err) select { - case <-ch1: + case <-ExitChannel(): break default: // should not get here diff --git a/pkg/pipeline/write/write_loki.go b/pkg/pipeline/write/write_loki.go index a3d9ef0c6..bfaf413ec 100644 --- a/pkg/pipeline/write/write_loki.go +++ b/pkg/pipeline/write/write_loki.go @@ -56,7 +56,7 @@ type Loki struct { client emitter timeNow func() time.Time in chan config.GenericMap - exitChan chan struct{} + exitChan <-chan struct{} } var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{ @@ -263,9 +263,6 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) { return nil, err } - ch := make(chan struct{}) - pUtils.RegisterExitChannel(ch) - in := make(chan config.GenericMap, channelSize) l := &Loki{ @@ -273,7 +270,7 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) { apiConfig: jsonWriteLoki, client: client, timeNow: time.Now, - exitChan: ch, + exitChan: pUtils.ExitChannel(), in: in, }