From 9c5243be9b95532f6c953c7a658eb96bfaa31067 Mon Sep 17 00:00:00 2001 From: Jakub Surdej Date: Tue, 16 May 2023 15:46:18 +0200 Subject: [PATCH] Add support for pipeline flow metrics --- README.md | 14 ++++++ collectors/nodestats/pipeline_subcollector.go | 50 ++++++++++++++++--- fetcher/responses/nodestats_response.go | 35 +++---------- fixtures/node_stats.json | 2 +- scripts/snapshots/metric_names.txt | 10 ++++ 5 files changed, 74 insertions(+), 37 deletions(-) diff --git a/README.md b/README.md index f8fb8004..691e9a4c 100644 --- a/README.md +++ b/README.md @@ -300,6 +300,20 @@ Table of exported metrics: | logstash_stats_pipeline_events_in | counter | Number of events that have been inputted into this pipeline. | | logstash_stats_pipeline_events_out | counter | Number of events that have been processed by this pipeline. | | logstash_stats_pipeline_events_queue_push_duration | counter | Time needed to push event to queue. | +| logstash_stats_pipeline_flow_filter_current | gauge | Current number of events in the filter queue. | +| logstash_stats_pipeline_flow_filter_lifetime | counter | Lifetime number of events in the filter queue. | +| logstash_stats_pipeline_flow_input_current | gauge | Current number of events in the input queue. | +| logstash_stats_pipeline_flow_input_lifetime | counter | Lifetime number of events in the input queue. | +| logstash_stats_pipeline_flow_output_current | gauge | Current number of events in the output queue. | +| logstash_stats_pipeline_flow_output_lifetime | counter | Lifetime number of events in the output queue. | +| logstash_stats_pipeline_flow_queue_backpressure_current | gauge | Current number of events in the backpressure queue. | +| logstash_stats_pipeline_flow_queue_backpressure_lifetime | counter | Lifetime number of events in the backpressure queue. | +| logstash_stats_pipeline_flow_worker_concurrency_current | gauge | Current number of workers. | +| logstash_stats_pipeline_flow_worker_concurrency_lifetime | counter | Lifetime number of workers. | +| logstash_stats_pipeline_plugin_bulk_requests_errors | counter | Number of bulk request errors. | +| logstash_stats_pipeline_plugin_bulk_requests_responses | counter | Bulk request HTTP response counts by code. | +| logstash_stats_pipeline_plugin_documents_non_retryable_failures | counter | Number of output events with non-retryable failures. | +| logstash_stats_pipeline_plugin_documents_successes | counter | Number of successful bulk requests. | | logstash_stats_pipeline_plugin_events_duration | counter | Time spent processing events in this plugin. | | logstash_stats_pipeline_plugin_events_in | counter | Number of events received this pipeline. | | logstash_stats_pipeline_plugin_events_out | counter | Number of events output by this pipeline. | diff --git a/collectors/nodestats/pipeline_subcollector.go b/collectors/nodestats/pipeline_subcollector.go index 764accfd..8fe780f2 100644 --- a/collectors/nodestats/pipeline_subcollector.go +++ b/collectors/nodestats/pipeline_subcollector.go @@ -42,10 +42,21 @@ type PipelineSubcollector struct { PipelinePluginEventsDuration *prometheus.Desc PipelinePluginEventsQueuePushDuration *prometheus.Desc - PipelinePluginDocumentsSuccesses *prometheus.Desc + PipelinePluginDocumentsSuccesses *prometheus.Desc PipelinePluginDocumentsNonRetryableFailures *prometheus.Desc - PipelinePluginBulkRequestErrors *prometheus.Desc - PipelinePluginBulkRequestResponses *prometheus.Desc + PipelinePluginBulkRequestErrors *prometheus.Desc + PipelinePluginBulkRequestResponses *prometheus.Desc + + FlowInputCurrent *prometheus.Desc + FlowInputLifetime *prometheus.Desc + FlowFilterCurrent *prometheus.Desc + FlowFilterLifetime *prometheus.Desc + FlowOutputCurrent *prometheus.Desc + FlowOutputLifetime *prometheus.Desc + FlowQueueBackpressureCurrent *prometheus.Desc + FlowQueueBackpressureLifetime *prometheus.Desc + FlowWorkerConcurrencyCurrent *prometheus.Desc + FlowWorkerConcurrencyLifetime *prometheus.Desc } func NewPipelineSubcollector() *PipelineSubcollector { @@ -73,10 +84,21 @@ func NewPipelineSubcollector() *PipelineSubcollector { PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"), PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"), - PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"), + PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"), PipelinePluginDocumentsNonRetryableFailures: descHelper.NewDescWithHelpAndLabels("plugin_documents_non_retryable_failures", "Number of output events with non-retryable failures.", "pipeline", "plugin_type", "plugin", "plugin_id"), - PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"), - PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"), + PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"), + PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"), + + FlowInputCurrent: descHelper.NewDescWithHelpAndLabels("flow_input_current", "Current number of events in the input queue.", "pipeline"), + FlowInputLifetime: descHelper.NewDescWithHelpAndLabels("flow_input_lifetime", "Lifetime number of events in the input queue.", "pipeline"), + FlowFilterCurrent: descHelper.NewDescWithHelpAndLabels("flow_filter_current", "Current number of events in the filter queue.", "pipeline"), + FlowFilterLifetime: descHelper.NewDescWithHelpAndLabels("flow_filter_lifetime", "Lifetime number of events in the filter queue.", "pipeline"), + FlowOutputCurrent: descHelper.NewDescWithHelpAndLabels("flow_output_current", "Current number of events in the output queue.", "pipeline"), + FlowOutputLifetime: descHelper.NewDescWithHelpAndLabels("flow_output_lifetime", "Lifetime number of events in the output queue.", "pipeline"), + FlowQueueBackpressureCurrent: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_current", "Current number of events in the backpressure queue.", "pipeline"), + FlowQueueBackpressureLifetime: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"), + FlowWorkerConcurrencyCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_current", "Current number of workers.", "pipeline"), + FlowWorkerConcurrencyLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"), } } @@ -106,17 +128,29 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli ch <- prometheus.MustNewConstMetric(collector.QueueEventsQueueSize, prometheus.CounterValue, float64(pipeStats.Queue.QueueSizeInBytes), pipelineID) ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID) + flowStats := pipeStats.Flow + ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, float64(flowStats.InputThroughput.Current), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.CounterValue, float64(flowStats.InputThroughput.Lifetime), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, float64(flowStats.FilterThroughput.Current), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.CounterValue, float64(flowStats.FilterThroughput.Lifetime), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, float64(flowStats.OutputThroughput.Current), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.CounterValue, float64(flowStats.OutputThroughput.Lifetime), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, float64(flowStats.QueueBackpressure.Current), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.CounterValue, float64(flowStats.QueueBackpressure.Lifetime), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, float64(flowStats.WorkerConcurrency.Current), pipelineID) + ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, float64(flowStats.WorkerConcurrency.Lifetime), pipelineID) + // Output error metrics for _, output := range pipeStats.Plugins.Outputs { pluginID := TruncatePluginId(output.ID) pluginType := "output" log.Printf("collecting output error stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, output.Name, pluginID) - + // Response codes returned by output Bulk Requests for code, count := range output.BulkRequests.Responses { ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestResponses, prometheus.CounterValue, float64(count), pipelineID, pluginType, output.Name, pluginID, code) } - + ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsSuccesses, prometheus.CounterValue, float64(output.Documents.Successes), pipelineID, pluginType, output.Name, pluginID) ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsNonRetryableFailures, prometheus.CounterValue, float64(output.Documents.NonRetryableFailures), pipelineID, pluginType, output.Name, pluginID) ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestErrors, prometheus.CounterValue, float64(output.BulkRequests.WithErrors), pipelineID, pluginType, output.Name, pluginID) diff --git a/fetcher/responses/nodestats_response.go b/fetcher/responses/nodestats_response.go index c7839aaa..5950884f 100644 --- a/fetcher/responses/nodestats_response.go +++ b/fetcher/responses/nodestats_response.go @@ -158,17 +158,17 @@ type SinglePipelineResponse struct { DurationInMillis int `json:"duration_in_millis"` } `json:"events"` Documents struct { - Successes int `json:"successes"` + Successes int `json:"successes"` NonRetryableFailures int `json:"non_retryable_failures"` } `json:"documents"` BulkRequests struct { - WithErrors int `json:"with_errors"` - Responses map[string]int `json:"responses"` + WithErrors int `json:"with_errors"` + Responses map[string]int `json:"responses"` } `json:"bulk_requests"` } `json:"outputs"` } `json:"plugins"` Reloads PipelineReloadResponse `json:"reloads"` - Queue struct { + Queue struct { Type string `json:"type"` EventsCount int `json:"events_count"` QueueSizeInBytes int `json:"queue_size_in_bytes"` @@ -186,28 +186,7 @@ type PipelineLogstashMonitoringResponse struct { DurationInMillis int `json:"duration_in_millis"` QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"` } `json:"events"` - Flow struct { - OutputThroughput struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` - } `json:"output_throughput"` - WorkerConcurrency struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` - } `json:"worker_concurrency"` - InputThroughput struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` - } `json:"input_throughput"` - FilterThroughput struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` - } `json:"filter_throughput"` - QueueBackpressure struct { - Current float64 `json:"current"` - Lifetime float64 `json:"lifetime"` - } `json:"queue_backpressure"` - } `json:"flow"` + Flow FlowResponse `json:"flow"` Plugins struct { Inputs []interface{} `json:"inputs"` Codecs []interface{} `json:"codecs"` @@ -215,7 +194,7 @@ type PipelineLogstashMonitoringResponse struct { Outputs []interface{} `json:"outputs"` } `json:"plugins"` Reloads PipelineReloadResponse `json:"reloads"` - Queue interface{} `json:"queue,omitempty"` + Queue interface{} `json:"queue,omitempty"` } type PipelineReloadResponse struct { @@ -227,7 +206,7 @@ type PipelineReloadResponse struct { } type LastError struct { - Message string `json:"message"` + Message string `json:"message"` Backtrace []string `json:"backtrace"` } diff --git a/fixtures/node_stats.json b/fixtures/node_stats.json index 7e5a3619..76ba83e6 100644 --- a/fixtures/node_stats.json +++ b/fixtures/node_stats.json @@ -230,7 +230,7 @@ ] }, "reloads": { - "last_failure_timestamp": "2023-04-20T20:00:32.437218256Z", + "last_failure_timestamp": "2023-04-20T20:00:32.437218256Z", "successes": 3, "failures": 1, "last_success_timestamp": "2023-04-20T22:30:32.437218256Z", diff --git a/scripts/snapshots/metric_names.txt b/scripts/snapshots/metric_names.txt index a49307ef..80a5262e 100644 --- a/scripts/snapshots/metric_names.txt +++ b/scripts/snapshots/metric_names.txt @@ -51,3 +51,13 @@ logstash_stats_pipeline_plugin_bulk_requests_errors logstash_stats_pipeline_plugin_bulk_requests_responses logstash_stats_pipeline_plugin_documents_non_retryable_failures logstash_stats_pipeline_plugin_documents_successes +logstash_stats_pipeline_flow_filter_current +logstash_stats_pipeline_flow_filter_lifetime +logstash_stats_pipeline_flow_input_current +logstash_stats_pipeline_flow_input_lifetime +logstash_stats_pipeline_flow_output_current +logstash_stats_pipeline_flow_output_lifetime +logstash_stats_pipeline_flow_queue_backpressure_current +logstash_stats_pipeline_flow_queue_backpressure_lifetime +logstash_stats_pipeline_flow_worker_concurrency_current +logstash_stats_pipeline_flow_worker_concurrency_lifetime