diff --git a/.semaphore/semaphore.yml b/.semaphore/semaphore.yml index 3850c7387..8b30b64c0 100644 --- a/.semaphore/semaphore.yml +++ b/.semaphore/semaphore.yml @@ -6,7 +6,7 @@ agent: global_job_config: env_vars: - name: LIBRDKAFKA_VERSION - value: v2.6.0-RC2 + value: v2.6.0 prologue: commands: - checkout diff --git a/examples/docker_aws_lambda_example/go.mod b/examples/docker_aws_lambda_example/go.mod index ceb484cb9..2e4812e62 100644 --- a/examples/docker_aws_lambda_example/go.mod +++ b/examples/docker_aws_lambda_example/go.mod @@ -2,7 +2,7 @@ module docker_example require ( github.com/aws/aws-lambda-go v1.27.0 - github.com/confluentinc/confluent-kafka-go/v2 v2.6.0-RC2 + github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 ) go 1.21 diff --git a/examples/go.mod b/examples/go.mod index b33aaa35e..243c52367 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -9,7 +9,7 @@ replace github.com/confluentinc/confluent-kafka-go/v2 => ../ require ( github.com/actgardner/gogen-avro/v10 v10.2.1 github.com/alecthomas/kingpin v2.2.6+incompatible - github.com/confluentinc/confluent-kafka-go/v2 v2.6.0-RC2 + github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 github.com/gdamore/tcell v1.4.0 google.golang.org/protobuf v1.33.0 ) diff --git a/kafka/api.html b/kafka/api.html index 2fc915574..40e2b57d7 100644 --- a/kafka/api.html +++ b/kafka/api.html @@ -567,6 +567,11 @@
- + 00version.go - + adminapi.go - + adminoptions.go - + build_glibc_linux_amd64.go - + config.go - + consumer.go - + context.go - + error.go - + error_gen.go - + event.go - + generated_errors.go - + handle.go - + header.go - + kafka.go - + log.go - + message.go - + metadata.go - + misc.go - + mockcluster.go - + offset.go - + producer.go - + time.go @@ -1905,7 +1965,7 @@
LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -
const LibrdkafkaLinkInfo = "static glibc_linux_amd64 from librdkafka-static-bundle-v2.6.0-RC2.tgz"
+ const LibrdkafkaLinkInfo = "static glibc_linux_amd64 from librdkafka-static-bundle-v2.6.0.tgz"
OffsetBeginning represents the earliest offset (logical)
const OffsetBeginning = Offset(C.RD_KAFKA_OFFSET_BEGINNING)
@@ -1924,7 +1984,7 @@ const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
type ACLBindingFilter = ACLBinding
type ACLBindingFilters []ACLBindingFilter
type ACLBindings []ACLBinding
func (a ACLBindings) Len() int
func (a ACLBindings) Less(i, j int) bool
func (a ACLBindings) Swap(i, j int)
Returns a map from user name to user SCRAM credentials description. Each description can have an individual error. -
func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
func (a *AdminClient) ElectLeaders(ctx context.Context, electLeaderRequest ElectLeadersRequest, options ...ElectLeadersAdminOption) (result ElectLeadersResult, err error)
- GetMetadata queries broker for cluster and topic metadata. + ElectLeaders performs Preferred or Unclean Elections for the specified topic Partitions or for all of them. +
+ Parameters: +
+ Returns ElectLeadersResult, which contains a slice of TopicPartitions containing the partitions for which the leader election was performed. +If we are passing partitions as nil, the broker will perform leader elections for all partitions, +but the results will only contain partitions for which there was an election or resulted in an error. +Individual TopicPartitions inside the ElectLeadersResult should be checked for errors. +Additionally, an error that is not nil for client-level errors is returned. +
func (a *AdminClient) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
+ GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. -
func (a *AdminClient) IncrementalAlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)-
- IncrementalAlterConfigs alters/updates cluster resource configuration. -
- Updates are not transactional so they may succeed for some resources +
func (a *AdminClient) IncrementalAlterConfigs(ctx context.Context, resources []ConfigResource, options ...AlterConfigsAdminOption) (result []ConfigResourceResult, err error)+
+ IncrementalAlterConfigs alters/updates cluster resource configuration. +
+ Updates are not transactional so they may succeed for some resources while fail for others. The configs for a particular resource are updated atomically, executing the corresponding incremental operations on the provided configurations. -
- Requires broker version >=2.3.0 -
- IncrementalAlterConfigs will only change configurations for provided -resources with the new configuration given. -
- Multiple resources and resource types may be set, but at most one -resource of type ResourceBroker is allowed per call since these -resource requests must be sent to the broker specified in the resource. -
func (a *AdminClient) IsClosed() bool
- IsClosed returns boolean representing if client is closed or not -
func (a *AdminClient) ListConsumerGroupOffsets( - ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, - options ...ListConsumerGroupOffsetsAdminOption) (lcgor ListConsumerGroupOffsetsResult, err error)+ Requires broker version >=2.3.0
- ListConsumerGroupOffsets fetches the offsets for topic partition(s) for -consumer group(s). + IncrementalAlterConfigs will only change configurations for provided +resources with the new configuration given.
- Parameters: -
func (a *AdminClient) IsClosed() bool
- Returns a ListConsumerGroupOffsetsResult, containing a slice of -ConsumerGroupTopicPartitions corresponding to the input slice, plus an error that is -not `nil` for client level errors. Individual TopicPartitions inside each of -the ConsumerGroupTopicPartitions should also be checked for errors. -
func (a *AdminClient) ListConsumerGroups( - ctx context.Context, - options ...ListConsumerGroupsAdminOption) (result ListConsumerGroupsResult, err error)+
func (a *AdminClient) ListConsumerGroupOffsets( + ctx context.Context, groupsPartitions []ConsumerGroupTopicPartitions, + options ...ListConsumerGroupOffsetsAdminOption) (lcgor ListConsumerGroupOffsetsResult, err error)
- ListConsumerGroups lists the consumer groups available in the cluster. + ListConsumerGroupOffsets fetches the offsets for topic partition(s) for +consumer group(s).
Parameters:
- Returns a ListConsumerGroupsResult, which contains a slice corresponding to -each group in the cluster and a slice of errors encountered while listing. -Additionally, an error that is not nil for client-level errors is returned. -Both the returned error, and the errors slice should be checked. -
func (a *AdminClient) ListOffsets( - ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec, - options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error)+
func (a *AdminClient) ListConsumerGroups( + ctx context.Context, + options ...ListConsumerGroupsAdminOption) (result ListConsumerGroupsResult, err error)
- ListOffsets describe offsets for the -specified TopicPartiton based on an OffsetSpec. + ListConsumerGroups lists the consumer groups available in the cluster.
Parameters:
- Returns a ListOffsetsResult. -Each TopicPartition's ListOffset can have an individual error. -
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
func (a *AdminClient) ListOffsets( + ctx context.Context, topicPartitionOffsets map[TopicPartition]OffsetSpec, + options ...ListOffsetsAdminOption) (result ListOffsetsResult, err error)
- SetOAuthBearerToken sets the the data to be transmitted + ListOffsets describe offsets for the +specified TopicPartiton based on an OffsetSpec. +
+ Parameters: +
+ Returns a ListOffsetsResult. +Each TopicPartition's ListOffset can have an individual error. +
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
+ SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per - - https://tools.ietf.org/html/rfc7628#section-3.1 - - ); + + https://tools.ietf.org/html/rfc7628#section-3.1 + + ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism. -
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error-
- SetOAuthBearerTokenFailure sets the error message describing why token +
func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error+
+ SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism. -
func (a *AdminClient) SetSaslCredentials(username, password string) error-
- SetSaslCredentials sets the SASL credentials used for this admin client. +
func (a *AdminClient) SetSaslCredentials(username, password string) error+
+ SetSaslCredentials sets the SASL credentials used for this admin client. The new credentials will overwrite the old ones (which were set when creating the admin client or by a previous call to SetSaslCredentials). The new credentials will be used the next time the admin client needs to authenticate to a broker. This method will not disconnect existing broker connections that were established with the old credentials. This method applies only to the SASL PLAIN and SCRAM mechanisms. -
func (a *AdminClient) String() string-
- String returns a human readable name for an AdminClient instance -
- AdminOption is a generic type not to be used directly. -
- See CreateTopicsAdminOption et.al. -
type AdminOption interface {
- // contains filtered or unexported methods
-}
- func (a *AdminClient) String() string
- AdminOptionIncludeAuthorizedOperations decides if the broker should return -authorized operations. + String returns a human readable name for an AdminClient instance +
- Default: false + AdminOption is a generic type not to be used directly.
- Valid for DescribeConsumerGroups, DescribeTopics, DescribeCluster. -
type AdminOptionIncludeAuthorizedOperations struct {
- // contains filtered or unexported fields
-}
-
- type AdminOption interface {
+ // contains filtered or unexported methods
+}
+ func SetAdminOptionIncludeAuthorizedOperations(val bool) (ao AdminOptionIncludeAuthorizedOperations)+
- SetAdminOptionIncludeAuthorizedOperations decides if the broker should return + AdminOptionIncludeAuthorizedOperations decides if the broker should return authorized operations.
Default: false
Valid for DescribeConsumerGroups, DescribeTopics, DescribeCluster. -
type AdminOptionIncludeAuthorizedOperations struct {
+ // contains filtered or unexported fields
+}
+
+ func SetAdminOptionIncludeAuthorizedOperations(val bool) (ao AdminOptionIncludeAuthorizedOperations)
- AdminOptionIsolationLevel sets the overall request IsolationLevel. + SetAdminOptionIncludeAuthorizedOperations decides if the broker should return +authorized operations.
- Default: `ReadUncommitted`. + Default: false
- Valid for ListOffsets. -
type AdminOptionIsolationLevel struct {
- // contains filtered or unexported fields
-}
-
- func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel)+
- SetAdminIsolationLevel sets the overall IsolationLevel for a request. + AdminOptionIsolationLevel sets the overall request IsolationLevel.
Default: `ReadUncommitted`.
Valid for ListOffsets. -
type AdminOptionIsolationLevel struct {
+ // contains filtered or unexported fields
+}
+
+ func SetAdminIsolationLevel(isolationLevel IsolationLevel) (ao AdminOptionIsolationLevel)
- AdminOptionMatchConsumerGroupStates decides groups in which state(s) should be -listed. + SetAdminIsolationLevel sets the overall IsolationLevel for a request.
- Default: nil (lists groups in all states). + Default: `ReadUncommitted`.
- Valid for ListConsumerGroups. -
type AdminOptionMatchConsumerGroupStates struct {
- // contains filtered or unexported fields
-}
-
- func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates)+
- SetAdminMatchConsumerGroupStates decides groups in which state(s) should be + AdminOptionMatchConsumerGroupStates decides groups in which state(s) should be listed.
Default: nil (lists groups in all states).
Valid for ListConsumerGroups. -
type AdminOptionMatchConsumerGroupStates struct {
+ // contains filtered or unexported fields
+}
+
+ func SetAdminMatchConsumerGroupStates(val []ConsumerGroupState) (ao AdminOptionMatchConsumerGroupStates)
- AdminOptionOperationTimeout sets the broker's operation timeout, such as the -timeout for CreateTopics to complete the creation of topics on the controller -before returning a result to the application. + SetAdminMatchConsumerGroupStates sets the state(s) that must be +listed.
- CreateTopics, DeleteTopics, CreatePartitions: -a value 0 will return immediately after triggering topic -creation, while > 0 will wait this long for topic creation to propagate -in cluster. + Default: nil (lists groups in all states).
- Default: 0 (return immediately). + Valid for ListConsumerGroups. +
- Valid for CreateTopics, DeleteTopics, CreatePartitions. -
type AdminOptionOperationTimeout struct { + AdminOptionMatchConsumerGroupTypes decides the type(s) that must be +listed. ++ Default: nil (lists groups of all types). +
+ Valid for ListConsumerGroups. +
type AdminOptionMatchConsumerGroupTypes struct { // contains filtered or unexported fields }
-- func - - SetAdminOperationTimeout - - - ¶ - -
-func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)-- SetAdminOperationTimeout sets the broker's operation timeout, such as the -timeout for CreateTopics to complete the creation of topics on the controller -before returning a result to the application. -
- CreateTopics, DeleteTopics, CreatePartitions: -a value 0 will return immediately after triggering topic -creation, while > 0 will wait this long for topic creation to propagate -in cluster. +
+ func + + SetAdminMatchConsumerGroupTypes + + + ¶ + +
+func SetAdminMatchConsumerGroupTypes(val []ConsumerGroupType) (ao AdminOptionMatchConsumerGroupTypes)- Default: 0 (return immediately). + SetAdminMatchConsumerGroupTypes set the type(s) that must be +listed.
- Valid for CreateTopics, DeleteTopics, CreatePartitions. -
- type - - AdminOptionRequestTimeout - - - ¶ - -
+ Default: nil (lists groups of all types).- AdminOptionRequestTimeout sets the overall request timeout, including broker -lookup, request transmission, operation time on broker, and response. + Valid for ListConsumerGroups. +
+ type + + AdminOptionOperationTimeout + + + ¶ + +
- Default: `socket.timeout.ms`. + AdminOptionOperationTimeout sets the broker's operation timeout, such as the +timeout for CreateTopics to complete the creation of topics on the controller +before returning a result to the application.
- Valid for all Admin API methods. -
type AdminOptionRequestTimeout struct { + CreateTopics, DeleteTopics, CreatePartitions: +a value 0 will return immediately after triggering topic +creation, while > 0 will wait this long for topic creation to propagate +in cluster. ++ Default: 0 (return immediately). +
+ Valid for CreateTopics, DeleteTopics, CreatePartitions. +
type AdminOptionOperationTimeout struct { // contains filtered or unexported fields }
-- func - - SetAdminRequestTimeout - - - ¶ - -
-func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)-- SetAdminRequestTimeout sets the overall request timeout, including broker -lookup, request transmission, operation time on broker, and response. -
- Default: `socket.timeout.ms`. +
+ func + + SetAdminOperationTimeout + + + ¶ + +
+func SetAdminOperationTimeout(t time.Duration) (ao AdminOptionOperationTimeout)- Valid for all Admin API methods. -
- type - - AdminOptionRequireStableOffsets - - - ¶ - -
+ SetAdminOperationTimeout sets the broker's operation timeout, such as the +timeout for CreateTopics to complete the creation of topics on the controller +before returning a result to the application.- AdminOptionRequireStableOffsets decides if the broker should return stable -offsets (transaction-committed). + CreateTopics, DeleteTopics, CreatePartitions: +a value 0 will return immediately after triggering topic +creation, while > 0 will wait this long for topic creation to propagate +in cluster.
- Default: false + Default: 0 (return immediately).
- Valid for ListConsumerGroupOffsets. -
type AdminOptionRequireStableOffsets struct { - // contains filtered or unexported fields -} -
-- func - - SetAdminRequireStableOffsets + Valid for CreateTopics, DeleteTopics, CreatePartitions. +
-+ type + + AdminOptionRequestTimeout - + ¶ -
func SetAdminRequireStableOffsets(val bool) (ao AdminOptionRequireStableOffsets)+
- SetAdminRequireStableOffsets decides if the broker should return stable -offsets (transaction-committed). + AdminOptionRequestTimeout sets the overall request timeout, including broker +lookup, request transmission, operation time on broker, and response.
- Default: false + Default: `socket.timeout.ms`.
- Valid for ListConsumerGroupOffsets. -
type AdminOptionRequestTimeout struct {
+ // contains filtered or unexported fields
+}
+
+ func SetAdminRequestTimeout(t time.Duration) (ao AdminOptionRequestTimeout)
- AdminOptionValidateOnly tells the broker to only validate the request, -without performing the requested operation (create topics, etc). + SetAdminRequestTimeout sets the overall request timeout, including broker +lookup, request transmission, operation time on broker, and response.
- Default: false. + Default: `socket.timeout.ms`.
- Valid for CreateTopics, CreatePartitions, AlterConfigs -
type AdminOptionValidateOnly struct {
- // contains filtered or unexported fields
-}
-
- func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)+
- SetAdminValidateOnly tells the broker to only validate the request, -without performing the requested operation (create topics, etc). + AdminOptionRequireStableOffsets decides if the broker should return stable +offsets (transaction-committed).
- Default: false. + Default: false
- Valid for CreateTopics, DeleteTopics, CreatePartitions, AlterConfigs -
type AdminOptionRequireStableOffsets struct {
+ // contains filtered or unexported fields
+}
+
+ func SetAdminRequireStableOffsets(val bool) (ao AdminOptionRequireStableOffsets)
- AlterConfigOpType specifies the operation to perform -on the ConfigEntry for IncrementalAlterConfig -
type AlterConfigOpType int-
const ( - // AlterConfigOpTypeSet sets/overwrites the configuration - // setting. - AlterConfigOpTypeSet AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET - // AlterConfigOpTypeDelete sets the configuration setting - // to default or NULL. - AlterConfigOpTypeDelete AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE - // AlterConfigOpTypeAppend appends the value to existing - // configuration settings. - AlterConfigOpTypeAppend AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND - // AlterConfigOpTypeSubtract subtracts the value from - // existing configuration settings. - AlterConfigOpTypeSubtract AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT -)-
func (o AlterConfigOpType) String() string+ SetAdminRequireStableOffsets decides if the broker should return stable +offsets (transaction-committed).
- String returns the human-readable representation of an AlterOperation -
- AlterConfigsAdminOption - see setters. + Valid for ListConsumerGroupOffsets. +
- See SetAdminRequestTimeout, SetAdminValidateOnly, SetAdminIncremental. -
type AlterConfigsAdminOption interface {
- // contains filtered or unexported methods
-}
- - AlterConsumerGroupOffsetsAdminOption - see setter. + Default: false.
- See SetAdminRequestTimeout. -
type AlterConsumerGroupOffsetsAdminOption interface {
- // contains filtered or unexported methods
-}
- type AdminOptionValidateOnly struct {
+ // contains filtered or unexported fields
+}
+
+ func SetAdminValidateOnly(validateOnly bool) (ao AdminOptionValidateOnly)
- AlterConsumerGroupOffsetsResult represents the result of a -AlterConsumerGroupOffsets operation. -
type AlterConsumerGroupOffsetsResult struct { - // A slice of ConsumerGroupTopicPartitions, each element represents a group's - // TopicPartitions and Offsets. - ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions -} --
- AlterOperation specifies the operation to perform on the ConfigEntry. -Currently only AlterOperationSet. -
type AlterOperation int-
func (o AlterOperation) String() string+ Default: false.
- String returns the human-readable representation of an AlterOperation -
- AlterUserScramCredentialsAdminOption - see setter. + AlterConfigOpType specifies the operation to perform +on the ConfigEntry for IncrementalAlterConfig +
type AlterConfigOpType int+
const ( + // AlterConfigOpTypeSet sets/overwrites the configuration + // setting. + AlterConfigOpTypeSet AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_SET + // AlterConfigOpTypeDelete sets the configuration setting + // to default or NULL. + AlterConfigOpTypeDelete AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_DELETE + // AlterConfigOpTypeAppend appends the value to existing + // configuration settings. + AlterConfigOpTypeAppend AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_APPEND + // AlterConfigOpTypeSubtract subtracts the value from + // existing configuration settings. + AlterConfigOpTypeSubtract AlterConfigOpType = C.RD_KAFKA_ALTER_CONFIG_OP_TYPE_SUBTRACT +)+
func (o AlterConfigOpType) String() string
- See SetAdminRequestTimeout. -
type AlterUserScramCredentialsAdminOption interface {
- // contains filtered or unexported methods
-}
- - AlterUserScramCredentialsResult represents the result of a -AlterUserScramCredentials call. -
type AlterUserScramCredentialsResult struct { - // Errors - Map from user name - // to an Error, with ErrNoError code on success. - Errors map[string]Error -} --
- AssignedPartitions consumer group rebalance event: assigned partition set -
type AssignedPartitions struct { - Partitions []TopicPartition -} --
func (e AssignedPartitions) String() string-
type AlterConfigsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ - BrokerMetadata contains per-broker metadata -
type BrokerMetadata struct { - ID int32 - Host string - Port int -} --
- ConfigEntry holds parameters for altering a resource's configuration. -
type ConfigEntry struct { - // Name of configuration entry, e.g., topic configuration property name. - Name string - // Value of configuration entry. - Value string - // Deprecated: Operation to perform on the entry. - Operation AlterOperation - // Operation to perform on the entry incrementally. - IncrementalOperation AlterConfigOpType -} --
type AlterConsumerGroupOffsetsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry+
- StringMapToConfigEntries creates a new map of ConfigEntry objects from the -provided string map. The AlterOperation is set on each created entry. -
type AlterConsumerGroupOffsetsResult struct { + // A slice of ConsumerGroupTopicPartitions, each element represents a group's + // TopicPartitions and Offsets. + ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions +} ++
func StringMapToIncrementalConfigEntries(stringMap map[string]string, - operationMap map[string]AlterConfigOpType) []ConfigEntry+
- StringMapToIncrementalConfigEntries creates a new map of ConfigEntry objects from the -provided string map an operation map. The AlterConfigOpType is set on each created entry. -
type AlterOperation int+
func (c ConfigEntry) String() string+
func (o AlterOperation) String() string
- String returns a human-readable representation of a ConfigEntry. -
- ConfigEntryResult contains the result of a single configuration entry from a -DescribeConfigs request. -
type ConfigEntryResult struct { - // Name of configuration entry, e.g., topic configuration property name. - Name string - // Value of configuration entry. - Value string - // Source indicates the configuration source. - Source ConfigSource - // IsReadOnly indicates whether the configuration entry can be altered. - IsReadOnly bool - // IsDefault indicates whether the value is at its default. - IsDefault bool - // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset. - IsSensitive bool - // IsSynonym indicates whether the configuration entry is a synonym for another configuration property. - IsSynonym bool - // Synonyms contains a map of configuration entries that are synonyms to this configuration entry. - Synonyms map[string]ConfigEntryResult -} --
func (c ConfigEntryResult) String() string+ AlterUserScramCredentialsAdminOption - see setter.
- String returns a human-readable representation of a ConfigEntryResult. -
type AlterUserScramCredentialsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ - ConfigMap is a map containing standard librdkafka configuration properties as documented in: - - https://github.com/confluentinc/librdkafka/tree/master/CONFIGURATION.md - + AlterUserScramCredentialsResult represents the result of a +AlterUserScramCredentials call. +
type AlterUserScramCredentialsResult struct { + // Errors - Map from user name + // to an Error, with ErrNoError code on success. + Errors map[string]Error +} ++
- The special property "default.topic.config" (optional) is a ConfigMap -containing default topic configuration properties. + AssignedPartitions consumer group rebalance event: assigned partition set +
type AssignedPartitions struct { + Partitions []TopicPartition +} ++
func (e AssignedPartitions) String() string+
- The use of "default.topic.config" is deprecated, -topic configuration properties shall be specified in the standard ConfigMap. -For backwards compatibility, "default.topic.config" (if supplied) -takes precedence. -
type ConfigMap map[string]ConfigValue-
type BrokerMetadata struct { + ID int32 + Host string + Port int +} ++
func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)+
- Get finds the given key in the ConfigMap and returns its value. -If the key is not found `defval` is returned. -If the key is found but the type does not match that of `defval` (unless nil) -an ErrInvalidArg error is returned. -
type ConfigEntry struct { + // Name of configuration entry, e.g., topic configuration property name. + Name string + // Value of configuration entry. + Value string + // Deprecated: Operation to perform on the entry. + Operation AlterOperation + // Operation to perform on the entry incrementally. + IncrementalOperation AlterConfigOpType +} ++
func (m ConfigMap) Set(kv string) error+
func StringMapToConfigEntries(stringMap map[string]string, operation AlterOperation) []ConfigEntry
- Set implements flag.Set (command line argument parser) as a convenience -for `-X key=value` config. -
func (m ConfigMap) SetKey(key string, value ConfigValue) error+
func StringMapToIncrementalConfigEntries(stringMap map[string]string, + operationMap map[string]AlterConfigOpType) []ConfigEntry
- SetKey sets configuration property key to value. + StringMapToIncrementalConfigEntries creates a new map of ConfigEntry objects from the +provided string map an operation map. The AlterConfigOpType is set on each created entry. +
func (c ConfigEntry) String() string
- For user convenience a key prefixed with {topic}. will be -set on the "default.topic.config" sub-map, this use is deprecated. -
- ConfigResource holds parameters for altering an Apache Kafka configuration resource -
type ConfigResource struct { - // Type of resource to set. - Type ResourceType - // Name of resource to set. + ConfigEntryResult contains the result of a single configuration entry from a +DescribeConfigs request. +type ConfigEntryResult struct { + // Name of configuration entry, e.g., topic configuration property name. Name string - // Config entries to set. - // Configuration updates are atomic, any configuration property not provided - // here will be reverted (by the broker) to its default value. - // Use DescribeConfigs to retrieve the list of current configuration entry values. - Config []ConfigEntry + // Value of configuration entry. + Value string + // Source indicates the configuration source. + Source ConfigSource + // IsReadOnly indicates whether the configuration entry can be altered. + IsReadOnly bool + // IsDefault indicates whether the value is at its default. + IsDefault bool + // IsSensitive indicates whether the configuration entry contains sensitive information, in which case the value will be unset. + IsSensitive bool + // IsSynonym indicates whether the configuration entry is a synonym for another configuration property. + IsSynonym bool + // Synonyms contains a map of configuration entries that are synonyms to this configuration entry. + Synonyms map[string]ConfigEntryResult }-- func (ConfigResource) - +
+ func (ConfigEntryResult) + String - + ¶
-func (c ConfigResource) String() string+func (c ConfigEntryResult) String() string- String returns a human-readable representation of a ConfigResource -
+ String returns a human-readable representation of a ConfigEntryResult. +
type - - ConfigResourceResult + + ConfigMap - + ¶
- ConfigResourceResult provides the result for a resource from a AlterConfigs or -DescribeConfigs request. -
type ConfigResourceResult struct { - // Type of returned result resource. - Type ResourceType - // Name of returned result resource. - Name string - // Error, if any, of returned result resource. - Error Error - // Config entries, if any, of returned result resource. - Config map[string]ConfigEntryResult -} --- func (ConfigResourceResult) - - String - - - ¶ - -
-func (c ConfigResourceResult) String() string+ ConfigMap is a map containing standard librdkafka configuration properties as documented in: + + https://github.com/confluentinc/librdkafka/tree/master/CONFIGURATION.md +- String returns a human-readable representation of a ConfigResourceResult. -
- type - - ConfigSource - - - ¶ - -
+ The special property "default.topic.config" (optional) is a ConfigMap +containing default topic configuration properties.- ConfigSource represents an Apache Kafka config source -
type ConfigSource int-const ( - // ConfigSourceUnknown is the default value - ConfigSourceUnknown ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG - // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic - ConfigSourceDynamicTopic ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG - // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker - ConfigSourceDynamicBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG - // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster - ConfigSourceDynamicDefaultBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG - // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file) - ConfigSourceStaticBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG - // ConfigSourceDefault is built-in default configuration for configs that have a default value - ConfigSourceDefault ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG -)-- func (ConfigSource) - - String + The use of "default.topic.config" is deprecated, +topic configuration properties shall be specified in the standard ConfigMap. +For backwards compatibility, "default.topic.config" (if supplied) +takes precedence. +
+type ConfigMap map[string]ConfigValue++ func (ConfigMap) + + Get - + ¶
-func (t ConfigSource) String() string+func (m ConfigMap) Get(key string, defval ConfigValue) (ConfigValue, error)- String returns the human-readable representation of a ConfigSource type -
- type - - ConfigValue + Get finds the given key in the ConfigMap and returns its value. +If the key is not found `defval` is returned. +If the key is found but the type does not match that of `defval` (unless nil) +an ErrInvalidArg error is returned. +
++ func (ConfigMap) + + Set - + ¶ -
func (m ConfigMap) Set(kv string) error- ConfigValue supports the following types: -
bool, int, string, any type with the standard String() interface --type ConfigValue interface{}-- type - - Consumer + Set implements flag.Set (command line argument parser) as a convenience +for `-X key=value` config. +
++ func (ConfigMap) + + SetKey - + ¶ -
func (m ConfigMap) SetKey(key string, value ConfigValue) error
- Consumer implements a High-level Apache Kafka Consumer instance -
type Consumer struct {
- // contains filtered or unexported fields
-}
-
- func NewConsumer(conf *ConfigMap) (*Consumer, error)+ SetKey sets configuration property key to value.
- NewConsumer creates a new high-level Consumer instance. + For user convenience a key prefixed with {topic}. will be +set on the "default.topic.config" sub-map, this use is deprecated. +
- conf is a *ConfigMap with standard librdkafka configuration properties. -
- Supported special configuration properties: -
go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. - If set to true the app must handle the AssignedPartitions and - RevokedPartitions events and call Assign() and Unassign() - respectively. -go.events.channel.enable (bool, false) - [deprecated] Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. -go.events.channel.size (int, 1000) - Events() channel size -go.logs.channel.enable (bool, false) - Forward log to Logs() channel. -go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true. + ConfigResource holds parameters for altering an Apache Kafka configuration resource +type ConfigResource struct { + // Type of resource to set. + Type ResourceType + // Name of resource to set. + Name string + // Config entries to set. + // Configuration updates are atomic, any configuration property not provided + // here will be reverted (by the broker) to its default value. + // Use DescribeConfigs to retrieve the list of current configuration entry values. + Config []ConfigEntry +}++ func (ConfigResource) + + String + + + ¶ + +
+func (c ConfigResource) String() string++ String returns a human-readable representation of a ConfigResource +
+ type + + ConfigResourceResult + + + ¶ + +
- WARNING: Due to the buffering nature of channels (and queues in general) the -use of the events channel risks receiving outdated events and -messages. Minimizing go.events.channel.size reduces the risk -and number of outdated events and messages but does not eliminate -the factor completely. With a channel size of 1 at most one -event or message may be outdated. -
- func (*Consumer) - - Assign + ConfigResourceResult provides the result for a resource from a AlterConfigs or +DescribeConfigs request. +
type ConfigResourceResult struct { + // Type of returned result resource. + Type ResourceType + // Name of returned result resource. + Name string + // Error, if any, of returned result resource. + Error Error + // Config entries, if any, of returned result resource. + Config map[string]ConfigEntryResult +} +++ func (ConfigResourceResult) + + String - + ¶
-func (c *Consumer) Assign(partitions []TopicPartition) (err error)+func (c ConfigResourceResult) String() string- Assign an atomic set of partitions to consume. + String returns a human-readable representation of a ConfigResourceResult. +
+ type + + ConfigSource + + + ¶ + +
- The .Offset field of each TopicPartition must either be set to an absolute -starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), -but should typically be set to `kafka.OffsetStored` to have the consumer -use the committed offset as a start position, with a fallback to -`auto.offset.reset` if there is no committed offset. + ConfigSource represents an Apache Kafka config source +
type ConfigSource int+const ( + // ConfigSourceUnknown is the default value + ConfigSourceUnknown ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_UNKNOWN_CONFIG + // ConfigSourceDynamicTopic is dynamic topic config that is configured for a specific topic + ConfigSourceDynamicTopic ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_TOPIC_CONFIG + // ConfigSourceDynamicBroker is dynamic broker config that is configured for a specific broker + ConfigSourceDynamicBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_BROKER_CONFIG + // ConfigSourceDynamicDefaultBroker is dynamic broker config that is configured as default for all brokers in the cluster + ConfigSourceDynamicDefaultBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DYNAMIC_DEFAULT_BROKER_CONFIG + // ConfigSourceStaticBroker is static broker config provided as broker properties at startup (e.g. from server.properties file) + ConfigSourceStaticBroker ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_STATIC_BROKER_CONFIG + // ConfigSourceDefault is built-in default configuration for configs that have a default value + ConfigSourceDefault ConfigSource = C.RD_KAFKA_CONFIG_SOURCE_DEFAULT_CONFIG +)++ func (ConfigSource) + + String + + + ¶ + +
+func (t ConfigSource) String() string- This replaces the current assignment. -
- func (*Consumer) - - Assignment + String returns the human-readable representation of a ConfigSource type +
-+ type + + ConfigValue - + ¶ -
func (c *Consumer) Assignment() (partitions []TopicPartition, err error)+
- Assignment returns the current partition assignments -
bool, int, string, any type with the standard String() interface ++
type ConfigValue interface{}+
func (c *Consumer) AssignmentLost() bool+
- AssignmentLost returns true if current partition assignment has been lost. -This method is only applicable for use with a subscribing consumer when -handling a rebalance event or callback. -Partitions that have been lost may already be owned by other members in the -group and therefore commiting offsets, for example, may fail. -
type Consumer struct {
+ // contains filtered or unexported fields
+}
+
+ func (c *Consumer) Close() (err error)+
func NewConsumer(conf *ConfigMap) (*Consumer, error)
- Close Consumer instance. -The object is no longer usable after this call. -
func (c *Consumer) Commit() ([]TopicPartition, error)+ NewConsumer creates a new high-level Consumer instance.
- Commit offsets for currently assigned partitions -This is a blocking call. -Returns the committed offsets on success. -
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)+ conf is a *ConfigMap with standard librdkafka configuration properties.
- CommitMessage commits offset based on the provided message. -This is a blocking call. -Returns the committed offsets on success. -
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)+ Supported special configuration properties: +
go.application.rebalance.enable (bool, false) - Forward rebalancing responsibility to application via the Events() channel. + If set to true the app must handle the AssignedPartitions and + RevokedPartitions events and call Assign() and Unassign() + respectively. +go.events.channel.enable (bool, false) - [deprecated] Enable the Events() channel. Messages and events will be pushed on the Events() channel and the Poll() interface will be disabled. +go.events.channel.size (int, 1000) - Events() channel size +go.logs.channel.enable (bool, false) - Forward log to Logs() channel. +go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true. +
- CommitOffsets commits the provided list of offsets -This is a blocking call. -Returns the committed offsets on success. -
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)+
func (c *Consumer) Assign(partitions []TopicPartition) (err error)
- Committed retrieves committed offsets for the given set of partitions -
func (c *Consumer) Events() chan Event+ Assign an atomic set of partitions to consume.
- Events returns the Events channel (if enabled) + The .Offset field of each TopicPartition must either be set to an absolute +starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), +but should typically be set to `kafka.OffsetStored` to have the consumer +use the committed offset as a start position, with a fallback to +`auto.offset.reset` if there is no committed offset.
- Deprecated: Events (channel based consumer) is deprecated in favour -of Poll(). -
func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)+
func (c *Consumer) Assignment() (partitions []TopicPartition, err error)
- GetConsumerGroupMetadata returns the consumer's current group metadata. -This object should be passed to the transactional producer's -SendOffsetsToTransaction() API. -
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
func (c *Consumer) AssignmentLost() bool
- GetMetadata queries broker for cluster and topic metadata. -If topic is non-nil only information about that topic is returned, else if -allTopics is false only information about locally used topics is returned, -else information about all topics is returned. -GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. -
func (c *Consumer) GetRebalanceProtocol() string+
func (c *Consumer) Close() (err error)
- GetRebalanceProtocol returns the current consumer group rebalance protocol, -which is either "EAGER" or "COOPERATIVE". -If the rebalance protocol is not known in the current state an empty string -is returned. -Should typically only be called during rebalancing. -
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)+
func (c *Consumer) Commit() ([]TopicPartition, error)
- GetWatermarkOffsets returns the cached low and high offsets for the given topic -and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. -The low offset is populated every statistics.interval.ms if that value is set. -OffsetInvalid will be returned if there is no cached offset for either value. -
func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)+
func (c *Consumer) CommitMessage(m *Message) ([]TopicPartition, error)
- IncrementalAssign adds the specified partitions to the current set of -partitions to consume. + CommitMessage commits offset based on the provided message. +This is a blocking call. +Returns the committed offsets on success. +
func (c *Consumer) CommitOffsets(offsets []TopicPartition) ([]TopicPartition, error)
- The .Offset field of each TopicPartition must either be set to an absolute -starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), -but should typically be set to `kafka.OffsetStored` to have the consumer -use the committed offset as a start position, with a fallback to -`auto.offset.reset` if there is no committed offset. + CommitOffsets commits the provided list of offsets +This is a blocking call. +Returns the committed offsets on success. +
func (c *Consumer) Committed(partitions []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
- The new partitions must not be part of the current assignment. -
func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)+
func (c *Consumer) Events() chan Event
- IncrementalUnassign removes the specified partitions from the current set of -partitions to consume. + Events returns the Events channel (if enabled)
- The .Offset field of the TopicPartition is ignored. + Deprecated: Events (channel based consumer) is deprecated in favour +of Poll(). +
func (c *Consumer) GetConsumerGroupMetadata() (*ConsumerGroupMetadata, error)
- The removed partitions must be part of the current assignment. -
func (c *Consumer) IsClosed() bool+
func (c *Consumer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)
- IsClosed returns boolean representing if client is closed or not -
func (c *Consumer) Logs() chan LogEvent+
func (c *Consumer) GetRebalanceProtocol() string
- Logs returns the log channel if enabled, or nil otherwise. -
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)+
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)
- OffsetsForTimes looks up offsets by timestamp for the given partitions. + GetWatermarkOffsets returns the cached low and high offsets for the given topic +and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. +The low offset is populated every statistics.interval.ms if that value is set. +OffsetInvalid will be returned if there is no cached offset for either value. +
func (c *Consumer) IncrementalAssign(partitions []TopicPartition) (err error)
- The returned offset for each partition is the earliest offset whose -timestamp is greater than or equal to the given timestamp in the -corresponding partition. If the provided timestamp exceeds that of the -last message in the partition, a value of -1 will be returned. + IncrementalAssign adds the specified partitions to the current set of +partitions to consume.
- The timestamps to query are represented as `.Offset` in the `times` -argument and the looked up offsets are represented as `.Offset` in the returned -`offsets` list. + The .Offset field of each TopicPartition must either be set to an absolute +starting offset (>= 0), or one of the logical offsets (`kafka.OffsetEnd` etc), +but should typically be set to `kafka.OffsetStored` to have the consumer +use the committed offset as a start position, with a fallback to +`auto.offset.reset` if there is no committed offset.
- The function will block for at most timeoutMs milliseconds. + The new partitions must not be part of the current assignment. +
func (c *Consumer) IncrementalUnassign(partitions []TopicPartition) (err error)
- Duplicate Topic+Partitions are not supported. -Per-partition errors may be returned in the `.Error` field. -
func (c *Consumer) Pause(partitions []TopicPartition) (err error)+ IncrementalUnassign removes the specified partitions from the current set of +partitions to consume.
- Pause consumption for the provided list of partitions + The .Offset field of the TopicPartition is ignored.
- Note that messages already enqueued on the consumer's Event channel -(if `go.events.channel.enable` has been set) will NOT be purged by -this call, set `go.events.channel.size` accordingly. -
func (c *Consumer) Poll(timeoutMs int) (event Event)+
func (c *Consumer) IsClosed() bool
- Poll the consumer for messages or events. -
func (c *Consumer) Logs() chan LogEvent
- The following callbacks may be triggered: -
Subscribe()'s rebalanceCb -+ Logs returns the log channel if enabled, or nil otherwise. +
func (c *Consumer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)
- Returns nil on timeout, else an Event -
func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)+ OffsetsForTimes looks up offsets by timestamp for the given partitions.
- Position returns the current consume position for the given partitions. -Typical use is to call Assignment() to get the partition list -and then pass it to Position() to get the current consume position for -each of the assigned partitions. -The consume position is the next message to read from the partition. -i.e., the offset of the last message seen by the application + 1. -
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)+ The returned offset for each partition is the earliest offset whose +timestamp is greater than or equal to the given timestamp in the +corresponding partition. If the provided timestamp exceeds that of the +last message in the partition, a value of -1 will be returned.
- QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition. -
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)+ The timestamps to query are represented as `.Offset` in the `times` +argument and the looked up offsets are represented as `.Offset` in the returned +`offsets` list.
- ReadMessage polls the consumer for a message. + The function will block for at most timeoutMs milliseconds.
- This is a convenience API that wraps Poll() and only returns -messages or errors. All other event types are discarded. + Duplicate Topic+Partitions are not supported. +Per-partition errors may be returned in the `.Error` field. +
func (c *Consumer) Pause(partitions []TopicPartition) (err error)
- The call will block for at most `timeout` waiting for -a new message or error. `timeout` may be set to -1 for -indefinite wait. + Pause consumption for the provided list of partitions
- Timeout is returned as (nil, err) where `err.(kafka.Error).IsTimeout() == true`. + Note that messages already enqueued on the consumer's Event channel +(if `go.events.channel.enable` has been set) will NOT be purged by +this call, set `go.events.channel.size` accordingly. +
func (c *Consumer) Poll(timeoutMs int) (event Event)
- Messages are returned as (msg, nil), -while general errors are returned as (nil, err), -and partition-specific errors are returned as (msg, err) where -msg.TopicPartition provides partition-specific information (such as topic, partition and offset). + Poll the consumer for messages or events. +
- All other event types, such as PartitionEOF, AssignedPartitions, etc, are silently discarded. -
func (c *Consumer) Resume(partitions []TopicPartition) (err error)+ The following callbacks may be triggered: +
Subscribe()'s rebalanceCb +
- Resume consumption for the provided list of partitions -
func (c *Consumer) Seek(partition TopicPartition, ignoredTimeoutMs int) error+
func (c *Consumer) Position(partitions []TopicPartition) (offsets []TopicPartition, err error)
- Seek seeks the given topic partitions using the offset from the TopicPartition. + Position returns the current consume position for the given partitions. +Typical use is to call Assignment() to get the partition list +and then pass it to Position() to get the current consume position for +each of the assigned partitions. +The consume position is the next message to read from the partition. +i.e., the offset of the last message seen by the application + 1. +
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- The ignoredTimeoutMs parameter is ignored. Instead, this method blocks until -the fetcher state is updated for the given partition with the new offset. -This guarantees that no previously fetched messages for the old offset (or -fetch position) will be passed to the application once this call returns. -It will still take some time after the method returns until messages are -fetched at the new offset. + QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition. +
func (c *Consumer) ReadMessage(timeout time.Duration) (*Message, error)
- Seek() may only be used for partitions already being consumed -(through Assign() or implicitly through a self-rebalanced Subscribe()). -To set the starting offset it is preferred to use Assign() and provide -a starting offset for each partition. + ReadMessage polls the consumer for a message.
- Returns an error on failure or nil otherwise. -Deprecated: Seek is deprecated in favour of SeekPartitions(). -
func (c *Consumer) SeekPartitions(partitions []TopicPartition) ([]TopicPartition, error)+ This is a convenience API that wraps Poll() and only returns +messages or errors. All other event types are discarded.
- SeekPartitions seeks the given topic partitions to the per-partition offset -stored in the .Offset field of each partition. + The call will block for at most `timeout` waiting for +a new message or error. `timeout` may be set to -1 for +indefinite wait.
- The offset may be either absolute (>= 0) or a logical offset (e.g. OffsetEnd). + Timeout is returned as (nil, err) where `err.(kafka.Error).IsTimeout() == true`.
- SeekPartitions() may only be used for partitions already being consumed -(through Assign() or implicitly through a self-rebalanced Subscribe()). -To set the starting offset it is preferred to use Assign() in a -kafka.AssignedPartitions handler and provide a starting offset for each -partition. + Messages are returned as (msg, nil), +while general errors are returned as (nil, err), +and partition-specific errors are returned as (msg, err) where +msg.TopicPartition provides partition-specific information (such as topic, partition and offset).
- Returns an error on failure or nil otherwise. Individual partition errors -should be checked in the per-partition .Error field. -
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
func (c *Consumer) Resume(partitions []TopicPartition) (err error)
- SetOAuthBearerToken sets the the data to be transmitted -to a broker during SASL/OAUTHBEARER authentication. It will return nil -on success, otherwise an error if: -1) the token data is invalid (meaning an expiration time in the past -or either a token value or an extension key or value that does not meet -the regular expression requirements as per - - https://tools.ietf.org/html/rfc7628#section-3.1 - - ); -2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; -3) SASL/OAUTHBEARER is supported but is not configured as the client's -authentication mechanism. -
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error+
func (c *Consumer) Seek(partition TopicPartition, ignoredTimeoutMs int) error
- SetOAuthBearerTokenFailure sets the error message describing why token -retrieval/setting failed; it also schedules a new token refresh event for 10 -seconds later so the attempt may be retried. It will return nil on -success, otherwise an error if: -1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; -2) SASL/OAUTHBEARER is supported but is not configured as the client's -authentication mechanism. -
func (c *Consumer) SetSaslCredentials(username, password string) error+ Seek seeks the given topic partitions using the offset from the TopicPartition.
- SetSaslCredentials sets the SASL credentials used for this consumer. The new credentials -will overwrite the old ones (which were set when creating the consumer or by a previous -call to SetSaslCredentials). The new credentials will be used the next time the -consumer needs to authenticate to a broker. This method will not disconnect -existing broker connections that were established with the old credentials. -This method applies only to the SASL PLAIN and SCRAM mechanisms. -
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error)+ The ignoredTimeoutMs parameter is ignored. Instead, this method blocks until +the fetcher state is updated for the given partition with the new offset. +This guarantees that no previously fetched messages for the old offset (or +fetch position) will be passed to the application once this call returns. +It will still take some time after the method returns until messages are +fetched at the new offset.
- StoreMessage stores offset based on the provided message. -This is a convenience method that uses StoreOffsets to do the actual work. -
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)+ Seek() may only be used for partitions already being consumed +(through Assign() or implicitly through a self-rebalanced Subscribe()). +To set the starting offset it is preferred to use Assign() and provide +a starting offset for each partition.
- StoreOffsets stores the provided list of offsets that will be committed -to the offset store according to `auto.commit.interval.ms` or manual -offset-less Commit(). + Returns an error on failure or nil otherwise. +Deprecated: Seek is deprecated in favour of SeekPartitions(). +
func (c *Consumer) SeekPartitions(partitions []TopicPartition) ([]TopicPartition, error)
- Returns the stored offsets on success. If at least one offset couldn't be stored, -an error and a list of offsets is returned. Each offset can be checked for -specific errors via its `.Error` member. -
func (c *Consumer) String() string+ SeekPartitions seeks the given topic partitions to the per-partition offset +stored in the .Offset field of each partition.
- Strings returns a human readable name for a Consumer instance -
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error+ The offset may be either absolute (>= 0) or a logical offset (e.g. OffsetEnd).
- Subscribe to a single topic -This replaces the current subscription -
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)+ SeekPartitions() may only be used for partitions already being consumed +(through Assign() or implicitly through a self-rebalanced Subscribe()). +To set the starting offset it is preferred to use Assign() in a +kafka.AssignedPartitions handler and provide a starting offset for each +partition.
- SubscribeTopics subscribes to the provided list of topics. -This replaces the current subscription. -
func (c *Consumer) Subscription() (topics []string, err error)+
func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error
- Subscription returns the current subscription as set by Subscribe() -
func (c *Consumer) Unassign() (err error)+
func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error
- Unassign the current set of partitions to consume. -
func (c *Consumer) Unsubscribe() (err error)+
func (c *Consumer) SetSaslCredentials(username, password string) error
- Unsubscribe from the current subscription, if any. -
func (c *Consumer) StoreMessage(m *Message) (storedOffsets []TopicPartition, err error)
- ConsumerGroupDescription represents the result of DescribeConsumerGroups for -a single group. -
type ConsumerGroupDescription struct { - // Group id. - GroupID string - // Error, if any, of result. Check with `Error.Code() != ErrNoError`. - Error Error - // Is a simple consumer group. - IsSimpleConsumerGroup bool - // Partition assignor identifier. - PartitionAssignor string - // Consumer group state. - State ConsumerGroupState - // Consumer group coordinator (has ID == -1 if not known). - Coordinator Node - // Members list. - Members []MemberDescription - // Operations allowed for the group (nil if not available or not requested) - AuthorizedOperations []ACLOperation -} --
func (c *Consumer) StoreOffsets(offsets []TopicPartition) (storedOffsets []TopicPartition, err error)
- ConsumerGroupListing represents the result of ListConsumerGroups for a single -group. -
type ConsumerGroupListing struct { - // Group id. - GroupID string - // Is a simple consumer group. - IsSimpleConsumerGroup bool - // Group state. - State ConsumerGroupState -} --
- ConsumerGroupMetadata reflects the current consumer group member metadata. -
type ConsumerGroupMetadata struct {
- // contains filtered or unexported fields
-}
-
- func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)+
func (c *Consumer) String() string
- NewTestConsumerGroupMetadata creates a new consumer group metadata instance -mainly for testing use. -Use GetConsumerGroupMetadata() to retrieve the real metadata. -
func (c *Consumer) Subscribe(topic string, rebalanceCb RebalanceCb) error
- ConsumerGroupResult provides per-group operation result (error) information. -
type ConsumerGroupResult struct { - // Group name - Group string - // Error, if any, of result. Check with `Error.Code() != ErrNoError`. - Error Error -} --
func (g ConsumerGroupResult) String() string+
func (c *Consumer) SubscribeTopics(topics []string, rebalanceCb RebalanceCb) (err error)
- String returns a human-readable representation of a ConsumerGroupResult. -
func (c *Consumer) Subscription() (topics []string, err error)
- ConsumerGroupState represents a consumer group state -
type ConsumerGroupState int-
const ( - // ConsumerGroupStateUnknown - Unknown ConsumerGroupState - ConsumerGroupStateUnknown ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN - // ConsumerGroupStatePreparingRebalance - preparing rebalance - ConsumerGroupStatePreparingRebalance ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE - // ConsumerGroupStateCompletingRebalance - completing rebalance - ConsumerGroupStateCompletingRebalance ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE - // ConsumerGroupStateStable - stable - ConsumerGroupStateStable ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_STABLE - // ConsumerGroupStateDead - dead group - ConsumerGroupStateDead ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_DEAD - // ConsumerGroupStateEmpty - empty group - ConsumerGroupStateEmpty ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY -)-
func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error)+
func (c *Consumer) Unassign() (err error)
- ConsumerGroupStateFromString translates a consumer group state name/string to -a ConsumerGroupStateFromString value. -
func (t ConsumerGroupState) String() string+
func (c *Consumer) Unsubscribe() (err error)
- String returns the human-readable representation of a consumer_group_state -
- ConsumerGroupTopicPartitions represents a consumer group's TopicPartitions. -
type ConsumerGroupTopicPartitions struct { - // Group name - Group string - // Partitions list - Partitions []TopicPartition + ConsumerGroupDescription represents the result of DescribeConsumerGroups for +a single group. +type ConsumerGroupDescription struct { + // Group id. + GroupID string + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error + // Is a simple consumer group. + IsSimpleConsumerGroup bool + // Partition assignor identifier. + PartitionAssignor string + // Consumer group state. + State ConsumerGroupState + // Consumer group coordinator (has ID == -1 if not known). + Coordinator Node + // Members list. + Members []MemberDescription + // Operations allowed for the group (nil if not available or not requested) + AuthorizedOperations []ACLOperation }-- func (ConsumerGroupTopicPartitions) - - String - - - ¶ - -
-func (gtp ConsumerGroupTopicPartitions) String() string-+
type - - CreateACLResult + + ConsumerGroupListing - + ¶
- CreateACLResult provides create ACL error information. -
type CreateACLResult struct { - // Error, if any, of result. Check with `Error.Code() != ErrNoError`. - Error Error + ConsumerGroupListing represents the result of ListConsumerGroups for a single +group. +type ConsumerGroupListing struct { + // Group id. + GroupID string + // Is a simple consumer group. + IsSimpleConsumerGroup bool + // Group state. + State ConsumerGroupState + // Group type. + Type ConsumerGroupType }-+
type - - CreateACLsAdminOption + + ConsumerGroupMetadata - + ¶
- CreateACLsAdminOption - see setter. + ConsumerGroupMetadata reflects the current consumer group member metadata. +
type ConsumerGroupMetadata struct { + // contains filtered or unexported fields +} +
++ func + + NewTestConsumerGroupMetadata + + + ¶ + +
+func NewTestConsumerGroupMetadata(groupID string) (*ConsumerGroupMetadata, error)- See SetAdminRequestTimeout -
type CreateACLsAdminOption interface { - // contains filtered or unexported methods -}
-+ NewTestConsumerGroupMetadata creates a new consumer group metadata instance +mainly for testing use. +Use GetConsumerGroupMetadata() to retrieve the real metadata. +
type - - CreatePartitionsAdminOption + + ConsumerGroupResult - + ¶
- CreatePartitionsAdminOption - see setters. + ConsumerGroupResult provides per-group operation result (error) information. +
type ConsumerGroupResult struct { + // Group name + Group string + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} +++ func (ConsumerGroupResult) + + String + + + ¶ + +
+func (g ConsumerGroupResult) String() string- See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly. -
type CreatePartitionsAdminOption interface { - // contains filtered or unexported methods -}
-+ String returns a human-readable representation of a ConsumerGroupResult. +
type - - CreateTopicsAdminOption + + ConsumerGroupState - + ¶
- CreateTopicsAdminOption - see setters. + ConsumerGroupState represents a consumer group state +
type ConsumerGroupState int+const ( + // ConsumerGroupStateUnknown - Unknown ConsumerGroupState + ConsumerGroupStateUnknown ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_UNKNOWN + // ConsumerGroupStatePreparingRebalance - preparing rebalance + ConsumerGroupStatePreparingRebalance ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_PREPARING_REBALANCE + // ConsumerGroupStateCompletingRebalance - completing rebalance + ConsumerGroupStateCompletingRebalance ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_COMPLETING_REBALANCE + // ConsumerGroupStateStable - stable + ConsumerGroupStateStable ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_STABLE + // ConsumerGroupStateDead - dead group + ConsumerGroupStateDead ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_DEAD + // ConsumerGroupStateEmpty - empty group + ConsumerGroupStateEmpty ConsumerGroupState = C.RD_KAFKA_CONSUMER_GROUP_STATE_EMPTY +)++ func + + ConsumerGroupStateFromString + + + ¶ + +
+func ConsumerGroupStateFromString(stateString string) (ConsumerGroupState, error)- See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly. -
type CreateTopicsAdminOption interface { - // contains filtered or unexported methods -}
-- type - - DeleteACLsAdminOption + ConsumerGroupStateFromString translates a consumer group state name/string to +a ConsumerGroupState value. +
++ func (ConsumerGroupState) + + String - + ¶ -
func (t ConsumerGroupState) String() string
- DeleteACLsAdminOption - see setter. + String returns the human-readable representation of a consumer_group_state +
- See SetAdminRequestTimeout -
type DeleteACLsAdminOption interface {
- // contains filtered or unexported methods
-}
- type ConsumerGroupTopicPartitions struct { + // Group name + Group string + // Partitions list + Partitions []TopicPartition +} ++
func (gtp ConsumerGroupTopicPartitions) String() string+
- DeleteACLsResult provides delete ACLs result or error information. -
type DeleteACLsResult = DescribeACLsResult-
type ConsumerGroupType int+
const ( + // ConsumerGroupTypeUnknown - Unknown ConsumerGroupType + ConsumerGroupTypeUnknown ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_UNKNOWN + // ConsumerGroupTypeConsumer - Consumer ConsumerGroupType + ConsumerGroupTypeConsumer ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CONSUMER + // ConsumerGroupTypeClassic - Classic ConsumerGroupType + ConsumerGroupTypeClassic ConsumerGroupType = C.RD_KAFKA_CONSUMER_GROUP_TYPE_CLASSIC +)+
func ConsumerGroupTypeFromString(typeString string) ConsumerGroupType
- DeleteConsumerGroupsAdminOption - see setters. + ConsumerGroupTypeFromString translates a consumer group type name/string to +a ConsumerGroupType value. +
func (t ConsumerGroupType) String() string
- See SetAdminRequestTimeout. -
type DeleteConsumerGroupsAdminOption interface {
- // contains filtered or unexported methods
-}
- - DeleteConsumerGroupsResult represents the result of a DeleteConsumerGroups -call. -
type DeleteConsumerGroupsResult struct { - // Slice of ConsumerGroupResult. - ConsumerGroupResults []ConsumerGroupResult + CreateACLResult provides create ACL error information. +type CreateACLResult struct { + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error }
-+
type - - DeleteRecordsAdminOption + + CreateACLsAdminOption - + ¶
- DeleteRecordsAdminOption - see setter. + CreateACLsAdminOption - see setter.
- See SetAdminRequestTimeout, SetAdminOperationTimeout. -
type DeleteRecordsAdminOption interface { + See SetAdminRequestTimeout +type CreateACLsAdminOption interface { // contains filtered or unexported methods }
-+
type - - DeleteRecordsResult + + CreatePartitionsAdminOption - + ¶
- DeleteRecordsResult represents the result of a DeleteRecords call -for a single partition. -
type DeleteRecordsResult struct { - // One of requested partitions. - // The Error field is set if any occurred for that partition. - TopicPartition TopicPartition - // Deleted records information, or nil if an error occurred. - DeletedRecords *DeletedRecords -} --- type - - DeleteRecordsResults - - - ¶ - -
+ CreatePartitionsAdminOption - see setters.- DeleteRecordsResults represents the results of a DeleteRecords call. -
type DeleteRecordsResults struct { - // A slice of DeleteRecordsResult, one for each requested topic partition. - DeleteRecordsResults []DeleteRecordsResult -} -
-+ See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly. +
type CreatePartitionsAdminOption interface { + // contains filtered or unexported methods +}
+type - - DeleteTopicsAdminOption + + CreateTopicsAdminOption - + ¶
- DeleteTopicsAdminOption - see setters. + CreateTopicsAdminOption - see setters.
- See SetAdminRequestTimeout, SetAdminOperationTimeout. -
type DeleteTopicsAdminOption interface { + See SetAdminRequestTimeout, SetAdminOperationTimeout, SetAdminValidateOnly. +type CreateTopicsAdminOption interface { // contains filtered or unexported methods }
-+
type - - DeletedRecords + + DeleteACLsAdminOption - + ¶
- DeletedRecords contains information about deleted -records of a single partition -
type DeletedRecords struct { - // Low-watermark offset after deletion - LowWatermark Offset -} -
-- type - - DescribeACLsAdminOption - - - ¶ - -
+ DeleteACLsAdminOption - see setter.- DescribeACLsAdminOption - see setter. -
- See SetAdminRequestTimeout -
type DescribeACLsAdminOption interface { + See SetAdminRequestTimeout +type DeleteACLsAdminOption interface { // contains filtered or unexported methods }
-+
+ type + + DeleteACLsResult + + + ¶ + +
++ DeleteACLsResult provides delete ACLs result or error information. +
type DeleteACLsResult = DescribeACLsResult+type - - DescribeACLsResult + + DeleteConsumerGroupsAdminOption - + ¶
- DescribeACLsResult provides describe ACLs result or error information. -
type DescribeACLsResult struct { - // Slice of ACL bindings matching the provided filter - ACLBindings ACLBindings - // Error, if any, of result. Check with `Error.Code() != ErrNoError`. - Error Error -} --- type - - DescribeClusterAdminOption - - - ¶ - -
+ DeleteConsumerGroupsAdminOption - see setters.- DescribeClusterAdminOption - see setter. -
- See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations. -
type DescribeClusterAdminOption interface { + See SetAdminRequestTimeout. +type DeleteConsumerGroupsAdminOption interface { // contains filtered or unexported methods }
-+
+ type + + DeleteConsumerGroupsResult + + + ¶ + +
++ DeleteConsumerGroupsResult represents the result of a DeleteConsumerGroups +call. +
type DeleteConsumerGroupsResult struct { + // Slice of ConsumerGroupResult. + ConsumerGroupResults []ConsumerGroupResult +} +
+type - - DescribeClusterResult + + DeleteRecordsAdminOption - + ¶
- DescribeClusterResult represents the result of DescribeCluster. -
type DescribeClusterResult struct { - // Cluster id for the cluster (always available if broker version >= 0.10.1.0, otherwise nil). - ClusterID *string - // Current controller broker for the cluster (nil if there is none). - Controller *Node - // List of brokers in the cluster. - Nodes []Node - // Operations allowed for the cluster (nil if not available or not requested). - AuthorizedOperations []ACLOperation -} --- type - - DescribeConfigsAdminOption - - - ¶ - -
+ DeleteRecordsAdminOption - see setter.- DescribeConfigsAdminOption - see setters. -
- See SetAdminRequestTimeout. -
type DescribeConfigsAdminOption interface { + See SetAdminRequestTimeout, SetAdminOperationTimeout. +type DeleteRecordsAdminOption interface { // contains filtered or unexported methods }
-+
+ type + + DeleteRecordsResult + + + ¶ + +
++ DeleteRecordsResult represents the result of a DeleteRecords call +for a single partition. +
type DeleteRecordsResult struct { + // One of requested partitions. + // The Error field is set if any occurred for that partition. + TopicPartition TopicPartition + // Deleted records information, or nil if an error occurred. + DeletedRecords *DeletedRecords +} ++type - - DescribeConsumerGroupsAdminOption + + DeleteRecordsResults - + ¶
- DescribeConsumerGroupsAdminOption - see setter. + DeleteRecordsResults represents the results of a DeleteRecords call. +
type DeleteRecordsResults struct { + // A slice of DeleteRecordsResult, one for each requested topic partition. + DeleteRecordsResults []DeleteRecordsResult +} +
++ type + + DeleteTopicsAdminOption + + + ¶ + +
- See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations. -
type DescribeConsumerGroupsAdminOption interface { + DeleteTopicsAdminOption - see setters. ++ See SetAdminRequestTimeout, SetAdminOperationTimeout. +
type DeleteTopicsAdminOption interface { // contains filtered or unexported methods }
-- type - - DescribeConsumerGroupsResult - - - ¶ - -
-- DescribeConsumerGroupsResult represents the result of a -DescribeConsumerGroups call. -
type DescribeConsumerGroupsResult struct { - // Slice of ConsumerGroupDescription. - ConsumerGroupDescriptions []ConsumerGroupDescription -} -
-+
type - - DescribeTopicsAdminOption + + DeletedRecords - + ¶
- DescribeTopicsAdminOption - see setter. + DeletedRecords contains information about deleted +records of a single partition +
type DeletedRecords struct { + // Low-watermark offset after deletion + LowWatermark Offset +} +
++ type + + DescribeACLsAdminOption + + + ¶ + +
- See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations. -
type DescribeTopicsAdminOption interface { + DescribeACLsAdminOption - see setter. ++ See SetAdminRequestTimeout +
type DescribeACLsAdminOption interface { // contains filtered or unexported methods }
-- type - - DescribeTopicsResult - - - ¶ - -
-- DescribeTopicsResult represents the result of a -DescribeTopics call. -
type DescribeTopicsResult struct { - // Slice of TopicDescription. - TopicDescriptions []TopicDescription -} -
-+
type - - DescribeUserScramCredentialsAdminOption + + DescribeACLsResult - + ¶
- DescribeUserScramCredentialsAdminOption - see setter. + DescribeACLsResult provides describe ACLs result or error information. +
type DescribeACLsResult struct { + // Slice of ACL bindings matching the provided filter + ACLBindings ACLBindings + // Error, if any, of result. Check with `Error.Code() != ErrNoError`. + Error Error +} +++ type + + DescribeClusterAdminOption + + + ¶ + +
- See SetAdminRequestTimeout. -
type DescribeUserScramCredentialsAdminOption interface { + DescribeClusterAdminOption - see setter. ++ See SetAdminRequestTimeout, SetAdminOptionIncludeAuthorizedOperations. +
type DescribeClusterAdminOption interface { // contains filtered or unexported methods }
-- type - - DescribeUserScramCredentialsResult - - - ¶ - -
-- DescribeUserScramCredentialsResult represents the result of a -DescribeUserScramCredentials call. -
type DescribeUserScramCredentialsResult struct { - // Descriptions - Map from user name - // to UserScramCredentialsDescription - Descriptions map[string]UserScramCredentialsDescription -} --+
type - - Error + + DescribeClusterResult - + ¶
- Error provides a Kafka-specific error container -
type Error struct { - // contains filtered or unexported fields + DescribeClusterResult represents the result of DescribeCluster. +type DescribeClusterResult struct { + // Cluster id for the cluster (always available if broker version >= 0.10.1.0, otherwise nil). + ClusterID *string + // Current controller broker for the cluster (nil if there is none). + Controller *Node + // List of brokers in the cluster. + Nodes []Node + // Operations allowed for the cluster (nil if not available or not requested). + AuthorizedOperations []ACLOperation }-- func - - NewError +
-+ type + + DescribeConfigsAdminOption - + ¶ -
func NewError(code ErrorCode, str string, fatal bool) (err Error)+
- NewError creates a new Error. -
func (e Error) Code() ErrorCode+ DescribeConfigsAdminOption - see setters.
- Code returns the ErrorCode of an Error -
type DescribeConfigsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ func (e Error) Error() string+
- Error returns a human readable representation of an Error -Same as Error.String() -
func (e Error) IsFatal() bool+ DescribeConsumerGroupsAdminOption - see setter.
- IsFatal returns true if the error is a fatal error. -A fatal error indicates the client instance is no longer operable and -should be terminated. Typical causes include non-recoverable -idempotent producer errors. -
type DescribeConsumerGroupsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ func (e Error) IsRetriable() bool+
- IsRetriable returns true if the operation that caused this error -may be retried. -This flag is currently only set by the Transactional producer API. -
type DescribeConsumerGroupsResult struct {
+ // Slice of ConsumerGroupDescription.
+ ConsumerGroupDescriptions []ConsumerGroupDescription
+}
+
+ func (e Error) IsTimeout() bool+
- IsTimeout returns true if the error is a timeout error. -A timeout error indicates that the operation timed out locally. -
func (e Error) String() string+ DescribeTopicsAdminOption - see setter.
- String returns a human readable representation of an Error -
type DescribeTopicsAdminOption interface {
+ // contains filtered or unexported methods
+}
+ func (e Error) TxnRequiresAbort() bool+
- TxnRequiresAbort returns true if the error is an abortable transaction error -that requires the application to abort the current transaction with -AbortTransaction() and start a new transaction with BeginTransaction() -if it wishes to proceed with transactional operations. -This flag is only set by the Transactional producer API. -
type DescribeTopicsResult struct {
+ // Slice of TopicDescription.
+ TopicDescriptions []TopicDescription
+}
+
+ - ErrorCode is the integer representation of local and broker error codes -
type ErrorCode int-
const ( + DescribeUserScramCredentialsAdminOption - see setter. ++ See SetAdminRequestTimeout. +
type DescribeUserScramCredentialsAdminOption interface { + // contains filtered or unexported methods +}
++ type + + DescribeUserScramCredentialsResult + + + ¶ + +
++ DescribeUserScramCredentialsResult represents the result of a +DescribeUserScramCredentials call. +
type DescribeUserScramCredentialsResult struct { + // Descriptions - Map from user name + // to UserScramCredentialsDescription + Descriptions map[string]UserScramCredentialsDescription +} +++ type + + ElectLeadersAdminOption + + + ¶ + +
++ ElectLeadersAdminOption - see setter. +
+ See SetAdminRequestTimeout, SetAdminOperationTimeout. +
type ElectLeadersAdminOption interface { + // contains filtered or unexported methods +}
++ type + + ElectLeadersRequest + + + ¶ + +
++ ElectLeadersRequest holds parameters for the type of election to be performed and +the topic partitions for which election has to be performed +
type ElectLeadersRequest struct { + // contains filtered or unexported fields +} +
++ func + + NewElectLeadersRequest + + + ¶ + +
+func NewElectLeadersRequest(electionType ElectionType, partitions []TopicPartition) ElectLeadersRequest++ NewElectLeadersRequest creates a new ElectLeadersRequest with the given election type +and topic partitions +
+ type + + ElectLeadersResult + + + ¶ + +
++ ElectLeadersResult holds the result of the election performed +
type ElectLeadersResult struct { + // TopicPartitions for which election has been performed and the per-partition error, if any + // that occurred while running the election for the specific TopicPartition. + TopicPartitions []TopicPartition +} +++ type + + ElectionType + + + ¶ + +
++ ElectionType represents the type of election to be performed +
type ElectionType int+const ( + // ElectionTypePreferred - Preferred election type + ElectionTypePreferred ElectionType = C.RD_KAFKA_ELECTION_TYPE_PREFERRED + // ElectionTypeUnclean - Unclean election type + ElectionTypeUnclean ElectionType = C.RD_KAFKA_ELECTION_TYPE_UNCLEAN +)++ func + + ElectionTypeFromString + + + ¶ + +
+func ElectionTypeFromString(electionTypeString string) (ElectionType, error)++ ElectionTypeFromString translates an election type name to +an ElectionType value. +
+ type + + Error + + + ¶ + +
++ Error provides a Kafka-specific error container +
type Error struct { + // contains filtered or unexported fields +} +
++ func + + NewError + + + ¶ + +
+func NewError(code ErrorCode, str string, fatal bool) (err Error)++ NewError creates a new Error. +
+ func (Error) + + Code + + + ¶ + +
+func (e Error) Code() ErrorCode++ Code returns the ErrorCode of an Error +
+ func (Error) + + Error + + + ¶ + +
+func (e Error) Error() string++ Error returns a human readable representation of an Error +Same as Error.String() +
+ func (Error) + + IsFatal + + + ¶ + +
+func (e Error) IsFatal() bool++ IsFatal returns true if the error is a fatal error. +A fatal error indicates the client instance is no longer operable and +should be terminated. Typical causes include non-recoverable +idempotent producer errors. +
+ func (Error) + + IsRetriable + + + ¶ + +
+func (e Error) IsRetriable() bool++ IsRetriable returns true if the operation that caused this error +may be retried. +This flag is currently only set by the Transactional producer API. +
+ func (Error) + + IsTimeout + + + ¶ + +
+func (e Error) IsTimeout() bool++ IsTimeout returns true if the error is a timeout error. +A timeout error indicates that the operation timed out locally. +
+ func (Error) + + String + + + ¶ + +
+func (e Error) String() string++ String returns a human readable representation of an Error +
+ func (Error) + + TxnRequiresAbort + + + ¶ + +
+func (e Error) TxnRequiresAbort() bool++ TxnRequiresAbort returns true if the error is an abortable transaction error +that requires the application to abort the current transaction with +AbortTransaction() and start a new transaction with BeginTransaction() +if it wishes to proceed with transactional operations. +This flag is only set by the Transactional producer API. +
+ type + + ErrorCode + + + ¶ + +
++ ErrorCode is the integer representation of local and broker error codes +
type ErrorCode int+const ( // ErrBadMsg Local: Bad message format ErrBadMsg ErrorCode = C.RD_KAFKA_RESP_ERR__BAD_MSG // ErrBadCompression Local: Invalid compressed data @@ -5452,46 +5723,46 @@-// ErrTelemetryTooLarge Broker: Client sent a push telemetry request larger than the maximum size the broker will accept ErrTelemetryTooLarge ErrorCode = C.RD_KAFKA_RESP_ERR_TELEMETRY_TOO_LARGE )
- func (ErrorCode) - - String - - - ¶ - -
-func (c ErrorCode) String() string-- String returns a human readable representation of an error code -
- type - - Event - - - ¶ - -
-- Event generic interface -
type Event interface { ++ func (ErrorCode) + + String + + + ¶ + +
+func (c ErrorCode) String() string++ String returns a human readable representation of an error code +
+ type + + Event + + + ¶ + +
++ Event generic interface +
type Event interface { // String returns a human-readable representation of the event String() string }
-- type - - Handle - - - ¶ - -
-- Handle represents a generic client handle containing common parts for +
+ type + + Handle + + + ¶ + +
++ Handle represents a generic client handle containing common parts for both Producer and Consumer. -
type Handle interface { +type Handle interface { // SetOAuthBearerToken sets the the data to be transmitted // to a broker during SASL/OAUTHBEARER authentication. It will return nil // on success, otherwise an error if: @@ -5517,192 +5788,192 @@-IsClosed() bool // contains filtered or unexported methods }
- type - - Header - - - ¶ - -
-- Header represents a single Kafka message header. -
- Message headers are made up of a list of Header elements, retaining their original insert +
+ type + + Header + + + ¶ + +
++ Header represents a single Kafka message header. +
+ Message headers are made up of a list of Header elements, retaining their original insert order and allowing for duplicate Keys. -
- Key is a human readable string identifying the header. +
+ Key is a human readable string identifying the header. Value is the key's binary value, Kafka does not put any restrictions on the format of of the Value but it should be made relatively compact. The value may be a byte array, empty, or nil. -
- NOTE: Message headers are not available on producer delivery report messages. -
type Header struct { ++ NOTE: Message headers are not available on producer delivery report messages. +
type Header struct { Key string // Header name (utf-8 string) Value []byte // Header value (nil, empty, or binary) }-- func (Header) - - String - - - ¶ - -
-func (h Header) String() string-- String returns the Header Key and data in a human representable possibly truncated form +
+ func (Header) + + String + + + ¶ + +
+func (h Header) String() string++ String returns the Header Key and data in a human representable possibly truncated form suitable for displaying to the user. -
- type - - IsolationLevel - - - ¶ - -
-- IsolationLevel is a type which is used for AdminOptions to set the IsolationLevel. -
type IsolationLevel int-const ( ++ type + + IsolationLevel + + + ¶ + +
++ IsolationLevel is a type which is used for AdminOptions to set the IsolationLevel. +
type IsolationLevel int+const ( // IsolationLevelReadUncommitted - read uncommitted isolation level IsolationLevelReadUncommitted IsolationLevel = C.RD_KAFKA_ISOLATION_LEVEL_READ_UNCOMMITTED // IsolationLevelReadCommitted - read committed isolation level IsolationLevelReadCommitted IsolationLevel = C.RD_KAFKA_ISOLATION_LEVEL_READ_COMMITTED )-- type - - ListConsumerGroupOffsetsAdminOption - - - ¶ - -
-- ListConsumerGroupOffsetsAdminOption - see setter. -
- See SetAdminRequestTimeout, SetAdminRequireStableOffsets. -
type ListConsumerGroupOffsetsAdminOption interface { ++ type + + ListConsumerGroupOffsetsAdminOption + + + ¶ + +
++ ListConsumerGroupOffsetsAdminOption - see setter. +
+ See SetAdminRequestTimeout, SetAdminRequireStableOffsets. +
type ListConsumerGroupOffsetsAdminOption interface { // contains filtered or unexported methods }
-- type - - ListConsumerGroupOffsetsResult - - - ¶ - -
-- ListConsumerGroupOffsetsResult represents the result of a +
+ type + + ListConsumerGroupOffsetsResult + + + ¶ + +
++ ListConsumerGroupOffsetsResult represents the result of a ListConsumerGroupOffsets operation. -
type ListConsumerGroupOffsetsResult struct { +type ListConsumerGroupOffsetsResult struct { // A slice of ConsumerGroupTopicPartitions, each element represents a group's // TopicPartitions and Offsets. ConsumerGroupsTopicPartitions []ConsumerGroupTopicPartitions }-- type - - ListConsumerGroupsAdminOption - - - ¶ - -
-- ListConsumerGroupsAdminOption - see setter. -
- See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates. -
type ListConsumerGroupsAdminOption interface { ++ type + + ListConsumerGroupsAdminOption + + + ¶ + +
++ ListConsumerGroupsAdminOption - see setter. +
+ See SetAdminRequestTimeout, SetAdminMatchConsumerGroupStates, SetAdminMatchConsumerGroupTypes. +
type ListConsumerGroupsAdminOption interface { // contains filtered or unexported methods }
-- type - - ListConsumerGroupsResult - - - ¶ - -
-- ListConsumerGroupsResult represents ListConsumerGroups results and errors. -
type ListConsumerGroupsResult struct { ++ type + + ListConsumerGroupsResult + + + ¶ + +
++ ListConsumerGroupsResult represents ListConsumerGroups results and errors. +
type ListConsumerGroupsResult struct { // List of valid ConsumerGroupListings. Valid []ConsumerGroupListing // List of errors. Errors []error }-- type - - ListOffsetsAdminOption - - - ¶ - -
-- ListOffsetsAdminOption - see setter. -
- See SetAdminRequestTimeout, SetAdminIsolationLevel. -
type ListOffsetsAdminOption interface { ++ type + + ListOffsetsAdminOption + + + ¶ + +
++ ListOffsetsAdminOption - see setter. +
+ See SetAdminRequestTimeout, SetAdminIsolationLevel. +
type ListOffsetsAdminOption interface { // contains filtered or unexported methods }
-- type - - ListOffsetsResult - - - ¶ - -
-- ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request. -
type ListOffsetsResult struct { ++ type + + ListOffsetsResult + + + ¶ + +
++ ListOffsetsResult holds the map of TopicPartition to ListOffsetsResultInfo for a request. +
type ListOffsetsResult struct { ResultInfos map[TopicPartition]ListOffsetsResultInfo }-- type - - ListOffsetsResultInfo - - - ¶ - -
-- ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition. -
type ListOffsetsResultInfo struct { ++ type + + ListOffsetsResultInfo + + + ¶ + +
++ ListOffsetsResultInfo describes the result of ListOffsets request for a Topic Partition. +
type ListOffsetsResultInfo struct { Offset Offset Timestamp int64 LeaderEpoch *int32 Error Error }-- type - - LogEvent - - - ¶ - -
-- LogEvent represent the log from librdkafka internal log queue -
type LogEvent struct { ++ type + + LogEvent + + + ¶ + +
++ LogEvent represent the log from librdkafka internal log queue +
type LogEvent struct { Name string // Name of client instance Tag string // Log tag that provides context to the log Message (e.g., "METADATA" or "GRPCOORD") Message string // Log message @@ -5710,44 +5981,44 @@-Timestamp time.Time // Log timestamp }
- func (LogEvent) - - String - - - ¶ - -
-func (logEvent LogEvent) String() string-- type - - MemberAssignment - - - ¶ - -
-- MemberAssignment represents the assignment of a consumer group member. -
type MemberAssignment struct { ++ func (LogEvent) + + String + + + ¶ + +
+func (logEvent LogEvent) String() string++ type + + MemberAssignment + + + ¶ + +
++ MemberAssignment represents the assignment of a consumer group member. +
type MemberAssignment struct { // Partitions assigned to current member. TopicPartitions []TopicPartition }
-- type - - MemberDescription - - - ¶ - -
-- MemberDescription represents the description of a consumer group member. -
type MemberDescription struct { ++ type + + MemberDescription + + + ¶ + +
++ MemberDescription represents the description of a consumer group member. +
type MemberDescription struct { // Client id. ClientID string // Group instance id. @@ -5760,18 +6031,18 @@-Assignment MemberAssignment }
- type - - Message - - - ¶ - -
-- Message represents a Kafka message -
type Message struct { ++ type + + Message + + + ¶ + +
++ Message represents a Kafka message +
type Message struct { TopicPartition TopicPartition Value []byte Key []byte @@ -5782,171 +6053,171 @@-LeaderEpoch *int32 // Deprecated: LeaderEpoch or nil if not available. Use m.TopicPartition.LeaderEpoch instead. }
- func (*Message) - - String - - - ¶ - -
-func (m *Message) String() string-- String returns a human readable representation of a Message. +
+ func (*Message) + + String + + + ¶ + +
+func (m *Message) String() string++ String returns a human readable representation of a Message. Key and payload are not represented. -
- type - - Metadata - - - ¶ - -
-- Metadata contains broker and topic metadata for all (matching) topics -
type Metadata struct { ++ type + + Metadata + + + ¶ + +
++ Metadata contains broker and topic metadata for all (matching) topics +
type Metadata struct { Brokers []BrokerMetadata Topics map[string]TopicMetadata OriginatingBroker BrokerMetadata }-- type - - MockCluster - - - ¶ - -
-- MockCluster represents a Kafka mock cluster instance which can be used +
+ type + + MockCluster + + + ¶ + +
++ MockCluster represents a Kafka mock cluster instance which can be used for testing. -
type MockCluster struct { +type MockCluster struct { // contains filtered or unexported fields }
-- func - - NewMockCluster - - - ¶ - -
-func NewMockCluster(brokerCount int) (*MockCluster, error)-- NewMockCluster provides a mock Kafka cluster with a configurable +
+ func + + NewMockCluster + + + ¶ + +
+func NewMockCluster(brokerCount int) (*MockCluster, error)++ NewMockCluster provides a mock Kafka cluster with a configurable number of brokers that support a reasonable subset of Kafka protocol operations, error injection, etc. -
- The broker ids will start at 1 up to and including brokerCount. -
- Mock clusters provide localhost listeners that can be used as the bootstrap +
+ The broker ids will start at 1 up to and including brokerCount. +
+ Mock clusters provide localhost listeners that can be used as the bootstrap servers by multiple Kafka client instances. -
- Currently supported functionality: +
+ Currently supported functionality: - Producer - Idempotent Producer - Transactional Producer - Low-level consumer - High-level balanced consumer groups with offset commits - Topic Metadata and auto creation -
- Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL. -
- func (*MockCluster) - - BootstrapServers - - - ¶ - -
-func (mc *MockCluster) BootstrapServers() string-- BootstrapServers returns the bootstrap.servers property for this MockCluster -
- func (*MockCluster) - - Close - - - ¶ - -
-func (mc *MockCluster) Close()-- Close and destroy the MockCluster -
- func (*MockCluster) - - CreateTopic - - - ¶ - -
-func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error-- CreateTopic creates a topic without having to use a producer -
- func (*MockCluster) - - SetBrokerDown - - - ¶ - -
-func (mc *MockCluster) SetBrokerDown(brokerID int) error-- SetBrokerDown disconnects the broker and disallows any new connections. +
+ Warning THIS IS AN EXPERIMENTAL API, SUBJECT TO CHANGE OR REMOVAL. +
+ func (*MockCluster) + + BootstrapServers + + + ¶ + +
+func (mc *MockCluster) BootstrapServers() string++ BootstrapServers returns the bootstrap.servers property for this MockCluster +
+ func (*MockCluster) + + Close + + + ¶ + +
+func (mc *MockCluster) Close()++ Close and destroy the MockCluster +
+ func (*MockCluster) + + CreateTopic + + + ¶ + +
+func (mc *MockCluster) CreateTopic(topic string, partitions, replicationFactor int) error++ CreateTopic creates a topic without having to use a producer +
+ func (*MockCluster) + + SetBrokerDown + + + ¶ + +
+func (mc *MockCluster) SetBrokerDown(brokerID int) error++ SetBrokerDown disconnects the broker and disallows any new connections. This does NOT trigger leader change. Use brokerID -1 for all brokers, or >= 0 for a specific broker. -
- func (*MockCluster) - - SetBrokerUp - - - ¶ - -
-func (mc *MockCluster) SetBrokerUp(brokerID int) error-- SetBrokerUp makes the broker accept connections again. +
+ func (*MockCluster) + + SetBrokerUp + + + ¶ + +
+func (mc *MockCluster) SetBrokerUp(brokerID int) error++ SetBrokerUp makes the broker accept connections again. This does NOT trigger leader change. Use brokerID -1 for all brokers, or >= 0 for a specific broker. -
- func (*MockCluster) - - SetRoundtripDuration - - - ¶ - -
-func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration) error-- SetRoundtripDuration sets the broker round-trip-time delay for the given broker. +
+ func (*MockCluster) + + SetRoundtripDuration + + + ¶ + +
+func (mc *MockCluster) SetRoundtripDuration(brokerID int, duration time.Duration) error++ SetRoundtripDuration sets the broker round-trip-time delay for the given broker. Use brokerID -1 for all brokers, or >= 0 for a specific broker. -
- type - - Node - - - ¶ - -
-- Node represents a Kafka broker. -
type Node struct { ++ type + + Node + + + ¶ + +
++ Node represents a Kafka broker. +
type Node struct { // Node id. ID int // Node host. @@ -5957,29 +6228,29 @@-Rack *string }
- func (Node) - - String - - - ¶ - -
-func (n Node) String() string-- type - - OAuthBearerToken - - - ¶ - -
-- OAuthBearerToken represents the data to be transmitted +
+ func (Node) + + String + + + ¶ + +
+func (n Node) String() string++ type + + OAuthBearerToken + + + ¶ + +
++ OAuthBearerToken represents the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. -
type OAuthBearerToken struct { +type OAuthBearerToken struct { // Token value, often (but not necessarily) a JWS compact serialization // as per https://tools.ietf.org/html/rfc7515#section-3.1; it must meet // the regular expression for a SASL/OAUTHBEARER value defined at @@ -5998,105 +6269,105 @@-Extensions map[string]string }
- type - - OAuthBearerTokenRefresh - - - ¶ - -
-- OAuthBearerTokenRefresh indicates token refresh is required -
type OAuthBearerTokenRefresh struct { ++ type + + OAuthBearerTokenRefresh + + + ¶ + +
++ OAuthBearerTokenRefresh indicates token refresh is required +
type OAuthBearerTokenRefresh struct { // Config is the value of the sasl.oauthbearer.config property Config string }
-- func (OAuthBearerTokenRefresh) - - String - - - ¶ - -
-func (o OAuthBearerTokenRefresh) String() string-- type - - Offset - - - ¶ - -
-- Offset type (int64) with support for canonical names -
type Offset int64-- func - - NewOffset - - - ¶ - -
-func NewOffset(offset interface{}) (Offset, error)-- NewOffset creates a new Offset using the provided logical string, an +
+ func (OAuthBearerTokenRefresh) + + String + + + ¶ + +
+func (o OAuthBearerTokenRefresh) String() string++ type + + Offset + + + ¶ + +
++ Offset type (int64) with support for canonical names +
type Offset int64++ func + + NewOffset + + + ¶ + +
+func NewOffset(offset interface{}) (Offset, error)++ NewOffset creates a new Offset using the provided logical string, an absolute int64 offset value, or a concrete Offset type. Logical offsets: "beginning", "earliest", "end", "latest", "unset", "invalid", "stored" -
- func - - OffsetTail - - - ¶ - -
-func OffsetTail(relativeOffset Offset) Offset-- OffsetTail returns the logical offset relativeOffset from current end of partition -
- func (*Offset) - - Set - - - ¶ - -
-func (o *Offset) Set(offset interface{}) error-- Set offset value, see NewOffset() -
- func (Offset) - - String - - - ¶ - -
-func (o Offset) String() string-- type - - OffsetSpec - - - ¶ - -
-- OffsetSpec specifies desired offsets while using ListOffsets. -
type OffsetSpec int64-const ( ++ func + + OffsetTail + + + ¶ + +
+func OffsetTail(relativeOffset Offset) Offset++ OffsetTail returns the logical offset relativeOffset from current end of partition +
+ func (*Offset) + + Set + + + ¶ + +
+func (o *Offset) Set(offset interface{}) error++ Set offset value, see NewOffset() +
+ func (Offset) + + String + + + ¶ + +
+func (o Offset) String() string++ type + + OffsetSpec + + + ¶ + +
++ OffsetSpec specifies desired offsets while using ListOffsets. +
type OffsetSpec int64+const ( // MaxTimestampOffsetSpec is used to describe the offset with the Max Timestamp which may be different then LatestOffsetSpec as Timestamp can be set client side. MaxTimestampOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_MAX_TIMESTAMP // EarliestOffsetSpec is used to describe the earliest offset for the TopicPartition. @@ -6104,80 +6375,80 @@-// LatestOffsetSpec is used to describe the latest offset for the TopicPartition. LatestOffsetSpec OffsetSpec = C.RD_KAFKA_OFFSET_SPEC_LATEST )
- func - - NewOffsetSpecForTimestamp - - - ¶ - -
-func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec-- NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp. -
- type - - OffsetsCommitted - - - ¶ - -
-- OffsetsCommitted reports committed offsets -
type OffsetsCommitted struct { ++ func + + NewOffsetSpecForTimestamp + + + ¶ + +
+func NewOffsetSpecForTimestamp(timestamp int64) OffsetSpec++ NewOffsetSpecForTimestamp creates an OffsetSpec corresponding to the timestamp. +
+ type + + OffsetsCommitted + + + ¶ + +
++ OffsetsCommitted reports committed offsets +
type OffsetsCommitted struct { Error error Offsets []TopicPartition }-- func (OffsetsCommitted) - - String - - - ¶ - -
-func (o OffsetsCommitted) String() string-- type - - PartitionEOF - - - ¶ - -
-- PartitionEOF consumer reached end of partition +
+ func (OffsetsCommitted) + + String + + + ¶ + +
+func (o OffsetsCommitted) String() string++ type + + PartitionEOF + + + ¶ + +
++ PartitionEOF consumer reached end of partition Needs to be explicitly enabled by setting the `enable.partition.eof` configuration property to true. -
type PartitionEOF TopicPartition-- func (PartitionEOF) - - String - - - ¶ - -
-func (p PartitionEOF) String() string-- type - - PartitionMetadata - - - ¶ - -
-- PartitionMetadata contains per-partition metadata -
type PartitionMetadata struct { +type PartitionEOF TopicPartition++ func (PartitionEOF) + + String + + + ¶ + +
+func (p PartitionEOF) String() string++ type + + PartitionMetadata + + + ¶ + +
++ PartitionMetadata contains per-partition metadata +
type PartitionMetadata struct { ID int32 Error Error Leader int32 @@ -6185,19 +6456,19 @@-Isrs []int32 }
- type - - PartitionsSpecification - - - ¶ - -
-- PartitionsSpecification holds parameters for creating additional partitions for a topic. +
+ type + + PartitionsSpecification + + + ¶ + +
++ PartitionsSpecification holds parameters for creating additional partitions for a topic. PartitionsSpecification is analogous to NewPartitions in the Java Topic Admin API. -
type PartitionsSpecification struct { +type PartitionsSpecification struct { // Topic to create more partitions for. Topic string // New partition count for topic, must be higher than current partition count. @@ -6210,38 +6481,38 @@-ReplicaAssignment [][]int32 }
- type - - Producer - - - ¶ - -
-- Producer implements a High-level Apache Kafka Producer instance -
type Producer struct { ++ type + + Producer + + + ¶ + +
++ Producer implements a High-level Apache Kafka Producer instance +
type Producer struct { // contains filtered or unexported fields }
-- func - - NewProducer - - - ¶ - -
-func NewProducer(conf *ConfigMap) (*Producer, error)-- NewProducer creates a new high-level Producer instance. -
- conf is a *ConfigMap with standard librdkafka configuration properties. -
- Supported special configuration properties (type, default): -
go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). ++ func + + NewProducer + + + ¶ + +
+func NewProducer(conf *ConfigMap) (*Producer, error)++ NewProducer creates a new high-level Producer instance. +
+ conf is a *ConfigMap with standard librdkafka configuration properties. +
+ Supported special configuration properties (type, default): +
go.batch.producer (bool, false) - EXPERIMENTAL: Enable batch producer (for increased performance). These batches do not relate to Kafka message batches in any way. Note: timestamps and headers are not supported with this interface. go.delivery.reports (bool, true) - Forward per-message delivery reports to the @@ -6254,330 +6525,330 @@-go.logs.channel.enable (bool, false) - Forward log to Logs() channel. go.logs.channel (chan kafka.LogEvent, nil) - Forward logs to application-provided channel instead of Logs(). Requires go.logs.channel.enable=true.
- func (*Producer) - - AbortTransaction - - - ¶ - -
-func (p *Producer) AbortTransaction(ctx context.Context) error-- AbortTransaction aborts the ongoing transaction. -
- This function should also be used to recover from non-fatal abortable +
+ func (*Producer) + + AbortTransaction + + + ¶ + +
+func (p *Producer) AbortTransaction(ctx context.Context) error++ AbortTransaction aborts the ongoing transaction. +
+ This function should also be used to recover from non-fatal abortable transaction errors. -
- Any outstanding messages will be purged and fail with +
+ Any outstanding messages will be purged and fail with `ErrPurgeInflight` or `ErrPurgeQueue`. -
- Parameters: -
- Note: This function will block until all outstanding messages are purged +
+ Parameters: +
+ Note: This function will block until all outstanding messages are purged and the transaction abort request has been successfully handled by the transaction coordinator, or until the `ctx` expires, which ever comes first. On timeout the application may call the function again. -
- Note: Will automatically call `Purge()` and `Flush()` to ensure all queued +
+ Note: Will automatically call `Purge()` and `Flush()` to ensure all queued and in-flight messages are purged before attempting to abort the transaction. The application MUST serve the `producer.Events()` channel for delivery reports in a separate go-routine during this time. -
- Returns nil on success or an error object on failure. +
+ Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`. -
func (p *Producer) BeginTransaction() error-
- BeginTransaction starts a new transaction. -
- `InitTransactions()` must have been called successfully (once) +
func (p *Producer) BeginTransaction() error+
+ BeginTransaction starts a new transaction. +
+ `InitTransactions()` must have been called successfully (once) before this function is called. -
- Upon successful return from this function the application has to perform at +
+ Upon successful return from this function the application has to perform at least one of the following operations within `transaction.timeout.ms` to avoid timing out the transaction on the broker: -
- Any messages produced, offsets sent (`SendOffsetsToTransaction()`), +
+ Any messages produced, offsets sent (`SendOffsetsToTransaction()`), etc, after the successful return of this function will be part of the transaction and committed or aborted atomatically. -
- Finish the transaction by calling `CommitTransaction()` or +
+ Finish the transaction by calling `CommitTransaction()` or abort the transaction by calling `AbortTransaction()`. -
- Returns nil on success or an error object on failure. +
+ Returns nil on success or an error object on failure. Check whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`. -
- Note: With the transactional producer, `Produce()`, et.al, are only +
+ Note: With the transactional producer, `Produce()`, et.al, are only allowed during an on-going transaction, as started with this function. Any produce call outside an on-going transaction, or for a failed transaction, will fail. -
func (p *Producer) Close()-
- Close a Producer instance. +
func (p *Producer) Close()+
+ Close a Producer instance. The Producer object or its channels are no longer usable after this call. -
func (p *Producer) CommitTransaction(ctx context.Context) error-
- CommitTransaction commits the current transaction. -
- Any outstanding messages will be flushed (delivered) before actually +
func (p *Producer) CommitTransaction(ctx context.Context) error+
+ CommitTransaction commits the current transaction. +
+ Any outstanding messages will be flushed (delivered) before actually committing the transaction. -
- If any of the outstanding messages fail permanently the current +
+ If any of the outstanding messages fail permanently the current transaction will enter the abortable error state and this function will return an abortable error, in this case the application must call `AbortTransaction()` before attempting a new transaction with `BeginTransaction()`. -
- Parameters: -
- Note: This function will block until all outstanding messages are +
+ Parameters: +
+ Note: This function will block until all outstanding messages are delivered and the transaction commit request has been successfully handled by the transaction coordinator, or until the `ctx` expires, which ever comes first. On timeout the application may call the function again. -
- Note: Will automatically call `Flush()` to ensure all queued +
+ Note: Will automatically call `Flush()` to ensure all queued messages are delivered before attempting to commit the transaction. The application MUST serve the `producer.Events()` channel for delivery reports in a separate go-routine during this time. -
- Returns nil on success or an error object on failure. +
+ Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether an abortable or fatal error has been raised by calling `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` respectively. -
func (p *Producer) Events() chan Event-
- Events returns the Events channel (read) -
func (p *Producer) Flush(timeoutMs int) int-
- Flush and wait for outstanding messages and requests to complete delivery. +
func (p *Producer) Events() chan Event+
+ Events returns the Events channel (read) +
func (p *Producer) Flush(timeoutMs int) int+
+ Flush and wait for outstanding messages and requests to complete delivery. Runs until value reaches zero or on timeoutMs. Returns the number of outstanding events still un-flushed. BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to be reliable. -
func (p *Producer) GetFatalError() error-
- GetFatalError returns an Error object if the client instance has raised a fatal error, else nil. -
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)-
- GetMetadata queries broker for cluster and topic metadata. +
func (p *Producer) GetFatalError() error+
+ GetFatalError returns an Error object if the client instance has raised a fatal error, else nil. +
func (p *Producer) GetMetadata(topic *string, allTopics bool, timeoutMs int) (*Metadata, error)+
+ GetMetadata queries broker for cluster and topic metadata. If topic is non-nil only information about that topic is returned, else if allTopics is false only information about locally used topics is returned, else information about all topics is returned. GetMetadata is equivalent to listTopics, describeTopics and describeCluster in the Java API. -
func (p *Producer) InitTransactions(ctx context.Context) error-
- InitTransactions Initializes transactions for the producer instance. -
- This function ensures any transactions initiated by previous instances +
func (p *Producer) InitTransactions(ctx context.Context) error+
+ InitTransactions Initializes transactions for the producer instance. +
+ This function ensures any transactions initiated by previous instances of the producer with the same `transactional.id` are completed. If the previous instance failed with a transaction in progress the previous transaction will be aborted. This function needs to be called before any other transactional or produce functions are called when the `transactional.id` is configured. -
- If the last transaction had begun completion (following transaction commit) +
+ If the last transaction had begun completion (following transaction commit) but not yet finished, this function will await the previous transaction's completion. -
- When any previous transactions have been fenced this function +
+ When any previous transactions have been fenced this function will acquire the internal producer id and epoch, used in all future transactional messages issued by this producer instance. -
- Parameters: -
+ Parameters: +
- Returns nil on success or an error on failure. +
+ Returns nil on success or an error on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether a fatal error has been raised by calling `err.(kafka.Error).IsFatal()`. -
func (p *Producer) IsClosed() bool-
- IsClosed returns boolean representing if client is closed or not -
func (p *Producer) Len() int-
- Len returns the number of messages and requests waiting to be transmitted to the broker +
func (p *Producer) IsClosed() bool+
+ IsClosed returns boolean representing if client is closed or not +
func (p *Producer) Len() int+
+ Len returns the number of messages and requests waiting to be transmitted to the broker as well as delivery reports queued for the application. BUG: Tries to include messages on ProduceChannel, but it's not guaranteed to be reliable. -
func (p *Producer) Logs() chan LogEvent-
- Logs returns the Log channel (if enabled), else nil -
func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)-
- OffsetsForTimes looks up offsets by timestamp for the given partitions. -
- The returned offset for each partition is the earliest offset whose +
func (p *Producer) Logs() chan LogEvent+
+ Logs returns the Log channel (if enabled), else nil +
func (p *Producer) OffsetsForTimes(times []TopicPartition, timeoutMs int) (offsets []TopicPartition, err error)+
+ OffsetsForTimes looks up offsets by timestamp for the given partitions. +
+ The returned offset for each partition is the earliest offset whose timestamp is greater than or equal to the given timestamp in the corresponding partition. If the provided timestamp exceeds that of the last message in the partition, a value of -1 will be returned. -
- The timestamps to query are represented as `.Offset` in the `times` +
+ The timestamps to query are represented as `.Offset` in the `times` argument and the looked up offsets are represented as `.Offset` in the returned `offsets` list. -
- The function will block for at most timeoutMs milliseconds. -
- Duplicate Topic+Partitions are not supported. +
+ The function will block for at most timeoutMs milliseconds. +
+ Duplicate Topic+Partitions are not supported. Per-partition errors may be returned in the `.Error` field. -
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error-
- Produce single message. +
func (p *Producer) Produce(msg *Message, deliveryChan chan Event) error+
+ Produce single message. This is an asynchronous call that enqueues the message on the internal transmit queue, thus returning immediately. The delivery report will be sent on the provided deliveryChan if specified, @@ -6587,234 +6858,234 @@
func (p *Producer) ProduceChannel() chan *Message-
- ProduceChannel returns the produce *Message channel (write) -
- Deprecated: ProduceChannel (channel based producer) is deprecated in favour +
func (p *Producer) ProduceChannel() chan *Message+
+ ProduceChannel returns the produce *Message channel (write) +
+ Deprecated: ProduceChannel (channel based producer) is deprecated in favour of Produce(). Flush() and Len() are not guaranteed to be reliable with ProduceChannel. -
func (p *Producer) Purge(flags int) error-
- Purge messages currently handled by this producer instance. -
- flags is a combination of PurgeQueue, PurgeInFlight and PurgeNonBlocking. -
- The application will need to call Poll(), Flush() or read the Events() channel +
func (p *Producer) Purge(flags int) error+
+ Purge messages currently handled by this producer instance. +
+ flags is a combination of PurgeQueue, PurgeInFlight and PurgeNonBlocking. +
+ The application will need to call Poll(), Flush() or read the Events() channel after this call to serve delivery reports for the purged messages. -
- Messages purged from internal queues fail with the delivery report +
+ Messages purged from internal queues fail with the delivery report error code set to ErrPurgeQueue, while purged messages that are in-flight to or from the broker will fail with the error code set to ErrPurgeInflight. -
- Warning: Purging messages that are in-flight to or from the broker +
+ Warning: Purging messages that are in-flight to or from the broker will ignore any sub-sequent acknowledgement for these messages received from the broker, effectively making it impossible for the application to know if the messages were successfully produced or not. This may result in duplicate messages if the application retries these messages at a later time. -
- Note: This call may block for a short time while background thread +
+ Note: This call may block for a short time while background thread queues are purged. -
- Returns nil on success, ErrInvalidArg if the purge flags are invalid or unknown. -
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)-
- QueryWatermarkOffsets returns the broker's low and high offsets for the given topic -and partition. -
func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, consumerMetadata *ConsumerGroupMetadata) error-
- SendOffsetsToTransaction sends a list of topic partition offsets to the +
+ Returns nil on success, ErrInvalidArg if the purge flags are invalid or unknown. +
func (p *Producer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)+
+ QueryWatermarkOffsets returns the broker's low and high offsets for the given topic +and partition. +
func (p *Producer) SendOffsetsToTransaction(ctx context.Context, offsets []TopicPartition, consumerMetadata *ConsumerGroupMetadata) error+
+ SendOffsetsToTransaction sends a list of topic partition offsets to the consumer group coordinator for `consumerMetadata`, and marks the offsets as part part of the current transaction. These offsets will be considered committed only if the transaction is committed successfully. -
- The offsets should be the next message your application will consume, +
+ The offsets should be the next message your application will consume, i.e., the last processed message's offset + 1 for each partition. Either track the offsets manually during processing or use `consumer.Position()` (on the consumer) to get the current offsets for the partitions assigned to the consumer. -
- Use this method at the end of a consume-transform-produce loop prior +
+ Use this method at the end of a consume-transform-produce loop prior to committing the transaction with `CommitTransaction()`. -
- Parameters: -
+ Parameters: +
- Note: The consumer must disable auto commits (set `enable.auto.commit` to false on the consumer). -
- Note: Logical and invalid offsets (e.g., OffsetInvalid) in + + + + +
+ Note: The consumer must disable auto commits (set `enable.auto.commit` to false on the consumer). +
+ Note: Logical and invalid offsets (e.g., OffsetInvalid) in `offsets` will be ignored. If there are no valid offsets in `offsets` the function will return nil and no action will be taken. -
- Returns nil on success or an error object on failure. +
+ Returns nil on success or an error object on failure. Check whether the returned error object permits retrying by calling `err.(kafka.Error).IsRetriable()`, or whether an abortable or fatal error has been raised by calling `err.(kafka.Error).TxnRequiresAbort()` or `err.(kafka.Error).IsFatal()` respectively. -
func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error-
- SetOAuthBearerToken sets the the data to be transmitted +
func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
+ SetOAuthBearerToken sets the the data to be transmitted to a broker during SASL/OAUTHBEARER authentication. It will return nil on success, otherwise an error if: 1) the token data is invalid (meaning an expiration time in the past or either a token value or an extension key or value that does not meet the regular expression requirements as per - - https://tools.ietf.org/html/rfc7628#section-3.1 - - ); + + https://tools.ietf.org/html/rfc7628#section-3.1 + + ); 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 3) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism. -
func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error-
- SetOAuthBearerTokenFailure sets the error message describing why token +
func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error+
+ SetOAuthBearerTokenFailure sets the error message describing why token retrieval/setting failed; it also schedules a new token refresh event for 10 seconds later so the attempt may be retried. It will return nil on success, otherwise an error if: 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; 2) SASL/OAUTHBEARER is supported but is not configured as the client's authentication mechanism. -
func (p *Producer) SetSaslCredentials(username, password string) error-
- SetSaslCredentials sets the SASL credentials used for this producer. The new credentials +
func (p *Producer) SetSaslCredentials(username, password string) error+
+ SetSaslCredentials sets the SASL credentials used for this producer. The new credentials will overwrite the old ones (which were set when creating the producer or by a previous call to SetSaslCredentials). The new credentials will be used the next time this producer needs to authenticate to a broker. This method will not disconnect existing broker connections that were established with the old credentials. This method applies only to the SASL PLAIN and SCRAM mechanisms. -
func (p *Producer) String() string-
- String returns a human readable name for a Producer instance -
func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode-
- TestFatalError triggers a fatal error in the underlying client. +
func (p *Producer) String() string+
+ String returns a human readable name for a Producer instance +
func (p *Producer) TestFatalError(code ErrorCode, str string) ErrorCode+
+ TestFatalError triggers a fatal error in the underlying client. This is to be used strictly for testing purposes. -
- RebalanceCb provides a per-Subscribe*() rebalance event callback. +
+ RebalanceCb provides a per-Subscribe*() rebalance event callback. The passed Event will be either AssignedPartitions or RevokedPartitions -
type RebalanceCb func(*Consumer, Event) error-
- ResourcePatternType enumerates the different types of Kafka resource patterns. -
type ResourcePatternType int-
const ( +type RebalanceCb func(*Consumer, Event) error++ type + + ResourcePatternType + + + ¶ + +
++ ResourcePatternType enumerates the different types of Kafka resource patterns. +
type ResourcePatternType int+const ( // ResourcePatternTypeUnknown is a resource pattern type not known or not set. ResourcePatternTypeUnknown ResourcePatternType = C.RD_KAFKA_RESOURCE_PATTERN_UNKNOWN // ResourcePatternTypeAny matches any resource, used for lookups. @@ -6826,44 +7097,44 @@-// ResourcePatternTypePrefixed matches a prefixed resource name ResourcePatternTypePrefixed ResourcePatternType = C.RD_KAFKA_RESOURCE_PATTERN_PREFIXED )
- func - - ResourcePatternTypeFromString - - - ¶ - -
-func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error)-- ResourcePatternTypeFromString translates a resource pattern type name to +
+ func + + ResourcePatternTypeFromString + + + ¶ + +
+func ResourcePatternTypeFromString(patternTypeString string) (ResourcePatternType, error)++ ResourcePatternTypeFromString translates a resource pattern type name to a ResourcePatternType value. -
- func (ResourcePatternType) - - String - - - ¶ - -
-func (t ResourcePatternType) String() string-- String returns the human-readable representation of a ResourcePatternType -
- type - - ResourceType - - - ¶ - -
-- ResourceType represents an Apache Kafka resource type -
type ResourceType int-const ( ++ func (ResourcePatternType) + + String + + + ¶ + +
+func (t ResourcePatternType) String() string++ String returns the human-readable representation of a ResourcePatternType +
+ type + + ResourceType + + + ¶ + +
++ ResourceType represents an Apache Kafka resource type +
type ResourceType int+const ( // ResourceUnknown - Unknown ResourceUnknown ResourceType = C.RD_KAFKA_RESOURCE_UNKNOWN // ResourceAny - match any resource type (DescribeConfigs) @@ -6875,90 +7146,90 @@-// ResourceBroker - Broker ResourceBroker ResourceType = C.RD_KAFKA_RESOURCE_BROKER )
- func - - ResourceTypeFromString - - - ¶ - -
-func ResourceTypeFromString(typeString string) (ResourceType, error)-- ResourceTypeFromString translates a resource type name/string to +
+ func + + ResourceTypeFromString + + + ¶ + +
+func ResourceTypeFromString(typeString string) (ResourceType, error)++ ResourceTypeFromString translates a resource type name/string to a ResourceType value. -
- func (ResourceType) - - String - - - ¶ - -
-func (t ResourceType) String() string-- String returns the human-readable representation of a ResourceType -
- type - - RevokedPartitions - - - ¶ - -
-- RevokedPartitions consumer group rebalance event: revoked partition set -
type RevokedPartitions struct { ++ func (ResourceType) + + String + + + ¶ + +
+func (t ResourceType) String() string++ String returns the human-readable representation of a ResourceType +
+ type + + RevokedPartitions + + + ¶ + +
++ RevokedPartitions consumer group rebalance event: revoked partition set +
type RevokedPartitions struct { Partitions []TopicPartition }-- func (RevokedPartitions) - - String - - - ¶ - -
-func (e RevokedPartitions) String() string-- type - - ScramCredentialInfo - - - ¶ - -
-- ScramCredentialInfo contains Mechanism and Iterations for a +
+ func (RevokedPartitions) + + String + + + ¶ + +
+func (e RevokedPartitions) String() string++ type + + ScramCredentialInfo + + + ¶ + +
++ ScramCredentialInfo contains Mechanism and Iterations for a SASL/SCRAM credential associated with a user. -
type ScramCredentialInfo struct { +type ScramCredentialInfo struct { // Iterations - positive number of iterations used when creating the credential Iterations int // Mechanism - SASL/SCRAM mechanism Mechanism ScramMechanism }-- type - - ScramMechanism - - - ¶ - -
-- ScramMechanism enumerates SASL/SCRAM mechanisms. +
+ type + + ScramMechanism + + + ¶ + +
++ ScramMechanism enumerates SASL/SCRAM mechanisms. Used by `AdminClient.AlterUserScramCredentials` and `AdminClient.DescribeUserScramCredentials`. -
type ScramMechanism int-const ( +type ScramMechanism int+const ( // ScramMechanismUnknown - Unknown SASL/SCRAM mechanism ScramMechanismUnknown ScramMechanism = C.RD_KAFKA_SCRAM_MECHANISM_UNKNOWN // ScramMechanismSHA256 - SCRAM-SHA-256 mechanism @@ -6966,69 +7237,69 @@-// ScramMechanismSHA512 - SCRAM-SHA-512 mechanism ScramMechanismSHA512 ScramMechanism = C.RD_KAFKA_SCRAM_MECHANISM_SHA_512 )
- func - - ScramMechanismFromString - - - ¶ - -
-func ScramMechanismFromString(mechanism string) (ScramMechanism, error)-- ScramMechanismFromString translates a Scram Mechanism name to +
+ func + + ScramMechanismFromString + + + ¶ + +
+func ScramMechanismFromString(mechanism string) (ScramMechanism, error)++ ScramMechanismFromString translates a Scram Mechanism name to a ScramMechanism value. -
- func (ScramMechanism) - - String - - - ¶ - -
-func (o ScramMechanism) String() string-- String returns the human-readable representation of an ScramMechanism -
- type - - Stats - - - ¶ - -
-- Stats statistics event -
type Stats struct { ++ func (ScramMechanism) + + String + + + ¶ + +
+func (o ScramMechanism) String() string++ String returns the human-readable representation of an ScramMechanism +
+ type + + Stats + + + ¶ + +
++ Stats statistics event +
type Stats struct { // contains filtered or unexported fields }
-- func (Stats) - - String - - - ¶ - -
-func (e Stats) String() string-- type - - TimestampType - - - ¶ - -
-- TimestampType is a the Message timestamp type or source -
type TimestampType int-const ( ++ func (Stats) + + String + + + ¶ + +
+func (e Stats) String() string++ type + + TimestampType + + + ¶ + +
++ TimestampType is a the Message timestamp type or source +
type TimestampType int+const ( // TimestampNotAvailable indicates no timestamp was set, or not available due to lacking broker support TimestampNotAvailable TimestampType = C.RD_KAFKA_TIMESTAMP_NOT_AVAILABLE // TimestampCreateTime indicates timestamp set by producer (source time) @@ -7036,57 +7307,57 @@-// TimestampLogAppendTime indicates timestamp set set by broker (store time) TimestampLogAppendTime TimestampType = C.RD_KAFKA_TIMESTAMP_LOG_APPEND_TIME )
- func (TimestampType) - - String - - - ¶ - -
-func (t TimestampType) String() string-- type - - TopicCollection - - - ¶ - -
-- TopicCollection represents a collection of topics. -
type TopicCollection struct { ++ func (TimestampType) + + String + + + ¶ + +
+func (t TimestampType) String() string++ type + + TopicCollection + + + ¶ + +
++ TopicCollection represents a collection of topics. +
type TopicCollection struct { // contains filtered or unexported fields }
-- func - - NewTopicCollectionOfTopicNames - - - ¶ - -
-func NewTopicCollectionOfTopicNames(names []string) TopicCollection-- NewTopicCollectionOfTopicNames creates a new TopicCollection based on a list +
+ func + + NewTopicCollectionOfTopicNames + + + ¶ + +
+func NewTopicCollectionOfTopicNames(names []string) TopicCollection++ NewTopicCollectionOfTopicNames creates a new TopicCollection based on a list of topic names. -
- type - - TopicDescription - - - ¶ - -
-- TopicDescription represents the result of DescribeTopics for +
+ type + + TopicDescription + + + ¶ + +
++ TopicDescription represents the result of DescribeTopics for a single topic. -
type TopicDescription struct { +type TopicDescription struct { // Topic name. Name string // Topic Id @@ -7101,35 +7372,35 @@-AuthorizedOperations []ACLOperation }
- type - - TopicMetadata - - - ¶ - -
-- TopicMetadata contains per-topic metadata -
type TopicMetadata struct { ++ type + + TopicMetadata + + + ¶ + +
++ TopicMetadata contains per-topic metadata +
type TopicMetadata struct { Topic string Partitions []PartitionMetadata Error Error }-- type - - TopicPartition - - - ¶ - -
-- TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset. -
type TopicPartition struct { ++ type + + TopicPartition + + + ¶ + +
++ TopicPartition is a generic placeholder for a Topic+Partition and optionally Offset. +
type TopicPartition struct { Topic *string Partition int32 Offset Offset @@ -7138,29 +7409,29 @@-LeaderEpoch *int32 // LeaderEpoch or nil if not available }
- func (TopicPartition) - - String - - - ¶ - -
-func (p TopicPartition) String() string-- type - - TopicPartitionInfo - - - ¶ - -
-- TopicPartitionInfo represents a specific partition's information inside a +
+ func (TopicPartition) + + String + + + ¶ + +
+func (p TopicPartition) String() string++ type + + TopicPartitionInfo + + + ¶ + +
++ TopicPartitionInfo represents a specific partition's information inside a TopicDescription. -
type TopicPartitionInfo struct { +type TopicPartitionInfo struct { // Partition id. Partition int // Leader broker. @@ -7171,92 +7442,92 @@-Isr []Node }
- type - - TopicPartitions - - - ¶ - -
-- TopicPartitions is a slice of TopicPartitions that also implements +
+ type + + TopicPartitions + + + ¶ + +
++ TopicPartitions is a slice of TopicPartitions that also implements the sort interface -
type TopicPartitions []TopicPartition-- func (TopicPartitions) - - Len - - - ¶ - -
-func (tps TopicPartitions) Len() int-- func (TopicPartitions) - - Less - - - ¶ - -
-func (tps TopicPartitions) Less(i, j int) bool-- func (TopicPartitions) - - Swap - - - ¶ - -
-func (tps TopicPartitions) Swap(i, j int)-- type - - TopicResult - - - ¶ - -
-- TopicResult provides per-topic operation result (error) information. -
type TopicResult struct { +type TopicPartitions []TopicPartition++ func (TopicPartitions) + + Len + + + ¶ + +
+func (tps TopicPartitions) Len() int++ func (TopicPartitions) + + Less + + + ¶ + +
+func (tps TopicPartitions) Less(i, j int) bool++ func (TopicPartitions) + + Swap + + + ¶ + +
+func (tps TopicPartitions) Swap(i, j int)++ type + + TopicResult + + + ¶ + +
++ TopicResult provides per-topic operation result (error) information. +
type TopicResult struct { // Topic name Topic string // Error, if any, of result. Check with `Error.Code() != ErrNoError`. Error Error }-- func (TopicResult) - - String - - - ¶ - -
-func (t TopicResult) String() string-- String returns a human-readable representation of a TopicResult. -
- type - - TopicSpecification - - - ¶ - -
-- TopicSpecification holds parameters for creating a new topic. +
+ func (TopicResult) + + String + + + ¶ + +
+func (t TopicResult) String() string++ String returns a human-readable representation of a TopicResult. +
+ type + + TopicSpecification + + + ¶ + +
++ TopicSpecification holds parameters for creating a new topic. TopicSpecification is analogous to NewTopic in the Java Topic Admin API. -
type TopicSpecification struct { +type TopicSpecification struct { // Topic name to create. Topic string // Number of partitions in topic. @@ -7273,89 +7544,89 @@-Config map[string]string }
- type - - UUID - - - ¶ - -
-- UUID Kafka UUID representation -
type UUID struct { ++ type + + UUID + + + ¶ + +
++ UUID Kafka UUID representation +
type UUID struct { // contains filtered or unexported fields }
-- func (UUID) - - GetLeastSignificantBits - - - ¶ - -
-func (uuid UUID) GetLeastSignificantBits() int64-- GetLeastSignificantBits returns Least Significant 64 bits of the 128 bits UUID -
- func (UUID) - - GetMostSignificantBits - - - ¶ - -
-func (uuid UUID) GetMostSignificantBits() int64-- GetMostSignificantBits returns Most Significant 64 bits of the 128 bits UUID -
- func (UUID) - - String - - - ¶ - -
-func (uuid UUID) String() string-- Base64 string representation of the UUID -
- type - - UserScramCredentialDeletion - - - ¶ - -
-- UserScramCredentialDeletion is a request to delete +
+ func (UUID) + + GetLeastSignificantBits + + + ¶ + +
+func (uuid UUID) GetLeastSignificantBits() int64++ GetLeastSignificantBits returns Least Significant 64 bits of the 128 bits UUID +
+ func (UUID) + + GetMostSignificantBits + + + ¶ + +
+func (uuid UUID) GetMostSignificantBits() int64++ GetMostSignificantBits returns Most Significant 64 bits of the 128 bits UUID +
+ func (UUID) + + String + + + ¶ + +
+func (uuid UUID) String() string++ Base64 string representation of the UUID +
+ type + + UserScramCredentialDeletion + + + ¶ + +
++ UserScramCredentialDeletion is a request to delete a SASL/SCRAM credential for a user. -
type UserScramCredentialDeletion struct { +type UserScramCredentialDeletion struct { // User - user name User string // Mechanism - SASL/SCRAM mechanism. Mechanism ScramMechanism }-- type - - UserScramCredentialUpsertion - - - ¶ - -
-- UserScramCredentialUpsertion is a request to update/insert +
+ type + + UserScramCredentialUpsertion + + + ¶ + +
++ UserScramCredentialUpsertion is a request to update/insert a SASL/SCRAM credential for a user. -
type UserScramCredentialUpsertion struct { +type UserScramCredentialUpsertion struct { // User - user name User string // ScramCredentialInfo - the mechanism and iterations. @@ -7366,20 +7637,20 @@-Salt []byte }
- type - - UserScramCredentialsDescription - - - ¶ - -
-- UserScramCredentialsDescription represent all SASL/SCRAM credentials +
+ type + + UserScramCredentialsDescription + + + ¶ + +
++ UserScramCredentialsDescription represent all SASL/SCRAM credentials associated with a user that can be retrieved, or an error indicating why credentials could not be retrieved. -
type UserScramCredentialsDescription struct { +type UserScramCredentialsDescription struct { // User - the user name. User string // ScramCredentialInfos - SASL/SCRAM credential representations for the user. @@ -7388,30 +7659,49 @@- + + + + + + + + + + + + + + + + + + + diff --git a/kafka/build_darwin_amd64.go b/kafka/build_darwin_amd64.go index 15288bc8f..1460d1ea8 100644 --- a/kafka/build_darwin_amd64.go +++ b/kafka/build_darwin_amd64.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static darwin_amd64 from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static darwin_amd64 from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/build_darwin_arm64.go b/kafka/build_darwin_arm64.go index 1ee4f126a..7f88904fc 100644 --- a/kafka/build_darwin_arm64.go +++ b/kafka/build_darwin_arm64.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static darwin_arm64 from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static darwin_arm64 from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/build_glibc_linux_amd64.go b/kafka/build_glibc_linux_amd64.go index 57f7bb0dd..bb72dbe59 100644 --- a/kafka/build_glibc_linux_amd64.go +++ b/kafka/build_glibc_linux_amd64.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static glibc_linux_amd64 from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static glibc_linux_amd64 from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/build_glibc_linux_arm64.go b/kafka/build_glibc_linux_arm64.go index 56a10056b..8e8182a23 100644 --- a/kafka/build_glibc_linux_arm64.go +++ b/kafka/build_glibc_linux_arm64.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static glibc_linux_arm64 from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static glibc_linux_arm64 from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/build_musl_linux_amd64.go b/kafka/build_musl_linux_amd64.go index 79c27c657..833f136d2 100644 --- a/kafka/build_musl_linux_amd64.go +++ b/kafka/build_musl_linux_amd64.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static musl_linux_amd64 from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static musl_linux_amd64 from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/build_musl_linux_arm64.go b/kafka/build_musl_linux_arm64.go index 9c83b9c39..b0169814d 100644 --- a/kafka/build_musl_linux_arm64.go +++ b/kafka/build_musl_linux_arm64.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static musl_linux_arm64 from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static musl_linux_arm64 from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/build_windows.go b/kafka/build_windows.go index af7d80970..21d0d9bea 100644 --- a/kafka/build_windows.go +++ b/kafka/build_windows.go @@ -10,4 +10,4 @@ package kafka import "C" // LibrdkafkaLinkInfo explains how librdkafka was linked to the Go client -const LibrdkafkaLinkInfo = "static windows from librdkafka-static-bundle-v2.6.0-RC2.tgz" +const LibrdkafkaLinkInfo = "static windows from librdkafka-static-bundle-v2.6.0.tgz" diff --git a/kafka/generated_errors.go b/kafka/generated_errors.go index c741a48fc..3e8b07afe 100644 --- a/kafka/generated_errors.go +++ b/kafka/generated_errors.go @@ -1,7 +1,7 @@ package kafka // Copyright 2016-2024 Confluent Inc. -// AUTOMATICALLY GENERATED ON 2024-10-10 18:39:57.522607864 +0530 IST m=+0.000115365 USING librdkafka 2.6.0-RC2 +// AUTOMATICALLY GENERATED ON 2024-10-11 03:13:44.64397621 +0530 IST m=+0.000130567 USING librdkafka 2.6.0 /* #include "select_rdkafka.h" diff --git a/kafka/librdkafka_vendor/librdkafka_darwin_amd64.a b/kafka/librdkafka_vendor/librdkafka_darwin_amd64.a index be7732203..b7100e4b4 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_darwin_amd64.a and b/kafka/librdkafka_vendor/librdkafka_darwin_amd64.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_darwin_arm64.a b/kafka/librdkafka_vendor/librdkafka_darwin_arm64.a index eed570f6f..d697b2818 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_darwin_arm64.a and b/kafka/librdkafka_vendor/librdkafka_darwin_arm64.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_glibc_linux_amd64.a b/kafka/librdkafka_vendor/librdkafka_glibc_linux_amd64.a index 7aa391a0c..92f30a879 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_glibc_linux_amd64.a and b/kafka/librdkafka_vendor/librdkafka_glibc_linux_amd64.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_glibc_linux_arm64.a b/kafka/librdkafka_vendor/librdkafka_glibc_linux_arm64.a index 36492e27e..92c197468 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_glibc_linux_arm64.a and b/kafka/librdkafka_vendor/librdkafka_glibc_linux_arm64.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_musl_linux_amd64.a b/kafka/librdkafka_vendor/librdkafka_musl_linux_amd64.a index 2ea88ae9d..c017b6599 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_musl_linux_amd64.a and b/kafka/librdkafka_vendor/librdkafka_musl_linux_amd64.a differ diff --git a/kafka/librdkafka_vendor/librdkafka_musl_linux_arm64.a b/kafka/librdkafka_vendor/librdkafka_musl_linux_arm64.a index 436818e43..02339767b 100644 Binary files a/kafka/librdkafka_vendor/librdkafka_musl_linux_arm64.a and b/kafka/librdkafka_vendor/librdkafka_musl_linux_arm64.a differ diff --git a/kafkatest/go.mod b/kafkatest/go.mod index db750cdce..c173a73fc 100644 --- a/kafkatest/go.mod +++ b/kafkatest/go.mod @@ -8,7 +8,7 @@ replace github.com/confluentinc/confluent-kafka-go/v2 => ../ require ( github.com/alecthomas/kingpin v2.2.6+incompatible - github.com/confluentinc/confluent-kafka-go/v2 v2.6.0-RC2 + github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 ) require ( diff --git a/mk/doc-gen.py b/mk/doc-gen.py index 48fb1f03b..dcea3454c 100755 --- a/mk/doc-gen.py +++ b/mk/doc-gen.py @@ -24,7 +24,7 @@ def convert_path(url, base_url, after): sys.exit(1) package = sys.argv[1] - tag = "v2.6.0-RC2" + tag = "v2.6.0" major = tag.split(".")[0] # e.g. v2 base_css = "https://go.dev/css" base_js = "https://go.dev/js" diff --git a/schemaregistry/api.html b/schemaregistry/api.html index 5ca7405de..5270ca504 100644 --- a/schemaregistry/api.html +++ b/schemaregistry/api.html @@ -245,16 +245,16 @@Error Error }
- + config.go - + mock_schemaregistry_client.go - + schemaregistry_client.go - + testhelpers.go @@ -299,7 +299,7 @@
)
type - + Client @@ -345,7 +345,7 @@
}
func - + NewClient @@ -357,7 +357,7 @@
NewClient returns a Client implementation
type - + Compatibility @@ -369,7 +369,7 @@
type Compatibility intfunc (*Compatibility) - + MarshalJSON @@ -381,7 +381,7 @@
MarshalJSON implements json.Marshaler
func (*Compatibility) - + ParseString @@ -393,7 +393,7 @@
ParseString returns a Compatibility for the given string
func (*Compatibility) - + String @@ -403,7 +403,7 @@
func (c *Compatibility) String() stringfunc (*Compatibility) - + UnmarshalJSON @@ -415,7 +415,7 @@
UnmarshalJSON implements json.Unmarshaler
type - + Config @@ -430,7 +430,7 @@
func - + NewConfig @@ -442,7 +442,7 @@
NewConfig returns a new configuration instance with sane defaults.
func - + NewConfigWithAuthentication @@ -456,7 +456,7 @@
This method is deprecated.
func - + NewConfigWithBasicAuthentication @@ -469,7 +469,7 @@
For Confluent Cloud, use the API key for the username and the API secret for the password.
func - + NewConfigWithBearerAuthentication @@ -483,7 +483,7 @@
identityPoolID(`bearer.auth.identity.pool.id`) is required
type - + Metadata @@ -500,7 +500,7 @@
type - + Reference @@ -517,7 +517,7 @@
type - + Rule @@ -553,7 +553,7 @@
type - + RuleMode @@ -565,7 +565,7 @@
type RuleMode = intfunc - + ParseMode @@ -577,7 +577,7 @@
ParseMode parses the given rule mode
type - + RuleSet @@ -593,7 +593,7 @@
func (*RuleSet) - + HasRules @@ -605,7 +605,7 @@
HasRules checks if the ruleset has rules for the given mode
type - + SchemaInfo @@ -624,7 +624,7 @@
func (*SchemaInfo) - + MarshalJSON @@ -636,7 +636,7 @@
MarshalJSON implements the json.Marshaler interface
func (*SchemaInfo) - + UnmarshalJSON @@ -648,7 +648,7 @@
UnmarshalJSON implements the json.Unmarshaller interface
type - + SchemaMetadata @@ -666,7 +666,7 @@
func (*SchemaMetadata) - + MarshalJSON @@ -678,7 +678,7 @@
MarshalJSON implements the json.Marshaler interface
func (*SchemaMetadata) - + UnmarshalJSON @@ -690,7 +690,7 @@
UnmarshalJSON implements the json.Unmarshaller interface
type - + ServerConfig @@ -715,7 +715,7 @@
type - + SubjectAndVersion diff --git a/soaktest/go.mod b/soaktest/go.mod index a43578a82..7a4cd11eb 100644 --- a/soaktest/go.mod +++ b/soaktest/go.mod @@ -8,7 +8,7 @@ replace github.com/confluentinc/confluent-kafka-go/v2 => ../ require ( github.com/DataDog/datadog-go v4.8.3+incompatible - github.com/confluentinc/confluent-kafka-go/v2 v2.6.0-RC2 + github.com/confluentinc/confluent-kafka-go/v2 v2.6.0 github.com/shirou/gopsutil v3.21.11+incompatible )