Skip to content

Commit

Permalink
Add setToken API for OAuthBearer flow
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewstanovsky committed Apr 16, 2024
1 parent c0c0e32 commit 7d339f7
Show file tree
Hide file tree
Showing 12 changed files with 253 additions and 5 deletions.
80 changes: 80 additions & 0 deletions examples/oauthbearer-default-flow.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
Producer, Consumer and HighLevelProducer:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/

var Kafka = require('../');

var token = "your_token";

var producer = new Kafka.Producer({
//'debug' : 'all',
'metadata.broker.list': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
}).setOauthBearerToken(token);

//start the producer
producer.connect();

//refresh the token
producer.setOauthBearerToken(token);
```

AdminClient:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');

var token = "your_token";

var admin = Kafka.AdminClient.create({
'metadata.broker.list': 'localhost:9093',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER',
}, token);

//refresh the token
admin.refreshOauthBearerToken(token);
```

ConsumerStream:
```js
/*
* node-rdkafka - Node.js wrapper for RdKafka C/C++ library
*
* Copyright (c) 2016 Blizzard Entertainment
*
* This software may be modified and distributed under the terms
* of the MIT license. See the LICENSE.txt file for details.
*/
var Kafka = require('../');

var token = "your_token";

var stream = Kafka.KafkaConsumer.createReadStream({
'metadata.broker.list': 'localhost:9093',
'group.id': 'myGroup',
'security.protocol': 'SASL_SSL',
'sasl.mechanisms': 'OAUTHBEARER'
}, {}, {
topics: 'test1',
initOauthBearerToken: token,
});

//refresh the token
stream.refreshOauthBearerToken(token.token);
```
8 changes: 7 additions & 1 deletion index.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ export interface ReadStreamOptions extends ReadableOptions {
autoClose?: boolean;
streamAsBatch?: boolean;
connectOptions?: any;
initOauthBearerToken?: string;
}

