diff --git a/internal/configtypes/types.go b/internal/configtypes/types.go index 4c3a08f95..76106304a 100644 --- a/internal/configtypes/types.go +++ b/internal/configtypes/types.go @@ -620,8 +620,9 @@ type KafkaConsumerConfig struct { // Set to -1 to use non-buffered channel. PartitionBufferSize int `mapstructure:"partition_buffer_size" json:"partition_buffer_size" envconfig:"partition_buffer_size" default:"16" yaml:"partition_buffer_size" toml:"partition_buffer_size"` - // PublicationDataMode is a configuration for the mode where message payload already contains data ready to publish into channels, instead of API command. - PublicationDataMode KafkaPublicationModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"` + // PublicationDataMode is a configuration for the mode where message payload already + // contains data ready to publish into channels, instead of API command. + PublicationDataMode KafkaPublicationDataModeConfig `mapstructure:"publication_data_mode" json:"publication_data_mode" envconfig:"publication_data_mode" yaml:"publication_data_mode" toml:"publication_data_mode"` } func (c KafkaConsumerConfig) Validate() error { @@ -634,19 +635,29 @@ func (c KafkaConsumerConfig) Validate() error { if c.ConsumerGroup == "" { return errors.New("no Kafka consumer group provided") } - if c.PublicationDataMode.Enabled && c.PublicationDataMode.ChannelsHeaderName == "" { + if c.PublicationDataMode.Enabled && c.PublicationDataMode.ChannelsHeader == "" { return errors.New("no Kafka channels_header_name provided for publication data mode") } return nil } -type KafkaPublicationModeConfig struct { +// KafkaPublicationDataModeConfig is a configuration for Kafka publication data mode. +// In this mode we expect Kafka message payload to contain data ready to publish into +// channels, instead of API command. All other fields used to build channel Publication +// can be passed in Kafka message headers – thus it's possible to integrate existing +// topics with Centrifugo. +type KafkaPublicationDataModeConfig struct { // Enabled enables Kafka publication data mode for the Kafka consumer. Enabled bool `mapstructure:"enabled" json:"enabled" envconfig:"enabled" yaml:"enabled" toml:"enabled"` - // ChannelsHeaderName is a header name to extract publication channels (channels must be comma-separated). - ChannelsHeaderName string `mapstructure:"channels_header_name" json:"channels_header_name" envconfig:"channels_header_name" yaml:"channels_header_name" toml:"channels_header_name"` - // IdempotencyKeyHeaderName is a header name to extract idempotency key from Kafka message. - IdempotencyKeyHeaderName string `mapstructure:"idempotency_key_header_name" json:"idempotency_key_header_name" envconfig:"idempotency_key_header_name" yaml:"idempotency_key_header_name" toml:"idempotency_key_header_name"` - // DeltaHeaderName is a header name to extract delta flag from Kafka message. - DeltaHeaderName string `mapstructure:"delta_header_name" json:"delta_header_name" envconfig:"delta_header_name" yaml:"delta_header_name" toml:"delta_header_name"` + // ChannelsHeader is a header name to extract channels to publish data into + // (channels must be comma-separated). Ex. of value: "channel1,channel2". + ChannelsHeader string `mapstructure:"channels_header" json:"channels_header" envconfig:"channels_header" yaml:"channels_header" toml:"channels_header"` + // IdempotencyKeyHeader is a header name to extract Publication idempotency key from + // Kafka message. See https://centrifugal.dev/docs/server/server_api#publishrequest. + IdempotencyKeyHeader string `mapstructure:"idempotency_key_header" json:"idempotency_key_header" envconfig:"idempotency_key_header" yaml:"idempotency_key_header" toml:"idempotency_key_header"` + // DeltaHeader is a header name to extract Publication delta flag from Kafka message + // which tells Centrifugo whether to use delta compression for message or not. + // See https://centrifugal.dev/docs/server/delta_compression and + // https://centrifugal.dev/docs/server/server_api#publishrequest. + DeltaHeader string `mapstructure:"delta_header" json:"delta_header" envconfig:"delta_header" yaml:"delta_header" toml:"delta_header"` } diff --git a/internal/consuming/kafka.go b/internal/consuming/kafka.go index 008050b43..e964b2aed 100644 --- a/internal/consuming/kafka.go +++ b/internal/consuming/kafka.go @@ -412,9 +412,9 @@ func getHeaderValue(record *kgo.Record, headerKey string) string { func (pc *partitionConsumer) processPublicationDataRecord(ctx context.Context, record *kgo.Record) error { var delta bool - if pc.config.PublicationDataMode.DeltaHeaderName != "" { + if pc.config.PublicationDataMode.DeltaHeader != "" { var err error - delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeaderName)) + delta, err = strconv.ParseBool(getHeaderValue(record, pc.config.PublicationDataMode.DeltaHeader)) if err != nil { pc.logger.Log(centrifuge.NewLogEntry(centrifuge.LogLevelError, "error parsing delta header value, skip message", map[string]any{"error": err.Error(), "topic": record.Topic, "partition": record.Partition})) return nil @@ -422,8 +422,8 @@ func (pc *partitionConsumer) processPublicationDataRecord(ctx context.Context, r } req := &apiproto.BroadcastRequest{ Data: record.Value, - Channels: strings.Split(getHeaderValue(record, pc.config.PublicationDataMode.ChannelsHeaderName), ","), - IdempotencyKey: getHeaderValue(record, pc.config.PublicationDataMode.IdempotencyKeyHeaderName), + Channels: strings.Split(getHeaderValue(record, pc.config.PublicationDataMode.ChannelsHeader), ","), + IdempotencyKey: getHeaderValue(record, pc.config.PublicationDataMode.IdempotencyKeyHeader), Delta: delta, } return pc.dispatcher.Broadcast(ctx, req) diff --git a/internal/consuming/kafka_test.go b/internal/consuming/kafka_test.go index 280f863f6..c17e4d384 100644 --- a/internal/consuming/kafka_test.go +++ b/internal/consuming/kafka_test.go @@ -11,9 +11,8 @@ import ( "testing" "time" - "github.com/centrifugal/centrifugo/v5/internal/configtypes" - "github.com/centrifugal/centrifugo/v5/internal/apiproto" + "github.com/centrifugal/centrifugo/v5/internal/configtypes" "github.com/centrifugal/centrifuge" "github.com/google/uuid" @@ -460,11 +459,11 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) { Brokers: []string{testKafkaBrokerURL}, // Adjust as needed Topics: []string{testKafkaTopic}, ConsumerGroup: uuid.New().String(), - PublicationDataMode: configtypes.KafkaPublicationModeConfig{ - Enabled: true, - ChannelsHeaderName: "centrifugo-channels", - IdempotencyKeyHeaderName: "centrifugo-idempotency-key", - DeltaHeaderName: "centrifugo-delta", + PublicationDataMode: configtypes.KafkaPublicationDataModeConfig{ + Enabled: true, + ChannelsHeader: "centrifugo-channels", + IdempotencyKeyHeader: "centrifugo-idempotency-key", + DeltaHeader: "centrifugo-delta", }, } @@ -474,7 +473,7 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) { consumer, err := NewKafkaConsumer("test", uuid.NewString(), &MockLogger{}, &MockDispatcher{ onBroadcast: func(ctx context.Context, req *apiproto.BroadcastRequest) error { require.Equal(t, testChannels, req.Channels) - require.Equal(t, testPayload, req.Data) + require.Equal(t, apiproto.Raw(testPayload), req.Data) require.Equal(t, testIdempotencyKey, req.IdempotencyKey) require.Equal(t, testDelta, req.Delta) close(eventReceived) @@ -491,15 +490,15 @@ func TestKafkaConsumer_GreenScenario_PublicationDataMode(t *testing.T) { err = produceTestMessage(testKafkaTopic, testPayload, []kgo.RecordHeader{ { - Key: config.PublicationDataMode.ChannelsHeaderName, + Key: config.PublicationDataMode.ChannelsHeader, Value: []byte(strings.Join(testChannels, ",")), }, { - Key: config.PublicationDataMode.IdempotencyKeyHeaderName, + Key: config.PublicationDataMode.IdempotencyKeyHeader, Value: []byte(testIdempotencyKey), }, { - Key: config.PublicationDataMode.DeltaHeaderName, + Key: config.PublicationDataMode.DeltaHeader, Value: []byte(fmt.Sprintf("%v", testDelta)), }, })