From 4adbd7de1831e330eccfab71829fe9111045fc11 Mon Sep 17 00:00:00 2001 From: Andrew Stanovsky Date: Mon, 15 Apr 2024 18:42:39 -0700 Subject: [PATCH] Add setToken API for OAuthBearer flow --- examples/oauthbearer-default-flow.md | 80 ++++++++++++++++++++++++++++ index.d.ts | 8 ++- lib/admin.js | 22 +++++++- lib/client.js | 19 +++++++ lib/kafka-consumer-stream.js | 16 ++++++ src/admin.cc | 19 +++++++ src/connection.cc | 40 ++++++++++++++ src/connection.h | 11 +++- src/kafka-consumer.cc | 19 +++++++ src/producer.cc | 20 +++++++ test/binding.spec.js | 2 +- test/consumer.spec.js | 2 +- 12 files changed, 253 insertions(+), 5 deletions(-) create mode 100644 examples/oauthbearer-default-flow.md diff --git a/examples/oauthbearer-default-flow.md b/examples/oauthbearer-default-flow.md new file mode 100644 index 00000000..06cb9aab --- /dev/null +++ b/examples/oauthbearer-default-flow.md @@ -0,0 +1,80 @@ +Producer, Consumer and HighLevelProducer: +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ + +var Kafka = require('../'); + +var token = "your_token"; + +var producer = new Kafka.Producer({ + //'debug' : 'all', + 'metadata.broker.list': 'localhost:9093', + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'OAUTHBEARER', +}).setOauthBearerToken(token); + +//start the producer +producer.connect(); + +//refresh the token +producer.setOauthBearerToken(token); +``` + +AdminClient: +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ +var Kafka = require('../'); + +var token = "your_token"; + +var admin = Kafka.AdminClient.create({ + 'metadata.broker.list': 'localhost:9093', + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'OAUTHBEARER', +}, token); + +//refresh the token +admin.refreshOauthBearerToken(token); +``` + +ConsumerStream: +```js +/* + * node-rdkafka - Node.js wrapper for RdKafka C/C++ library + * + * Copyright (c) 2016 Blizzard Entertainment + * + * This software may be modified and distributed under the terms + * of the MIT license. See the LICENSE.txt file for details. + */ +var Kafka = require('../'); + +var token = "your_token"; + +var stream = Kafka.KafkaConsumer.createReadStream({ + 'metadata.broker.list': 'localhost:9093', + 'group.id': 'myGroup', + 'security.protocol': 'SASL_SSL', + 'sasl.mechanisms': 'OAUTHBEARER' + }, {}, { + topics: 'test1', + initOauthBearerToken: token, + }); + +//refresh the token +stream.refreshOauthBearerToken(token.token); +``` diff --git a/index.d.ts b/index.d.ts index d7ce7e61..4cd46ec1 100644 --- a/index.d.ts +++ b/index.d.ts @@ -117,6 +117,7 @@ export interface ReadStreamOptions extends ReadableOptions { autoClose?: boolean; streamAsBatch?: boolean; connectOptions?: any; + initOauthBearerToken?: string; } export interface WriteStreamOptions extends WritableOptions { @@ -137,6 +138,7 @@ export interface ProducerStream extends Writable { export interface ConsumerStream extends Readable { consumer: KafkaConsumer; connect(options: ConsumerGlobalConfig): void; + refreshOauthBearerToken(tokenStr: string): void; close(cb?: () => void): void; } @@ -180,6 +182,8 @@ export abstract class Client extends EventEmitter { connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this; + setOauthBearerToken(tokenStr: string): this; + getClient(): any; connectedTime(): number; @@ -330,6 +334,8 @@ export interface NewTopic { } export interface IAdminClient { + refreshOauthBearerToken(tokenStr: string): void; + createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void; createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void; @@ -343,5 +349,5 @@ export interface IAdminClient { } export abstract class AdminClient { - static create(conf: GlobalConfig): IAdminClient; + static create(conf: GlobalConfig, initOauthBearerToken?: string): IAdminClient; } diff --git a/lib/admin.js b/lib/admin.js index 773dc957..bbe06084 100644 --- a/lib/admin.js +++ b/lib/admin.js @@ -25,9 +25,13 @@ var shallowCopy = require('./util').shallowCopy; * active handle with the brokers. * */ -function createAdminClient(conf) { +function createAdminClient(conf, initOauthBearerToken) { var client = new AdminClient(conf); + if (initOauthBearerToken) { + client.refreshOauthBearerToken(initOauthBearerToken); + } + // Wrap the error so we throw if it failed with some context LibrdKafkaError.wrap(client.connect(), true); @@ -105,6 +109,22 @@ AdminClient.prototype.disconnect = function() { this._isConnected = false; }; +/** + * Refresh OAuthBearer token, initially provided in factory method. + * Expiry is always set to maximum value, as the callback of librdkafka + * for token refresh is not used. + * + * @param {string} tokenStr - OAuthBearer token string + * @see connection.cc + */ +AdminClient.prototype.refreshOauthBearerToken = function (tokenStr) { + if (!tokenStr || typeof tokenStr !== 'string') { + throw new Error("OAuthBearer token is undefined/empty or not a string"); + } + + this._client.setToken(tokenStr); +}; + /** * Create a topic with a given config. * diff --git a/lib/client.js b/lib/client.js index 9cbd3f9a..354f5446 100644 --- a/lib/client.js +++ b/lib/client.js @@ -229,6 +229,25 @@ Client.prototype.connect = function(metadataOptions, cb) { }; +/** + * Set initial token before any connection is established for oauthbearer authentication flow. + * Expiry is always set to maximum value, as the callback of librdkafka + * for token refresh is not used. + * Call this method again to refresh the token. + * + * @param {string} tokenStr - OAuthBearer token string + * @see connection.cc + * @return {Client} - Returns itself. + */ +Client.prototype.setOauthBearerToken = function (tokenStr) { + if (!tokenStr || typeof tokenStr !== 'string') { + throw new Error("OAuthBearer token is undefined/empty or not a string"); + } + + this._client.setToken(tokenStr); + return this; +}; + /** * Get the native Kafka client. * diff --git a/lib/kafka-consumer-stream.js b/lib/kafka-consumer-stream.js index 0abb5358..ce1fbe95 100644 --- a/lib/kafka-consumer-stream.js +++ b/lib/kafka-consumer-stream.js @@ -112,6 +112,10 @@ function KafkaConsumerStream(consumer, options) { self.push(null); }); + if (options.initOauthBearerToken) { + this.consumer.setOauthBearerToken(options.initOauthBearerToken); + } + // Call connect. Handles potentially being connected already this.connect(this.connectOptions); @@ -123,6 +127,18 @@ function KafkaConsumerStream(consumer, options) { } +/** + * Refresh OAuthBearer token, initially provided in factory method. + * Expiry is always set to maximum value, as the callback of librdkafka + * for token refresh is not used. + * + * @param {string} tokenStr - OAuthBearer token string + * @see connection.cc + */ +KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr) { + this.consumer.setOauthBearerToken(tokenStr); +}; + /** * Internal stream read method. This method reads message objects. * @param {number} size - This parameter is ignored for our cases. diff --git a/src/admin.cc b/src/admin.cc index 1453ad35..b5c7c8f0 100644 --- a/src/admin.cc +++ b/src/admin.cc @@ -49,6 +49,24 @@ Baton AdminClient::Connect() { return Baton(RdKafka::ERR__STATE, errstr); } + if (m_init_oauthToken) { + scoped_shared_write_lock lock(m_connection_lock); + if (m_init_oauthToken) { + std::list emptyList; + std::string token = m_init_oauthToken->token; + int64_t expiry = m_init_oauthToken->expiry; + // needed for initial connection only + m_init_oauthToken.reset(); + + RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + return Baton(err, errstr); + } + } + } + if (rkqu == NULL) { rkqu = rd_kafka_queue_new(m_client->c_ptr()); } @@ -88,6 +106,7 @@ void AdminClient::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); + Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken); constructor.Reset( (tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked()); diff --git a/src/connection.cc b/src/connection.cc index 9de3b3c1..68099b8a 100644 --- a/src/connection.cc +++ b/src/connection.cc @@ -7,6 +7,7 @@ * of the MIT license. See the LICENSE.txt file for details. */ +#include #include #include @@ -226,6 +227,45 @@ void Connection::ConfigureCallback(const std::string &string_key, const v8::Loca } // NAN METHODS +NAN_METHOD(Connection::NodeSetToken) +{ + if (info.Length() < 1 || !info[0]->IsString()) { + Nan::ThrowError("Token argument must be a string"); + return; + } + + Nan::Utf8String tk(info[0]); + std::string token = *tk; + // we always set expiry to maximum value in ms, as we don't use refresh callback, + // rdkafka continues sending a token even if it expired. Client code must + // handle token refreshing by calling 'setToken' again when needed. + int64_t expiry = std::numeric_limits::max() / 100000; + Connection* obj = ObjectWrap::Unwrap(info.This()); + RdKafka::Handle* handle = obj->m_client; + + if (!handle) { + scoped_shared_write_lock lock(obj->m_connection_lock); + obj->m_init_oauthToken = std::make_unique( + OauthBearerToken{token, expiry}); + info.GetReturnValue().Set(Nan::Null()); + return; + } + + { + scoped_shared_write_lock lock(obj->m_connection_lock); + std::string errstr; + std::list emptyList; + RdKafka::ErrorCode err = handle->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + Nan::ThrowError(errstr.c_str()); + return; + } + } + + info.GetReturnValue().Set(Nan::Null()); +} NAN_METHOD(Connection::NodeGetMetadata) { Nan::HandleScope scope; diff --git a/src/connection.h b/src/connection.h index 8c4ac73f..8ee3f052 100644 --- a/src/connection.h +++ b/src/connection.h @@ -45,7 +45,13 @@ namespace NodeKafka { */ class Connection : public Nan::ObjectWrap { - public: + struct OauthBearerToken + { + std::string token; + int64_t expiry; + }; + +public: bool IsConnected(); bool IsClosing(); @@ -82,10 +88,13 @@ class Connection : public Nan::ObjectWrap { Conf* m_tconfig; std::string m_errstr; + std::unique_ptr m_init_oauthToken; + uv_rwlock_t m_connection_lock; RdKafka::Handle* m_client; + static NAN_METHOD(NodeSetToken); static NAN_METHOD(NodeConfigureCallbacks); static NAN_METHOD(NodeGetMetadata); static NAN_METHOD(NodeQueryWatermarkOffsets); diff --git a/src/kafka-consumer.cc b/src/kafka-consumer.cc index 019b0cb6..0f5e32ed 100644 --- a/src/kafka-consumer.cc +++ b/src/kafka-consumer.cc @@ -56,6 +56,24 @@ Baton KafkaConsumer::Connect() { return Baton(RdKafka::ERR__STATE, errstr); } + if (m_init_oauthToken) { + scoped_shared_write_lock lock(m_connection_lock); + if (m_init_oauthToken) { + std::list emptyList; + std::string token = m_init_oauthToken->token; + int64_t expiry = m_init_oauthToken->expiry; + // needed for initial connection only + m_init_oauthToken.reset(); + + RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + return Baton(err, errstr); + } + } + } + if (m_partitions.size() > 0) { m_client->resume(m_partitions); } @@ -499,6 +517,7 @@ void KafkaConsumer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); + Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT Nan::SetPrototypeMethod(tpl, "offsetsForTimes", NodeOffsetsForTimes); diff --git a/src/producer.cc b/src/producer.cc index 04e75688..c8e3e632 100644 --- a/src/producer.cc +++ b/src/producer.cc @@ -66,6 +66,7 @@ void Producer::Init(v8::Local exports) { Nan::SetPrototypeMethod(tpl, "connect", NodeConnect); Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect); + Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken); Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata); Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT Nan::SetPrototypeMethod(tpl, "poll", NodePoll); @@ -183,6 +184,25 @@ Baton Producer::Connect() { return Baton(RdKafka::ERR__STATE, errstr); } + if (m_init_oauthToken) { + scoped_shared_write_lock lock(m_connection_lock); + if (m_init_oauthToken) { + std::list emptyList; + std::string token = m_init_oauthToken->token; + int64_t expiry = m_init_oauthToken->expiry; + // needed for initial connection only + m_init_oauthToken.reset(); + + RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry, + "", emptyList, errstr); + + if (err != RdKafka::ERR_NO_ERROR) { + return Baton(err, errstr); + } + } + } + + return Baton(RdKafka::ERR_NO_ERROR); } diff --git a/test/binding.spec.js b/test/binding.spec.js index b82c7bc4..117fde1b 100644 --- a/test/binding.spec.js +++ b/test/binding.spec.js @@ -56,7 +56,7 @@ module.exports = { }); }, 'has necessary methods from superclass': function() { - var methods = ['connect', 'disconnect', 'configureCallbacks', 'getMetadata']; + var methods = ['connect', 'disconnect', 'setToken', 'configureCallbacks', 'getMetadata']; methods.forEach(function(m) { t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method'); }); diff --git a/test/consumer.spec.js b/test/consumer.spec.js index 40b52ee4..4fc3bd53 100644 --- a/test/consumer.spec.js +++ b/test/consumer.spec.js @@ -71,7 +71,7 @@ module.exports = { }); }, 'has necessary methods from superclass': function() { - var methods = ['connect', 'disconnect', 'configureCallbacks', 'getMetadata']; + var methods = ['connect', 'disconnect', 'setToken', 'configureCallbacks', 'getMetadata']; methods.forEach(function(m) { t.equal(typeof(client[m]), 'function', 'Client is missing ' + m + ' method'); });