Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kprom: Multiple clients #820

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions pkg/kgo/broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -477,6 +477,7 @@ func (cxn *brokerCxn) hookWriteE2E(key int16, bytesWritten int, writeWait, timeT
WriteWait: writeWait,
TimeToWrite: timeToWrite,
WriteErr: writeErr,
ClientID: cxn.cl.clientIDString(),
})
}
})
Expand Down Expand Up @@ -1232,6 +1233,7 @@ func (cxn *brokerCxn) readResponse(
ReadWait: readWait,
TimeToRead: timeToRead,
ReadErr: readErr,
ClientID: cxn.cl.clientIDString(),
})
}
})
Expand Down
17 changes: 17 additions & 0 deletions pkg/kgo/hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,9 @@ type BrokerE2E struct {
WriteErr error
// ReadErr is any error encountered during reading.
ReadErr error

// ClientID is pointer to ID of the client that made the request
ClientID string
}

// DurationE2E returns the e2e time from the start of when a request is written
Expand Down Expand Up @@ -239,6 +242,9 @@ type ProduceBatchMetrics struct {
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8

// ClientID is pointer to ID of the client that made the request
ClientID string
}

// HookProduceBatchWritten is called whenever a batch is known to be
Expand Down Expand Up @@ -288,6 +294,9 @@ type FetchBatchMetrics struct {
// 0 is no compression, 1 is gzip, 2 is snappy, 3 is lz4, and 4 is
// zstd.
CompressionType uint8

// ClientID is pointer to ID of the client that made the request
ClientID string
}

// HookFetchBatchRead is called whenever a batch if read within the client.
Expand Down Expand Up @@ -418,3 +427,11 @@ func implementsAnyHook(h Hook) bool {
}
return false
}

func (cl *Client) clientIDString() string {
resolved := "kgo"
if cl.cfg.id != nil {
resolved = *cl.cfg.id
}
return resolved
}
1 change: 1 addition & 0 deletions pkg/kgo/sink.go
Original file line number Diff line number Diff line change
Expand Up @@ -1738,6 +1738,7 @@ func (p produceMetrics) hook(cfg *cfg, br *broker) {
for _, h := range hooks {
for topic, partitions := range p {
for partition, metrics := range partitions {
metrics.ClientID = br.cl.clientIDString()
h.OnProduceBatchWritten(br.meta, topic, partition, metrics)
}
}
Expand Down
1 change: 1 addition & 0 deletions pkg/kgo/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -1362,6 +1362,7 @@ func (o *cursorOffsetNext) processRespPartition(br *broker, rp *kmsg.FetchRespon
in = in[length:]

var m FetchBatchMetrics
m.ClientID = br.cl.clientIDString()

switch t := r.(type) {
case *kmsg.MessageV0:
Expand Down
10 changes: 8 additions & 2 deletions plugin/kprom/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,9 +96,12 @@ func HandlerOpts(opts promhttp.HandlerOpts) Opt {
return opt{func(c *cfg) { c.handlerOpts = opts }}
}

// WithClientLabel adds a "cliend_id" label to all metrics.
// WithClientLabel adds a "client_id" label to all metrics.
func WithClientLabel() Opt {
return opt{func(c *cfg) { c.withClientLabel = true }}
return opt{func(c *cfg) {
c.withClientLabel = true
c.fetchProduceOpts.labels = append(c.fetchProduceOpts.labels, "client_id")
}}
}

// Subsystem sets the subsystem for the kprom metrics, overriding the default
Expand Down Expand Up @@ -182,6 +185,7 @@ type Detail uint8
const (
ByNode Detail = iota // Include label "node_id" for fetch and produce metrics.
ByTopic // Include label "topic" for fetch and produce metrics.
ByClient // Include label "client_id" for fetch and produce metrics
Batches // Report number of fetched and produced batches.
Records // Report the number of fetched and produced records.
CompressedBytes // Report the number of fetched and produced compressed bytes.
Expand Down Expand Up @@ -211,6 +215,8 @@ func FetchAndProduceDetail(details ...Detail) Opt {
labelsDeduped[ByTopic] = "topic"
case ByNode:
labelsDeduped[ByNode] = "node_id"
case ByClient:
labelsDeduped[ByClient] = "client_id"
case Batches:
c.fetchProduceOpts.batches = true
case Records:
Expand Down
Loading