Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for pipeline flow metrics #119

Merged
merged 1 commit into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,20 @@ Table of exported metrics:
| logstash_stats_pipeline_events_in | counter | Number of events that have been inputted into this pipeline. |
| logstash_stats_pipeline_events_out | counter | Number of events that have been processed by this pipeline. |
| logstash_stats_pipeline_events_queue_push_duration | counter | Time needed to push event to queue. |
| logstash_stats_pipeline_flow_filter_current | gauge | Current number of events in the filter queue. |
| logstash_stats_pipeline_flow_filter_lifetime | counter | Lifetime number of events in the filter queue. |
| logstash_stats_pipeline_flow_input_current | gauge | Current number of events in the input queue. |
| logstash_stats_pipeline_flow_input_lifetime | counter | Lifetime number of events in the input queue. |
| logstash_stats_pipeline_flow_output_current | gauge | Current number of events in the output queue. |
| logstash_stats_pipeline_flow_output_lifetime | counter | Lifetime number of events in the output queue. |
| logstash_stats_pipeline_flow_queue_backpressure_current | gauge | Current number of events in the backpressure queue. |
| logstash_stats_pipeline_flow_queue_backpressure_lifetime | counter | Lifetime number of events in the backpressure queue. |
| logstash_stats_pipeline_flow_worker_concurrency_current | gauge | Current number of workers. |
| logstash_stats_pipeline_flow_worker_concurrency_lifetime | counter | Lifetime number of workers. |
| logstash_stats_pipeline_plugin_bulk_requests_errors | counter | Number of bulk request errors. |
| logstash_stats_pipeline_plugin_bulk_requests_responses | counter | Bulk request HTTP response counts by code. |
| logstash_stats_pipeline_plugin_documents_non_retryable_failures | counter | Number of output events with non-retryable failures. |
| logstash_stats_pipeline_plugin_documents_successes | counter | Number of successful bulk requests. |
| logstash_stats_pipeline_plugin_events_duration | counter | Time spent processing events in this plugin. |
| logstash_stats_pipeline_plugin_events_in | counter | Number of events received this pipeline. |
| logstash_stats_pipeline_plugin_events_out | counter | Number of events output by this pipeline. |
Expand Down
50 changes: 42 additions & 8 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,21 @@ type PipelineSubcollector struct {
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc

PipelinePluginDocumentsSuccesses *prometheus.Desc
PipelinePluginDocumentsSuccesses *prometheus.Desc
PipelinePluginDocumentsNonRetryableFailures *prometheus.Desc
PipelinePluginBulkRequestErrors *prometheus.Desc
PipelinePluginBulkRequestResponses *prometheus.Desc
PipelinePluginBulkRequestErrors *prometheus.Desc
PipelinePluginBulkRequestResponses *prometheus.Desc

FlowInputCurrent *prometheus.Desc
FlowInputLifetime *prometheus.Desc
FlowFilterCurrent *prometheus.Desc
FlowFilterLifetime *prometheus.Desc
FlowOutputCurrent *prometheus.Desc
FlowOutputLifetime *prometheus.Desc
FlowQueueBackpressureCurrent *prometheus.Desc
FlowQueueBackpressureLifetime *prometheus.Desc
FlowWorkerConcurrencyCurrent *prometheus.Desc
FlowWorkerConcurrencyLifetime *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
Expand Down Expand Up @@ -73,10 +84,21 @@ func NewPipelineSubcollector() *PipelineSubcollector {
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),

PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginDocumentsNonRetryableFailures: descHelper.NewDescWithHelpAndLabels("plugin_documents_non_retryable_failures", "Number of output events with non-retryable failures.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"),
PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"),

FlowInputCurrent: descHelper.NewDescWithHelpAndLabels("flow_input_current", "Current number of events in the input queue.", "pipeline"),
FlowInputLifetime: descHelper.NewDescWithHelpAndLabels("flow_input_lifetime", "Lifetime number of events in the input queue.", "pipeline"),
FlowFilterCurrent: descHelper.NewDescWithHelpAndLabels("flow_filter_current", "Current number of events in the filter queue.", "pipeline"),
FlowFilterLifetime: descHelper.NewDescWithHelpAndLabels("flow_filter_lifetime", "Lifetime number of events in the filter queue.", "pipeline"),
FlowOutputCurrent: descHelper.NewDescWithHelpAndLabels("flow_output_current", "Current number of events in the output queue.", "pipeline"),
FlowOutputLifetime: descHelper.NewDescWithHelpAndLabels("flow_output_lifetime", "Lifetime number of events in the output queue.", "pipeline"),
FlowQueueBackpressureCurrent: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_current", "Current number of events in the backpressure queue.", "pipeline"),
FlowQueueBackpressureLifetime: descHelper.NewDescWithHelpAndLabels("flow_queue_backpressure_lifetime", "Lifetime number of events in the backpressure queue.", "pipeline"),
FlowWorkerConcurrencyCurrent: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_current", "Current number of workers.", "pipeline"),
FlowWorkerConcurrencyLifetime: descHelper.NewDescWithHelpAndLabels("flow_worker_concurrency_lifetime", "Lifetime number of workers.", "pipeline"),
}
}

