Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

context propagation: StreamingAcquisition() #3274

Merged
merged 4 commits into from
Oct 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .github/generate-codecov-yml.sh
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
cat <<EOT
# we measure coverage but don't enforce it
# https://docs.codecov.com/docs/codecov-yaml
codecov:
require_ci_to_pass: false

coverage:
status:
patch:
Expand Down
4 changes: 0 additions & 4 deletions .github/workflows/go-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -180,10 +180,6 @@ jobs:
set -o pipefail
make go-acc | sed 's/ *coverage:.*of statements in.*//' | richgo testfilter

- name: Generate codecov configuration
run: |
.github/generate-codecov-yml.sh >> .github/codecov.yml

- name: Upload unit coverage to Codecov
uses: codecov/codecov-action@v4
with:
Expand Down
2 changes: 1 addition & 1 deletion cmd/crowdsec/crowdsec.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,7 +169,7 @@ func runCrowdsec(cConfig *csconfig.Config, parsers *parser.Parsers, hub *cwhub.H

log.Info("Starting processing data")

if err := acquisition.StartAcquisition(dataSources, inputLineChan, &acquisTomb); err != nil {
if err := acquisition.StartAcquisition(context.TODO(), dataSources, inputLineChan, &acquisTomb); err != nil {
return fmt.Errorf("starting acquisition error: %w", err)
}

Expand Down
56 changes: 42 additions & 14 deletions pkg/acquisition/acquisition.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acquisition

import (
"context"
"errors"
"fmt"
"io"
Expand Down Expand Up @@ -39,17 +40,17 @@

// The interface each datasource must implement
type DataSource interface {
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks.
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetUuid() string // Get the unique identifier of the datasource
GetMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module
GetAggregMetrics() []prometheus.Collector // Returns pointers to metrics that are managed by the module (aggregated mode, limits cardinality)
UnmarshalConfig([]byte) error // Decode and pre-validate the YAML datasource - anything that can be checked before runtime
Configure([]byte, *log.Entry, int) error // Complete the YAML datasource configuration and perform runtime checks.
ConfigureByDSN(string, map[string]string, *log.Entry, string) error // Configure the datasource
GetMode() string // Get the mode (TAIL, CAT or SERVER)
GetName() string // Get the name of the module
OneShotAcquisition(chan types.Event, *tomb.Tomb) error // Start one shot acquisition(eg, cat a file)
StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error // Start live acquisition (eg, tail a file)
CanRun() error // Whether the datasource can run or not (eg, journalctl on BSD is a non-sense)
GetUuid() string // Get the unique identifier of the datasource
Dump() interface{}
}

Expand Down Expand Up @@ -242,8 +243,10 @@

for {
var sub configuration.DataSourceCommonCfg
err = dec.Decode(&sub)

idx += 1

err = dec.Decode(&sub)
if err != nil {
if !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("failed to yaml decode %s: %w", acquisFile, err)
Expand Down Expand Up @@ -283,36 +286,44 @@

uniqueId := uuid.NewString()
sub.UniqueId = uniqueId

src, err := DataSourceConfigure(sub, metrics_level)
if err != nil {
var dserr *DataSourceUnavailableError
if errors.As(err, &dserr) {
log.Error(err)
continue
}

return nil, fmt.Errorf("while configuring datasource of type %s from %s (position: %d): %w", sub.Source, acquisFile, idx, err)
}

if sub.TransformExpr != "" {
vm, err := expr.Compile(sub.TransformExpr, exprhelpers.GetExprOptions(map[string]interface{}{"evt": &types.Event{}})...)
if err != nil {
return nil, fmt.Errorf("while compiling transform expression '%s' for datasource %s in %s (position: %d): %w", sub.TransformExpr, sub.Source, acquisFile, idx, err)
}

transformRuntimes[uniqueId] = vm
}

sources = append(sources, *src)
}
}

return sources, nil
}

func GetMetrics(sources []DataSource, aggregated bool) error {
var metrics []prometheus.Collector

Check warning on line 319 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L319

Added line #L319 was not covered by tests
for i := range sources {
if aggregated {
metrics = sources[i].GetMetrics()
} else {
metrics = sources[i].GetAggregMetrics()
}

for _, metric := range metrics {
if err := prometheus.Register(metric); err != nil {
if _, ok := err.(prometheus.AlreadyRegisteredError); !ok {
Expand All @@ -322,47 +333,56 @@
}
}
}

return nil
}

func transform(transformChan chan types.Event, output chan types.Event, AcquisTomb *tomb.Tomb, transformRuntime *vm.Program, logger *log.Entry) {
defer trace.CatchPanic("crowdsec/acquis")
logger.Infof("transformer started")

Check warning on line 343 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L343

Added line #L343 was not covered by tests
for {
select {
case <-AcquisTomb.Dying():
logger.Debugf("transformer is dying")
return
case evt := <-transformChan:
logger.Tracef("Received event %s", evt.Line.Raw)

Check warning on line 351 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L351

Added line #L351 was not covered by tests
out, err := expr.Run(transformRuntime, map[string]interface{}{"evt": &evt})
if err != nil {
logger.Errorf("while running transform expression: %s, sending event as-is", err)
output <- evt
}

if out == nil {
logger.Errorf("transform expression returned nil, sending event as-is")
output <- evt
}

switch v := out.(type) {
case string:
logger.Tracef("transform expression returned %s", v)
evt.Line.Raw = v
output <- evt
case []interface{}:
logger.Tracef("transform expression returned %v", v) //nolint:asasalint // We actually want to log the slice content

Check warning on line 370 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L370

Added line #L370 was not covered by tests
for _, line := range v {
l, ok := line.(string)
if !ok {
logger.Errorf("transform expression returned []interface{}, but cannot assert an element to string")
output <- evt

Check warning on line 376 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L376

Added line #L376 was not covered by tests
continue
}

evt.Line.Raw = l
output <- evt
}
case []string:
logger.Tracef("transform expression returned %v", v)

Check warning on line 385 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L385

Added line #L385 was not covered by tests
for _, line := range v {
evt.Line.Raw = line
output <- evt
Expand All @@ -375,7 +395,7 @@
}
}

func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
func StartAcquisition(ctx context.Context, sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {
// Don't wait if we have no sources, as it will hang forever
if len(sources) == 0 {
return nil
Expand All @@ -387,32 +407,40 @@

AcquisTomb.Go(func() error {
defer trace.CatchPanic("crowdsec/acquis")

var err error

outChan := output

log.Debugf("datasource %s UUID: %s", subsrc.GetName(), subsrc.GetUuid())

if transformRuntime, ok := transformRuntimes[subsrc.GetUuid()]; ok {
log.Infof("transform expression found for datasource %s", subsrc.GetName())

Check warning on line 419 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L419

Added line #L419 was not covered by tests
transformChan := make(chan types.Event)
outChan = transformChan
transformLogger := log.WithFields(log.Fields{
"component": "transform",
"datasource": subsrc.GetName(),
})

Check warning on line 426 in pkg/acquisition/acquisition.go

View check run for this annotation

Codecov / codecov/patch

pkg/acquisition/acquisition.go#L426

Added line #L426 was not covered by tests
AcquisTomb.Go(func() error {
transform(outChan, output, AcquisTomb, transformRuntime, transformLogger)
return nil
})
}

if subsrc.GetMode() == configuration.TAIL_MODE {
err = subsrc.StreamingAcquisition(outChan, AcquisTomb)
err = subsrc.StreamingAcquisition(ctx, outChan, AcquisTomb)
} else {
err = subsrc.OneShotAcquisition(outChan, AcquisTomb)
}

if err != nil {
// if one of the acqusition returns an error, we kill the others to properly shutdown
AcquisTomb.Kill(err)
}

return nil
})
}
Expand Down
52 changes: 30 additions & 22 deletions pkg/acquisition/acquisition_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package acquisition

import (
"context"
"errors"
"fmt"
"strings"
Expand Down Expand Up @@ -56,14 +57,16 @@ func (f *MockSource) Configure(cfg []byte, logger *log.Entry, metricsLevel int)

return nil
}
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) GetName() string { return "mock" }
func (f *MockSource) GetMode() string { return f.Mode }
func (f *MockSource) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSource) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return nil
}
func (f *MockSource) CanRun() error { return nil }
func (f *MockSource) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSource) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSource) Dump() interface{} { return f }
func (f *MockSource) GetName() string { return "mock" }
func (f *MockSource) ConfigureByDSN(string, map[string]string, *log.Entry, string) error {
return errors.New("not supported")
}
Expand Down Expand Up @@ -327,7 +330,7 @@ func (f *MockCat) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) erro
return nil
}

func (f *MockCat) StreamingAcquisition(chan types.Event, *tomb.Tomb) error {
func (f *MockCat) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return errors.New("can't run in tail")
}
func (f *MockCat) CanRun() error { return nil }
Expand Down Expand Up @@ -366,7 +369,7 @@ func (f *MockTail) OneShotAcquisition(out chan types.Event, tomb *tomb.Tomb) err
return errors.New("can't run in cat mode")
}

