Skip to content

Commit

Permalink
Add error count metrics (#110)
Browse files Browse the repository at this point in the history
* #109 Adds output/bulk-request success/error metrics

* #109 add metric tests

* Simplifies output success/fail metrics by removing omitempty

* Updates snapshots for testing

* Update metric names

---------

Co-authored-by: Jakub Surdej <[email protected]>
  • Loading branch information
excalq and kuskoman authored May 6, 2023
1 parent 607efa9 commit 3f03c69
Show file tree
Hide file tree
Showing 6 changed files with 62 additions and 5 deletions.
4 changes: 4 additions & 0 deletions collectors/nodestats/nodestats_collector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ func TestCollectNotNil(t *testing.T) {
"logstash_stats_pipeline_plugin_events_out",
"logstash_stats_pipeline_plugin_events_duration",
"logstash_stats_pipeline_plugin_events_queue_push_duration",
"logstash_stats_pipeline_plugin_documents_successes",
"logstash_stats_pipeline_plugin_documents_non_retryable_failures",
"logstash_stats_pipeline_plugin_bulk_requests_errors",
"logstash_stats_pipeline_plugin_bulk_requests_responses",
"logstash_stats_process_cpu_percent",
"logstash_stats_process_cpu_total_millis",
"logstash_stats_process_cpu_load_average_1m",
Expand Down
26 changes: 26 additions & 0 deletions collectors/nodestats/pipeline_subcollector.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ type PipelineSubcollector struct {
PipelinePluginEventsOut *prometheus.Desc
PipelinePluginEventsDuration *prometheus.Desc
PipelinePluginEventsQueuePushDuration *prometheus.Desc

PipelinePluginDocumentsSuccesses *prometheus.Desc
PipelinePluginDocumentsNonRetryableFailures *prometheus.Desc
PipelinePluginBulkRequestErrors *prometheus.Desc
PipelinePluginBulkRequestResponses *prometheus.Desc
}

func NewPipelineSubcollector() *PipelineSubcollector {
Expand All @@ -67,6 +72,11 @@ func NewPipelineSubcollector() *PipelineSubcollector {
PipelinePluginEventsOut: descHelper.NewDescWithHelpAndLabels("plugin_events_out", "Number of events output by this pipeline.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_duration", "Time spent processing events in this plugin.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginEventsQueuePushDuration: descHelper.NewDescWithHelpAndLabels("plugin_events_queue_push_duration", "Time spent pushing events into the input queue.", "pipeline", "plugin_type", "plugin", "plugin_id"),

PipelinePluginDocumentsSuccesses: descHelper.NewDescWithHelpAndLabels("plugin_documents_successes", "Number of successful bulk requests.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginDocumentsNonRetryableFailures: descHelper.NewDescWithHelpAndLabels("plugin_documents_non_retryable_failures", "Number of output events with non-retryable failures.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestErrors: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_errors", "Number of bulk request errors.", "pipeline", "plugin_type", "plugin", "plugin_id"),
PipelinePluginBulkRequestResponses: descHelper.NewDescWithHelpAndLabels("plugin_bulk_requests_responses", "Bulk request HTTP response counts by code.", "pipeline", "plugin_type", "plugin", "plugin_id", "code"),
}
}

Expand Down Expand Up @@ -96,6 +106,22 @@ func (collector *PipelineSubcollector) Collect(pipeStats *responses.SinglePipeli
ch <- prometheus.MustNewConstMetric(collector.QueueEventsQueueSize, prometheus.CounterValue, float64(pipeStats.Queue.QueueSizeInBytes), pipelineID)
ch <- prometheus.MustNewConstMetric(collector.QueueMaxQueueSizeInBytes, prometheus.CounterValue, float64(pipeStats.Queue.MaxQueueSizeInBytes), pipelineID)

// Output error metrics
for _, output := range pipeStats.Plugins.Outputs {
pluginID := TruncatePluginId(output.ID)
pluginType := "output"
log.Printf("collecting output error stats for pipeline %s :: plugin type:%s name:%s id:%s", pipelineID, pluginType, output.Name, pluginID)

// Response codes returned by output Bulk Requests
for code, count := range output.BulkRequests.Responses {
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestResponses, prometheus.CounterValue, float64(count), pipelineID, pluginType, output.Name, pluginID, code)
}

ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsSuccesses, prometheus.CounterValue, float64(output.Documents.Successes), pipelineID, pluginType, output.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginDocumentsNonRetryableFailures, prometheus.CounterValue, float64(output.Documents.NonRetryableFailures), pipelineID, pluginType, output.Name, pluginID)
ch <- prometheus.MustNewConstMetric(collector.PipelinePluginBulkRequestErrors, prometheus.CounterValue, float64(output.BulkRequests.WithErrors), pipelineID, pluginType, output.Name, pluginID)
}

// Pipeline plugins metrics
for _, plugin := range pipeStats.Plugins.Inputs {
pluginID := TruncatePluginId(plugin.ID)
Expand Down
15 changes: 10 additions & 5 deletions fetcher/responses/__snapshots__/nodestats_response_test.snap
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ responses.NodeStatsResponse{
Monitoring: responses.PipelineLogstashMonitoringResponse{},
Events: struct { Out int "json:\"out\""; Filtered int "json:\"filtered\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" }{},
Flow: responses.FlowResponse{},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"outputs\"" }{
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Inputs: {
},
Codecs: {
Expand All @@ -88,7 +88,7 @@ responses.NodeStatsResponse{
QueueBackpressure: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
WorkerConcurrency: struct { Current float64 "json:\"current\""; Lifetime float64 "json:\"lifetime\"" }{},
},
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"outputs\"" }{
Plugins: struct { Inputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; QueuePushDurationInMillis int "json:\"queue_push_duration_in_millis\"" } "json:\"events\"" } "json:\"inputs\""; Codecs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Decode struct { Out int "json:\"out\""; WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"decode\""; Encode struct { WritesIn int "json:\"writes_in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"encode\"" } "json:\"codecs\""; Filters []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\"" } "json:\"filters\""; Outputs []struct { ID string "json:\"id\""; Name string "json:\"name\""; Events struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" } "json:\"events\""; Documents struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" } "json:\"documents\""; BulkRequests struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" } "json:\"bulk_requests\"" } "json:\"outputs\"" }{
Inputs: {
{
ID: "c75c0c6f97fd2c8605b95a5b2694fdae97189fe49553787a923faeaa3342c54a",
Expand Down Expand Up @@ -129,9 +129,14 @@ responses.NodeStatsResponse{
},
Outputs: {
{
ID: "45554a51a53a57f5dbba7d26b65aad526147453a895529f3d4698c8fd88692ef",
Name: "elasticsearch",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:0, In:2000, DurationInMillis:0},
ID: "45554a51a53a57f5dbba7d26b65aad526147453a895529f3d4698c8fd88692ef",
Name: "elasticsearch",
Events: struct { Out int "json:\"out\""; In int "json:\"in\""; DurationInMillis int "json:\"duration_in_millis\"" }{Out:0, In:2000, DurationInMillis:0},
Documents: struct { Successes int "json:\"successes\""; NonRetryableFailures int "json:\"non_retryable_failures\"" }{Successes:1337, NonRetryableFailures:87},
BulkRequests: struct { WithErrors int "json:\"with_errors\""; Responses map[string]int "json:\"responses\"" }{
WithErrors: 87,
Responses: {"200":87},
},
},
},
},
Expand Down
8 changes: 8 additions & 0 deletions fetcher/responses/nodestats_response.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,14 @@ type SinglePipelineResponse struct {
In int `json:"in"`
DurationInMillis int `json:"duration_in_millis"`
} `json:"events"`
Documents struct {
Successes int `json:"successes"`
NonRetryableFailures int `json:"non_retryable_failures"`
} `json:"documents"`
BulkRequests struct {
WithErrors int `json:"with_errors"`
Responses map[string]int `json:"responses"`
} `json:"bulk_requests"`
} `json:"outputs"`
} `json:"plugins"`
Reloads PipelineReloadResponse `json:"reloads"`
Expand Down
10 changes: 10 additions & 0 deletions fixtures/node_stats.json
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,16 @@
"out": 0,
"in": 2000,
"duration_in_millis": 0
},
"documents": {
"successes": 1337,
"non_retryable_failures": 87
},
"bulk_requests": {
"with_errors": 87,
"responses": {
"200": 87
}
}
}
]
Expand Down
4 changes: 4 additions & 0 deletions scripts/snapshots/metric_names.txt
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,7 @@ logstash_stats_pipeline_plugin_events_duration
logstash_stats_pipeline_plugin_events_in
logstash_stats_pipeline_plugin_events_out
logstash_stats_pipeline_plugin_events_queue_push_duration
logstash_stats_pipeline_plugin_bulk_requests_errors
logstash_stats_pipeline_plugin_bulk_requests_responses
logstash_stats_pipeline_plugin_documents_non_retryable_failures
logstash_stats_pipeline_plugin_documents_successes

0 comments on commit 3f03c69

Please sign in to comment.