Skip to content

Commit

Permalink
Merge pull request #188 from mariomac/shutdown
Browse files Browse the repository at this point in the history
Fix shutdown time
  • Loading branch information
Mario Macias authored Apr 25, 2022
2 parents d36b66c + 6ac9c48 commit 43ca3a2
Show file tree
Hide file tree
Showing 11 changed files with 29 additions and 21 deletions.
4 changes: 2 additions & 2 deletions pkg/pipeline/encode/encode_prom.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ type encodeProm struct {
expiryTime int64
mList *list.List
mCache metricCache
exitChan chan bool
exitChan chan struct{}
}

var metricsProcessed = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -336,7 +336,7 @@ func NewEncodeProm(params config.StageParam) (Encoder, error) {
}
}

ch := make(chan bool, 1)
ch := make(chan struct{})
utils.RegisterExitChannel(ch)

log.Debugf("metrics = %v", metrics)
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/extract/aggregate/aggregates.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func (aggregates *Aggregates) AddAggregate(aggregateDefinition api.AggregateDefi
func (aggregates *Aggregates) cleanupExpiredEntriesLoop() {

ticker := time.NewTicker(time.Duration(aggregates.expiryTime) * time.Second)
done := make(chan bool)
done := make(chan struct{})
utils.RegisterExitChannel(done)
go func() {
for {
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ type ingestCollector struct {
in chan map[string]interface{}
batchFlushTime time.Duration
batchMaxLength int
exitChan chan bool
exitChan chan struct{}
}

// TransportWrapper is an implementation of the goflow2 transport interface
Expand Down Expand Up @@ -200,7 +200,7 @@ func NewIngestCollector(params config.StageParam) (Ingester, error) {
log.Infof("hostname = %s", jsonIngestCollector.HostName)
log.Infof("port = %d", jsonIngestCollector.Port)

ch := make(chan bool, 1)
ch := make(chan struct{})
pUtils.RegisterExitChannel(ch)

bml := defaultBatchMaxLength
Expand Down
2 changes: 1 addition & 1 deletion pkg/pipeline/ingest/ingest_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ func TestIngest(t *testing.T) {
hostname: "0.0.0.0",
port: collectorPort,
batchFlushTime: 10 * time.Millisecond,
exitChan: make(chan bool),
exitChan: make(chan struct{}),
}
forwarded := make(chan []interface{})
//defer close(forwarded)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ import (

type IngestFile struct {
params config.Ingest
exitChan chan bool
exitChan chan struct{}
PrevRecords []interface{}
TotalRecords int
}
Expand Down Expand Up @@ -104,7 +104,7 @@ func NewIngestFile(params config.StageParam) (Ingester, error) {

log.Debugf("input file name = %s", params.Ingest.File.Filename)

ch := make(chan bool, 1)
ch := make(chan struct{})
utils.RegisterExitChannel(ch)
return &IngestFile{
params: params.Ingest,
Expand Down
8 changes: 8 additions & 0 deletions pkg/pipeline/ingest/ingest_grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"

"github.com/netobserv/flowlogs-pipeline/pkg/config"
"github.com/netobserv/flowlogs-pipeline/pkg/pipeline/utils"
"github.com/netobserv/netobserv-agent/pkg/grpc"
"github.com/netobserv/netobserv-agent/pkg/pbflow"
)
Expand Down Expand Up @@ -37,6 +38,13 @@ func NewGRPCProtobuf(params config.StageParam) (*GRPCProtobuf, error) {
}

func (no *GRPCProtobuf) Ingest(out chan<- []interface{}) {
exitCh := make(chan struct{})
utils.RegisterExitChannel(exitCh)
go func() {
<-exitCh
close(no.flowPackets)
no.collector.Close()
}()
for fp := range no.flowPackets {
out <- []interface{}{fp}
}
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type ingestKafka struct {
kafkaParams api.IngestKafka
kafkaReader kafkaReadMessage
in chan string
exitChan chan bool
exitChan chan struct{}
prevRecords []interface{} // copy of most recently sent records; for testing and debugging
}

Expand Down Expand Up @@ -153,7 +153,7 @@ func NewIngestKafka(params config.StageParam) (Ingester, error) {
}
log.Debugf("kafkaReader = %v", kafkaReader)

ch := make(chan bool, 1)
ch := make(chan struct{})
utils.RegisterExitChannel(ch)

return &ingestKafka{
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/ingest/ingest_kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ func Test_IngestKafka(t *testing.T) {
require.Equal(t, record3, receivedEntries[2])

// make the ingest thread exit
ingestKafka.exitChan <- true
close(ingestKafka.exitChan)
time.Sleep(time.Second)
}

Expand Down Expand Up @@ -179,7 +179,7 @@ func Test_KafkaListener(t *testing.T) {
require.Equal(t, string(fakeRecord), receivedEntries[0])

// make the ingest thread exit
ingestKafka.exitChan <- true
close(ingestKafka.exitChan)
time.Sleep(time.Second)

}
8 changes: 4 additions & 4 deletions pkg/pipeline/utils/exit.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,11 +27,11 @@ import (
)

var (
registeredChannels []chan bool
registeredChannels []chan struct{}
chanMutex sync.Mutex
)

func RegisterExitChannel(ch chan bool) {
func RegisterExitChannel(ch chan struct{}) {
chanMutex.Lock()
defer chanMutex.Unlock()
registeredChannels = append(registeredChannels, ch)
Expand All @@ -40,7 +40,7 @@ func RegisterExitChannel(ch chan bool) {
func SetupElegantExit() {
log.Debugf("entering SetupElegantExit")
// handle elegant exit; create support for channels of go routines that want to exit cleanly
registeredChannels = make([]chan bool, 0)
registeredChannels = make([]chan struct{}, 0)
exitSigChan := make(chan os.Signal, 1)
log.Debugf("registered exit signal channel")
signal.Notify(exitSigChan, syscall.SIGINT, syscall.SIGTERM)
Expand All @@ -52,7 +52,7 @@ func SetupElegantExit() {
defer chanMutex.Unlock()
// exit signal received; stop other go functions
for _, ch := range registeredChannels {
ch <- true
close(ch)
}
log.Debugf("exiting SetupElegantExit go function")
}()
Expand Down
6 changes: 3 additions & 3 deletions pkg/pipeline/utils/exit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
func Test_SetupElegantExit(t *testing.T) {
SetupElegantExit()
require.Equal(t, 0, len(registeredChannels))
ch1 := make(chan bool, 1)
ch2 := make(chan bool, 1)
ch3 := make(chan bool, 1)
ch1 := make(chan struct{})
ch2 := make(chan struct{})
ch3 := make(chan struct{})
RegisterExitChannel(ch1)
require.Equal(t, 1, len(registeredChannels))
RegisterExitChannel(ch2)
Expand Down
4 changes: 2 additions & 2 deletions pkg/pipeline/write/write_loki.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ type Loki struct {
client emitter
timeNow func() time.Time
in chan config.GenericMap
exitChan chan bool
exitChan chan struct{}
}

var recordsWritten = operationalMetrics.NewCounter(prometheus.CounterOpts{
Expand Down Expand Up @@ -263,7 +263,7 @@ func NewWriteLoki(params config.StageParam) (*Loki, error) {
return nil, err
}

ch := make(chan bool, 1)
ch := make(chan struct{})
pUtils.RegisterExitChannel(ch)

in := make(chan config.GenericMap, channelSize)
Expand Down

0 comments on commit 43ca3a2

Please sign in to comment.