Skip to content

Commit

Permalink
Kafka shared library not constructing/destructing properly
Browse files Browse the repository at this point in the history
Use Singleton pattern instead of std::once.
  • Loading branch information
dcamper committed Mar 21, 2024
1 parent 1b38646 commit 8c1d74b
Showing 1 changed file with 33 additions and 42 deletions.
75 changes: 33 additions & 42 deletions plugins/kafka/kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,12 +50,6 @@ namespace KafkaPlugin
// background activity
const __int32 POLL_TIMEOUT = 1000;

//--------------------------------------------------------------------------
// Static Variables
//--------------------------------------------------------------------------

static std::once_flag pubCacheInitFlag;

//--------------------------------------------------------------------------
// Static Methods (internal)
//--------------------------------------------------------------------------
Expand Down Expand Up @@ -697,7 +691,7 @@ namespace KafkaPlugin
*
* Class used to create and cache publisher objects and connections
*/
static class PublisherCacheObj
class PublisherCacheObj
{
private:

Expand Down Expand Up @@ -837,14 +831,23 @@ namespace KafkaPlugin

ObjMap cachedPublishers; //!< std::map of created Publisher object pointers
CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers
} *publisherCache;
};

//----------------------------------

static Singleton<PublisherCacheObj> publisherCache;

static PublisherCacheObj & queryPublisherCache()
{
return *publisherCache.query([] () { return new PublisherCacheObj; });
}

//--------------------------------------------------------------------------

/** @class PublisherCacheExpirerObj
* Class used to expire old publisher objects held within publisherCache
*/
static class PublisherCacheExpirerObj : public Thread
class PublisherCacheExpirerObj : public Thread
{
public:

Expand Down Expand Up @@ -877,11 +880,7 @@ namespace KafkaPlugin
{
while (shouldRun)
{
if (publisherCache)
{
publisherCache->expire();
}

queryPublisherCache().expire();
usleep(1000);
}

Expand All @@ -891,23 +890,29 @@ namespace KafkaPlugin
private:

std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
} *publisherCacheExpirer;
};

//----------------------------------

static Singleton<PublisherCacheExpirerObj> publisherCacheExpirer;

static PublisherCacheExpirerObj & queryPublisherCacheExpirer()
{
return *publisherCacheExpirer.query([] () { return new PublisherCacheExpirerObj; });
}

//--------------------------------------------------------------------------
// Lazy Initialization
// Singleton Initialization
//--------------------------------------------------------------------------

/**
* Make sure the publisher object cache is initialized as well as the
* associated background thread for expiring idle publishers. This is
* called only once.
* associated background thread for expiring idle publishers.
*/
static void setupPublisherCache()
{
KafkaPlugin::publisherCache = new KafkaPlugin::PublisherCacheObj();

KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj;
KafkaPlugin::publisherCacheExpirer->start();
queryPublisherCache();
queryPublisherCacheExpirer().start();
}

//--------------------------------------------------------------------------
Expand All @@ -916,9 +921,9 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key)
{
std::call_once(pubCacheInitFlag, setupPublisherCache);
setupPublisherCache();

Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);

pubObjPtr->sendMessage(message, key);

Expand All @@ -927,9 +932,9 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key)
{
std::call_once(pubCacheInitFlag, setupPublisherCache);
setupPublisherCache();

Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT);
Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);
std::string messageStr(message, rtlUtf8Size(lenMessage, message));
std::string keyStr(key, rtlUtf8Size(lenKey, key));

Expand Down Expand Up @@ -1088,29 +1093,15 @@ ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock* pb)

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
KafkaPlugin::publisherCache = NULL;
KafkaPlugin::publisherCacheExpirer = NULL;

return true;
}

MODULE_EXIT()
{
// Delete the background thread expiring items from the publisher cache
// before deleting the publisher cache
if (KafkaPlugin::publisherCacheExpirer)
{
KafkaPlugin::publisherCacheExpirer->stop();
delete(KafkaPlugin::publisherCacheExpirer);
KafkaPlugin::publisherCacheExpirer = NULL;
}

if (KafkaPlugin::publisherCache)
{
KafkaPlugin::publisherCache->deleteAll();
delete(KafkaPlugin::publisherCache);
KafkaPlugin::publisherCache = NULL;
}
KafkaPlugin::publisherCacheExpirer.destroy();
KafkaPlugin::publisherCache.destroy();

RdKafka::wait_destroyed(3000);
}

0 comments on commit 8c1d74b

Please sign in to comment.