export interface WriteStreamOptions extends WritableOptions {
Expand All @@ -137,6 +138,7 @@ export interface ProducerStream extends Writable {
export interface ConsumerStream extends Readable {
consumer: KafkaConsumer;
connect(options: ConsumerGlobalConfig): void;
refreshOauthBearerToken(tokenStr: string): void;
close(cb?: () => void): void;
}

Expand Down Expand Up @@ -180,6 +182,8 @@ export abstract class Client<Events extends string> extends EventEmitter {

connect(metadataOptions?: MetadataOptions, cb?: (err: LibrdKafkaError, data: Metadata) => any): this;

setOauthBearerToken(tokenStr: string): this;

getClient(): any;

connectedTime(): number;
Expand Down Expand Up @@ -330,6 +334,8 @@ export interface NewTopic {
}

export interface IAdminClient {
refreshOauthBearerToken(tokenStr: string): void;

createTopic(topic: NewTopic, cb?: (err: LibrdKafkaError) => void): void;
createTopic(topic: NewTopic, timeout?: number, cb?: (err: LibrdKafkaError) => void): void;

Expand All @@ -343,5 +349,5 @@ export interface IAdminClient {
}

export abstract class AdminClient {
static create(conf: GlobalConfig): IAdminClient;
static create(conf: GlobalConfig, initOauthBearerToken?: string): IAdminClient;
}
22 changes: 21 additions & 1 deletion lib/admin.js
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,13 @@ var shallowCopy = require('./util').shallowCopy;
* active handle with the brokers.
*
*/
function createAdminClient(conf) {
function createAdminClient(conf, initOauthBearerToken) {
var client = new AdminClient(conf);

if (initOauthBearerToken) {
client.refreshOauthBearerToken(initOauthBearerToken);
}

// Wrap the error so we throw if it failed with some context
LibrdKafkaError.wrap(client.connect(), true);

Expand Down Expand Up @@ -105,6 +109,22 @@ AdminClient.prototype.disconnect = function() {
this._isConnected = false;
};

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
*/
AdminClient.prototype.refreshOauthBearerToken = function (tokenStr) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
};

/**
* Create a topic with a given config.
*
Expand Down
19 changes: 19 additions & 0 deletions lib/client.js
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,25 @@ Client.prototype.connect = function(metadataOptions, cb) {

};

/**
* Set initial token before any connection is established for oauthbearer authentication flow.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
* Call this method again to refresh the token.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
* @return {Client} - Returns itself.
*/
Client.prototype.setOauthBearerToken = function (tokenStr) {
if (!tokenStr || typeof tokenStr !== 'string') {
throw new Error("OAuthBearer token is undefined/empty or not a string");
}

this._client.setToken(tokenStr);
return this;
};

/**
* Get the native Kafka client.
*
Expand Down
16 changes: 16 additions & 0 deletions lib/kafka-consumer-stream.js
Original file line number Diff line number Diff line change
Expand Up @@ -112,6 +112,10 @@ function KafkaConsumerStream(consumer, options) {
self.push(null);
});

if (options.initOauthBearerToken) {
this.consumer.setOauthBearerToken(options.initOauthBearerToken);
}

// Call connect. Handles potentially being connected already
this.connect(this.connectOptions);

Expand All @@ -123,6 +127,18 @@ function KafkaConsumerStream(consumer, options) {

}

/**
* Refresh OAuthBearer token, initially provided in factory method.
* Expiry is always set to maximum value, as the callback of librdkafka
* for token refresh is not used.
*
* @param {string} tokenStr - OAuthBearer token string
* @see connection.cc
*/
KafkaConsumerStream.prototype.refreshOauthBearerToken = function (tokenStr) {
this.consumer.setOauthBearerToken(tokenStr);
};

/**
* Internal stream read method. This method reads message objects.
* @param {number} size - This parameter is ignored for our cases.
Expand Down
19 changes: 19 additions & 0 deletions src/admin.cc
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,24 @@ Baton AdminClient::Connect() {
return Baton(RdKafka::ERR__STATE, errstr);
}

if (m_init_oauthToken) {
scoped_shared_write_lock lock(m_connection_lock);
if (m_init_oauthToken) {
std::list<std::string> emptyList;
std::string token = m_init_oauthToken->token;
int64_t expiry = m_init_oauthToken->expiry;
// needed for initial connection only
m_init_oauthToken.reset();

RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry,
"", emptyList, errstr);

if (err != RdKafka::ERR_NO_ERROR) {
return Baton(err, errstr);
}
}
}

if (rkqu == NULL) {
rkqu = rd_kafka_queue_new(m_client->c_ptr());
}
Expand Down Expand Up @@ -88,6 +106,7 @@ void AdminClient::Init(v8::Local<v8::Object> exports) {

Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);

constructor.Reset(
(tpl->GetFunction(Nan::GetCurrentContext())).ToLocalChecked());
Expand Down
40 changes: 40 additions & 0 deletions src/connection.cc
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
* of the MIT license. See the LICENSE.txt file for details.
*/

#include <limits>
#include <string>
#include <vector>

Expand Down Expand Up @@ -226,6 +227,45 @@ void Connection::ConfigureCallback(const std::string &string_key, const v8::Loca
}

// NAN METHODS
NAN_METHOD(Connection::NodeSetToken)
{
if (info.Length() < 1 || !info[0]->IsString()) {
Nan::ThrowError("Token argument must be a string");
return;
}

Nan::Utf8String tk(info[0]);
std::string token = *tk;
// we always set expiry to maximum value in ms, as we don't use refresh callback,
// rdkafka continues sending a token even if it expired. Client code must
// handle token refreshing by calling 'setToken' again when needed.
int64_t expiry = std::numeric_limits<int64_t>::max() / 100000;
Connection* obj = ObjectWrap::Unwrap<Connection>(info.This());
RdKafka::Handle* handle = obj->m_client;

if (!handle) {
scoped_shared_write_lock lock(obj->m_connection_lock);
obj->m_init_oauthToken = std::make_unique<OauthBearerToken>(
OauthBearerToken{token, expiry});
info.GetReturnValue().Set(Nan::Null());
return;
}

{
scoped_shared_write_lock lock(obj->m_connection_lock);
std::string errstr;
std::list<std::string> emptyList;
RdKafka::ErrorCode err = handle->oauthbearer_set_token(token, expiry,
"", emptyList, errstr);

if (err != RdKafka::ERR_NO_ERROR) {
Nan::ThrowError(errstr.c_str());
return;
}
}

info.GetReturnValue().Set(Nan::Null());
}

NAN_METHOD(Connection::NodeGetMetadata) {
Nan::HandleScope scope;
Expand Down
11 changes: 10 additions & 1 deletion src/connection.h
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,13 @@ namespace NodeKafka {
*/

class Connection : public Nan::ObjectWrap {
public:
struct OauthBearerToken
{
std::string token;
int64_t expiry;
};

public:
bool IsConnected();
bool IsClosing();

Expand Down Expand Up @@ -82,10 +88,13 @@ class Connection : public Nan::ObjectWrap {
Conf* m_tconfig;
std::string m_errstr;

std::unique_ptr<OauthBearerToken> m_init_oauthToken;

uv_rwlock_t m_connection_lock;

RdKafka::Handle* m_client;

static NAN_METHOD(NodeSetToken);
static NAN_METHOD(NodeConfigureCallbacks);
static NAN_METHOD(NodeGetMetadata);
static NAN_METHOD(NodeQueryWatermarkOffsets);
Expand Down
19 changes: 19 additions & 0 deletions src/kafka-consumer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,24 @@ Baton KafkaConsumer::Connect() {
return Baton(RdKafka::ERR__STATE, errstr);
}

if (m_init_oauthToken) {
scoped_shared_write_lock lock(m_connection_lock);
if (m_init_oauthToken) {
std::list<std::string> emptyList;
std::string token = m_init_oauthToken->token;
int64_t expiry = m_init_oauthToken->expiry;
// needed for initial connection only
m_init_oauthToken.reset();

RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry,
"", emptyList, errstr);

if (err != RdKafka::ERR_NO_ERROR) {
return Baton(err, errstr);
}
}
}

if (m_partitions.size() > 0) {
m_client->resume(m_partitions);
}
Expand Down Expand Up @@ -499,6 +517,7 @@ void KafkaConsumer::Init(v8::Local<v8::Object> exports) {

Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
Nan::SetPrototypeMethod(tpl, "offsetsForTimes", NodeOffsetsForTimes);
Expand Down
20 changes: 20 additions & 0 deletions src/producer.cc
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ void Producer::Init(v8::Local<v8::Object> exports) {

Nan::SetPrototypeMethod(tpl, "connect", NodeConnect);
Nan::SetPrototypeMethod(tpl, "disconnect", NodeDisconnect);
Nan::SetPrototypeMethod(tpl, "setToken", NodeSetToken);
Nan::SetPrototypeMethod(tpl, "getMetadata", NodeGetMetadata);
Nan::SetPrototypeMethod(tpl, "queryWatermarkOffsets", NodeQueryWatermarkOffsets); // NOLINT
Nan::SetPrototypeMethod(tpl, "poll", NodePoll);
Expand Down Expand Up @@ -183,6 +184,25 @@ Baton Producer::Connect() {
return Baton(RdKafka::ERR__STATE, errstr);
}

if (m_init_oauthToken) {
scoped_shared_write_lock lock(m_connection_lock);
if (m_init_oauthToken) {
std::list<std::string> emptyList;
std::string token = m_init_oauthToken->token;
int64_t expiry = m_init_oauthToken->expiry;
// needed for initial connection only
m_init_oauthToken.reset();

RdKafka::ErrorCode err = m_client->oauthbearer_set_token(token, expiry,
"", emptyList, errstr);

if (err != RdKafka::ERR_NO_ERROR) {
return Baton(err, errstr);
}
}
}


return Baton(RdKafka::ERR_NO_ERROR);
}

Expand Down
Loading

0 comments on commit 7d339f7

Please sign in to comment.