From 2bfc30adfc795470c0141d8e0416424224d81630 Mon Sep 17 00:00:00 2001 From: Kathryn May Date: Tue, 3 Dec 2024 10:21:47 -0500 Subject: [PATCH] Updates based on feedback + KL --- .../compression-level-kafka-config.md | 1 + .../advanced-changefeed-configuration.md | 2 +- src/current/v24.3/changefeed-sinks.md | 73 +++++++++++++++---- .../v24.3/create-and-configure-changefeeds.md | 1 + src/current/v24.3/known-limitations.md | 1 + 5 files changed, 61 insertions(+), 17 deletions(-) create mode 100644 src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md diff --git a/src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md b/src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md new file mode 100644 index 00000000000..9f24121a3a2 --- /dev/null +++ b/src/current/_includes/v24.3/known-limitations/compression-level-kafka-config.md @@ -0,0 +1 @@ +Changefeeds created in v24.3 of CockroachDB that emit to [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), or changefeeds created in earlier versions with the `changefeed.new_kafka_sink.enabled` cluster setting enabled, do not support negative compression level values in the [`kafka_sink_config = {... "CompressionLevel" = ...}`]({% link {{ page.version.version }}/changefeed-sinks.md %}#compressionlevel) option field. [#136492](https://github.com/cockroachdb/cockroach/issues/136492) \ No newline at end of file diff --git a/src/current/v24.3/advanced-changefeed-configuration.md b/src/current/v24.3/advanced-changefeed-configuration.md index ff2322336a2..0a64c56e0f6 100644 --- a/src/current/v24.3/advanced-changefeed-configuration.md +++ b/src/current/v24.3/advanced-changefeed-configuration.md @@ -180,7 +180,7 @@ If you are setting the `resolved` option when you are aiming for high throughput ### Batching and buffering messages - Batch messages to your sink: - - For a [Kafka sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-flush) parameter for the `kafka_sink_config` option. + - For a [Kafka sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#flush) parameter for the `kafka_sink_config` option. - For a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink), use the [`file_size`]({% link {{ page.version.version }}/create-changefeed.md %}#file-size) parameter to flush a file when it exceeds the specified size. - For a [webhook sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink-configuration) parameter for the `webhook_sink_config` option. - Set the [`changefeed.memory.per_changefeed_limit`]({% link {{ page.version.version }}/cluster-settings.md %}) cluster setting to a higher limit to give more memory for buffering changefeed data. This setting influences how often the changefeed will flush buffered messages. This is useful during heavy traffic. diff --git a/src/current/v24.3/changefeed-sinks.md b/src/current/v24.3/changefeed-sinks.md index dfced793de0..a2e4bf81b4e 100644 --- a/src/current/v24.3/changefeed-sinks.md +++ b/src/current/v24.3/changefeed-sinks.md @@ -119,7 +119,23 @@ Each of the following settings have significant impact on a changefeed's behavio kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP", "CompressionLevel": 3}' ~~~ -`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker. +Using the default values or not setting fields in `kafka_sink_config` will mean that changefeed messages emit immediately. + +The configurable fields are as follows: + +Field | Type | Description | Default +-------------------+---------------------+------------------+------------------- +`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. For more details, refer to [`ClientID`](#clientid). | "" +`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"` +New in v24.3:`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. For the compression protocol ranges, refer to [`CompressionLevel`](#compressionlevel).

**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`

`ZSTD`: `2`

`LZ4`: `0` +`"Flush"."Bytes"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | `0` +`"Flush"."Frequency"` | [Duration string](https://pkg.go.dev/time#ParseDuration) | When this amount of time has passed since the **first** received message in the batch without it flushing, it should be flushed. For more details, refer to [`Flush`](#flush). | `"0s"` +`"Flush"."MaxMessages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. For more details, refer to [`Flush`](#flush). | `1000` +`"Flush"."Messages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Configures the number of messages the changefeed should batch before flushing. | `0` +`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are: `ONE`, `NONE`, `ALL`. For details on each value, refer to [`RequiredAcks`](#requiredacks). | `"ONE"` +`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"` + +#### `ClientID` Implement a Kafka resource usage limit per changefeed by setting a client ID and Kafka quota. You can set the quota for the client ID in your Kafka server's configuration: @@ -128,25 +144,50 @@ Implement a Kafka resource usage limit per changefeed by setting a client ID and bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name client-changefeed-1 ~~~ -Refer to the [Kafka documentation](https://kafka.apache.org/documentation/#quotas) for details on setting quotas to client IDs. - When you create a changefeed, include the `"ClientID"` field with the unique client ID (e.g., `kafka_client_ID_1`) you have configured in your Kafka server configuration. This will subject the changefeed to the Kafka quota applied to that client ID. We recommend tracking the [`changefeed.kafka_throttling_hist_nanos` metric]({% link {{ page.version.version }}/metrics.md %}) to monitor the time spent throttling due to changefeed messages exceeding Kafka quotas. -Using the default values or not setting fields in `kafka_sink_config` will mean that changefeed messages emit immediately. +For details on setting quotas to client IDs, refer to the [Kafka documentation](https://kafka.apache.org/documentation/#quotas). + +#### `CompressionLevel` + +{% include_cached new-in.html version="v24.3" %} The `CompressionLevel` field allows you to implement a level of compression for your set `Compression` protocol. `CompressionLevel` determines the level of the compression ratio versus the compression speed. That is, how much the data is reduced for _better_ compression and how quickly the compression is completed for _faster_ compression. The compression protocols have the following ranges and values: + +- `GZIP`: + - `0`: No compression + - `1` to `9`: From fastest compression to best compression + {% comment %} + These values are not available yet per KL #136492 + - `-1`: Default compression + - `-2`: [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding) + - `-3`: Stateless compression + {% endcomment %} + The default compression level for `GZIP` is `-1`; however, the `CompressionLevel` field does not support manually set negative values. For more details, refer to [Known Limitations]({% link {{ page.version.version }}/create-and-configure-changefeeds.md %}#known-limitations). +- `ZSTD`: + - `1`: Fastest compression + - `2`: Default compression + - `3`: Better compression + - `4`: Best compression +- `LZ4`: + - `0`: Fastest compression (Default) + - `1` to `9`: From fast compression to best compression + +`SNAPPY` does not support the `CompressionLevel` field. -The configurable fields are as follows: +{{site.data.alerts.callout_info}} +If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. +{{site.data.alerts.end}} -Field | Type | Description | Default --------------------+---------------------+------------------+------------------- -`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. | "" -`"Flush"."MaxMessages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. | `1000` -`"Flush"."Messages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Configures the number of messages the changefeed should batch before flushing. | `0` -`"Flush"."Bytes"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | `0` -`"Flush"."Frequency"` | [Duration string](https://pkg.go.dev/time#ParseDuration) | When this amount of time has passed since the **first** received message in the batch without it flushing, it should be flushed. | `"0s"` -`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"` -`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are:

`"ONE"`: a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.

`"NONE"`: no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.

`"ALL"`: a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} | `"ONE"` -`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"` -New in v24.3:`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. The compression protocols have the following ranges:
`GZIP`:`ZSTD`:`LZ4`**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`

`ZSTD`: `2`

`LZ4`: `0` +#### `Flush` + +`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker. + +#### `RequiredAcks` + +The `RequiredAcks` field defines what a successful write to Kafka is. The possible values are: + +- `"ONE"`: A write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails. +- `"NONE"`: No Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency. +- `"ALL"`: A quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} ### Kafka sink messages diff --git a/src/current/v24.3/create-and-configure-changefeeds.md b/src/current/v24.3/create-and-configure-changefeeds.md index e95837c80d9..b059a82e382 100644 --- a/src/current/v24.3/create-and-configure-changefeeds.md +++ b/src/current/v24.3/create-and-configure-changefeeds.md @@ -168,6 +168,7 @@ For more information, see [`EXPERIMENTAL CHANGEFEED FOR`]({% link {{ page.versio - {% include {{ page.version.version }}/known-limitations/alter-changefeed-cdc-queries.md %} - {% include {{ page.version.version }}/known-limitations/cdc-queries-column-families.md %} - {% include {{ page.version.version }}/known-limitations/changefeed-column-family-message.md %} +- {% include {{ page.version.version }}/known-limitations/compression-level-kafka-config.md %} ## See also diff --git a/src/current/v24.3/known-limitations.md b/src/current/v24.3/known-limitations.md index 9a7ea807fce..792ba38c91f 100644 --- a/src/current/v24.3/known-limitations.md +++ b/src/current/v24.3/known-limitations.md @@ -493,6 +493,7 @@ Change data capture (CDC) provides efficient, distributed, row-level changefeeds {% include {{ page.version.version }}/known-limitations/cdc-queries.md %} - {% include {{ page.version.version }}/known-limitations/cdc-queries-column-families.md %} - {% include {{ page.version.version }}/known-limitations/changefeed-column-family-message.md %} +- {% include {{ page.version.version }}/known-limitations/compression-level-kafka-config.md %} #### `ALTER CHANGEFEED` limitations