func (f *MockTail) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTail) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand All @@ -389,14 +392,15 @@ func (f *MockTail) GetUuid() string { return "" }
// func StartAcquisition(sources []DataSource, output chan types.Event, AcquisTomb *tomb.Tomb) error {

func TestStartAcquisitionCat(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockCat{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
Expand All @@ -416,14 +420,15 @@ READLOOP:
}

func TestStartAcquisitionTail(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockTail{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil {
t.Errorf("unexpected error")
}
}()
Expand All @@ -450,7 +455,7 @@ type MockTailError struct {
MockTail
}

func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb) error {
func (f *MockTailError) StreamingAcquisition(ctx context.Context, out chan types.Event, t *tomb.Tomb) error {
for range 10 {
evt := types.Event{}
evt.Line.Src = "test"
Expand All @@ -463,14 +468,15 @@ func (f *MockTailError) StreamingAcquisition(out chan types.Event, t *tomb.Tomb)
}

func TestStartAcquisitionTailError(t *testing.T) {
ctx := context.Background()
sources := []DataSource{
&MockTailError{},
}
out := make(chan types.Event)
acquisTomb := tomb.Tomb{}

go func() {
if err := StartAcquisition(sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
if err := StartAcquisition(ctx, sources, out, &acquisTomb); err != nil && err.Error() != "got error (tomb)" {
t.Errorf("expected error, got '%s'", err)
}
}()
Expand Down Expand Up @@ -501,14 +507,16 @@ func (f *MockSourceByDSN) UnmarshalConfig(cfg []byte) error { return nil }
func (f *MockSourceByDSN) Configure(cfg []byte, logger *log.Entry, metricsLevel int) error {
return nil
}
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) CanRun() error { return nil }
func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) Dump() interface{} { return f }
func (f *MockSourceByDSN) GetName() string { return "mockdsn" }
func (f *MockSourceByDSN) GetMode() string { return f.Mode }
func (f *MockSourceByDSN) OneShotAcquisition(chan types.Event, *tomb.Tomb) error { return nil }
func (f *MockSourceByDSN) StreamingAcquisition(context.Context, chan types.Event, *tomb.Tomb) error {
return nil
}
func (f *MockSourceByDSN) CanRun() error { return nil }
func (f *MockSourceByDSN) GetMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) GetAggregMetrics() []prometheus.Collector { return nil }
func (f *MockSourceByDSN) Dump() interface{} { return f }
func (f *MockSourceByDSN) GetName() string { return "mockdsn" }
func (f *MockSourceByDSN) ConfigureByDSN(dsn string, labels map[string]string, logger *log.Entry, uuid string) error {
dsn = strings.TrimPrefix(dsn, "mockdsn://")
if dsn != "test_expect" {
Expand Down
Loading