Skip to content

Commit

Permalink
Add CompressionLevel and make v2 Kafka sink default
Browse files Browse the repository at this point in the history
  • Loading branch information
kathancox committed Nov 21, 2024
1 parent b23f9f2 commit de9c4ec
Showing 1 changed file with 8 additions and 15 deletions.
23 changes: 8 additions & 15 deletions src/current/v24.3/changefeed-sinks.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,17 +40,6 @@ To set a different sink URI to an existing changefeed, use the [`sink` option]({

## Kafka

{{site.data.alerts.callout_info}}
CockroachDB uses a different version of the Kafka sink that is implemented with the [franz-go](https://github.com/twmb/franz-go) Kafka client library. If you are using a [testing release]({% link releases/index.md %}#patch-releases) of v24.2 or v24.2.0, we recommend that you enable this updated version of the Kafka sink to avoid a potential bug in the previous version of the CockroachDB Kafka sink; for more details, refer to the [technical advisory 122372]({% link advisories/a122372.md %}). You can enable this Kafka sink with the cluster setting [`changefeed.new_kafka_sink.enabled`]({% link v24.2/show-cluster-setting.md %}).

{% include_cached copy-clipboard.html %}
~~~ sql
SET CLUSTER SETTING changefeed.new_kafka_sink.enabled = true;
~~~

If you are running v24.2.1 and later, the `changefeed.new_kafka_sink.enabled` cluster setting is enabled by default.
{{site.data.alerts.end}}

### Kafka sink connection

Example of a Kafka sink URI using `SCRAM-SHA-256` authentication:
Expand Down Expand Up @@ -117,14 +106,17 @@ Kafka has the following topic limitations:

### Kafka sink configuration

The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters.
You can configure flushing, acknowledgments, compression, and concurrency behavior of changefeeds running to a Kafka sink with the following:

- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Kafka sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub](#google-cloud-pub-sub) sinks and [webhook sinks](#webhook-sink).
- The `kafka_sink_config` option allows configuration of a changefeed's message delivery, Kafka server version, and batching parameters.

{{site.data.alerts.callout_danger}}
Each of the following settings have significant impact on a changefeed's behavior, such as latency. For example, it is possible to configure batching parameters to be very high, which would negatively impact changefeed latency. As a result it would take a long time to see messages coming through to the sink. Also, large batches may be rejected by the Kafka server unless it's separately configured to accept a high [`max.message.bytes`](https://kafka.apache.org/documentation/#brokerconfigs_message.max.bytes).
{{site.data.alerts.end}}

~~~
kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP" }'
kafka_sink_config='{"Flush": {"MaxMessages": 1, "Frequency": "1s"}, "ClientID": "kafka_client_ID", "Version": "0.8.2.0", "RequiredAcks": "ONE", "Compression": "GZIP", "CompressionLevel": 3}'
~~~

<a name ="kafka-flush"></a>`"Flush"."MaxMessages"` and `"Flush"."Frequency"` are configurable batching parameters depending on latency and throughput needs. For example, if `"MaxMessages"` is set to 1000 and `"Frequency"` to 1 second, it will flush to Kafka either after 1 second or after 1000 messages are batched, whichever comes first. It's important to consider that if there are not many messages, then a `"1s"` frequency will add 1 second latency. However, if there is a larger influx of messages these will be flushed quicker.
Expand Down Expand Up @@ -154,6 +146,7 @@ Field | Type | Description | Default
`"Version"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets the appropriate Kafka cluster version, which can be used to connect to [Kafka versions < v1.0](https://docs.confluent.io/platform/current/installation/versions-interoperability.html) (`kafka_sink_config='{"Version": "0.8.2.0"}'`). | `"1.0.0.0"`
<a name="kafka-required-acks"></a>`"RequiredAcks"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Specifies what a successful write to Kafka is. CockroachDB [guarantees at least once delivery of messages]({% link {{ page.version.version }}/changefeed-messages.md %}#ordering-and-delivery-guarantees) — this value defines the **delivery**. The possible values are: <br><br>`"ONE"`: a write to Kafka is successful once the leader node has committed and acknowledged the write. Note that this has the potential risk of dropped messages; if the leader node acknowledges before replicating to a quorum of other Kafka nodes, but then fails.<br><br>`"NONE"`: no Kafka brokers are required to acknowledge that they have committed the message. This will decrease latency and increase throughput, but comes at the cost of lower consistency.<br><br>`"ALL"`: a quorum must be reached (that is, most Kafka brokers have committed the message) before the leader can acknowledge. This is the highest consistency level. {% include {{ page.version.version }}/cdc/kafka-acks.md %} | `"ONE"`
<a name="kafka-compression"></a>`"Compression"` | [`STRING`]({% link {{ page.version.version }}/string.md %}) | Sets a compression protocol that the changefeed should use when emitting events. The possible values are: `"NONE"`, `"GZIP"`, `"SNAPPY"`, `"LZ4"`, `"ZSTD"`. | `"NONE"`
<span class="version-tag">New in v24.3:</span>`"CompressionLevel"` | [`INT`]({% link {{ page.version.version }}/int.md %}) | Sets the level of compression. This determines the level of compression ratio versus compression speed, i.e., how much the data size is reduced (better compression) and how quickly the compression process is completed. The compression protocols have the following ranges:<br>`GZIP`:<ul><li>`0` no compression</li><li>`1` to `9` best speed to best compression</li><li>`-1` default</li><li>`-2` [Huffman-only compression](https://en.wikipedia.org/wiki/Huffman_coding)</li></ul>`ZSTD`:<ul><li>`1` fastest</li><li>`2` default</li><li>`3` better compression</li><li>`4` best compression</li></ul>`LZ4`<ul><li>0 fast default</li><li>`512 * N` Level N, where N is between `1` and `9`. The higher the number, the better compression</li></ul>**Note:** If you have the `changefeed.new_kafka_sink.enabled` cluster setting disabled, `CompressionLevel` will not affect `LZ4` compression. `SNAPPY` does not support `CompressionLevel`. | `GZIP`: `-1`<br><br>`ZSTD`: `2`<br><br>`LZ4`: `0`

### Kafka sink messages

Expand Down Expand Up @@ -359,7 +352,7 @@ For a list of compatible parameters and options, refer to [Parameters]({% link {

You can configure flushing, retry, and concurrency behavior of changefeeds running to a Pub/Sub sink with the following:

- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [webhook sinks](#webhook-sink).
- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a Pub/Sub sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [webhook sinks](#webhook-sink) and [Kafka](#kafka).
- Set the `pubsub_sink_config` option to configure the changefeed flushing and retry behavior to your webhook sink. For details on the `pubsub_sink_config` option's configurable fields, refer to the following table and examples.

Field | Type | Description | Default
Expand Down Expand Up @@ -562,7 +555,7 @@ The following are considerations when using the webhook sink:

You can configure flushing, retry, and concurrency behavior of changefeeds running to a webhook sink with the following:

- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub sinks](#google-cloud-pub-sub).
- Set the [`changefeed.sink_io_workers` cluster setting]({% link {{ page.version.version }}/cluster-settings.md %}#setting-changefeed-sink-io-workers) to configure the number of concurrent workers used by changefeeds in the cluster when sending requests to a webhook sink. When you set `changefeed.sink_io_workers`, it will not affect running changefeeds; [pause the changefeed]({% link {{ page.version.version }}/pause-job.md %}), set `changefeed.sink_io_workers`, and then [resume the changefeed]({% link {{ page.version.version }}/resume-job.md %}). Note that this cluster setting will also affect changefeeds running to [Google Cloud Pub/Sub sinks](#google-cloud-pub-sub) and [Kafka](#kafka).
- Set the `webhook_sink_config` option to configure the changefeed flushing and retry behavior to your webhook sink. For details on the `webhook_sink_config` option's configurable fields, refer to the following table and examples.

Field | Type | Description | Default
Expand Down

0 comments on commit de9c4ec

Please sign in to comment.