From 0d6ad638cf74b55b9f5679ba3a5487dc482dc5cf Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Tue, 10 Oct 2023 10:34:13 +0100 Subject: [PATCH] loki.process: add decolorise stage support (#5416) --- CHANGELOG.md | 5 ++ component/loki/process/stages/decolorize.go | 39 ++++++++++++++ .../loki/process/stages/decolorize_test.go | 53 ++++++++++++++++++ component/loki/process/stages/extensions.go | 2 +- component/loki/process/stages/pipeline.go | 32 +++++------ component/loki/process/stages/stage.go | 54 +++++++++++-------- .../promtailconvert/internal/build/stages.go | 5 +- .../testdata/pipeline_stages_part2.river | 2 + .../testdata/pipeline_stages_part2.yaml | 1 + .../pipeline_stages_unsupported.diags | 1 - .../testdata/pipeline_stages_unsupported.yaml | 1 - .../flow/reference/components/loki.process.md | 33 ++++++++++-- 12 files changed, 182 insertions(+), 46 deletions(-) create mode 100644 component/loki/process/stages/decolorize.go create mode 100644 component/loki/process/stages/decolorize_test.go diff --git a/CHANGELOG.md b/CHANGELOG.md index 64815911e36a..c5d51cef7639 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -10,6 +10,11 @@ internal API changes are not present. Main (unreleased) ----------------- +### Features + +- Added a new `stage.decolorize` stage to `loki.process` component which + allows to strip ANSI color codes from the log lines. (@thampiotr) + ### Bugfixes - Fixed an issue where `loki.process` validation for stage `metric.counter` was diff --git a/component/loki/process/stages/decolorize.go b/component/loki/process/stages/decolorize.go new file mode 100644 index 000000000000..e20864ba1761 --- /dev/null +++ b/component/loki/process/stages/decolorize.go @@ -0,0 +1,39 @@ +package stages + +// NOTE: This code is copied from Promtail (07cbef92268aecc0f20d1791a6df390c2df5c072) with changes kept to the minimum. + +import ( + "github.com/grafana/loki/pkg/logql/log" +) + +type DecolorizeConfig struct{} + +type decolorizeStage struct{} + +func newDecolorizeStage(_ DecolorizeConfig) (Stage, error) { + return &decolorizeStage{}, nil +} + +// Run implements Stage +func (m *decolorizeStage) Run(in chan Entry) chan Entry { + decolorizer, _ := log.NewDecolorizer() + out := make(chan Entry) + go func() { + defer close(out) + for e := range in { + decolorizedLine, _ := decolorizer.Process( + e.Timestamp.Unix(), + []byte(e.Entry.Line), + nil, + ) + e.Entry.Line = string(decolorizedLine) + out <- e + } + }() + return out +} + +// Name implements Stage +func (m *decolorizeStage) Name() string { + return StageTypeDecolorize +} diff --git a/component/loki/process/stages/decolorize_test.go b/component/loki/process/stages/decolorize_test.go new file mode 100644 index 000000000000..2df016885156 --- /dev/null +++ b/component/loki/process/stages/decolorize_test.go @@ -0,0 +1,53 @@ +package stages + +// NOTE: This code is copied from Promtail (07cbef92268aecc0f20d1791a6df390c2df5c072) with changes kept to the minimum. + +import ( + "testing" + "time" + + "github.com/prometheus/client_golang/prometheus" + "github.com/stretchr/testify/assert" + + util_log "github.com/grafana/loki/pkg/util/log" +) + +var testDecolorizePipeline = ` +stage.decolorize {} +` + +func TestPipeline_Decolorize(t *testing.T) { + t.Parallel() + + tests := map[string]struct { + config string + entry string + expectedEntry string + }{ + "successfully run pipeline on non-colored text": { + testDecolorizePipeline, + "sample text", + "sample text", + }, + "successfully run pipeline on colored text": { + testDecolorizePipeline, + "\033[0;32mgreen\033[0m \033[0;31mred\033[0m", + "green red", + }, + } + + for testName, testData := range tests { + testData := testData + + t.Run(testName, func(t *testing.T) { + t.Parallel() + + pl, err := NewPipeline(util_log.Logger, loadConfig(testData.config), nil, prometheus.DefaultRegisterer) + if err != nil { + t.Fatal(err) + } + out := processEntries(pl, newEntry(nil, nil, testData.entry, time.Now()))[0] + assert.Equal(t, testData.expectedEntry, out.Line) + }) + } +} diff --git a/component/loki/process/stages/extensions.go b/component/loki/process/stages/extensions.go index 87435e67fb92..d4ce94ede0a5 100644 --- a/component/loki/process/stages/extensions.go +++ b/component/loki/process/stages/extensions.go @@ -101,7 +101,7 @@ var _ Stage = (*cri)(nil) // Name implement the Stage interface. func (c *cri) Name() string { - return "cri" + return StageTypeCRI } // implements Stage interface diff --git a/component/loki/process/stages/pipeline.go b/component/loki/process/stages/pipeline.go index 02d47a817e97..4be53d34acb7 100644 --- a/component/loki/process/stages/pipeline.go +++ b/component/loki/process/stages/pipeline.go @@ -19,28 +19,30 @@ import ( // We define these as pointers types so we can use reflection to check that // exactly one is set. type StageConfig struct { + //TODO(thampiotr): sync these with new stages + CRIConfig *CRIConfig `river:"cri,block,optional"` + DecolorizeConfig *DecolorizeConfig `river:"decolorize,block,optional"` + DockerConfig *DockerConfig `river:"docker,block,optional"` + DropConfig *DropConfig `river:"drop,block,optional"` + GeoIPConfig *GeoIPConfig `river:"geoip,block,optional"` JSONConfig *JSONConfig `river:"json,block,optional"` - LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"` - LabelsConfig *LabelsConfig `river:"labels,block,optional"` - StructuredMetadata *LabelsConfig `river:"structured_metadata,block,optional"` LabelAllowConfig *LabelAllowConfig `river:"label_keep,block,optional"` LabelDropConfig *LabelDropConfig `river:"label_drop,block,optional"` - StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"` - DockerConfig *DockerConfig `river:"docker,block,optional"` - CRIConfig *CRIConfig `river:"cri,block,optional"` - RegexConfig *RegexConfig `river:"regex,block,optional"` - TimestampConfig *TimestampConfig `river:"timestamp,block,optional"` - OutputConfig *OutputConfig `river:"output,block,optional"` - ReplaceConfig *ReplaceConfig `river:"replace,block,optional"` - MultilineConfig *MultilineConfig `river:"multiline,block,optional"` + LabelsConfig *LabelsConfig `river:"labels,block,optional"` + LimitConfig *LimitConfig `river:"limit,block,optional"` + LogfmtConfig *LogfmtConfig `river:"logfmt,block,optional"` MatchConfig *MatchConfig `river:"match,block,optional"` - DropConfig *DropConfig `river:"drop,block,optional"` + MetricsConfig *MetricsConfig `river:"metrics,block,optional"` + MultilineConfig *MultilineConfig `river:"multiline,block,optional"` + OutputConfig *OutputConfig `river:"output,block,optional"` PackConfig *PackConfig `river:"pack,block,optional"` + RegexConfig *RegexConfig `river:"regex,block,optional"` + ReplaceConfig *ReplaceConfig `river:"replace,block,optional"` + StaticLabelsConfig *StaticLabelsConfig `river:"static_labels,block,optional"` + StructuredMetadata *LabelsConfig `river:"structured_metadata,block,optional"` TemplateConfig *TemplateConfig `river:"template,block,optional"` TenantConfig *TenantConfig `river:"tenant,block,optional"` - LimitConfig *LimitConfig `river:"limit,block,optional"` - MetricsConfig *MetricsConfig `river:"metrics,block,optional"` - GeoIPConfig *GeoIPConfig `river:"geoip,block,optional"` + TimestampConfig *TimestampConfig `river:"timestamp,block,optional"` } var rateLimiter *rate.Limiter diff --git a/component/loki/process/stages/stage.go b/component/loki/process/stages/stage.go index 94300eb34c32..494408ce14f0 100644 --- a/component/loki/process/stages/stage.go +++ b/component/loki/process/stages/stage.go @@ -5,6 +5,7 @@ package stages // new code without being able to slowly review, examine and test them. import ( + "fmt" "os" "runtime" "time" @@ -18,29 +19,34 @@ import ( // TODO(@tpaschalis) Let's use this as the list of stages we need to port over. const ( - StageTypeJSON = "json" - StageTypeLogfmt = "logfmt" - StageTypeRegex = "regex" - StageTypeReplace = "replace" - StageTypeMetric = "metrics" - StageTypeLabel = "labels" + StageTypeCRI = "cri" + StageTypeDecolorize = "decolorize" + StageTypeDocker = "docker" + StageTypeDrop = "drop" + //TODO(thampiotr): Add support for eventlogmessage stage + StageTypeEventLogMessage = "eventlogmessage" + StageTypeGeoIP = "geoip" + StageTypeJSON = "json" + StageTypeLabel = "labels" + StageTypeLabelAllow = "labelallow" + StageTypeLabelDrop = "labeldrop" + StageTypeLimit = "limit" + StageTypeLogfmt = "logfmt" + StageTypeMatch = "match" + StageTypeMetric = "metrics" + StageTypeMultiline = "multiline" + StageTypeOutput = "output" + StageTypePack = "pack" + StageTypePipeline = "pipeline" + StageTypeRegex = "regex" + StageTypeReplace = "replace" + //TODO(thampiotr): Add support for sampling stage + StageTypeSampling = "sampling" + StageTypeStaticLabels = "static_labels" StageTypeStructuredMetadata = "structured_metadata" - StageTypeLabelDrop = "labeldrop" - StageTypeTimestamp = "timestamp" - StageTypeOutput = "output" - StageTypeDocker = "docker" - StageTypeCRI = "cri" - StageTypeMatch = "match" StageTypeTemplate = "template" - StageTypePipeline = "pipeline" StageTypeTenant = "tenant" - StageTypeDrop = "drop" - StageTypeLimit = "limit" - StageTypeMultiline = "multiline" - StageTypePack = "pack" - StageTypeLabelAllow = "labelallow" - StageTypeStaticLabels = "static_labels" - StageTypeGeoIP = "geoip" + StageTypeTimestamp = "timestamp" ) // Processor takes an existing set of labels, timestamp and log entry and returns either a possibly mutated @@ -222,9 +228,13 @@ func New(logger log.Logger, jobName *string, cfg StageConfig, registerer prometh if err != nil { return nil, err } - + case cfg.DecolorizeConfig != nil: + s, err = newDecolorizeStage(*cfg.DecolorizeConfig) + if err != nil { + return nil, err + } default: - panic("unreachable; should have decoded into one of the StageConfig fields") + panic(fmt.Sprintf("unreachable; should have decoded into one of the StageConfig fields: %+v", cfg)) } return s, nil } diff --git a/converter/internal/promtailconvert/internal/build/stages.go b/converter/internal/promtailconvert/internal/build/stages.go index 79ee12a1e55c..80778c475670 100644 --- a/converter/internal/promtailconvert/internal/build/stages.go +++ b/converter/internal/promtailconvert/internal/build/stages.go @@ -126,9 +126,8 @@ func convertEventLogMessage(diags *diag.Diagnostics) (stages.StageConfig, bool) return stages.StageConfig{}, false } -func convertDecolorize(diags *diag.Diagnostics) (stages.StageConfig, bool) { - diags.Add(diag.SeverityLevelError, "pipeline_stages.decolorize is not supported") - return stages.StageConfig{}, false +func convertDecolorize(_ *diag.Diagnostics) (stages.StageConfig, bool) { + return stages.StageConfig{DecolorizeConfig: &stages.DecolorizeConfig{}}, true } func convertStaticLabels(cfg interface{}, diags *diag.Diagnostics) (stages.StageConfig, bool) { diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river index 9b5a68542244..ea849f30cbde 100644 --- a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.river @@ -79,6 +79,8 @@ loki.process "example" { source = "internet" db_type = "mmdb" } + + stage.decolorize { } } loki.source.file "example" { diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml index 2093da78c425..078d62268259 100644 --- a/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_part2.yaml @@ -53,6 +53,7 @@ scrape_configs: db: /usr/share/GeoIP/GeoLite2-City.mmdb source: internet db_type: mmdb + - decolorize: { } kubernetes_sd_configs: - role: pod kubeconfig_file: /home/toby/.kube/config diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.diags b/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.diags index 6c05a3f9e2b0..58ccf371d93c 100644 --- a/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.diags +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.diags @@ -1,3 +1,2 @@ (Error) pipeline_stages.sampling is currently not supported: map[rate:100] -(Error) pipeline_stages.decolorize is not supported (Error) pipeline_stages.eventlogmessage is not supported \ No newline at end of file diff --git a/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.yaml b/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.yaml index c5689890df17..4e6d5a73d0b6 100644 --- a/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.yaml +++ b/converter/internal/promtailconvert/testdata/pipeline_stages_unsupported.yaml @@ -5,7 +5,6 @@ scrape_configs: pipeline_stages: - sampling: rate: 100 - - decolorize: { } - eventlogmessage: { } kubernetes_sd_configs: - role: pod diff --git a/docs/sources/flow/reference/components/loki.process.md b/docs/sources/flow/reference/components/loki.process.md index 6926ef02f5c8..5e0136846b3b 100644 --- a/docs/sources/flow/reference/components/loki.process.md +++ b/docs/sources/flow/reference/components/loki.process.md @@ -51,17 +51,18 @@ loki.process "LABEL" { The following blocks are supported inside the definition of `loki.process`: | Hierarchy | Block | Description | Required | -|---------------------------|-------------------------------|------------------------------------------------------| -------- | +|---------------------------|-------------------------------|------------------------------------------------------|----------| | stage.cri | [stage.cri][] | Configures a pre-defined CRI-format pipeline. | no | +| stage.decolorize | [stage.decolorize][] | Strips ANSI color codes from log lines. | no | | stage.docker | [stage.docker][] | Configures a pre-defined Docker log format pipeline. | no | | stage.drop | [stage.drop][] | Configures a `drop` processing stage. | no | | stage.json | [stage.json][] | Configures a JSON processing stage. | no | | stage.label_drop | [stage.label_drop][] | Configures a `label_drop` processing stage. | no | | stage.label_keep | [stage.label_keep][] | Configures a `label_keep` processing stage. | no | -| stage.labels | [stage.labels][] | Configures a `labels` processing stage. | no | +| stage.labels | [stage.labels][] | Configures a `labels` processing stage. | no | | stage.structured_metadata | [stage.structured_metadata][] | Configures a structured metadata processing stage. | no | | stage.limit | [stage.limit][] | Configures a `limit` processing stage. | no | -| stage.logfmt | [stage.logfmt][] | Configures a `logfmt` processing stage. | no | +| stage.logfmt | [stage.logfmt][] | Configures a `logfmt` processing stage. | no | | stage.match | [stage.match][] | Configures a `match` processing stage. | no | | stage.metrics | [stage.metrics][] | Configures a `metrics` stage. | no | | stage.multiline | [stage.multiline][] | Configures a `multiline` processing stage. | no | @@ -80,6 +81,7 @@ A user can provide any number of these stage blocks nested inside file. [stage.cri]: #stagecri-block +[stage.decolorize]: #stagedecolorize-block [stage.docker]: #stagedocker-block [stage.drop]: #stagedrop-block [stage.json]: #stagejson-block @@ -141,6 +143,31 @@ stream: stdout timestamp: 2019-04-30T02:12:41.8443515 ``` +### stage.decolorize block + +The `stage.decolorize` strips ANSI color codes from the log lines, thus making +it easier to parse logs further. + +The `stage.decolorize` block does not support any arguments or inner blocks, so +it is always empty. + +```river +stage.decolorize {} +``` + +`stage.decolorize` turns each line having a color code into a non-colored one, +for example: + +``` +[2022-11-04 22:17:57.811] \033[0;32http\033[0m: GET /_health (0 ms) 204 +``` + +is turned into + +``` +[2022-11-04 22:17:57.811] http: GET /_health (0 ms) 204 +``` + ### stage.docker block The `stage.docker` inner block enables a predefined pipeline which reads log lines in