Name | Description |
---|---|
kafkaListeners | Comma-separated list of URIs that we will listen on and the listener names. e.g. PLAINTEXT://localhost:9092,SSL://localhost:9093. Each URI's scheme represents a listener name if kafkaProtocolMap is configured.Otherwise, the scheme must be a valid protocol in [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL]. If the hostname is not set, it will be bound to the default interface. |
kafkaProtocolMap | Comma-separated map of listener name and protocol. e.g. PRIVATE:PLAINTEXT,PRIVATE_SSL:SSL,PUBLIC:PLAINTEXT,PUBLIC_SSL:SSL. |
listeners | Deprecated. kafkaListeners is used. |
kafkaAdvertisedListeners | Listeners to publish to ZooKeeper for clients to use. The format is the same as kafkaListeners . |
NOTE
Among all configurations, only
kafkaListeners
orlisteners
(deprecated) is required.
To support multiple listeners, you need to specify different listener names in kafkaListeners
and kafkaAdvertisedListeners
. Then, map the listener name to the proper protocol in kafkaProtocolMap
.
For example, assuming you need to listen on port 9092 and 19092 with the PLAINTEXT
protocol, the associated names are kafka_internal
and kafka_external
. Then you need to add the following configurations:
kafkaListeners=kafka_internal://0.0.0.0:9092,kafka_external://0.0.0.0:19092
kafkaProtocolMap=kafka_internal:PLAINTEXT,kafka_external:PLAINTEXT
kafkaAdvertisedListeners=kafka_internal://localhost:9092,kafka_external://localhost:19092
In the above example,
kafkaListener
is split into multiple tokens by a comma (,
), the token is in a format of<listener-name>://<host>:<port>
.kafkaProtocolMap
is split into multiple tokens by a comma (,
), the token is in a format of<listener-name>:<protocol>
.kafkaAdvertisedListeners
is split into multiple tokens by a comma(,
), the token is in a format of<listener-name>:<scheme>://<host>:<port>
.
KoP shares the same configuration files with the Pulsar broker, e.g. conf/broker.conf
or conf/standalone.conf
. The log configurations can be configured in conf/log4j2.yaml
file like below:
Logger:
- name: io.streamnative.pulsar.handlers.kop
level: warn
additivity: false
AppenderRef:
- ref: Console
Pulsar is a multi-tenant system that requires users to specify the tenant and namespace. While, most Kafka users just specify the short topic name. So KoP provides following configurations to specify the default namespace.
Name | Description | Default |
---|---|---|
kafkaTenant | The default tenant of Kafka topics | public |
kafkaNamespace | The default namespace of Kafka topics | default |
kafkaMetadataTenant | The tenant used for storing Kafka metadata topics | public |
kafkaEnableMultiTenantMetadata | Use the SASL username as kafkaMetadataTenant |
true |
kafkaMetadataNamespace | The namespace used for storing Kafka metadata topics | __kafka |
kopAllowedNamespaces | The allowed namespace to list topics with a comma separator. For example, "public/default,public/kafka". If it's not set or empty, the allowed namespaces will be "/". |
When you enable kafkaEnableMultiTenantMetadata
, KoP uses separate tenants for handling the system metadata.
This enables you to fully isolate your tenants in your Pulsar cluster.
This is not available in pure Kafka, because usually you share the system metadata among all the users.
This section lists configurations that may affect the performance.
Name | Description | Range | Default |
---|---|---|---|
entryFormat | The format of an entry. If it is set tokafka , there is no unnecessary encoding and decoding work, which helps improve the performance. However, in this situation, a topic cannot be used by mixed Pulsar clients and Kafka clients. If it is set to mixed_kafka , some non-official Kafka clients implementation are supported. - Note: Compared with performance for mixed_kafka , performance is improved by 2 to 3 times when the parameter is set to kafka . |
kafka, mixed_kafka, pulsar |
pulsar |
maxReadEntriesNum | The maximum number of entries that are read from the cursor once per time. Increasing this value can make FETCH request read more bytes each time. NOTE: Currently, KoP does not check the maximum byte limit. Therefore, if the value is too great, the response size may be over the network limit. |
5 |
Generally, if you don't have Pulsar consumers that consume messages from Kafka producers, kafka
format is perferred because it has much higher performance when Kafka consumers interact with Kafka producers.
However, some non-official Kafka clients might not work for kafka
format. For example, old Golang Sarama client didn't assign relative offsets in compressed message sets before Shopify/sarama #1002. In this case, the broker has to assign relative offsets and then do recompression. Since this behavior leads to some performance loss, KoP adds the mixed_kafka
format to perform the conversion. The mixed_kafka
format should be chosen when you have such an old Kafka client. Like kafka
format, in this case, Pulsar consumers still cannot consume messages from Kafka producers.
PIP 96 introduced a message payload processor for Pulsar consumer. KoP provides a processor implementation so that even if you configured entryFormat=kafka
for better performance among Kafka clients, it could be also possible for Pulsar consumer to consume messages from Kafka producer.
You just need to configure the processor in your consumer application via messagePayloadProcessor
. See the following code example:
PulsarClient client = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
Consumer<byte[]> consumer = client.newConsumer()
.topic("my-topic")
.subscriptionName("my-sub")
.messagePayloadProcessor(new KafkaPayloadProcessor()) // this extra line is needed
.subscribe();
To import the KafkaPayloadProcessor
, you should add the additional dependency.
<dependency>
<groupId>io.streamnative.pulsar.handlers</groupId>
<artifactId>kafka-payload-processor</artifactId>
<version>${pulsar.version}</version>
</dependency>
The pulsar.version
should be same with the version of your pulsar-client
dependency.
Name | Description | Default |
---|---|---|
maxQueuedRequests | Limit the queue size for request, like queued.max.requests in Kafka server. |
500 |
requestTimeoutMs | Limit the timeout in milliseconds for request, like request.timeout.ms in Kafka client.If a request was not processed in the timeout, KoP would return an error response to client. |
30000 |
connectionMaxIdleMs | The idle connection timeout in milliseconds. If the idle connection timeout (such as connections.max.idle.ms used in the Kafka server) is reached, the server handler will close this idle connection.Note: If it is set to -1 , it indicates that the idle connection timeout is disabled. |
600000 |
failedAuthenticationDelayMs | Connection close delay on failed authentication: this is the time (in milliseconds) by which connection close will be delayed on authentication failure, like connection.failed.authentication.delay.ms in Kafka server. |
300 |
brokerLookupTimeoutMs | The timeout for broker lookups (in milliseconds). | 30000 |
NOTE
These limits are based on each connection.
Name | Description | Default |
---|---|---|
kopPrometheusStatsLatencyRolloverSeconds | Kop metrics exposed to prometheus rollover latency in seconds. | 60 |
This section lists configurations about the group coordinator and the __consumer_offsets
topic that is used to store committed offsets.
Name | Description | Default |
---|---|---|
groupMinSessionTimeoutMs | The minimum allowed session timeout for registered consumers. Shorter timeouts result in quicker failure detection while require more frequent consumer heart beating, which can overwhelm broker resources. |
6000 |
groupMaxSessionTimeoutMs | The maximum allowed session timeout for registered consumers. Longer timeouts give consumers more time to process messages between heartbeats while require longer time to detect failures. |
300000 |
groupInitialRebalanceDelayMs | The time the group coordinator waits for more consumers to join a new group before performing the first rebalance. A longer delay potentially reduces rebalances, but increases the time until processing begins. |
3000 |
offsetsTopicCompressionCodec | Compression codec for the offsets topic. Compression may be used to achieve "atomic" commits. |
|
offsetMetadataMaxSize | The maximum size in bytes for a metadata entry associated with an offset commit. | 4096 |
offsetsRetentionMinutes | Offsets older than this retention period are discarded. | 4320 |
offsetsMessageTTL | The offsets message TTL in seconds. | 259200 |
offsetsRetentionCheckIntervalMs | The frequency at which to check for stale offsets. | 600000 |
offsetsTopicNumPartitions | The number of partitions for the offsets topic. | 50 |
This section lists configurations about the transaction.
Name | Description | Default |
---|---|---|
kafkaTransactionCoordinatorEnabled | Whether to enable transaction coordinator. | false |
kafkaBrokerId | The broker ID that is used to create the producer ID. | 1 |
kafkaTxnLogTopicNumPartitions | the number of partitions for the transaction log topic. | 50 |
kafkaTxnAbortTimedOutTransactionCleanupIntervalMs | The interval in milliseconds at which to rollback transactions that have timed out. | 10000 |
kafkaTransactionalIdExpirationEnable | Whether to enable transactional ID expiration. | true |
kafkaTransactionalIdExpirationMs | The time (in ms) that the transaction coordinator waits without receiving any transaction status updates for the current transaction before expiring its transactional ID. | 604800 |
kafkaTransactionsRemoveExpiredTransactionalIdCleanupIntervalMs | The interval (in ms) at which to remove expired transactions. | 3600 |
This section lists configurations about the authentication.
Name | Description | Range | Default |
---|---|---|---|
saslAllowedMechanisms | A set of supported SASL mechanisms exposed by the broker. | PLAIN, OAUTHBEARER |
|
kopOauth2AuthenticateCallbackHandler | The fully qualified name of a SASL server callback handler class that implements the AuthenticateCallbackHandler interface, which is used for OAuth2 authentication. If it is not set, the class will be Kafka's default server callback handler for OAUTHBEARER mechanism: OAuthBearerUnsecuredValidatorCallbackHandler. |
Name | Description | Default |
---|---|---|
kopSslProtocol | Kafka SSL configuration map with: SSL_PROTOCOL_CONFIG = ssl.protocol | TLS |
kopSslProvider | Kafka SSL configuration map with: SSL_PROVIDER_CONFIG = ssl.provider | |
kopSslCipherSuites | Kafka SSL configuration map with: SSL_CIPHER_SUITES_CONFIG = ssl.cipher.suites | |
kopSslEnabledProtocols | Kafka SSL configuration map with: SSL_ENABLED_PROTOCOLS_CONFIG = ssl.enabled.protocols | TLSv1.2, TLSv1.1, TLSv1 |
kopSslKeystoreType | Kafka SSL configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.type | JKS |
kopSslKeystoreLocation | Kafka SSL configuration map with: SSL_KEYSTORE_LOCATION_CONFIG = ssl.keystore.location | |
kopSslKeystorePassword | Kafka SSL configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password | N/A |
kopSslTruststoreType | Kafka SSL configuration map with: SSL_KEYSTORE_TYPE_CONFIG = ssl.keystore.type | JKS |
kopSslTruststoreLocation | Kafka SSL configuration map with: SSL_TRUSTSTORE_LOCATION_CONFIG = ssl.truststore.location | |
kopSslTruststorePassword | Kafka SSL configuration map with: SSL_TRUSTSTORE_PASSWORD_CONFIG = ssl.truststore.password | |
kopSslKeymanagerAlgorithm | Kafka SSL configuration map with: SSL_KEYMANAGER_ALGORITHM_CONFIG = ssl.keymanager.algorithm | SunX509 |
kopSslTrustmanagerAlgorithm | Kafka SSL configuration map with: SSL_TRUSTMANAGER_ALGORITHM_CONFIG = ssl.trustmanager.algorithm | SunX509 |
kopSslSecureRandomImplementation | Kafka SSL configuration map with: SSL_SECURE_RANDOM_IMPLEMENTATION_CONFIG = ssl.secure.random.implementation |