Skip to content

Commit

Permalink
HPCC-31498 Kafka shared library not constructing/destructing properly
Browse files Browse the repository at this point in the history
Use Singleton pattern instead of std::once.
  • Loading branch information
dcamper committed Mar 28, 2024
1 parent 3bd0ef5 commit 11750e0
Showing 1 changed file with 43 additions and 125 deletions.
168 changes: 43 additions & 125 deletions plugins/kafka/kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ namespace KafkaPlugin
// background activity
const __int32 POLL_TIMEOUT = 1000;

//--------------------------------------------------------------------------
// Static Variables
//--------------------------------------------------------------------------

static std::once_flag pubCacheInitFlag;

//--------------------------------------------------------------------------
// Static Methods (internal)
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -697,7 +691,7 @@ namespace KafkaPlugin
*
* Class used to create and cache publisher objects and connections
*/
static class PublisherCacheObj
class PublisherCacheObj
{
private:

Expand All @@ -714,6 +708,15 @@ namespace KafkaPlugin

}

/**
* Destructor
*
*/
~PublisherCacheObj()
{
deleteAll();
}

void deleteAll()
{
CriticalBlock block(lock);
Expand All @@ -735,45 +738,40 @@ namespace KafkaPlugin

/**
* Remove previously-created objects that have been inactive
* for awhile
* for awhile; assumes a lock is held while modifying cachedPublishers
*/
void expire()
{
if (!cachedPublishers.empty())
{
CriticalBlock block(lock);
time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;

time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;

for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
for (ObjMap::iterator x = cachedPublishers.begin(); x != cachedPublishers.end(); /* increment handled explicitly */)
{
// Expire only if the publisher has been inactive and if
// there are no messages in the outbound queue
if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
{
// Expire only if the publisher has been inactive and if
// there are no messages in the outbound queue
if (x->second && x->second->getTimeTouched() < oldestAllowedTime && x->second->messagesWaitingInQueue() == 0)
{
// Shutdown the attached poller before deleting
x->second->shutdownPoller();
// Shutdown the attached poller before deleting
x->second->shutdownPoller();

// Delete the object
delete(x->second);
// Delete the object
delete(x->second);

// Erase from map
cachedPublishers.erase(x++);
// Erase from map
cachedPublishers.erase(x++);

++expireCount;
}
else
{
x++;
}
++expireCount;
}

if (doTrace(traceKafka) && expireCount > 0)
else
{
DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
x++;
}
}

if (doTrace(traceKafka) && expireCount > 0)
{
DBGLOG("Kafka: Expired %d cached publisher%s", expireCount, (expireCount == 1 ? "" : "s"));
}
}

/**
Expand Down Expand Up @@ -822,6 +820,9 @@ namespace KafkaPlugin
{
DBGLOG("Kafka: Created and cached new publisher object: %s @ %s", topic.c_str(), brokers.c_str());
}

// Expire any old publishers before returning the new one
expire();
}
}

Expand All @@ -837,77 +838,16 @@ namespace KafkaPlugin

ObjMap cachedPublishers; //!< std::map of created Publisher object pointers
CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers
} *publisherCache;

//--------------------------------------------------------------------------

/** @class PublisherCacheExpirerObj
* Class used to expire old publisher objects held within publisherCache
*/
static class PublisherCacheExpirerObj : public Thread
{
public:

PublisherCacheExpirerObj()
: Thread("Kafka::PublisherExpirer"),
shouldRun(false)
{

}

virtual void start()
{
if (!isAlive())
{
shouldRun = true;
Thread::start();
}
}

virtual void stop()
{
if (isAlive())
{
shouldRun = false;
join();
}
}

virtual int run()
{
while (shouldRun)
{
if (publisherCache)
{
publisherCache->expire();
}

usleep(1000);
}

return 0;
}

private:

std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
} *publisherCacheExpirer;
};

//--------------------------------------------------------------------------
// Lazy Initialization
// Singleton Initialization
//--------------------------------------------------------------------------

/**
* Make sure the publisher object cache is initialized as well as the
* associated background thread for expiring idle publishers. This is
* called only once.
*/
static void setupPublisherCache()
static Singleton<PublisherCacheObj> publisherCache;
static PublisherCacheObj & queryPublisherCache()
{
KafkaPlugin::publisherCache = new KafkaPlugin::PublisherCacheObj();

KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj;
KafkaPlugin::publisherCacheExpirer->start();
return *publisherCache.query([] () { return new PublisherCacheObj; });
}

//--------------------------------------------------------------------------
Expand All @@ -916,9 +856,7 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key)
{
std::call_once(pubCacheInitFlag, setupPublisherCache);

Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);

pubObjPtr->sendMessage(message, key);

Expand All @@ -927,9 +865,7 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key)
{
std::call_once(pubCacheInitFlag, setupPublisherCache);

Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);
std::string messageStr(message, rtlUtf8Size(lenMessage, message));
std::string keyStr(key, rtlUtf8Size(lenKey, key));

Expand Down Expand Up @@ -1088,29 +1024,11 @@ ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock* pb)

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
KafkaPlugin::publisherCache = NULL;
KafkaPlugin::publisherCacheExpirer = NULL;

return true;
}

MODULE_EXIT()
{
// Delete the background thread expiring items from the publisher cache
// before deleting the publisher cache
if (KafkaPlugin::publisherCacheExpirer)
{
KafkaPlugin::publisherCacheExpirer->stop();
delete(KafkaPlugin::publisherCacheExpirer);
KafkaPlugin::publisherCacheExpirer = NULL;
}

if (KafkaPlugin::publisherCache)
{
KafkaPlugin::publisherCache->deleteAll();
delete(KafkaPlugin::publisherCache);
KafkaPlugin::publisherCache = NULL;
}

KafkaPlugin::publisherCache.destroy();
RdKafka::wait_destroyed(3000);
}

0 comments on commit 11750e0

Please sign in to comment.