forked from influxdata/telegraf
-
Notifications
You must be signed in to change notification settings - Fork 0
/
eventhub_consumer.go
422 lines (356 loc) · 13 KB
/
eventhub_consumer.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
package eventhub
import (
"context"
"fmt"
"sync"
"time"
eventhub "github.com/Azure/azure-event-hubs-go/v3"
"github.com/Azure/azure-event-hubs-go/v3/persist"
"github.com/influxdata/telegraf"
"github.com/influxdata/telegraf/internal"
"github.com/influxdata/telegraf/plugins/inputs"
"github.com/influxdata/telegraf/plugins/parsers"
)
const (
defaultMaxUndeliveredMessages = 1000
)
type empty struct{}
type semaphore chan empty
// EventHub is the top level struct for this plugin
type EventHub struct {
// Configuration
ConnectionString string `toml:"connection_string"`
PersistenceDir string `toml:"persistence_dir"`
ConsumerGroup string `toml:"consumer_group"`
FromTimestamp time.Time `toml:"from_timestamp"`
Latest bool `toml:"latest"`
PrefetchCount uint32 `toml:"prefetch_count"`
Epoch int64 `toml:"epoch"`
UserAgent string `toml:"user_agent"`
PartitionIDs []string `toml:"partition_ids"`
MaxUndeliveredMessages int `toml:"max_undelivered_messages"`
EnqueuedTimeAsTs bool `toml:"enqueued_time_as_ts"`
IotHubEnqueuedTimeAsTs bool `toml:"iot_hub_enqueued_time_as_ts"`
// Metadata
ApplicationPropertyFields []string `toml:"application_property_fields"`
ApplicationPropertyTags []string `toml:"application_property_tags"`
SequenceNumberField string `toml:"sequence_number_field"`
EnqueuedTimeField string `toml:"enqueued_time_field"`
OffsetField string `toml:"offset_field"`
PartitionIDTag string `toml:"partition_id_tag"`
PartitionKeyTag string `toml:"partition_key_tag"`
IoTHubDeviceConnectionIDTag string `toml:"iot_hub_device_connection_id_tag"`
IoTHubAuthGenerationIDTag string `toml:"iot_hub_auth_generation_id_tag"`
IoTHubConnectionAuthMethodTag string `toml:"iot_hub_connection_auth_method_tag"`
IoTHubConnectionModuleIDTag string `toml:"iot_hub_connection_module_id_tag"`
IoTHubEnqueuedTimeField string `toml:"iot_hub_enqueued_time_field"`
Log telegraf.Logger `toml:"-"`
// Azure
hub *eventhub.Hub
cancel context.CancelFunc
wg sync.WaitGroup
parser parsers.Parser
in chan []telegraf.Metric
}
// SampleConfig is provided here
func (*EventHub) SampleConfig() string {
return `
## The default behavior is to create a new Event Hub client from environment variables.
## This requires one of the following sets of environment variables to be set:
##
## 1) Expected Environment Variables:
## - "EVENTHUB_NAMESPACE"
## - "EVENTHUB_NAME"
## - "EVENTHUB_CONNECTION_STRING"
##
## 2) Expected Environment Variables:
## - "EVENTHUB_NAMESPACE"
## - "EVENTHUB_NAME"
## - "EVENTHUB_KEY_NAME"
## - "EVENTHUB_KEY_VALUE"
## Uncommenting the option below will create an Event Hub client based solely on the connection string.
## This can either be the associated environment variable or hard coded directly.
# connection_string = ""
## Set persistence directory to a valid folder to use a file persister instead of an in-memory persister
# persistence_dir = ""
## Change the default consumer group
# consumer_group = ""
## By default the event hub receives all messages present on the broker, alternative modes can be set below.
## The timestamp should be in https://github.com/toml-lang/toml#offset-date-time format (RFC 3339).
## The 3 options below only apply if no valid persister is read from memory or file (e.g. first run).
# from_timestamp =
# latest = true
## Set a custom prefetch count for the receiver(s)
# prefetch_count = 1000
## Add an epoch to the receiver(s)
# epoch = 0
## Change to set a custom user agent, "telegraf" is used by default
# user_agent = "telegraf"
## To consume from a specific partition, set the partition_ids option.
## An empty array will result in receiving from all partitions.
# partition_ids = ["0","1"]
## Max undelivered messages
# max_undelivered_messages = 1000
## Set either option below to true to use a system property as timestamp.
## You have the choice between EnqueuedTime and IoTHubEnqueuedTime.
## It is recommended to use this setting when the data itself has no timestamp.
# enqueued_time_as_ts = true
# iot_hub_enqueued_time_as_ts = true
## Tags or fields to create from keys present in the application property bag.
## These could for example be set by message enrichments in Azure IoT Hub.
# application_property_tags = []
# application_property_fields = []
## Tag or field name to use for metadata
## By default all metadata is disabled
# sequence_number_field = "SequenceNumber"
# enqueued_time_field = "EnqueuedTime"
# offset_field = "Offset"
# partition_id_tag = "PartitionID"
# partition_key_tag = "PartitionKey"
# iot_hub_device_connection_id_tag = "IoTHubDeviceConnectionID"
# iot_hub_auth_generation_id_tag = "IoTHubAuthGenerationID"
# iot_hub_connection_auth_method_tag = "IoTHubConnectionAuthMethod"
# iot_hub_connection_module_id_tag = "IoTHubConnectionModuleID"
# iot_hub_enqueued_time_field = "IoTHubEnqueuedTime"
## Data format to consume.
## Each data format has its own unique set of configuration options, read
## more about them here:
## https://github.com/influxdata/telegraf/blob/master/docs/DATA_FORMATS_INPUT.md
data_format = "influx"
`
}
// Description of the plugin
func (*EventHub) Description() string {
return "Azure Event Hubs service input plugin"
}
// SetParser sets the parser
func (e *EventHub) SetParser(parser parsers.Parser) {
e.parser = parser
}
// Gather function is unused
func (*EventHub) Gather(telegraf.Accumulator) error {
return nil
}
// Init the EventHub ServiceInput
func (e *EventHub) Init() (err error) {
if e.MaxUndeliveredMessages == 0 {
e.MaxUndeliveredMessages = defaultMaxUndeliveredMessages
}
// Set hub options
hubOpts := []eventhub.HubOption{}
if e.PersistenceDir != "" {
persister, err := persist.NewFilePersister(e.PersistenceDir)
if err != nil {
return err
}
hubOpts = append(hubOpts, eventhub.HubWithOffsetPersistence(persister))
}
if e.UserAgent != "" {
hubOpts = append(hubOpts, eventhub.HubWithUserAgent(e.UserAgent))
} else {
hubOpts = append(hubOpts, eventhub.HubWithUserAgent(internal.ProductToken()))
}
// Create event hub connection
if e.ConnectionString != "" {
e.hub, err = eventhub.NewHubFromConnectionString(e.ConnectionString, hubOpts...)
} else {
e.hub, err = eventhub.NewHubFromEnvironment(hubOpts...)
}
return err
}
// Start the EventHub ServiceInput
func (e *EventHub) Start(acc telegraf.Accumulator) error {
e.in = make(chan []telegraf.Metric)
var ctx context.Context
ctx, e.cancel = context.WithCancel(context.Background())
// Start tracking
e.wg.Add(1)
go func() {
defer e.wg.Done()
e.startTracking(ctx, acc)
}()
// Configure receiver options
receiveOpts, err := e.configureReceiver()
if err != nil {
return err
}
partitions := e.PartitionIDs
if len(e.PartitionIDs) == 0 {
runtimeinfo, err := e.hub.GetRuntimeInformation(ctx)
if err != nil {
return err
}
partitions = runtimeinfo.PartitionIDs
}
for _, partitionID := range partitions {
_, err = e.hub.Receive(ctx, partitionID, e.onMessage, receiveOpts...)
if err != nil {
return fmt.Errorf("creating receiver for partition %q: %v", partitionID, err)
}
}
return nil
}
func (e *EventHub) configureReceiver() ([]eventhub.ReceiveOption, error) {
receiveOpts := []eventhub.ReceiveOption{}
if e.ConsumerGroup != "" {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithConsumerGroup(e.ConsumerGroup))
}
if !e.FromTimestamp.IsZero() {
receiveOpts = append(receiveOpts, eventhub.ReceiveFromTimestamp(e.FromTimestamp))
} else if e.Latest {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithLatestOffset())
}
if e.PrefetchCount != 0 {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithPrefetchCount(e.PrefetchCount))
}
if e.Epoch != 0 {
receiveOpts = append(receiveOpts, eventhub.ReceiveWithEpoch(e.Epoch))
}
return receiveOpts, nil
}
// OnMessage handles an Event. When this function returns without error the
// Event is immediately accepted and the offset is updated. If an error is
// returned the Event is marked for redelivery.
func (e *EventHub) onMessage(ctx context.Context, event *eventhub.Event) error {
metrics, err := e.createMetrics(event)
if err != nil {
return err
}
select {
case <-ctx.Done():
return ctx.Err()
case e.in <- metrics:
return nil
}
}
// OnDelivery returns true if a new slot has opened up in the TrackingAccumulator.
func (e *EventHub) onDelivery(
acc telegraf.TrackingAccumulator,
groups map[telegraf.TrackingID][]telegraf.Metric,
track telegraf.DeliveryInfo,
) bool {
if track.Delivered() {
delete(groups, track.ID())
return true
}
// The metric was already accepted when onMessage completed, so we can't
// fallback on redelivery from Event Hub. Add a new copy of the metric for
// reprocessing.
metrics, ok := groups[track.ID()]
delete(groups, track.ID())
if !ok {
// The metrics should always be found, this message indicates a programming error.
e.Log.Errorf("Could not find delivery: %d", track.ID())
return true
}
backup := deepCopyMetrics(metrics)
id := acc.AddTrackingMetricGroup(metrics)
groups[id] = backup
return false
}
func (e *EventHub) startTracking(ctx context.Context, ac telegraf.Accumulator) {
acc := ac.WithTracking(e.MaxUndeliveredMessages)
sem := make(semaphore, e.MaxUndeliveredMessages)
groups := make(map[telegraf.TrackingID][]telegraf.Metric, e.MaxUndeliveredMessages)
for {
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
if e.onDelivery(acc, groups, track) {
<-sem
}
case sem <- empty{}:
select {
case <-ctx.Done():
return
case track := <-acc.Delivered():
if e.onDelivery(acc, groups, track) {
<-sem
<-sem
}
case metrics := <-e.in:
backup := deepCopyMetrics(metrics)
id := acc.AddTrackingMetricGroup(metrics)
groups[id] = backup
}
}
}
}
func deepCopyMetrics(in []telegraf.Metric) []telegraf.Metric {
metrics := make([]telegraf.Metric, 0, len(in))
for _, m := range in {
metrics = append(metrics, m.Copy())
}
return metrics
}
// CreateMetrics returns the Metrics from the Event.
func (e *EventHub) createMetrics(event *eventhub.Event) ([]telegraf.Metric, error) {
metrics, err := e.parser.Parse(event.Data)
if err != nil {
return nil, err
}
for i := range metrics {
for _, field := range e.ApplicationPropertyFields {
if val, ok := event.Get(field); ok {
metrics[i].AddField(field, val)
}
}
for _, tag := range e.ApplicationPropertyTags {
if val, ok := event.Get(tag); ok {
metrics[i].AddTag(tag, fmt.Sprintf("%v", val))
}
}
if e.SequenceNumberField != "" {
metrics[i].AddField(e.SequenceNumberField, *event.SystemProperties.SequenceNumber)
}
if e.EnqueuedTimeAsTs {
metrics[i].SetTime(*event.SystemProperties.EnqueuedTime)
} else if e.EnqueuedTimeField != "" {
metrics[i].AddField(e.EnqueuedTimeField, (*event.SystemProperties.EnqueuedTime).UnixNano()/int64(time.Millisecond))
}
if e.OffsetField != "" {
metrics[i].AddField(e.OffsetField, *event.SystemProperties.Offset)
}
if event.SystemProperties.PartitionID != nil && e.PartitionIDTag != "" {
metrics[i].AddTag(e.PartitionIDTag, fmt.Sprintf("%d", *event.SystemProperties.PartitionID))
}
if event.SystemProperties.PartitionKey != nil && e.PartitionKeyTag != "" {
metrics[i].AddTag(e.PartitionKeyTag, *event.SystemProperties.PartitionKey)
}
if event.SystemProperties.IoTHubDeviceConnectionID != nil && e.IoTHubDeviceConnectionIDTag != "" {
metrics[i].AddTag(e.IoTHubDeviceConnectionIDTag, *event.SystemProperties.IoTHubDeviceConnectionID)
}
if event.SystemProperties.IoTHubAuthGenerationID != nil && e.IoTHubAuthGenerationIDTag != "" {
metrics[i].AddTag(e.IoTHubAuthGenerationIDTag, *event.SystemProperties.IoTHubAuthGenerationID)
}
if event.SystemProperties.IoTHubConnectionAuthMethod != nil && e.IoTHubConnectionAuthMethodTag != "" {
metrics[i].AddTag(e.IoTHubConnectionAuthMethodTag, *event.SystemProperties.IoTHubConnectionAuthMethod)
}
if event.SystemProperties.IoTHubConnectionModuleID != nil && e.IoTHubConnectionModuleIDTag != "" {
metrics[i].AddTag(e.IoTHubConnectionModuleIDTag, *event.SystemProperties.IoTHubConnectionModuleID)
}
if event.SystemProperties.IoTHubEnqueuedTime != nil {
if e.IotHubEnqueuedTimeAsTs {
metrics[i].SetTime(*event.SystemProperties.IoTHubEnqueuedTime)
} else if e.IoTHubEnqueuedTimeField != "" {
metrics[i].AddField(e.IoTHubEnqueuedTimeField, (*event.SystemProperties.IoTHubEnqueuedTime).UnixNano()/int64(time.Millisecond))
}
}
}
return metrics, nil
}
// Stop the EventHub ServiceInput
func (e *EventHub) Stop() {
err := e.hub.Close(context.Background())
if err != nil {
e.Log.Errorf("Error closing Event Hub connection: %v", err)
}
e.cancel()
e.wg.Wait()
}
func init() {
inputs.Add("eventhub_consumer", func() telegraf.Input {
return &EventHub{}
})
}