From 21fcb1988295089727a1d894b6c683560c674946 Mon Sep 17 00:00:00 2001 From: "dependabot[bot]" <49699333+dependabot[bot]@users.noreply.github.com> Date: Mon, 30 Dec 2024 10:33:53 +0200 Subject: [PATCH 01/10] build(deps): bump helm/kind-action from 1.10.0 to 1.11.0 (#2289) Bumps [helm/kind-action](https://github.com/helm/kind-action) from 1.10.0 to 1.11.0. - [Release notes](https://github.com/helm/kind-action/releases) - [Commits](https://github.com/helm/kind-action/compare/v1.10.0...v1.11.0) --- updated-dependencies: - dependency-name: helm/kind-action dependency-type: direct:production update-type: version-update:semver-minor ... Signed-off-by: dependabot[bot] Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> --- .github/workflows/helm-test.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/helm-test.yml b/.github/workflows/helm-test.yml index 43fc01757e..aec51de3f0 100644 --- a/.github/workflows/helm-test.yml +++ b/.github/workflows/helm-test.yml @@ -71,7 +71,7 @@ jobs: run: ct lint --config ./operations/helm/ct.yaml - name: Create kind cluster - uses: helm/kind-action@v1.10.0 + uses: helm/kind-action@v1.11.0 if: steps.list-changed.outputs.changed == 'true' - name: Add dependency chart repos From 8a4af8ba9de19e36f9d666b9eb2b8273c0b3e933 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 Jan 2025 16:32:34 +0100 Subject: [PATCH 02/10] integration tests tempo: set fixed version instead of latest (#2325) --- internal/cmd/integration-tests/docker-compose.yaml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/internal/cmd/integration-tests/docker-compose.yaml b/internal/cmd/integration-tests/docker-compose.yaml index 1632286bab..451fd1b779 100644 --- a/internal/cmd/integration-tests/docker-compose.yaml +++ b/internal/cmd/integration-tests/docker-compose.yaml @@ -12,7 +12,7 @@ services: - "9009:9009" tempo: - image: grafana/tempo:latest + image: grafana/tempo:2.6.1 command: [ "-config.file=/etc/tempo.yaml" ] volumes: - ./configs/tempo/tempo.yaml:/etc/tempo.yaml From 8568931e88b443f9d6f752f51012c76b5e4ec5b3 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Thu, 2 Jan 2025 16:32:48 +0100 Subject: [PATCH 03/10] Make otel scheduler sync (#2262) * make otel scheduler sync * Don't start components until Run is called (#2296) * Don't start components until Run is called * Update consumers after stopping the component * Minor fixes * Update internal/component/otelcol/internal/scheduler/scheduler.go Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> * test and docs * add resume call on exist for robustness --------- Co-authored-by: Paulin Todev Co-authored-by: Piotr <17101802+thampiotr@users.noreply.github.com> --- internal/component/otelcol/auth/auth.go | 2 +- .../component/otelcol/connector/connector.go | 7 +- .../component/otelcol/exporter/exporter.go | 7 +- .../component/otelcol/extension/extension.go | 2 +- .../otelcol/internal/scheduler/scheduler.go | 137 ++++++++++-------- .../internal/scheduler/scheduler_test.go | 37 +++-- .../component/otelcol/processor/processor.go | 8 +- .../component/otelcol/receiver/receiver.go | 2 +- 8 files changed, 115 insertions(+), 87 deletions(-) diff --git a/internal/component/otelcol/auth/auth.go b/internal/component/otelcol/auth/auth.go index eb20434dec..9e9cc5fdf2 100644 --- a/internal/component/otelcol/auth/auth.go +++ b/internal/component/otelcol/auth/auth.go @@ -198,7 +198,7 @@ func (a *Auth) Update(args component.Arguments) error { }) // Schedule the components to run once our component is running. - a.sched.Schedule(host, components...) + a.sched.Schedule(a.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/connector/connector.go b/internal/component/otelcol/connector/connector.go index bb9b3a151e..643730000f 100644 --- a/internal/component/otelcol/connector/connector.go +++ b/internal/component/otelcol/connector/connector.go @@ -214,9 +214,12 @@ func (p *Connector) Update(args component.Arguments) error { return errors.New("unsupported connector type") } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesConnector, metricsConnector, logsConnector) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/exporter/exporter.go b/internal/component/otelcol/exporter/exporter.go index 63e0f1cdd0..4f19b5a625 100644 --- a/internal/component/otelcol/exporter/exporter.go +++ b/internal/component/otelcol/exporter/exporter.go @@ -253,9 +253,12 @@ func (e *Exporter) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + } + // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) - e.consumer.SetConsumers(tracesExporter, metricsExporter, logsExporter) + e.sched.Schedule(e.ctx, updateConsumersFunc, host, components...) return nil } diff --git a/internal/component/otelcol/extension/extension.go b/internal/component/otelcol/extension/extension.go index 1e494e71c1..7eca6e7349 100644 --- a/internal/component/otelcol/extension/extension.go +++ b/internal/component/otelcol/extension/extension.go @@ -162,7 +162,7 @@ func (e *Extension) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - e.sched.Schedule(host, components...) + e.sched.Schedule(e.ctx, func() {}, host, components...) return nil } diff --git a/internal/component/otelcol/internal/scheduler/scheduler.go b/internal/component/otelcol/internal/scheduler/scheduler.go index 2c731616a5..fbf70e22c0 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler.go +++ b/internal/component/otelcol/internal/scheduler/scheduler.go @@ -37,9 +37,7 @@ type Scheduler struct { schedMut sync.Mutex schedComponents []otelcomponent.Component // Most recently created components host otelcomponent.Host - - // newComponentsCh is written to when schedComponents gets updated. - newComponentsCh chan struct{} + running bool // onPause is called when scheduler is making changes to running components. onPause func() @@ -51,89 +49,102 @@ type Scheduler struct { // Schedule to schedule components to run. func New(l log.Logger) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: func() {}, - onResume: func() {}, + log: l, + onPause: func() {}, + onResume: func() {}, } } -// NewWithPauseCallbacks is like New, but allows to specify onPause and onResume callbacks. The scheduler is assumed to -// start paused and only when its components are scheduled, it will call onResume. From then on, each update to running -// components via Schedule method will trigger a call to onPause and then onResume. When scheduler is shutting down, it -// will call onResume as a last step. +// NewWithPauseCallbacks is like New, but allows to specify onPause() and onResume() callbacks. +// The callbacks are a useful way of pausing and resuming the ingestion of data by the components: +// * onPause() is called before the scheduler stops the components. +// * onResume() is called after the scheduler starts the components. +// The callbacks are used by the Schedule() and Run() functions. +// The scheduler is assumed to start paused; Schedule() won't call onPause() if Run() was never ran. func NewWithPauseCallbacks(l log.Logger, onPause func(), onResume func()) *Scheduler { return &Scheduler{ - log: l, - newComponentsCh: make(chan struct{}, 1), - onPause: onPause, - onResume: onResume, + log: l, + onPause: onPause, + onResume: onResume, } } -// Schedule schedules a new set of OpenTelemetry Components to run. Components -// will only be scheduled when the Scheduler is running. +// Schedule a new set of OpenTelemetry Components to run. +// Components will only be started when the Scheduler's Run() function has been called. +// +// Schedule() completely overrides the set of previously running components. +// Components which have been removed since the last call to Schedule will be stopped. // -// Schedule completely overrides the set of previously running components; -// components which have been removed since the last call to Schedule will be -// stopped. -func (cs *Scheduler) Schedule(h otelcomponent.Host, cc ...otelcomponent.Component) { +// updateConsumers is called after the components are paused and before starting the new components. +// It is expected that this function will set the new set of consumers to the wrapping consumer that's assigned to the Alloy component. +func (cs *Scheduler) Schedule(ctx context.Context, updateConsumers func(), h otelcomponent.Host, cc ...otelcomponent.Component) { cs.schedMut.Lock() defer cs.schedMut.Unlock() - cs.schedComponents = cc + // If the scheduler isn't running yet, just update the state. + // That way the Run function is ready to go. + if !cs.running { + cs.schedComponents = cc + cs.host = h + updateConsumers() + return + } + + // The new components must be setup in this order: + // + // 1. Pause consumers + // 2. Stop the old components + // 3. Change the consumers + // 4. Start the new components + // 5. Start the consumer + // + // There could be race conditions if the order above is not followed. + + // 1. Pause consumers + // This prevents them from accepting new data while we're shutting them down. + cs.onPause() + + // 2. Stop the old components + cs.stopComponents(ctx, cs.schedComponents...) + + // 3. Change the consumers + // This can only be done after stopping the pervious components and before starting the new ones. + updateConsumers() + + // 4. Start the new components + level.Debug(cs.log).Log("msg", "scheduling otelcol components", "count", len(cs.schedComponents)) + cs.schedComponents = cs.startComponents(ctx, h, cc...) cs.host = h + //TODO: What if the trace component failed but the metrics one didn't? Should we resume all consumers? - select { - case cs.newComponentsCh <- struct{}{}: - // Queued new message. - default: - // A message is already queued for refreshing running components so we - // don't have to do anything here. - } + // 5. Start the consumer + // The new components will now start accepting telemetry data. + cs.onResume() } -// Run starts the Scheduler. Run will watch for schedule components to appear -// and run them, terminating previously running components if they exist. +// Run starts the Scheduler and stops the components when the context is cancelled. func (cs *Scheduler) Run(ctx context.Context) error { - firstRun := true - var components []otelcomponent.Component + cs.schedMut.Lock() + cs.running = true + + cs.onPause() + cs.startComponents(ctx, cs.host, cs.schedComponents...) + cs.onResume() + + cs.schedMut.Unlock() // Make sure we terminate all of our running components on shutdown. defer func() { - if !firstRun { // always handle the callbacks correctly - cs.onPause() - } - cs.stopComponents(context.Background(), components...) + cs.schedMut.Lock() + defer cs.schedMut.Unlock() + cs.stopComponents(context.Background(), cs.schedComponents...) + // this Resume call should not be needed but is added for robustness to ensure that + // it does not ever exit in "paused" state. cs.onResume() }() - // Wait for a write to cs.newComponentsCh. The initial list of components is - // always empty so there's nothing to do until cs.newComponentsCh is written - // to. - for { - select { - case <-ctx.Done(): - return nil - case <-cs.newComponentsCh: - if !firstRun { - cs.onPause() // do not pause on first run - } else { - firstRun = false - } - // Stop the old components before running new scheduled ones. - cs.stopComponents(ctx, components...) - - cs.schedMut.Lock() - components = cs.schedComponents - host := cs.host - cs.schedMut.Unlock() - - level.Debug(cs.log).Log("msg", "scheduling components", "count", len(components)) - components = cs.startComponents(ctx, host, components...) - cs.onResume() - } - } + <-ctx.Done() + return nil } func (cs *Scheduler) stopComponents(ctx context.Context, cc ...otelcomponent.Component) { diff --git a/internal/component/otelcol/internal/scheduler/scheduler_test.go b/internal/component/otelcol/internal/scheduler/scheduler_test.go index 469d679b7f..c034e262f8 100644 --- a/internal/component/otelcol/internal/scheduler/scheduler_test.go +++ b/internal/component/otelcol/internal/scheduler/scheduler_test.go @@ -5,10 +5,11 @@ import ( "testing" "time" + "go.uber.org/atomic" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" otelcomponent "go.opentelemetry.io/collector/component" - "go.uber.org/atomic" "github.com/grafana/alloy/internal/component/otelcol/internal/scheduler" "github.com/grafana/alloy/internal/runtime/componenttest" @@ -32,7 +33,7 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started trigger once it is // running. component, started, _ := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) require.NoError(t, started.Wait(5*time.Second), "component did not start") }) @@ -52,12 +53,12 @@ func TestScheduler(t *testing.T) { // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(context.Background(), func() {}, h, component) // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(context.Background(), func() {}, h) require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") }) @@ -81,26 +82,32 @@ func TestScheduler(t *testing.T) { require.NoError(t, err) }() + toInt := func(a *atomic.Int32) int { return int(a.Load()) } + + // The Run function starts the components. They should be paused and then resumed. + require.EventuallyWithT(t, func(t *assert.CollectT) { + assert.Equal(t, 1, toInt(pauseCalls), "pause callbacks should be called on run") + assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on run") + }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") + // Schedule our component, which should notify the started and stopped // trigger once it starts and stops respectively. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) - - toInt := func(a *atomic.Int32) int { return int(a.Load()) } + cs.Schedule(ctx, func() {}, h, component) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 0, toInt(pauseCalls), "pause callbacks should not be called on first run") - assert.Equal(t, 1, toInt(resumeCalls), "resume callback should be called on first run") + assert.Equal(t, 2, toInt(pauseCalls), "pause callbacks should be called on schedule") + assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") // Wait for the component to start, and then unschedule all components, which // should cause our running component to terminate. require.NoError(t, started.Wait(5*time.Second), "component did not start") - cs.Schedule(h) + cs.Schedule(ctx, func() {}, h) require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 1, toInt(pauseCalls), "pause callback should be called on second run") - assert.Equal(t, 2, toInt(resumeCalls), "resume callback should be called on second run") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should be called on second schedule") + assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on second schedule") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") require.NoError(t, stopped.Wait(5*time.Second), "component did not shutdown") @@ -109,8 +116,8 @@ func TestScheduler(t *testing.T) { cancel() require.EventuallyWithT(t, func(t *assert.CollectT) { - assert.Equal(t, 2, toInt(pauseCalls), "pause callback should be called on shutdown") - assert.Equal(t, 3, toInt(resumeCalls), "resume callback should be called on shutdown") + assert.Equal(t, 3, toInt(pauseCalls), "pause callback should not be called on shutdown") + assert.Equal(t, 4, toInt(resumeCalls), "resume callback should be called on shutdown") }, 5*time.Second, 10*time.Millisecond, "pause/resume callbacks not called correctly") }) @@ -133,7 +140,7 @@ func TestScheduler(t *testing.T) { // Schedule our component which will notify our trigger when Shutdown gets // called. component, started, stopped := newTriggerComponent() - cs.Schedule(h, component) + cs.Schedule(ctx, func() {}, h, component) // Wait for the component to start, and then stop our scheduler, which // should cause our running component to terminate. diff --git a/internal/component/otelcol/processor/processor.go b/internal/component/otelcol/processor/processor.go index 5072d65233..08a7a6181c 100644 --- a/internal/component/otelcol/processor/processor.go +++ b/internal/component/otelcol/processor/processor.go @@ -237,9 +237,13 @@ func (p *Processor) Update(args component.Arguments) error { } } + updateConsumersFunc := func() { + p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + } + // Schedule the components to run once our component is running. - p.sched.Schedule(host, components...) - p.consumer.SetConsumers(tracesProcessor, metricsProcessor, logsProcessor) + p.sched.Schedule(p.ctx, updateConsumersFunc, host, components...) + return nil } diff --git a/internal/component/otelcol/receiver/receiver.go b/internal/component/otelcol/receiver/receiver.go index 80b82efb06..ff4c3c21d1 100644 --- a/internal/component/otelcol/receiver/receiver.go +++ b/internal/component/otelcol/receiver/receiver.go @@ -233,7 +233,7 @@ func (r *Receiver) Update(args component.Arguments) error { } // Schedule the components to run once our component is running. - r.sched.Schedule(host, components...) + r.sched.Schedule(r.ctx, func() {}, host, components...) return nil } From eaf4bc641a952416efeb3312d0cc6abfde1af06f Mon Sep 17 00:00:00 2001 From: Paulin Todev Date: Thu, 2 Jan 2025 19:21:42 +0200 Subject: [PATCH 04/10] Fix non-working Promtool example (#2307) The example was malformed. --- .../reference/components/prometheus/prometheus.remote_write.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/reference/components/prometheus/prometheus.remote_write.md b/docs/sources/reference/components/prometheus/prometheus.remote_write.md index d2614a2e52..a919d750f0 100644 --- a/docs/sources/reference/components/prometheus/prometheus.remote_write.md +++ b/docs/sources/reference/components/prometheus/prometheus.remote_write.md @@ -427,7 +427,7 @@ To troubleshoot, take the following steps in order: You can use [Promtool][promtool] to inspect it and find out which metric series were sent by this {{< param "PRODUCT_NAME" >}} instance since the last WAL truncation event. For example: ``` - ./promtool tsdb dump --match='{__name__=\"otelcol_connector_spanmetrics_duration_seconds_bucket\", http_method=\"GET\", job=\"ExampleJobName\"' /path/to/wal/ + ./promtool tsdb dump --match='{__name__="otelcol_connector_spanmetrics_duration_seconds_bucket", http_method="GET", job="ExampleJobName"}' /path/to/wal/ ``` [clustering]: ../../configure/clustering From df1d04cc709cfb182cc8f4d96e1fadd1b9c72ae5 Mon Sep 17 00:00:00 2001 From: Ravishankar Date: Fri, 3 Jan 2025 14:04:03 +0530 Subject: [PATCH 05/10] Performance optimization for live debugging feature (#2294) * Performance optimization for live debugging feature * Moving the config to the livedebugging block Remove config from docs * Fix test cases * Remove the config changes --- CHANGELOG.md | 1 + .../sources/reference/config-blocks/livedebugging.md | 6 +++--- internal/component/loki/process/process_test.go | 12 ++++++------ internal/web/api/api.go | 10 ++++++---- 4 files changed, 16 insertions(+), 13 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 685f799212..0ae3c90519 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -52,6 +52,7 @@ Main (unreleased) - Add three new stdlib functions to_base64, from_URLbase64 and to_URLbase64 (@ravishankar15) - Add `ignore_older_than` option for local.file_match (@ravishankar15) - Add livedebugging support for `discover.relabel` (@ravishankar15) +- Performance optimization for live debugging feature (@ravishankar15) - Upgrade `github.com/goccy/go-json` to v0.10.4, which reduces the memory consumption of an Alloy instance by 20MB. If Alloy is running certain otelcol components, this reduction will not apply. (@ptodev) diff --git a/docs/sources/reference/config-blocks/livedebugging.md b/docs/sources/reference/config-blocks/livedebugging.md index 4587d95117..af30cb4f2e 100644 --- a/docs/sources/reference/config-blocks/livedebugging.md +++ b/docs/sources/reference/config-blocks/livedebugging.md @@ -30,8 +30,8 @@ livedebugging { The following arguments are supported: -| Name | Type | Description | Default | Required | -| --------- | ------ | ----------------------------------- | ------- | -------- | -| `enabled` | `bool` | Enables the live debugging feature. | `false` | no | +| Name | Type | Description | Default | Required | +| -------------------- | ----- | --------------------------------------------------------------- | ------- | -------- | +| `enabled` | `bool`| Enables the live debugging feature. | `false` | no | [debug]: ../../../troubleshoot/debug/ diff --git a/internal/component/loki/process/process_test.go b/internal/component/loki/process/process_test.go index 30a035124d..7558b0fc6c 100644 --- a/internal/component/loki/process/process_test.go +++ b/internal/component/loki/process/process_test.go @@ -53,7 +53,7 @@ func TestJSONLabelsStage(t *testing.T) { // The third stage will set some labels from the extracted values above. // Again, if the value is empty, it is inferred that we want to use the // populate the label with extracted value of the same name. - stg := `stage.json { + stg := `stage.json { expressions = {"output" = "log", stream = "stream", timestamp = "time", "extra" = "" } drop_malformed = true } @@ -62,7 +62,7 @@ func TestJSONLabelsStage(t *testing.T) { source = "extra" } stage.labels { - values = { + values = { stream = "", user = "", ts = "timestamp", @@ -679,7 +679,7 @@ func TestLeakyUpdate(t *testing.T) { numLogsToSend := 1 cfg1 := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test1" action = "inc" @@ -688,7 +688,7 @@ func TestLeakyUpdate(t *testing.T) { }` + forwardArgs cfg2 := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test2" action = "inc" @@ -731,7 +731,7 @@ func TestMetricsStageRefresh(t *testing.T) { numLogsToSend := 3 cfgWithMetric := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test" action = "inc" @@ -776,7 +776,7 @@ func TestMetricsStageRefresh(t *testing.T) { // We try having a metric with the same name as before so that we can see if there // is some sort of double registration error for that metric. cfgWithTwoMetrics := ` - stage.metrics { + stage.metrics { metric.counter { name = "paulin_test_3" action = "inc" diff --git a/internal/web/api/api.go b/internal/web/api/api.go index 53592b744a..5164f91854 100644 --- a/internal/web/api/api.go +++ b/internal/web/api/api.go @@ -11,6 +11,7 @@ import ( "path" "strconv" "strings" + "time" "github.com/google/uuid" "github.com/gorilla/mux" @@ -165,13 +166,11 @@ func getClusteringPeersHandler(host service.Host) http.HandlerFunc { } } -func liveDebugging(host service.Host, callbackManager livedebugging.CallbackManager) http.HandlerFunc { +func liveDebugging(_ service.Host, callbackManager livedebugging.CallbackManager) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { vars := mux.Vars(r) componentID := livedebugging.ComponentID(vars["id"]) - // Buffer of 1000 entries to handle load spikes and prevent this functionality from eating up too much memory. - // TODO: in the future we may want to make this value configurable to handle heavy load dataCh := make(chan string, 1000) ctx := r.Context() @@ -200,9 +199,12 @@ func liveDebugging(host service.Host, callbackManager livedebugging.CallbackMana return } + flushTicker := time.NewTicker(time.Second) + defer func() { close(dataCh) callbackManager.DeleteCallback(id, componentID) + flushTicker.Stop() }() for { @@ -216,7 +218,7 @@ func liveDebugging(host service.Host, callbackManager livedebugging.CallbackMana if writeErr != nil { return } - // TODO: flushing at a regular interval might be better performance wise + case <-flushTicker.C: w.(http.Flusher).Flush() case <-ctx.Done(): return From f4fe420e10dbb7552de7b1e5148c1fac077310d3 Mon Sep 17 00:00:00 2001 From: Christian Simon Date: Fri, 3 Jan 2025 11:54:13 +0000 Subject: [PATCH 06/10] feat(pyroscope.java): Show debug info about profiled targets (#2304) * feat(pyroscope.java): Show debug info about profiled targets This should make it clearer, what is currently profiled. * Sort debuginfos by PID Ensure order is consistent * Collect per type byte information --- internal/component/pyroscope/java/java.go | 43 +++++++++++++++++ internal/component/pyroscope/java/loop.go | 56 ++++++++++++++++++++++- 2 files changed, 98 insertions(+), 1 deletion(-) diff --git a/internal/component/pyroscope/java/java.go b/internal/component/pyroscope/java/java.go index 076bd254d8..5a67fb78b6 100644 --- a/internal/component/pyroscope/java/java.go +++ b/internal/component/pyroscope/java/java.go @@ -6,10 +6,13 @@ import ( "context" "fmt" "os" + "sort" "strconv" "sync" + "time" "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/discovery" "github.com/grafana/alloy/internal/component/pyroscope" "github.com/grafana/alloy/internal/component/pyroscope/java/asprof" "github.com/grafana/alloy/internal/featuregate" @@ -51,6 +54,31 @@ func init() { }) } +type debugInfo struct { + ProfiledTargets []*debugInfoProfiledTarget `alloy:"profiled_targets,block"` +} + +type debugInfoBytesPerType struct { + Type string `alloy:"type,attr"` + Bytes int64 `alloy:"bytes,attr"` +} + +type debugInfoProfiledTarget struct { + TotalBytes int64 `alloy:"total_bytes,attr,optional"` + TotalSamples int64 `alloy:"total_samples,attr,optional"` + LastProfiled time.Time `alloy:"last_profiled,attr,optional"` + LastError time.Time `alloy:"last_error,attr,optional"` + LastProfileBytesPerType map[string]int64 `alloy:"last_profile_bytes_per_type,attr,optional"` + ErrorMsg string `alloy:"error_msg,attr,optional"` + PID int `alloy:"pid,attr"` + Target discovery.Target `alloy:"target,attr"` +} + +var ( + _ component.DebugComponent = (*javaComponent)(nil) + _ component.Component = (*javaComponent)(nil) +) + type javaComponent struct { opts component.Options args Arguments @@ -69,6 +97,21 @@ func (j *javaComponent) Run(ctx context.Context) error { return nil } +func (j *javaComponent) DebugInfo() interface{} { + j.mutex.Lock() + defer j.mutex.Unlock() + var di debugInfo + di.ProfiledTargets = make([]*debugInfoProfiledTarget, 0, len(j.pid2process)) + for _, proc := range j.pid2process { + di.ProfiledTargets = append(di.ProfiledTargets, proc.debugInfo()) + } + // sort by pid + sort.Slice(di.ProfiledTargets, func(i, j int) bool { + return di.ProfiledTargets[i].PID < di.ProfiledTargets[j].PID + }) + return &di +} + func (j *javaComponent) Update(args component.Arguments) error { newArgs := args.(Arguments) j.forwardTo.UpdateChildren(newArgs.ForwardTo) diff --git a/internal/component/pyroscope/java/loop.go b/internal/component/pyroscope/java/loop.go index 6a25e214bb..45d716e869 100644 --- a/internal/component/pyroscope/java/loop.go +++ b/internal/component/pyroscope/java/loop.go @@ -34,12 +34,18 @@ type profilingLoop struct { pid int target discovery.Target cancel context.CancelFunc - error error dist *asprof.Distribution jfrFile string startTime time.Time profiler *asprof.Profiler sampleRate int + + error error + lastError time.Time + lastPush time.Time + lastBytesPerType []debugInfoBytesPerType + totalBytes int64 + totalSamples int64 } func newProfilingLoop(pid int, target discovery.Target, logger log.Logger, profiler *asprof.Profiler, output *pyroscope.Fanout, cfg ProfilingConfig) *profilingLoop { @@ -144,6 +150,11 @@ func (p *profilingLoop) push(jfrBytes []byte, startTime time.Time, endTime time. return fmt.Errorf("failed to parse jfr: %w", err) } target := p.getTarget() + var totalSamples, totalBytes int64 + + // reset the per type bytes stats + p.lastBytesPerType = p.lastBytesPerType[:0] + for _, req := range profiles.Profiles { metric := req.Metric sz := req.Profile.SizeVT() @@ -156,6 +167,13 @@ func (p *profilingLoop) push(jfrBytes []byte, startTime time.Time, endTime time. ls.Set(labelServiceName, inferServiceName(target)) } + p.lastBytesPerType = append(p.lastBytesPerType, debugInfoBytesPerType{ + Type: metric, + Bytes: int64(sz), + }) + totalBytes += int64(sz) + totalSamples += int64(len(req.Profile.Sample)) + profile, err := req.Profile.MarshalVT() if err != nil { _ = l.Log("msg", "failed to marshal profile", "err", err) @@ -168,6 +186,12 @@ func (p *profilingLoop) push(jfrBytes []byte, startTime time.Time, endTime time. continue } _ = l.Log("msg", "pushed jfr-pprof") + + p.mutex.Lock() + p.lastPush = time.Now() + p.totalSamples += totalSamples + p.totalBytes += totalBytes + p.mutex.Unlock() } return nil } @@ -255,9 +279,39 @@ func (p *profilingLoop) onError(err error) bool { p.mutex.Lock() defer p.mutex.Unlock() p.error = err + p.lastError = time.Now() return alive } +func (p *profilingLoop) debugInfo() *debugInfoProfiledTarget { + p.mutex.Lock() + defer p.mutex.Unlock() + + d := &debugInfoProfiledTarget{ + TotalBytes: p.totalBytes, + TotalSamples: p.totalSamples, + LastProfiled: p.lastPush, + LastError: p.lastError, + PID: p.pid, + Target: p.target, + } + + // expose per profile type bytes + if len(p.lastBytesPerType) > 0 { + d.LastProfileBytesPerType = make(map[string]int64) + for _, b := range p.lastBytesPerType { + d.LastProfileBytesPerType[b.Type] += b.Bytes + } + } + + // expose error message if given + if p.error != nil { + d.ErrorMsg = p.error.Error() + } + return d + +} + func (p *profilingLoop) interval() time.Duration { return p.getConfig().Interval } From dbfa7b73e579663c50f20fb54f2ccd7d36e5e331 Mon Sep 17 00:00:00 2001 From: William Dumont Date: Fri, 3 Jan 2025 15:44:27 +0100 Subject: [PATCH 07/10] fix debug doc (#2335) --- docs/sources/troubleshoot/debug.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/sources/troubleshoot/debug.md b/docs/sources/troubleshoot/debug.md index 5b8518515d..ec2b5a76f9 100644 --- a/docs/sources/troubleshoot/debug.md +++ b/docs/sources/troubleshoot/debug.md @@ -110,8 +110,8 @@ Supported components: * `otelcol.processor.*` * `otelcol.receiver.*` * `prometheus.relabel` -{{< /admonition >}} * `discovery.relabel` +{{< /admonition >}} ## Debug using the UI From 44f2b26851f768eda8341e942d8a115b673049ef Mon Sep 17 00:00:00 2001 From: William Dumont Date: Fri, 3 Jan 2025 17:27:47 +0100 Subject: [PATCH 08/10] Change log level in eventlogmessage (#2336) * change log level in eventlogmessage * changelog * update doc * Update docs/sources/reference/components/loki/loki.process.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --------- Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 2 ++ docs/sources/reference/components/loki/loki.process.md | 4 ++++ internal/component/loki/process/stages/eventlogmessage.go | 2 +- 3 files changed, 7 insertions(+), 1 deletion(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0ae3c90519..63a8c4ad55 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -71,6 +71,8 @@ Main (unreleased) - Fixed an issue where the `otelcol.processor.interval` could not be used because the debug metrics were not set to default. (@wildum) +- Change the log level in the `eventlogmessage` stage of the `loki.process` component from `warn` to `debug`. (@wildum) + ### Other changes - Change the stability of the `livedebugging` feature from "experimental" to "generally available". (@wildum) diff --git a/docs/sources/reference/components/loki/loki.process.md b/docs/sources/reference/components/loki/loki.process.md index 0e9d78f793..31043c12b9 100644 --- a/docs/sources/reference/components/loki/loki.process.md +++ b/docs/sources/reference/components/loki/loki.process.md @@ -258,6 +258,10 @@ If set to `false`, the `_extracted` suffix will be appended to an already existi When `drop_invalid_labels` is set to `true`, the stage drops fields that are not valid label names. If set to `false`, the stage will automatically convert them into valid labels replacing invalid characters with underscores. +The `eventlogmessage` stage only extracts lines with the key:value format. +All non-alpha characters in the key are replaced with underscores. +For example, `\tSecurity ID` is extracted as `_Security_ID`. + #### Example combined with `stage.json` ```alloy diff --git a/internal/component/loki/process/stages/eventlogmessage.go b/internal/component/loki/process/stages/eventlogmessage.go index 2cbdef78c9..c73760e137 100644 --- a/internal/component/loki/process/stages/eventlogmessage.go +++ b/internal/component/loki/process/stages/eventlogmessage.go @@ -78,7 +78,7 @@ func (m *eventLogMessageStage) processEntry(extracted map[string]interface{}, ke for _, line := range lines { parts := strings.SplitN(line, ":", 2) if len(parts) < 2 { - level.Warn(m.logger).Log("msg", "invalid line parsed from message", "line", line) + level.Debug(m.logger).Log("msg", "invalid line parsed from message", "line", line) continue } mkey := parts[0] From 89e17b223f04b4ce3187e4db78361da6b3690215 Mon Sep 17 00:00:00 2001 From: Sam DeHaan Date: Fri, 3 Jan 2025 12:56:55 -0500 Subject: [PATCH 09/10] feat: Add the otelcol syslog receiver (#2263) * Implementation of otelcol syslog receiver wrapper * update unmarshal code * Add changelog entry for syslogreceiver * Change field key, add converter * Apply suggestions from code review Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Update whitespace attribute descriptions * Clean up commented out code * Clean up comments * Update autogenerated docs * Update docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Update docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Put a little more maximum delay in syslog test * clean up validation, try not to use localhost for ci tests * Update docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Fix syslogreceiver converter * Add test sleep * Update docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Address some PR feedback * Update docs for rfc6587 arguments * Add todo to test comment * Update docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> * remote timeout from test --------- Co-authored-by: Clayton Cornell <131809008+clayton-cornell@users.noreply.github.com> --- CHANGELOG.md | 2 + .../sources/reference/compatibility/_index.md | 1 + .../otelcol/otelcol.receiver.syslog.md | 278 +++++++++++++++ go.mod | 3 + go.sum | 4 + internal/component/all/all.go | 3 +- .../otelcol/config_consumer_retry.go | 30 ++ .../otelcol/receiver/syslog/syslog.go | 325 ++++++++++++++++++ .../otelcol/receiver/syslog/syslog_test.go | 179 ++++++++++ .../converter_syslogreceiver.go | 128 +++++++ .../otelcolconvert/testdata/syslog.alloy | 55 +++ .../otelcolconvert/testdata/syslog.yaml | 57 +++ 12 files changed, 1064 insertions(+), 1 deletion(-) create mode 100644 docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md create mode 100644 internal/component/otelcol/config_consumer_retry.go create mode 100644 internal/component/otelcol/receiver/syslog/syslog.go create mode 100644 internal/component/otelcol/receiver/syslog/syslog_test.go create mode 100644 internal/converter/internal/otelcolconvert/converter_syslogreceiver.go create mode 100644 internal/converter/internal/otelcolconvert/testdata/syslog.alloy create mode 100644 internal/converter/internal/otelcolconvert/testdata/syslog.yaml diff --git a/CHANGELOG.md b/CHANGELOG.md index 63a8c4ad55..e9aed6a44d 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,8 @@ Main (unreleased) ### Features +- Add `otelcol.receiver.syslog` component to receive otel logs in syslog format (@dehaansa) + - Add support for metrics in `otelcol.exporter.loadbalancing` (@madaraszg-tulip) - Add `add_cloudwatch_timestamp` to `prometheus.exporter.cloudwatch` metrics. (@captncraig) diff --git a/docs/sources/reference/compatibility/_index.md b/docs/sources/reference/compatibility/_index.md index da5471fbfb..aa9a3b043c 100644 --- a/docs/sources/reference/compatibility/_index.md +++ b/docs/sources/reference/compatibility/_index.md @@ -370,6 +370,7 @@ The following components, grouped by namespace, _consume_ OpenTelemetry `otelcol - [otelcol.receiver.otlp](../components/otelcol/otelcol.receiver.otlp) - [otelcol.receiver.prometheus](../components/otelcol/otelcol.receiver.prometheus) - [otelcol.receiver.solace](../components/otelcol/otelcol.receiver.solace) +- [otelcol.receiver.syslog](../components/otelcol/otelcol.receiver.syslog) - [otelcol.receiver.vcenter](../components/otelcol/otelcol.receiver.vcenter) - [otelcol.receiver.zipkin](../components/otelcol/otelcol.receiver.zipkin) {{< /collapse >}} diff --git a/docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md b/docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md new file mode 100644 index 0000000000..f803480fc8 --- /dev/null +++ b/docs/sources/reference/components/otelcol/otelcol.receiver.syslog.md @@ -0,0 +1,278 @@ +--- +canonical: https://grafana.com/docs/alloy/latest/reference/components/otelcol/otelcol.receiver.syslog/ +description: Learn about otelcol.receiver.syslog +title: otelcol.receiver.syslog +--- + +Public preview + +# otelcol.receiver.syslog + +{{< docs/shared lookup="stability/public_preview.md" source="alloy" version="" >}} + +`otelcol.receiver.syslog` accepts syslog messages over the network and forwards them as logs to other `otelcol.*` components. +It supports syslog protocols [RFC5424][] and [RFC3164][] and can receive data over `TCP` or `UDP`. + +{{< admonition type="note" >}} +`otelcol.receiver.syslog` is a wrapper over the upstream OpenTelemetry Collector `syslog` receiver. +Bug reports or feature requests will be redirected to the upstream repository, if necessary. +{{< /admonition >}} + +You can specify multiple `otelcol.receiver.syslog` components by giving them different labels. + +[RFC5424]: https://www.rfc-editor.org/rfc/rfc5424 +[RFC3164]: https://www.rfc-editor.org/rfc/rfc3164 + +## Usage + +```alloy +otelcol.receiver.syslog "LABEL" { + tcp { ... } + udp { ... } + + output { + logs = [...] + } +} +``` + +## Arguments + +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|-----------------------------------|----------|--------------------------------------------------------------------|-----------|----------| +| `protocol` | `string` | The syslog protocol that the syslog server supports. | `rfc5424` | no | +| `location` | `string` | The geographic time zone to use when parsing an RFC3164 timestamp. | `UTC` | no | +| `enable_octet_counting` | `bool` | Whether to enable RFC6587 octet counting. | `false` | no | +| `max_octets` | `int` | The maximum octets for messages when octet counting is enabled. | `8192` | no | +| `allow_skip_pri_header` | `bool` | Allow parsing records without a priority header. | `false` | no | +| `non_transparent_framing_trailer` | `string` | The framing trailer when using RFC6587 Non-Transparent-Framing. | `nil` | no | + +The `protocol` argument specifies the syslog format supported by the receiver. +`protocol` must be one of `rfc5424` or `rfc3164` + +The `location` argument specifies a Time Zone identifier. The available locations depend on the local IANA Time Zone database. +Refer to the [list of tz database time zones][tz-wiki] in Wikipedia for a non-comprehensive list. + +The `non_transparent_framing_trailer` and `enable_octet_counting` arguments specify TCP syslog behavior as defined in [RFC6587]. +These arguments are mutually exclusive. +They can't be used with a UDP syslog listener configured. +If configured, the `non_transparent_framing_trailer` argument must be one of `LF`, `NUL`. + + +[RFC6587]: https://datatracker.ietf.org/doc/html/rfc6587 +[tz-wiki]: https://en.wikipedia.org/wiki/List_of_tz_database_time_zones + +## Blocks + +The following blocks are supported inside the definition of +`otelcol.receiver.syslog`: + + +| Hierarchy | Block | Description | Required | +|------------------|----------------------|-------------------------------------------------------------------------------------------------|----------| +| udp | [udp][] | Configures a UDP syslog server to receive syslog messages. | no* | +| udp > multiline | [multiline][] | Configures rules for multiline parsing of incoming messages. | no | +| udp > async | [async][] | Configures rules for asynchronous parsing of incoming messages. | no | +| tcp | [tcp][] | Configures a TCP syslog server to receive syslog messages. | no* | +| tcp > multiline | [multiline][] | Configures rules for multiline parsing of incoming messages | no | +| tcp > tls | [tls][] | Configures TLS for the TCP syslog server. | no | +| retry_on_failure | [retry_on_failure][] | Configures the retry behavior when the receiver encounters an error downstream in the pipeline. | no | +| debug_metrics | [debug_metrics][] | Configures the metrics that this component generates to monitor its state. | no | +| output | [output][] | Configures where to send received telemetry data. | yes | + +A syslog receiver must have either a `udp` or `tcp` block configured. + +The `>` symbol indicates deeper levels of nesting. For example, `tcp > tls` +refers to a `tls` block defined inside a `tcp` block. + +[tls]: #tls-block +[udp]: #udp-block +[tcp]: #tcp-block +[multiline]: #multiline-block +[async]: #async-block +[retry_on_failure]: #retry-on-failure-block +[debug_metrics]: #debug_metrics-block +[output]: #output-block + +### udp block + +The `udp` block configures a UDP syslog server. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|---------------------------------|----------|--------------------------------------------------------------------------------------------------------------|---------|----------| +| `listen_address` | `string` | The `` address to listen to for syslog messages. | | yes | +| `one_log_per_packet` | `bool` | Skip log tokenization, improving performance when messages always contain one log and multiline is not used. | `false` | no | +| `add_attributes` | `bool` | Add net.* attributes to log messages according to OpenTelemetry semantic conventions. | `false` | no | +| `encoding` | `string` | The encoding of the syslog messages. | `utf-8` | no | +| `preserve_leading_whitespaces` | `bool` | Preserves leading whitespace in messages when set to `true`. | `false` | no | +| `preserve_trailing_whitespaces` | `bool` | Preserves trailing whitespace in messages when set to `true`. | `false` | no | + +The `encoding` argument specifies the encoding of the incoming syslog messages. +`encoding` must be one of `utf-8`, `utf-16le`, `utf-16be`, `ascii`, `big5`, or `nop`. +Refer to the upstream receiver [documentation][encoding-documentation] for more details. + +### multiline block + +The `multiline` block configures logic for splitting incoming log entries. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|----------------------|----------|-----------------------------------------------------------------|---------|----------| +| `line_start_pattern` | `string` | A regular expression that matches the beginning of a log entry. | | no | +| `line_end_pattern` | `string` | A regular expression that matches the end of a log entry. | | no | +| `omit_pattern` | `bool` | Omit the start/end pattern from the split log entries. | `false` | no | + +A `multiline` block must contain either `line_start_pattern` or `line_end_pattern`. + +If a `multiline` block is not set, log entries will not be split. + +### async block + +The `async` block configures concurrent asynchronous readers for a UDP syslog server. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|--------------------|-------|----------------------------------------------------------------------------------|---------|----------| +| `readers` | `int` | The number of goroutines to concurrently read from the UDP syslog server. | `1` | no | +| `processors` | `int` | The number of goroutines to concurrently process logs before sending downstream. | `1` | no | +| `max_queue_length` | `int` | The maximum number of messages to wait for an available processor. | `100` | no | + +If `async` is not set, a single goroutine will read and process messages synchronously. + +### tcp block + +The `tcp` block configures a TCP syslog server. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|---------------------------------|----------|--------------------------------------------------------------------------------------------------------------|---------|----------| +| `listen_address` | `string` | The `` address to listen to for syslog messages. | | yes | +| `max_log_size` | `string` | The maximum size of a log entry to read before failing. | `1MiB` | no | +| `one_log_per_packet` | `bool` | Skip log tokenization, improving performance when messages always contain one log and multiline is not used. | `false` | no | +| `add_attributes` | `bool` | Add net.* attributes to log messages according to OpenTelemetry semantic conventions. | `false` | no | +| `encoding` | `string` | The encoding of the syslog messages. | `utf-8` | no | +| `preserve_leading_whitespaces` | `bool` | Preserves leading whitespace in messages when set to `true`. | `false` | no | +| `preserve_trailing_whitespaces` | `bool` | Preserves trailing whitespace in messages when set to `true`. | `false` | no | + +The `encoding` argument specifies the encoding of the incoming syslog messages. +`encoding` must be one of `utf-8`, `utf-16le`, `utf-16be`, `ascii`, `big5`, `nop`. +See the upstream receiver [documentation][encoding-documentation] for more details. + +The `max_log_size` argument has a minimum value of `64KiB` + +### tls block + +The `tls` block configures TLS settings used for a server. If the `tls` block +isn't provided, TLS won't be used for connections to the server. + +{{< docs/shared lookup="reference/components/otelcol-tls-server-block.md" source="alloy" version="" >}} + +### retry on failure block + +The `retry_on_failure` block configures the retry behavior when the receiver encounters an error downstream in the pipeline. +A backoff algorithm is used to delay the retry upon subsequent failures. +The following arguments are supported: + +| Name | Type | Description | Default | Required | +|--------------------|------------|-----------------------------------------------------------------------------------------------------------|--------------|----------| +| `enabled` | `bool` | If true, the receiver will pause reading a file and attempt to resend the current batch of logs on error. | `false` | no | +| `initial_interval` | `duration` | The time to wait after first failure to retry. | `1s` | no | +| `max_interval` | `duration` | The maximum time to wait after applying backoff logic. | `30s` | no | +| `max_elapsed_time` | `duration` | The maximum age of a message before the data is discarded. | `5m` | no | + +If `max_elapsed_time` is set to `0` data will never be discarded. + +### debug_metrics block + +{{< docs/shared lookup="reference/components/otelcol-debug-metrics-block.md" source="alloy" version="" >}} + +### output block + +{{< docs/shared lookup="reference/components/output-block.md" source="alloy" version="" >}} + +## Exported fields + +`otelcol.receiver.syslog` does not export any fields. + +## Component health + +`otelcol.receiver.syslog` is only reported as unhealthy if given an invalid +configuration. + +## Debug information + +`otelcol.receiver.syslog` does not expose any component-specific debug +information. + +## Debug metrics + +`otelcol.receiver.syslog` does not expose any component-specific debug metrics. + +## Example + +This example proxies syslog messages from the `otelcol.receiver.syslog` receiver to the +`otelcol.exporter.syslog` component, and then sends them on to a `loki.source.syslog` component +before being logged by a `loki.echo` component. This shows how the `otelcol` syslog components +can be used to proxy syslog messages before sending them to another destination. + +Using the `otelcol` syslog components in this way results in the messages being forwarded as sent, +attempting to use the `loki.source.syslog` component for a similar proxy use case requires +careful mapping of any structured data fields through the `otelcol.processor.transform` component. A +very simple example of that can be found in the [`otelcol.exporter.syslog`][exporter-examples] documentation. + +```alloy +otelcol.receiver.syslog "default" { + protocol = "rfc5424" + tcp { + listen_address = "localhost:1515" + } + output { + logs = [otelcol.exporter.syslog.default.input] + } +} + +otelcol.exporter.syslog "default" { + endpoint = "localhost" + network = "tcp" + port = 1514 + protocol = "rfc5424" + enable_octet_counting = false + tls { + insecure = true + } +} + +loki.source.syslog "default" { + listener { + address = "localhost:1514" + protocol = "tcp" + syslog_format = "rfc5424" + label_structured_data = true + use_rfc5424_message = true + } + forward_to = [loki.echo.default.receiver] +} + +loki.echo "default" {} +``` + +[exporter-examples]: ../otelcol.exporter.syslog/#use-the-otelcolprocessortransform-component-to-format-logs-from-lokisourcesyslog +[encoding-documentation]: https://github.com/open-telemetry/opentelemetry-collector-contrib/blob/main/receiver/syslogreceiver/README.md#supported-encodings + + +## Compatible components + +`otelcol.receiver.syslog` can accept arguments from the following components: + +- Components that export [OpenTelemetry `otelcol.Consumer`](../../../compatibility/#opentelemetry-otelcolconsumer-exporters) + + +{{< admonition type="note" >}} +Connecting some components may not be sensible or components may require further configuration to make the connection work correctly. +Refer to the linked documentation for more details. +{{< /admonition >}} + + diff --git a/go.mod b/go.mod index 4dbf6351b1..0a241163aa 100644 --- a/go.mod +++ b/go.mod @@ -125,6 +125,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/ottl v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatatest v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/pdatautil v0.112.0 + github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/loki v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/pkg/translator/prometheus v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/processor/attributesprocessor v0.112.0 @@ -143,6 +144,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/receiver/kafkareceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0 + github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0 github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.112.0 github.com/ory/dockertest/v3 v3.8.1 @@ -845,6 +847,7 @@ require ( github.com/open-telemetry/opentelemetry-collector-contrib/pkg/kafka/topic v0.112.0 // indirect github.com/open-telemetry/opentelemetry-collector-contrib/receiver/influxdbreceiver v0.112.0 github.com/planetscale/vtprotobuf v0.6.1-0.20240319094008-0393e58bdf10 // indirect + github.com/valyala/fastjson v1.6.4 // indirect go.opentelemetry.io/collector/connector/connectorprofiles v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror v0.112.0 // indirect go.opentelemetry.io/collector/consumer/consumererror/consumererrorprofiles v0.112.0 // indirect diff --git a/go.sum b/go.sum index 6076b1a759..38763b2b12 100644 --- a/go.sum +++ b/go.sum @@ -2612,6 +2612,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2client github.com/open-telemetry/opentelemetry-collector-contrib/extension/oauth2clientauthextension v0.112.0/go.mod h1:F7NU4rHqbTrkfOFH6ZtbSHoD+vLNgfu0MrqDw5n1I8Y= github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.112.0 h1:XYTTlyT5xjZKcUaQT05ffltMahw2S2wU7qIWtjLp7XE= github.com/open-telemetry/opentelemetry-collector-contrib/extension/sigv4authextension v0.112.0/go.mod h1:wlnJiEwFYq3920DXOgDby5w6ctv57GJUTkvH4gHoBVA= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.112.0 h1:OtZFEz8PEAcGJFGAI3m1bu8gC3rS8IbtnB5mO8B9AAU= +github.com/open-telemetry/opentelemetry-collector-contrib/extension/storage v0.112.0/go.mod h1:ykfaFUQlOIuWrWSwc4wtY0TDwWjjGHF/8jNm3lFH0cM= github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.112.0 h1:3zGjQ0pRszCibVGvjqTWDVEDT0D+d5pY8JRy8rV8X9k= github.com/open-telemetry/opentelemetry-collector-contrib/internal/aws/ecsutil v0.112.0/go.mod h1:0vf3+lFg/yOaXwQ17MAF2JmBkTGeq09qR+ftaJQqN08= github.com/open-telemetry/opentelemetry-collector-contrib/internal/common v0.112.0 h1:PVgAm7sIQUOS8TtX5ANV+hHn67vW6cW6uVy3qifccKc= @@ -2716,6 +2718,8 @@ github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusrec github.com/open-telemetry/opentelemetry-collector-contrib/receiver/opencensusreceiver v0.112.0/go.mod h1:q2lFBHfnG+ar2DJJlIU6RviOFXDeFur9vJ083NvOMQs= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0 h1:cHk8vS/D1pjeZ0o4LJJAENP847HHWjTXFe4y1RJYlfo= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/solacereceiver v0.112.0/go.mod h1:2CK7Hh6UGLnBSGW7Y0nopvEhoo25D6t/395jFEephEs= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.112.0 h1:GNWvYGjT08ByMbKuvY/uB57TQYrPJc/aF+nnpraELgU= +github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver v0.112.0/go.mod h1:U0XNYcs+DJTwElKNKXADGBpQLIFrrEKAI78PzqOVl/E= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0 h1:Vv1FDwd7pykzj8Wmuc7yj7bcN0qUv1mGBb/dcTMPfNE= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/vcenterreceiver v0.112.0/go.mod h1:lklLK8ELD2Wk5z7ywjaf6XEbbViDtf7uK8jAExjRlls= github.com/open-telemetry/opentelemetry-collector-contrib/receiver/zipkinreceiver v0.112.0 h1:XhKHjEpQJQMaUuWVhWS1FEuaY4LJDwBgsGXE166j9SY= diff --git a/internal/component/all/all.go b/internal/component/all/all.go index 65d48a019e..d82838a2ab 100644 --- a/internal/component/all/all.go +++ b/internal/component/all/all.go @@ -96,8 +96,8 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/processor/tail_sampling" // Import otelcol.processor.tail_sampling _ "github.com/grafana/alloy/internal/component/otelcol/processor/transform" // Import otelcol.processor.transform _ "github.com/grafana/alloy/internal/component/otelcol/receiver/datadog" // Import otelcol.receiver.datadog - _ "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" // Import otelcol.receiver.influxdb _ "github.com/grafana/alloy/internal/component/otelcol/receiver/file_stats" // Import otelcol.receiver.file_stats + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/influxdb" // Import otelcol.receiver.influxdb _ "github.com/grafana/alloy/internal/component/otelcol/receiver/jaeger" // Import otelcol.receiver.jaeger _ "github.com/grafana/alloy/internal/component/otelcol/receiver/kafka" // Import otelcol.receiver.kafka _ "github.com/grafana/alloy/internal/component/otelcol/receiver/loki" // Import otelcol.receiver.loki @@ -105,6 +105,7 @@ import ( _ "github.com/grafana/alloy/internal/component/otelcol/receiver/otlp" // Import otelcol.receiver.otlp _ "github.com/grafana/alloy/internal/component/otelcol/receiver/prometheus" // Import otelcol.receiver.prometheus _ "github.com/grafana/alloy/internal/component/otelcol/receiver/solace" // Import otelcol.receiver.solace + _ "github.com/grafana/alloy/internal/component/otelcol/receiver/syslog" // Import otelcol.receiver.syslog _ "github.com/grafana/alloy/internal/component/otelcol/receiver/vcenter" // Import otelcol.receiver.vcenter _ "github.com/grafana/alloy/internal/component/otelcol/receiver/zipkin" // Import otelcol.receiver.zipkin _ "github.com/grafana/alloy/internal/component/prometheus/exporter/apache" // Import prometheus.exporter.apache diff --git a/internal/component/otelcol/config_consumer_retry.go b/internal/component/otelcol/config_consumer_retry.go new file mode 100644 index 0000000000..d687788606 --- /dev/null +++ b/internal/component/otelcol/config_consumer_retry.go @@ -0,0 +1,30 @@ +package otelcol + +import ( + "time" + + "github.com/grafana/alloy/syntax" +) + +// ConsumerRetryArguments holds shared settings for stanza receivers which can retry +// requests. There is no Convert functionality as the consumerretry package is stanza internal +type ConsumerRetryArguments struct { + Enabled bool `alloy:"enabled,attr,optional"` + InitialInterval time.Duration `alloy:"initial_interval,attr,optional"` + MaxInterval time.Duration `alloy:"max_interval,attr,optional"` + MaxElapsedTime time.Duration `alloy:"max_elapsed_time,attr,optional"` +} + +var ( + _ syntax.Defaulter = (*ConsumerRetryArguments)(nil) +) + +// SetToDefault implements syntax.Defaulter. +func (args *ConsumerRetryArguments) SetToDefault() { + *args = ConsumerRetryArguments{ + Enabled: false, + InitialInterval: 1 * time.Second, + MaxInterval: 30 * time.Second, + MaxElapsedTime: 5 * time.Minute, + } +} diff --git a/internal/component/otelcol/receiver/syslog/syslog.go b/internal/component/otelcol/receiver/syslog/syslog.go new file mode 100644 index 0000000000..842228be86 --- /dev/null +++ b/internal/component/otelcol/receiver/syslog/syslog.go @@ -0,0 +1,325 @@ +// Package syslog provides an otelcol.receiver.syslog component. +package syslog + +import ( + "fmt" + "net" + + "github.com/alecthomas/units" + "github.com/grafana/alloy/internal/component" + "github.com/grafana/alloy/internal/component/common/config" + "github.com/grafana/alloy/internal/component/otelcol" + otelcolCfg "github.com/grafana/alloy/internal/component/otelcol/config" + "github.com/grafana/alloy/internal/component/otelcol/receiver" + "github.com/grafana/alloy/internal/featuregate" + "github.com/hashicorp/go-multierror" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/decode" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/helper" + stanzainputsyslog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/syslog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" + stanzainputtcp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/tcp" + stanzainputudp "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" + stanzaparsersyslog "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/parser/syslog" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" + otelcomponent "go.opentelemetry.io/collector/component" + otelextension "go.opentelemetry.io/collector/extension" + "go.opentelemetry.io/collector/pipeline" +) + +func init() { + component.Register(component.Registration{ + Name: "otelcol.receiver.syslog", + Stability: featuregate.StabilityPublicPreview, + Args: Arguments{}, + + Build: func(opts component.Options, args component.Arguments) (component.Component, error) { + fact := syslogreceiver.NewFactory() + return receiver.New(opts, fact, args.(Arguments)) + }, + }) +} + +// Arguments configures the otelcol.receiver.syslog component. +type Arguments struct { + Protocol config.SysLogFormat `alloy:"protocol,attr,optional"` + Location string `alloy:"location,attr,optional"` + EnableOctetCounting bool `alloy:"enable_octet_counting,attr,optional"` + MaxOctets int `alloy:"max_octets,attr,optional"` + AllowSkipPriHeader bool `alloy:"allow_skip_pri_header,attr,optional"` + NonTransparentFramingTrailer *FramingTrailer `alloy:"non_transparent_framing_trailer,attr,optional"` + + ConsumerRetry otelcol.ConsumerRetryArguments `alloy:"retry_on_failure,block,optional"` + TCP *TCP `alloy:"tcp,block,optional"` + UDP *UDP `alloy:"udp,block,optional"` + + // DebugMetrics configures component internal metrics. Optional. + DebugMetrics otelcolCfg.DebugMetricsArguments `alloy:"debug_metrics,block,optional"` + + // Output configures where to send received data. Required. + Output *otelcol.ConsumerArguments `alloy:"output,block"` +} + +type FramingTrailer string + +var NULTrailer FramingTrailer = "NUL" +var LFTrailer FramingTrailer = "LF" + +// MarshalText implements encoding.TextMarshaler +func (s FramingTrailer) MarshalText() (text []byte, err error) { + return []byte(s), nil +} + +// UnmarshalText implements encoding.TextUnmarshaler +func (s *FramingTrailer) UnmarshalText(text []byte) error { + str := string(text) + switch str { + case "NUL": + *s = NULTrailer + case "LF": + *s = LFTrailer + default: + return fmt.Errorf("unknown syslog format: %s", str) + } + + return nil +} + +// Values taken from tcp input Build function +const tcpDefaultMaxLogSize = helper.ByteSize(tcp.DefaultMaxLogSize) +const minMaxLogSize = helper.ByteSize(64 * 1024) + +type TCP struct { + MaxLogSize units.Base2Bytes `alloy:"max_log_size,attr,optional"` + ListenAddress string `alloy:"listen_address,attr,optional"` + TLS *otelcol.TLSServerArguments `alloy:"tls,block,optional"` + AddAttributes bool `alloy:"add_attributes,attr,optional"` + OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + MultilineConfig *MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *TrimConfig `alloy:",squash"` +} + +type UDP struct { + ListenAddress string `alloy:"listen_address,attr,optional"` + OneLogPerPacket bool `alloy:"one_log_per_packet,attr,optional"` + AddAttributes bool `alloy:"add_attributes,attr,optional"` + Encoding string `alloy:"encoding,attr,optional"` + MultilineConfig *MultilineConfig `alloy:"multiline,block,optional"` + TrimConfig *TrimConfig `alloy:",squash"` + Async *AsyncConfig `alloy:"async,block,optional"` +} + +type TrimConfig struct { + PreserveLeadingWhitespace bool `alloy:"preserve_leading_whitespaces,attr,optional"` + PreserveTrailingWhitespace bool `alloy:"preserve_trailing_whitespaces,attr,optional"` +} + +func (c *TrimConfig) Convert() *trim.Config { + if c == nil { + return nil + } + + return &trim.Config{ + PreserveLeading: c.PreserveLeadingWhitespace, + PreserveTrailing: c.PreserveTrailingWhitespace, + } +} + +type MultilineConfig struct { + LineStartPattern string `alloy:"line_start_pattern,attr,optional"` + LineEndPattern string `alloy:"line_end_pattern,attr,optional"` + OmitPattern bool `alloy:"omit_pattern,attr,optional"` +} + +func (c *MultilineConfig) Convert() *split.Config { + if c == nil { + return nil + } + + return &split.Config{ + LineStartPattern: c.LineStartPattern, + LineEndPattern: c.LineEndPattern, + OmitPattern: c.OmitPattern, + } +} + +type AsyncConfig struct { + Readers int `alloy:"readers,attr,optional"` + Processors int `alloy:"processors,attr,optional"` + MaxQueueLength int `alloy:"max_queue_length,attr,optional"` +} + +func (c *AsyncConfig) Convert() *stanzainputudp.AsyncConfig { + if c == nil { + return nil + } + + return &stanzainputudp.AsyncConfig{ + Readers: c.Readers, + Processors: c.Processors, + MaxQueueLength: c.MaxQueueLength, + } +} + +var _ receiver.Arguments = Arguments{} + +// SetToDefault implements syntax.Defaulter. +func (args *Arguments) SetToDefault() { + *args = Arguments{ + Location: "UTC", + Protocol: config.SyslogFormatRFC5424, + Output: &otelcol.ConsumerArguments{}, + } + args.DebugMetrics.SetToDefault() + args.ConsumerRetry.SetToDefault() +} + +// Convert implements receiver.Arguments. +func (args Arguments) Convert() (otelcomponent.Config, error) { + + c := stanzainputsyslog.NewConfig() + c.BaseConfig = stanzaparsersyslog.BaseConfig{ + Protocol: string(args.Protocol), + Location: args.Location, + EnableOctetCounting: args.EnableOctetCounting, + MaxOctets: args.MaxOctets, + AllowSkipPriHeader: args.AllowSkipPriHeader, + } + + if args.NonTransparentFramingTrailer != nil { + s := string(*args.NonTransparentFramingTrailer) + c.BaseConfig.NonTransparentFramingTrailer = &s + } + + if args.TCP != nil { + c.TCP = &stanzainputtcp.BaseConfig{ + MaxLogSize: helper.ByteSize(args.TCP.MaxLogSize), + ListenAddress: args.TCP.ListenAddress, + TLS: args.TCP.TLS.Convert(), + AddAttributes: args.TCP.AddAttributes, + OneLogPerPacket: args.TCP.OneLogPerPacket, + Encoding: args.TCP.Encoding, + } + if c.TCP.MaxLogSize == 0 { + c.TCP.MaxLogSize = tcpDefaultMaxLogSize + } + split := args.TCP.MultilineConfig.Convert() + if split != nil { + c.TCP.SplitConfig = *split + } + trim := args.TCP.TrimConfig.Convert() + if trim != nil { + c.TCP.TrimConfig = *trim + } + } + + if args.UDP != nil { + c.UDP = &stanzainputudp.BaseConfig{ + ListenAddress: args.UDP.ListenAddress, + OneLogPerPacket: args.UDP.OneLogPerPacket, + AddAttributes: args.UDP.AddAttributes, + Encoding: args.UDP.Encoding, + } + split := args.UDP.MultilineConfig.Convert() + if split != nil { + c.UDP.SplitConfig = *split + } + trim := args.UDP.TrimConfig.Convert() + if trim != nil { + c.UDP.TrimConfig = *trim + } + async := args.UDP.Async.Convert() + if async != nil { + c.UDP.AsyncConfig = async + } + } + + def := syslogreceiver.ReceiverType{}.CreateDefaultConfig() + cfg := def.(*syslogreceiver.SysLogConfig) + cfg.InputConfig = *c + + // consumerretry package is stanza internal so we can't just Convert + cfg.RetryOnFailure.Enabled = args.ConsumerRetry.Enabled + cfg.RetryOnFailure.InitialInterval = args.ConsumerRetry.InitialInterval + cfg.RetryOnFailure.MaxInterval = args.ConsumerRetry.MaxInterval + cfg.RetryOnFailure.MaxElapsedTime = args.ConsumerRetry.MaxElapsedTime + + return cfg, nil +} + +// Extensions implements receiver.Arguments. +func (args Arguments) Extensions() map[otelcomponent.ID]otelextension.Extension { + return nil +} + +// Exporters implements receiver.Arguments. +func (args Arguments) Exporters() map[pipeline.Signal]map[otelcomponent.ID]otelcomponent.Component { + return nil +} + +// NextConsumers implements receiver.Arguments. +func (args Arguments) NextConsumers() *otelcol.ConsumerArguments { + return args.Output +} + +// Validate implements syntax.Validator. +func (args *Arguments) Validate() error { + var errs error + if args.TCP == nil && args.UDP == nil { + errs = multierror.Append(errs, fmt.Errorf("at least one of 'tcp' or 'udp' must be configured")) + } + + if args.Protocol != config.SyslogFormatRFC3164 && args.Protocol != config.SyslogFormatRFC5424 { + errs = multierror.Append(errs, fmt.Errorf("invalid protocol, must be one of 'rfc3164', 'rfc5424': %s", args.Protocol)) + } + + if args.TCP != nil { + if err := validateListenAddress(args.TCP.ListenAddress, "tcp.listen_address"); err != nil { + errs = multierror.Append(errs, err) + } + + if args.NonTransparentFramingTrailer != nil && *args.NonTransparentFramingTrailer != LFTrailer && *args.NonTransparentFramingTrailer != NULTrailer { + errs = multierror.Append(errs, fmt.Errorf("invalid non_transparent_framing_trailer, must be one of 'LF', 'NUL': %s", *args.NonTransparentFramingTrailer)) + } + + _, err := decode.LookupEncoding(args.TCP.Encoding) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("invalid tcp.encoding: %w", err)) + } + + if args.TCP.MaxLogSize != 0 && (int64(args.TCP.MaxLogSize) < int64(minMaxLogSize)) { + errs = multierror.Append(errs, fmt.Errorf("invalid value %d for parameter 'tcp.max_log_size', must be equal to or greater than %d bytes", args.TCP.MaxLogSize, minMaxLogSize)) + } + } + + if args.UDP != nil { + if err := validateListenAddress(args.UDP.ListenAddress, "udp.listen_address"); err != nil { + errs = multierror.Append(errs, err) + } + + _, err := decode.LookupEncoding(args.UDP.Encoding) + if err != nil { + errs = multierror.Append(errs, fmt.Errorf("invalid udp.encoding: %w", err)) + } + } + + return errs +} + +func validateListenAddress(url string, urlName string) error { + if url == "" { + return fmt.Errorf("%s cannot be empty", urlName) + } + + if _, _, err := net.SplitHostPort(url); err != nil { + return fmt.Errorf("invalid %s: %w", urlName, err) + } + return nil +} + +// DebugMetricsConfig implements receiver.Arguments. +func (args Arguments) DebugMetricsConfig() otelcolCfg.DebugMetricsArguments { + return args.DebugMetrics +} diff --git a/internal/component/otelcol/receiver/syslog/syslog_test.go b/internal/component/otelcol/receiver/syslog/syslog_test.go new file mode 100644 index 0000000000..ad17407043 --- /dev/null +++ b/internal/component/otelcol/receiver/syslog/syslog_test.go @@ -0,0 +1,179 @@ +package syslog_test + +import ( + "context" + "fmt" + "net" + "testing" + "time" + + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/internal/fakeconsumer" + "github.com/grafana/alloy/internal/component/otelcol/receiver/syslog" + "github.com/grafana/alloy/internal/runtime/componenttest" + "github.com/grafana/alloy/internal/runtime/logging/level" + "github.com/grafana/alloy/internal/util" + "github.com/grafana/alloy/syntax" + "github.com/grafana/dskit/backoff" + "github.com/phayes/freeport" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" +) + +// Test performs a basic integration test which runs the otelcol.receiver.syslog +// component and ensures that it can receive and forward data. +func Test(t *testing.T) { + tcp := getFreeAddr(t) + + ctx := componenttest.TestContext(t) + l := util.TestLogger(t) + + ctrl, err := componenttest.NewControllerFromID(l, "otelcol.receiver.syslog") + require.NoError(t, err) + + cfg := fmt.Sprintf(` + protocol = "rfc5424" + tcp { + listen_address = "%s" + } + + output { + // no-op: will be overridden by test code. + } + `, tcp) + + require.NoError(t, err) + + var args syslog.Arguments + require.NoError(t, syntax.Unmarshal([]byte(cfg), &args)) + + // Override our settings so logs get forwarded to logsCh. + logCh := make(chan plog.Logs) + args.Output = makeLogsOutput(logCh) + + go func() { + err := ctrl.Run(ctx, args) + require.NoError(t, err) + }() + + require.NoError(t, ctrl.WaitRunning(3*time.Second)) + + // Send traces in the background to our receiver. + go func() { + request := func() error { + conn, err := net.Dial("tcp", tcp) + require.NoError(t, err) + defer conn.Close() + + _, err = fmt.Fprint(conn, "<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey=\"1\"] An application event log entry...\n") + return err + } + + bo := backoff.New(ctx, backoff.Config{ + MinBackoff: 10 * time.Millisecond, + MaxBackoff: 100 * time.Millisecond, + }) + for bo.Ongoing() { + if err := request(); err != nil { + level.Error(l).Log("msg", "failed to send logs", "err", err) + bo.Wait() + continue + } + + return + } + }() + + // Wait for our client to get a span. + select { + case <-time.After(time.Second): + require.FailNow(t, "failed waiting for logs") + case log := <-logCh: + require.Equal(t, 1, log.LogRecordCount()) + } +} + +// makeLogsOutput returns ConsumerArguments which will forward logs to the +// provided channel. +func makeLogsOutput(ch chan plog.Logs) *otelcol.ConsumerArguments { + logsConsumer := fakeconsumer.Consumer{ + ConsumeLogsFunc: func(ctx context.Context, l plog.Logs) error { + select { + case <-ctx.Done(): + return ctx.Err() + case ch <- l: + return nil + } + }, + } + + return &otelcol.ConsumerArguments{ + Logs: []otelcol.Consumer{&logsConsumer}, + } +} + +func getFreeAddr(t *testing.T) string { + t.Helper() + + portNumber, err := freeport.GetFreePort() + require.NoError(t, err) + + return fmt.Sprintf("127.0.0.1:%d", portNumber) +} + +func TestUnmarshal(t *testing.T) { + alloyCfg := ` + protocol = "rfc5424" + location = "UTC" + enable_octet_counting = true + max_octets = 16000 + allow_skip_pri_header = true + non_transparent_framing_trailer = "NUL" + + tcp { + listen_address = "localhost:1514" + max_log_size = "2MiB" + one_log_per_packet = true + add_attributes = true + encoding = "utf-16be" + preserve_leading_whitespaces = true + preserve_trailing_whitespaces = true + tls { + include_system_ca_certs_pool = true + reload_interval = "1m" + } + } + + udp { + listen_address = "localhost:1515" + one_log_per_packet = false + add_attributes = false + encoding = "utf-16le" + preserve_leading_whitespaces = false + preserve_trailing_whitespaces = false + async { + readers = 2 + processors = 4 + max_queue_length = 1000 + } + multiline { + line_end_pattern = "logend" + omit_pattern = true + } + + } + + retry_on_failure { + enabled = true + initial_interval = "10s" + max_interval = "1m" + max_elapsed_time = "10m" + } + + output { + } + ` + var args syslog.Arguments + err := syntax.Unmarshal([]byte(alloyCfg), &args) + require.NoError(t, err) +} diff --git a/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go b/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go new file mode 100644 index 0000000000..014abc0395 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/converter_syslogreceiver.go @@ -0,0 +1,128 @@ +package otelcolconvert + +import ( + "fmt" + + "github.com/alecthomas/units" + "github.com/grafana/alloy/internal/component/common/config" + "github.com/grafana/alloy/internal/component/otelcol" + "github.com/grafana/alloy/internal/component/otelcol/receiver/syslog" + "github.com/grafana/alloy/internal/converter/diag" + "github.com/grafana/alloy/internal/converter/internal/common" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/operator/input/udp" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/split" + "github.com/open-telemetry/opentelemetry-collector-contrib/pkg/stanza/trim" + "github.com/open-telemetry/opentelemetry-collector-contrib/receiver/syslogreceiver" + "go.opentelemetry.io/collector/component" + "go.opentelemetry.io/collector/component/componentstatus" +) + +func init() { + converters = append(converters, syslogReceiverConverter{}) +} + +type syslogReceiverConverter struct{} + +func (syslogReceiverConverter) Factory() component.Factory { + return syslogreceiver.NewFactory() +} + +func (syslogReceiverConverter) InputComponentName() string { + return "otelcol.receiver.syslog" +} + +func (syslogReceiverConverter) ConvertAndAppend(state *State, id componentstatus.InstanceID, cfg component.Config) diag.Diagnostics { + var diags diag.Diagnostics + + label := state.AlloyComponentLabel() + + args := toOtelcolReceiversyslog(cfg.(*syslogreceiver.SysLogConfig)) + block := common.NewBlockWithOverride([]string{"otelcol", "receiver", "syslog"}, label, args) + + diags.Add( + diag.SeverityLevelInfo, + fmt.Sprintf("Converted %s into %s", StringifyInstanceID(id), StringifyBlock(block)), + ) + + state.Body().AppendBlock(block) + return diags +} + +func toOtelcolReceiversyslog(cfg *syslogreceiver.SysLogConfig) *syslog.Arguments { + args := &syslog.Arguments{ + Protocol: config.SysLogFormat(cfg.InputConfig.Protocol), + Location: cfg.InputConfig.Location, + EnableOctetCounting: cfg.InputConfig.EnableOctetCounting, + AllowSkipPriHeader: cfg.InputConfig.AllowSkipPriHeader, + MaxOctets: cfg.InputConfig.MaxOctets, + DebugMetrics: common.DefaultValue[syslog.Arguments]().DebugMetrics, + } + + if cfg.InputConfig.NonTransparentFramingTrailer != nil { + trailer := syslog.FramingTrailer(*cfg.InputConfig.NonTransparentFramingTrailer) + args.NonTransparentFramingTrailer = &trailer + } + + if cfg.InputConfig.TCP != nil { + args.TCP = &syslog.TCP{ + MaxLogSize: units.Base2Bytes(cfg.InputConfig.TCP.MaxLogSize), + ListenAddress: cfg.InputConfig.TCP.ListenAddress, + TLS: toTLSServerArguments(cfg.InputConfig.TCP.TLS), + AddAttributes: cfg.InputConfig.TCP.AddAttributes, + OneLogPerPacket: cfg.InputConfig.TCP.OneLogPerPacket, + Encoding: cfg.InputConfig.TCP.Encoding, + MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.TCP.SplitConfig), + TrimConfig: toSyslogTrimConfig(cfg.InputConfig.TCP.TrimConfig), + } + } + + if cfg.InputConfig.UDP != nil { + args.UDP = &syslog.UDP{ + ListenAddress: cfg.InputConfig.UDP.ListenAddress, + OneLogPerPacket: cfg.InputConfig.UDP.OneLogPerPacket, + AddAttributes: cfg.InputConfig.UDP.AddAttributes, + Encoding: cfg.InputConfig.UDP.Encoding, + MultilineConfig: toSyslogMultilineConfig(cfg.InputConfig.UDP.SplitConfig), + TrimConfig: toSyslogTrimConfig(cfg.InputConfig.UDP.TrimConfig), + Async: toSyslogAsyncConfig(cfg.InputConfig.UDP.AsyncConfig), + } + } + + // This isn't done in a function because the type is not exported + args.ConsumerRetry = otelcol.ConsumerRetryArguments{ + Enabled: cfg.RetryOnFailure.Enabled, + InitialInterval: cfg.RetryOnFailure.InitialInterval, + MaxInterval: cfg.RetryOnFailure.MaxInterval, + MaxElapsedTime: cfg.RetryOnFailure.MaxElapsedTime, + } + + return args + +} + +func toSyslogMultilineConfig(cfg split.Config) *syslog.MultilineConfig { + return &syslog.MultilineConfig{ + LineStartPattern: cfg.LineStartPattern, + LineEndPattern: cfg.LineEndPattern, + OmitPattern: cfg.OmitPattern, + } +} + +func toSyslogTrimConfig(cfg trim.Config) *syslog.TrimConfig { + return &syslog.TrimConfig{ + PreserveLeadingWhitespace: cfg.PreserveLeading, + PreserveTrailingWhitespace: cfg.PreserveTrailing, + } +} + +func toSyslogAsyncConfig(cfg *udp.AsyncConfig) *syslog.AsyncConfig { + if cfg == nil { + return nil + } + + return &syslog.AsyncConfig{ + Readers: cfg.Readers, + Processors: cfg.Processors, + MaxQueueLength: cfg.MaxQueueLength, + } +} diff --git a/internal/converter/internal/otelcolconvert/testdata/syslog.alloy b/internal/converter/internal/otelcolconvert/testdata/syslog.alloy new file mode 100644 index 0000000000..ff0f9af32b --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/syslog.alloy @@ -0,0 +1,55 @@ +otelcol.receiver.syslog "default" { + enable_octet_counting = true + max_octets = 16000 + allow_skip_pri_header = true + non_transparent_framing_trailer = "NUL" + + retry_on_failure { + enabled = true + initial_interval = "10s" + max_interval = "1m0s" + max_elapsed_time = "10m0s" + } + + tcp { + max_log_size = "2MiB" + listen_address = "localhost:1514" + + tls { + reload_interval = "1m0s" + include_system_ca_certs_pool = true + } + add_attributes = true + one_log_per_packet = true + encoding = "utf-16be" + + multiline { } + preserve_leading_whitespaces = true + preserve_trailing_whitespaces = true + } + + udp { + listen_address = "localhost:1515" + encoding = "utf-16le" + + multiline { + line_end_pattern = "logend" + omit_pattern = true + } + + async { + readers = 2 + processors = 4 + max_queue_length = 1000 + } + } +} + +otelcol.exporter.syslog "default" { + tls { + insecure_skip_verify = true + } + endpoint = "localhost" + port = 1514 + enable_octet_counting = true +} diff --git a/internal/converter/internal/otelcolconvert/testdata/syslog.yaml b/internal/converter/internal/otelcolconvert/testdata/syslog.yaml new file mode 100644 index 0000000000..f6a44160a6 --- /dev/null +++ b/internal/converter/internal/otelcolconvert/testdata/syslog.yaml @@ -0,0 +1,57 @@ +receivers: + syslog: + location: "UTC" + protocol: "rfc5424" + enable_octet_counting: true + max_octets: 16000 + allow_skip_pri_header: true + non_transparent_framing_trailer: "NUL" + tcp: + listen_address: "localhost:1514" + max_log_size: "2MiB" + one_log_per_packet: true + add_attributes: true + encoding: "utf-16be" + preserve_leading_whitespaces: true + preserve_trailing_whitespaces: true + tls: + include_system_ca_certs_pool: true + reload_interval: "1m" + udp: + listen_address: "localhost:1515" + one_log_per_packet: false + add_attributes: false + encoding: "utf-16le" + preserve_leading_whitespaces: false + preserve_trailing_whitespaces: false + async: + readers: 2 + processors: 4 + max_queue_length: 1000 + multiline: + line_end_pattern: "logend" + omit_pattern: true + retry_on_failure: + enabled: true + initial_interval: "10s" + max_interval: "1m" + max_elapsed_time: "10m" + + +exporters: + syslog: + endpoint: localhost + port: 1514 + protocol: "rfc5424" + network: "tcp" + enable_octet_counting: true + tls: + insecure: false + insecure_skip_verify: true + +service: + pipelines: + logs: + receivers: [syslog] + processors: [] + exporters: [syslog] From 81da528dee530df29829b00f3b2f13f547894638 Mon Sep 17 00:00:00 2001 From: Piotr <17101802+thampiotr@users.noreply.github.com> Date: Mon, 6 Jan 2025 17:03:45 +0000 Subject: [PATCH 10/10] Update publish pipeline config path (#2340) --- .drone/drone.yml | 4 ++-- .drone/pipelines/publish.jsonnet | 2 +- internal/component/otelcol/receiver/syslog/syslog_test.go | 3 +++ 3 files changed, 6 insertions(+), 3 deletions(-) diff --git a/.drone/drone.yml b/.drone/drone.yml index 59ebd247a4..52a18edeb0 100644 --- a/.drone/drone.yml +++ b/.drone/drone.yml @@ -658,7 +658,7 @@ steps: "repo_name": "deployment_tools", "update_jsonnet_attribute_configs": [ { - "file_path": "ksonnet/environments/grafana-agent/waves/alloy.libsonnet", + "file_path": "ksonnet/lib/alloy/waves/alloy.libsonnet", "jsonnet_key": "dev_canary", "jsonnet_value_file": ".image-tag" } @@ -836,6 +836,6 @@ kind: secret name: updater_private_key --- kind: signature -hmac: 10e6be95fbe639c7701a96c7b0523e849b28389d09eda8b122de3c981a93c4c0 +hmac: 760087c49893d6970a81c973518d26b582c3d4673d3c28b90553361de8b6e03a ... diff --git a/.drone/pipelines/publish.jsonnet b/.drone/pipelines/publish.jsonnet index 5662826647..0217331bb3 100644 --- a/.drone/pipelines/publish.jsonnet +++ b/.drone/pipelines/publish.jsonnet @@ -208,7 +208,7 @@ linux_containers_jobs + windows_containers_jobs + [ "repo_name": "deployment_tools", "update_jsonnet_attribute_configs": [ { - "file_path": "ksonnet/environments/grafana-agent/waves/alloy.libsonnet", + "file_path": "ksonnet/lib/alloy/waves/alloy.libsonnet", "jsonnet_key": "dev_canary", "jsonnet_value_file": ".image-tag" } diff --git a/internal/component/otelcol/receiver/syslog/syslog_test.go b/internal/component/otelcol/receiver/syslog/syslog_test.go index ad17407043..d43a54e976 100644 --- a/internal/component/otelcol/receiver/syslog/syslog_test.go +++ b/internal/component/otelcol/receiver/syslog/syslog_test.go @@ -58,6 +58,9 @@ func Test(t *testing.T) { require.NoError(t, ctrl.WaitRunning(3*time.Second)) + // TODO(@dehaansa) - test if this is removeable after https://github.com/grafana/alloy/pull/2262 + time.Sleep(1 * time.Second) + // Send traces in the background to our receiver. go func() { request := func() error {