Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: AVRO with Schema Registry #136

Open
wants to merge 7 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
8 changes: 7 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,13 @@ Prometheus-kafka-adapter listens for metrics coming from Prometheus and sends th
- `KAFKA_TOPIC`: defines kafka topic to be used, defaults to `metrics`. Could use go template, labels are passed (as a map) to the template: e.g: `metrics.{{ index . "__name__" }}` to use per-metric topic. Two template functions are available: replace (`{{ index . "__name__" | replace "message" "msg" }}`) and substring (`{{ index . "__name__" | substring 0 5 }}`)
- `KAFKA_COMPRESSION`: defines the compression type to be used, defaults to `none`.
- `KAFKA_BATCH_NUM_MESSAGES`: defines the number of messages to batch write, defaults to `10000`.
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, defaults to `json`.
- `KAFKA_BATCH_SIZE`: Maximum size (in bytes) of all messages batched in one MessageSet, including protocol framing overhead, defaults to `1000000`.
- `KAFKA_LINGER_MS`: Delay in milliseconds to wait for messages in the producer queue to accumulate before constructing message batches, defaults to `5`.
- `SERIALIZATION_FORMAT`: defines the serialization format, can be `json`, `avro-json`, `avro-schema-registry`, defaults to `json`.
- `SCHEMA_REGISTRY_URL`: defines the schema registry url to be used, only used if `SERIALIZATION_FORMAT=avro-schema-registry`.
- `SCHEMA_REGISTRY_USERNAME`: defines the schema registry username to be used, only used if `SERIALIZATION_FORMAT=avro-schema-registry`.
- `SCHEMA_REGISTRY_PASSWORD`: defines the schema registry password to be used, only used if `SERIALIZATION_FORMAT=avro-schema-registry`.
- `SCHEMA_REGISTRY_AUTO_REGISTRY_SCHEMAS`: defines the schema registry auto registry schema, only used if `SERIALIZATION_FORMAT=avro-schema-registry`, defaults to `false`.
- `PORT`: defines http port to listen, defaults to `8080`, used directly by [gin](https://github.com/gin-gonic/gin).
- `BASIC_AUTH_USERNAME`: basic auth username to be used for receive endpoint, defaults is no basic auth.
- `BASIC_AUTH_PASSWORD`: basic auth password to be used for receive endpoint, defaults is no basic auth.
Expand Down
75 changes: 55 additions & 20 deletions config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package main
import (
"fmt"
"os"
"strconv"
"strings"
"text/template"

Expand All @@ -28,25 +29,31 @@ import (
)

var (
kafkaBrokerList = "kafka:9092"
kafkaTopic = "metrics"
topicTemplate *template.Template
match = make(map[string]*dto.MetricFamily, 0)
basicauth = false
basicauthUsername = ""
basicauthPassword = ""
kafkaCompression = "none"
kafkaBatchNumMessages = "10000"
kafkaSslClientCertFile = ""
kafkaSslClientKeyFile = ""
kafkaSslClientKeyPass = ""
kafkaSslCACertFile = ""
kafkaSecurityProtocol = ""
kafkaSaslMechanism = ""
kafkaSaslUsername = ""
kafkaSaslPassword = ""
serializer Serializer
kafkaAcks = "all"
kafkaBrokerList = "kafka:9092"
kafkaTopic = "metrics"
topicTemplate *template.Template
match = make(map[string]*dto.MetricFamily, 0)
basicauth = false
basicauthUsername = ""
basicauthPassword = ""
kafkaCompression = "none"
kafkaBatchNumMessages = "10000"
kafkaBatchSize = "1000000"
kafkaLingerMs = "5"
kafkaSslClientCertFile = ""
kafkaSslClientKeyFile = ""
kafkaSslClientKeyPass = ""
kafkaSslCACertFile = ""
kafkaSecurityProtocol = ""
kafkaSaslMechanism = ""
kafkaSaslUsername = ""
kafkaSaslPassword = ""
serializer Serializer
kafkaAcks = "all"
schemaRegistryUrl = ""
schemaRegistryUsername = ""
schemaRegistryPassword = ""
schemaRegistryAutoRegisterSchemas = false
)

func init() {
Expand Down Expand Up @@ -82,6 +89,14 @@ func init() {
kafkaBatchNumMessages = value
}

if value := os.Getenv("KAFKA_BATCH_SIZE"); value != "" {
kafkaBatchSize = value
}

if value := os.Getenv("KAFKA_LINGER_MS"); value != "" {
kafkaLingerMs = value
}

if value := os.Getenv("KAFKA_SSL_CLIENT_CERT_FILE"); value != "" {
kafkaSslClientCertFile = value
}
Expand All @@ -99,7 +114,7 @@ func init() {
}

if value := os.Getenv("KAFKA_SECURITY_PROTOCOL"); value != "" {
kafkaSecurityProtocol = strings.ToLower(value)
kafkaSecurityProtocol = value
}

if value := os.Getenv("KAFKA_SASL_MECHANISM"); value != "" {
Expand All @@ -125,6 +140,24 @@ func init() {
match = matchList
}

if value := os.Getenv("SCHEMA_REGISTRY_URL"); value != "" {
schemaRegistryUrl = value
}
if value := os.Getenv("SCHEMA_REGISTRY_USERNAME"); value != "" {
schemaRegistryUsername = value
}
if value := os.Getenv("SCHEMA_REGISTRY_PASSWORD"); value != "" {
schemaRegistryPassword = value
}
if value := os.Getenv("SCHEMA_REGISTRY_AUTO_REGISTRY_SCHEMAS"); value != "" {
v, err := strconv.ParseBool(value)
if err != nil {
logrus.WithError(err).Fatalln("couldn't parse SCHEMA_REGISTRY_AUTO_REGISTRY_SCHEMAS to bool, using false")
v = false
}
schemaRegistryAutoRegisterSchemas = v
}

var err error
serializer, err = parseSerializationFormat(os.Getenv("SERIALIZATION_FORMAT"))
if err != nil {
Expand Down Expand Up @@ -175,6 +208,8 @@ func parseSerializationFormat(value string) (Serializer, error) {
return NewJSONSerializer()
case "avro-json":
return NewAvroJSONSerializer("schemas/metric.avsc")
case "avro-schema-registry":
return NewAvroSchemaRegistrySerializer(schemaRegistryUrl, schemaRegistryUsername, schemaRegistryPassword)
default:
logrus.WithField("serialization-format-value", value).Warningln("invalid serialization format, using json")
return NewJSONSerializer()
Expand Down
29 changes: 18 additions & 11 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,18 @@ module github.com/Telefonica/prometheus-kafka-adapter
go 1.22.3

require (
github.com/confluentinc/confluent-kafka-go v1.9.2
github.com/confluentinc/confluent-kafka-go/v2 v2.6.1
github.com/gin-gonic/contrib v0.0.0-20240508051311-c1c6bf0061b0
github.com/gin-gonic/gin v1.10.0
github.com/gogo/protobuf v1.3.2
github.com/golang/snappy v0.0.4
github.com/linkedin/goavro v2.1.0+incompatible
github.com/prometheus/client_golang v1.19.1
github.com/prometheus/client_golang v1.20.5
github.com/prometheus/client_model v0.6.1
github.com/prometheus/common v0.53.0
github.com/prometheus/prometheus v0.52.1
github.com/prometheus/common v0.61.0
github.com/prometheus/prometheus v0.300.1
github.com/sirupsen/logrus v1.9.3
github.com/stretchr/testify v1.9.0
github.com/stretchr/testify v1.10.0
gopkg.in/yaml.v2 v2.4.0
)

Expand All @@ -32,23 +32,30 @@ require (
github.com/go-playground/universal-translator v0.18.1 // indirect
github.com/go-playground/validator/v10 v10.20.0 // indirect
github.com/goccy/go-json v0.10.2 // indirect
github.com/grafana/regexp v0.0.0-20240518133315-a468a5bfb3bc // indirect
github.com/hamba/avro/v2 v2.24.0 // indirect
github.com/json-iterator/go v1.1.12 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/klauspost/cpuid/v2 v2.2.7 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/leodido/go-urn v1.4.0 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect
github.com/mitchellh/mapstructure v1.5.0 // indirect
github.com/moby/sys/userns v0.1.0 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/munnerz/goautoneg v0.0.0-20191010083416-a7dc8b61c822 // indirect
github.com/pelletier/go-toml/v2 v2.2.2 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/prometheus/procfs v0.12.0 // indirect
github.com/prometheus/procfs v0.15.1 // indirect
github.com/twitchyliquid64/golang-asm v0.15.1 // indirect
github.com/ugorji/go/codec v1.2.12 // indirect
golang.org/x/arch v0.8.0 // indirect
golang.org/x/crypto v0.23.0 // indirect
golang.org/x/net v0.25.0 // indirect
golang.org/x/sys v0.20.0 // indirect
golang.org/x/text v0.15.0 // indirect
google.golang.org/protobuf v1.34.1 // indirect
golang.org/x/crypto v0.30.0 // indirect
golang.org/x/net v0.32.0 // indirect
golang.org/x/sys v0.28.0 // indirect
golang.org/x/text v0.21.0 // indirect
google.golang.org/protobuf v1.35.2 // indirect
gopkg.in/linkedin/goavro.v1 v1.0.5 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
Loading