From 48acc03b9ddd85b5c8c268b480d5a9a3d523b9ad Mon Sep 17 00:00:00 2001
From: Magnus Edenhill
* `KafkaError` - client (error codes are prefixed with _) or broker error.
These errors are normally just informational since the
client will try its best to automatically recover (eventually).
+
+ * `OAuthBearerTokenRefresh` - retrieval of a new SASL/OAUTHBEARER token is required. +This event only occurs with sasl.mechanism=OAUTHBEARER. +Be sure to invoke SetOAuthBearerToken() on the Producer/Consumer/AdminClient +instance when a successful token retrieval is completed, otherwise be sure to +invoke SetOAuthBearerTokenFailure() to indicate that retrieval failed (or +if setting the token failed, which could happen if an extension doesn't meet +the required regular expression); invoking SetOAuthBearerTokenFailure() will +schedule a new event for 10 seconds later so another retrieval can be attempted.
Hint: If your application registers a signal notification @@ -308,6 +318,16 @@
const PartitionAny = int32(C.RD_KAFKA_PARTITION_UA)
func (a *AdminClient) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
+ SetOAuthBearerToken sets the the data to be transmitted +to a broker during SASL/OAUTHBEARER authentication. It will return nil +on success, otherwise an error if: +1) the token data is invalid (meaning an expiration time in the past +or either a token value or an extension key or value that does not meet +the regular expression requirements as per + + https://tools.ietf.org/html/rfc7628#section-3.1 + + ); +2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; +3) SASL/OAUTHBEARER is supported but is not configured as the client's +authentication mechanism. +
+func (a *AdminClient) SetOAuthBearerTokenFailure(errstr string) error+
+ SetOAuthBearerTokenFailure sets the error message describing why token +retrieval/setting failed; it also schedules a new token refresh event for 10 +seconds later so the attempt may be retried. It will return nil on +success, otherwise an error if: +1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; +2) SASL/OAUTHBEARER is supported but is not configured as the client's +authentication mechanism.
func (c *Consumer) GetWatermarkOffsets(topic string, partition int32) (low, high int64, err error)+
+ GetWatermarkOffsets returns the cached low and high offsets for the given topic +and partition. The high offset is populated on every fetch response or via calling QueryWatermarkOffsets. +The low offset is populated every statistics.interval.ms if that value is set. +OffsetInvalid will be returned if there is no cached offset for either value.
func (c *Consumer) QueryWatermarkOffsets(topic string, partition int32, timeoutMs int) (low, high int64, err error)
- QueryWatermarkOffsets returns the broker's low and high offsets for the given topic -and partition. + QueryWatermarkOffsets queries the broker for the low and high offsets for the given topic and partition.
Returns an error on failure or nil otherwise.
+func (c *Consumer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
+ SetOAuthBearerToken sets the the data to be transmitted +to a broker during SASL/OAUTHBEARER authentication. It will return nil +on success, otherwise an error if: +1) the token data is invalid (meaning an expiration time in the past +or either a token value or an extension key or value that does not meet +the regular expression requirements as per + + https://tools.ietf.org/html/rfc7628#section-3.1 + + ); +2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; +3) SASL/OAUTHBEARER is supported but is not configured as the client's +authentication mechanism. +
+func (c *Consumer) SetOAuthBearerTokenFailure(errstr string) error+
+ SetOAuthBearerTokenFailure sets the error message describing why token +retrieval/setting failed; it also schedules a new token refresh event for 10 +seconds later so the attempt may be retried. It will return nil on +success, otherwise an error if: +1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; +2) SASL/OAUTHBEARER is supported but is not configured as the client's +authentication mechanism. +
type Handle interface { + // SetOAuthBearerToken sets the the data to be transmitted + // to a broker during SASL/OAUTHBEARER authentication. It will return nil + // on success, otherwise an error if: + // 1) the token data is invalid (meaning an expiration time in the past + // or either a token value or an extension key or value that does not meet + // the regular expression requirements as per + // https://tools.ietf.org/html/rfc7628#section-3.1); + // 2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; + // 3) SASL/OAUTHBEARER is supported but is not configured as the client's + // authentication mechanism. + SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error + + // SetOAuthBearerTokenFailure sets the error message describing why token + // retrieval/setting failed; it also schedules a new token refresh event for 10 + // seconds later so the attempt may be retried. It will return nil on + // success, otherwise an error if: + // 1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; + // 2) SASL/OAUTHBEARER is supported but is not configured as the client's + // authentication mechanism. + SetOAuthBearerTokenFailure(errstr string) error // contains filtered or unexported methods }
+ OAuthBearerToken represents the data to be transmitted +to a broker during SASL/OAUTHBEARER authentication. +
+type OAuthBearerToken struct { + // Token value, often (but not necessarily) a JWS compact serialization + // as per https://tools.ietf.org/html/rfc7515#section-3.1; it must meet + // the regular expression for a SASL/OAUTHBEARER value defined at + // https://tools.ietf.org/html/rfc7628#section-3.1 + TokenValue string + // Metadata about the token indicating when it expires (local time); + // it must represent a time in the future + Expiration time.Time + // Metadata about the token indicating the Kafka principal name + // to which it applies (for example, "admin") + Principal string + // SASL extensions, if any, to be communicated to the broker during + // authentication (all keys and values of which must meet the regular + // expressions defined at https://tools.ietf.org/html/rfc7628#section-3.1, + // and it must not contain the reserved "auth" key) + Extensions map[string]string +} ++
+ OAuthBearerTokenRefresh indicates token refresh is required +
+type OAuthBearerTokenRefresh struct {
+ // Config is the value of the sasl.oauthbearer.config property
+ Config string
+}
+
+ func (o OAuthBearerTokenRefresh) String() string
QueryWatermarkOffsets returns the broker's low and high offsets for the given topic and partition. +
+func (p *Producer) SetOAuthBearerToken(oauthBearerToken OAuthBearerToken) error+
+ SetOAuthBearerToken sets the the data to be transmitted +to a broker during SASL/OAUTHBEARER authentication. It will return nil +on success, otherwise an error if: +1) the token data is invalid (meaning an expiration time in the past +or either a token value or an extension key or value that does not meet +the regular expression requirements as per + + https://tools.ietf.org/html/rfc7628#section-3.1 + + ); +2) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; +3) SASL/OAUTHBEARER is supported but is not configured as the client's +authentication mechanism. +
+func (p *Producer) SetOAuthBearerTokenFailure(errstr string) error+
+ SetOAuthBearerTokenFailure sets the error message describing why token +retrieval/setting failed; it also schedules a new token refresh event for 10 +seconds later so the attempt may be retried. It will return nil on +success, otherwise an error if: +1) SASL/OAUTHBEARER is not supported by the underlying librdkafka build; +2) SASL/OAUTHBEARER is supported but is not configured as the client's +authentication mechanism.
func (p TopicPartition) String() string
type TopicPartitions []TopicPartition
func (tps TopicPartitions) Len() int
func (tps TopicPartitions) Less(i, j int) bool