Expand Down Expand Up @@ -106,17 +128,29 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.QueueEventsQueueSize, prometheus.CounterValue, float64(pipeStats.Queue.QueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID)

flowStats := pipeStats.Flow
ch <- prometheus.MustNewConstMetric(collector.FlowInputCurrent, prometheus.GaugeValue, float64(flowStats.InputThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowInputLifetime, prometheus.CounterValue, float64(flowStats.InputThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterCurrent, prometheus.GaugeValue, float64(flowStats.FilterThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowFilterLifetime, prometheus.CounterValue, float64(flowStats.FilterThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputCurrent, prometheus.GaugeValue, float64(flowStats.OutputThroughput.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowOutputLifetime, prometheus.CounterValue, float64(flowStats.OutputThroughput.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureCurrent, prometheus.GaugeValue, float64(flowStats.QueueBackpressure.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowQueueBackpressureLifetime, prometheus.CounterValue, float64(flowStats.QueueBackpressure.Lifetime), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyCurrent, prometheus.GaugeValue, float64(flowStats.WorkerConcurrency.Current), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.FlowWorkerConcurrencyLifetime, prometheus.CounterValue, float64(flowStats.WorkerConcurrency.Lifetime), pipelineID)

// Output error metrics
for _, output := range pipeStats.Plugins.Outputs {
pluginID := TruncatePluginId(output.ID)
pluginType := "output"
log.Printf("collecting output error stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, output.Name, pluginID)

// Response codes returned by output Bulk Requests
for code, count := range output.BulkRequests.Responses {
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestResponses, prometheus.CounterValue, float64(count), pipelineID, pluginType, output.Name, pluginID, code)
}

ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsSuccesses, prometheus.CounterValue, float64(output.Documents.Successes), pipelineID, pluginType, output.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsNonRetryableFailures, prometheus.CounterValue, float64(output.Documents.NonRetryableFailures), pipelineID, pluginType, output.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestErrors, prometheus.CounterValue, float64(output.BulkRequests.WithErrors), pipelineID, pluginType, output.Name, pluginID)
Expand Down
35 changes: 7 additions & 28 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,17 +158,17 @@ type SinglePipelineResponse struct {
DurationInMillis int `json:"duration_in_millis"`
} `json:"events"`
Documents struct {
Successes int `json:"successes"`
Successes int `json:"successes"`
NonRetryableFailures int `json:"non_retryable_failures"`
} `json:"documents"`
BulkRequests struct {
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
} `json:"bulk_requests"`
} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Queue struct {
Queue struct {
Type string `json:"type"`
EventsCount int `json:"events_count"`
QueueSizeInBytes int `json:"queue_size_in_bytes"`
Expand All @@ -186,36 +186,15 @@ type PipelineLogstashMonitoringResponse struct {
DurationInMillis int `json:"duration_in_millis"`
QueuePushDurationInMillis int `json:"queue_push_duration_in_millis"`
} `json:"events"`
Flow struct {
OutputThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"output_throughput"`
WorkerConcurrency struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"worker_concurrency"`
InputThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"input_throughput"`
FilterThroughput struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"filter_throughput"`
QueueBackpressure struct {
Current float64 `json:"current"`
Lifetime float64 `json:"lifetime"`
} `json:"queue_backpressure"`
} `json:"flow"`
Flow FlowResponse `json:"flow"`
Plugins struct {
Inputs []interface{} `json:"inputs"`
Codecs []interface{} `json:"codecs"`
Filters []interface{} `json:"filters"`
Outputs []interface{} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Queue interface{} `json:"queue,omitempty"`
Queue interface{} `json:"queue,omitempty"`
}

type PipelineReloadResponse struct {
Expand All @@ -227,7 +206,7 @@ type PipelineReloadResponse struct {
}

type LastError struct {
Message string `json:"message"`
Message string `json:"message"`
Backtrace []string `json:"backtrace"`
}

Expand Down
2 changes: 1 addition & 1 deletion fixtures/node_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -230,7 +230,7 @@
]
},
"reloads": {
"last_failure_timestamp": "2023-04-20T20:00:32.437218256Z",
"last_failure_timestamp": "2023-04-20T20:00:32.437218256Z",
"successes": 3,
"failures": 1,
"last_success_timestamp": "2023-04-20T22:30:32.437218256Z",
Expand Down
10 changes: 10 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,13 @@ logstash_stats_pipeline_plugin_bulk_requests_errors
logstash_stats_pipeline_plugin_bulk_requests_responses
logstash_stats_pipeline_plugin_documents_non_retryable_failures
logstash_stats_pipeline_plugin_documents_successes
logstash_stats_pipeline_flow_filter_current
logstash_stats_pipeline_flow_filter_lifetime
logstash_stats_pipeline_flow_input_current
logstash_stats_pipeline_flow_input_lifetime
logstash_stats_pipeline_flow_output_current
logstash_stats_pipeline_flow_output_lifetime
logstash_stats_pipeline_flow_queue_backpressure_current
logstash_stats_pipeline_flow_queue_backpressure_lifetime
logstash_stats_pipeline_flow_worker_concurrency_current
logstash_stats_pipeline_flow_worker_concurrency_lifetime