Skip to content

Commit

Permalink
Updates based on feedback + KL
Browse files Browse the repository at this point in the history
  • Loading branch information
kathancox committed Dec 3, 2024
1 parent de9c4ec commit 2bfc30a
Show file tree
Hide file tree
Showing 5 changed files with 61 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Changefeeds created in v24.3 of CockroachDB that emit to [Kafka]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), or changefeeds created in earlier versions with the `changefeed.new_kafka_sink.enabled` cluster setting enabled, do not support negative compression level values in the [`kafka_sink_config = {... "CompressionLevel" = ...}`]({% link {{ page.version.version }}/changefeed-sinks.md %}#compressionlevel) option field. [#136492](https://github.com/cockroachdb/cockroach/issues/136492)
2 changes: 1 addition & 1 deletion src/current/v24.3/advanced-changefeed-configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ If you are setting the `resolved` option when you are aiming for high throughput
### Batching and buffering messages

- Batch messages to your sink:
- For a [Kafka sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka-flush) parameter for the `kafka_sink_config` option.
- For a [Kafka sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#kafka), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#flush) parameter for the `kafka_sink_config` option.
- For a [cloud storage sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#cloud-storage-sink), use the [`file_size`]({% link {{ page.version.version }}/create-changefeed.md %}#file-size) parameter to flush a file when it exceeds the specified size.
- For a [webhook sink]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink), refer to the [`Flush`]({% link {{ page.version.version }}/changefeed-sinks.md %}#webhook-sink-configuration) parameter for the `webhook_sink_config` option.
- Set the [`changefeed.memory.per_changefeed_limit`]({% link {{ page.version.version }}/cluster-settings.md %}) cluster setting to a higher limit to give more memory for buffering changefeed data. This setting influences how often the changefeed will flush buffered messages. This is useful during heavy traffic.
Expand Down
73 changes: 57 additions & 16 deletions src/current/v24.3/changefeed-sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,23 @@ Each of the following settings have significant impact on a changefeed's behavio
kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP", "CompressionLevel": 3}'
~~~

<a name ="kafka-flush"></a>`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker.
Using the default values or not setting fields in `kafka_sink_config` will mean that changefeed messages emit immediately.

The configurable fields are as follows:

Field | Type | Description | Default
-------------------+---------------------+------------------+-------------------
`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. For more details, refer to [`ClientID`](#clientid). | ""
<a name="kafka-compression"></a>`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"`
<span class="version-tag">New in v24.3:</span>`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. For the compression protocol ranges, refer to [`CompressionLevel`](#compressionlevel).<br><br>**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`<br><br>`ZSTD`: `2`<br><br>`LZ4`: `0`
`"Flush"."Bytes"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | `0`
`"Flush"."Frequency"` | [Duration string](https://pkg.go.dev/time#ParseDuration) | When this amount of time has passed since the **first** received message in the batch without it flushing, it should be flushed. For more details, refer to [`Flush`](#flush). | `"0s"`
`"Flush"."MaxMessages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. For more details, refer to [`Flush`](#flush). | `1000`
`"Flush"."Messages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Configures the number of messages the changefeed should batch before flushing. | `0`
`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are: `ONE`, `NONE`, `ALL`. For details on each value, refer to [`RequiredAcks`](#requiredacks). | `"ONE"`
`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"`

#### `ClientID`

Implement a Kafka resource usage limit per changefeed by setting a client ID and Kafka quota. You can set the quota for the client ID in your Kafka server's configuration:

Expand All @@ -128,25 +144,50 @@ Implement a Kafka resource usage limit per changefeed by setting a client ID and
bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter --add-config 'producer_byte_rate=1024,consumer_byte_rate=2048' --entity-type clients --entity-name client-changefeed-1
~~~

Refer to the [Kafka documentation](https://kafka.apache.org/documentation/#quotas) for details on setting quotas to client IDs.

When you create a changefeed, include the `"ClientID"` field with the unique client ID (e.g., `kafka_client_ID_1`) you have configured in your Kafka server configuration. This will subject the changefeed to the Kafka quota applied to that client ID. We recommend tracking the [`changefeed.kafka_throttling_hist_nanos` metric]({% link {{ page.version.version }}/metrics.md %}) to monitor the time spent throttling due to changefeed messages exceeding Kafka quotas.

Using the default values or not setting fields in `kafka_sink_config` will mean that changefeed messages emit immediately.
For details on setting quotas to client IDs, refer to the [Kafka documentation](https://kafka.apache.org/documentation/#quotas).

#### `CompressionLevel`

{% include_cached new-in.html version="v24.3" %} The `CompressionLevel` field allows you to implement a level of compression for your set `Compression` protocol. `CompressionLevel` determines the level of the compression ratio versus the compression speed. That is, how much the data is reduced for _better_ compression and how quickly the compression is completed for _faster_ compression. The compression protocols have the following ranges and values:

- `GZIP`:
- `0`: No compression
- `1` to `9`: From fastest compression to best compression
{% comment %}
These values are not available yet per KL #136492
- `-1`: Default compression
- `-2`: [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding)
- `-3`: Stateless compression
{% endcomment %}
The default compression level for `GZIP` is `-1`; however, the `CompressionLevel` field does not support manually set negative values. For more details, refer to [Known Limitations]({% link {{ page.version.version }}/create-and-configure-changefeeds.md %}#known-limitations).
- `ZSTD`:
- `1`: Fastest compression
- `2`: Default compression
- `3`: Better compression
- `4`: Best compression
- `LZ4`:
- `0`: Fastest compression (Default)
- `1` to `9`: From fast compression to best compression

`SNAPPY` does not support the `CompressionLevel` field.

The configurable fields are as follows:
{{site.data.alerts.callout_info}}
If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression.
{{site.data.alerts.end}}

Field | Type | Description | Default
-------------------+---------------------+------------------+-------------------
`"ClientID"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Applies a Kafka client ID per changefeed. Configure [quotas](https://kafka.apache.org/documentation/#quotas) within your Kafka configuration that apply to a unique client ID. The `ClientID` field can only contain the characters `A-Za-z0-9._-`. | ""
`"Flush"."MaxMessages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the maximum number of messages the producer can send in a single broker request. Any messages beyond the configured limit will be blocked. Increasing this value allows all messages to be sent in a batch. | `1000`
`"Flush"."Messages"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Configures the number of messages the changefeed should batch before flushing. | `0`
`"Flush"."Bytes"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | When the total byte size of all the messages in the batch reaches this amount, it should be flushed. | `0`
`"Flush"."Frequency"` | [Duration string](https://pkg.go.dev/time#ParseDuration) | When this amount of time has passed since the **first** received message in the batch without it flushing, it should be flushed. | `"0s"`
`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"`
<a name="kafka-required-acks"></a>`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are: <br><br>`"ONE"`: a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.<br><br>`"NONE"`: no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.<br><br>`"ALL"`: a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} | `"ONE"`
<a name="kafka-compression"></a>`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"`
<span class="version-tag">New in v24.3:</span>`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. The compression protocols have the following ranges:<br>`GZIP`:<ul><li>`0` no compression</li><li>`1` to `9` best speed to best compression</li><li>`-1` default</li><li>`-2` [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding)</li></ul>`ZSTD`:<ul><li>`1` fastest</li><li>`2` default</li><li>`3` better compression</li><li>`4` best compression</li></ul>`LZ4`<ul><li>0 fast default</li><li>`512 * N` Level N, where N is between `1` and `9`. The higher the number, the better compression</li></ul>**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`<br><br>`ZSTD`: `2`<br><br>`LZ4`: `0`
#### `Flush`

`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker.

#### `RequiredAcks`

The `RequiredAcks` field defines what a successful write to Kafka is. The possible values are:

- `"ONE"`: A write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.
- `"NONE"`: No Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.
- `"ALL"`: A quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %}

### Kafka sink messages

Expand Down
1 change: 1 addition & 0 deletions src/current/v24.3/create-and-configure-changefeeds.md
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,7 @@ For more information, see [`EXPERIMENTAL CHANGEFEED FOR`]({% link {{ page.versio
- {% include {{ page.version.version }}/known-limitations/alter-changefeed-cdc-queries.md %}
- {% include {{ page.version.version }}/known-limitations/cdc-queries-column-families.md %}
- {% include {{ page.version.version }}/known-limitations/changefeed-column-family-message.md %}
- {% include {{ page.version.version }}/known-limitations/compression-level-kafka-config.md %}

## See also

Expand Down
1 change: 1 addition & 0 deletions src/current/v24.3/known-limitations.md
Original file line number Diff line number Diff line change
Expand Up @@ -493,6 +493,7 @@ Change data capture (CDC) provides efficient, distributed, row-level changefeeds
{% include {{ page.version.version }}/known-limitations/cdc-queries.md %}
- {% include {{ page.version.version }}/known-limitations/cdc-queries-column-families.md %}
- {% include {{ page.version.version }}/known-limitations/changefeed-column-family-message.md %}
- {% include {{ page.version.version }}/known-limitations/compression-level-kafka-config.md %}

#### `ALTER CHANGEFEED` limitations

Expand Down

0 comments on commit 2bfc30a

Please sign in to comment.