diff --git a/plugins/kafka/kafka.cpp b/plugins/kafka/kafka.cpp index 5ae3e9e24b5..02ffba67f3c 100644 --- a/plugins/kafka/kafka.cpp +++ b/plugins/kafka/kafka.cpp @@ -50,12 +50,6 @@ namespace KafkaPlugin // background activity const __int32 POLL_TIMEOUT = 1000; - //-------------------------------------------------------------------------- - // Static Variables - //-------------------------------------------------------------------------- - - static std::once_flag pubCacheInitFlag; - //-------------------------------------------------------------------------- // Static Methods (internal) //-------------------------------------------------------------------------- @@ -697,7 +691,7 @@ namespace KafkaPlugin * * Class used to create and cache publisher objects and connections */ - static class PublisherCacheObj + class PublisherCacheObj { private: @@ -837,14 +831,23 @@ namespace KafkaPlugin ObjMap cachedPublishers; //!< std::map of created Publisher object pointers CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers - } *publisherCache; + }; + + //---------------------------------- + + static Singleton publisherCache; + + static PublisherCacheObj & queryPublisherCache() + { + return *publisherCache.query([] () { return new PublisherCacheObj; }); + } //-------------------------------------------------------------------------- /** @class PublisherCacheExpirerObj * Class used to expire old publisher objects held within publisherCache */ - static class PublisherCacheExpirerObj : public Thread + class PublisherCacheExpirerObj : public Thread { public: @@ -877,11 +880,7 @@ namespace KafkaPlugin { while (shouldRun) { - if (publisherCache) - { - publisherCache->expire(); - } - + queryPublisherCache().expire(); usleep(1000); } @@ -891,23 +890,29 @@ namespace KafkaPlugin private: std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop - } *publisherCacheExpirer; + }; + + //---------------------------------- + + static Singleton publisherCacheExpirer; + + static PublisherCacheExpirerObj & queryPublisherCacheExpirer() + { + return *publisherCacheExpirer.query([] () { return new PublisherCacheExpirerObj; }); + } //-------------------------------------------------------------------------- - // Lazy Initialization + // Singleton Initialization //-------------------------------------------------------------------------- /** * Make sure the publisher object cache is initialized as well as the - * associated background thread for expiring idle publishers. This is - * called only once. + * associated background thread for expiring idle publishers. */ static void setupPublisherCache() { - KafkaPlugin::publisherCache = new KafkaPlugin::PublisherCacheObj(); - - KafkaPlugin::publisherCacheExpirer = new KafkaPlugin::PublisherCacheExpirerObj; - KafkaPlugin::publisherCacheExpirer->start(); + queryPublisherCache(); + queryPublisherCacheExpirer().start(); } //-------------------------------------------------------------------------- @@ -916,9 +921,9 @@ namespace KafkaPlugin ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key) { - std::call_once(pubCacheInitFlag, setupPublisherCache); + setupPublisherCache(); - Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT); + Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT); pubObjPtr->sendMessage(message, key); @@ -927,9 +932,9 @@ namespace KafkaPlugin ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key) { - std::call_once(pubCacheInitFlag, setupPublisherCache); + setupPublisherCache(); - Publisher* pubObjPtr = publisherCache->getPublisher(brokers, topic, POLL_TIMEOUT); + Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT); std::string messageStr(message, rtlUtf8Size(lenMessage, message)); std::string keyStr(key, rtlUtf8Size(lenKey, key)); @@ -1088,9 +1093,6 @@ ECL_KAFKA_API bool getECLPluginDefinition(ECLPluginDefinitionBlock* pb) MODULE_INIT(INIT_PRIORITY_STANDARD) { - KafkaPlugin::publisherCache = NULL; - KafkaPlugin::publisherCacheExpirer = NULL; - return true; } @@ -1098,19 +1100,8 @@ MODULE_EXIT() { // Delete the background thread expiring items from the publisher cache // before deleting the publisher cache - if (KafkaPlugin::publisherCacheExpirer) - { - KafkaPlugin::publisherCacheExpirer->stop(); - delete(KafkaPlugin::publisherCacheExpirer); - KafkaPlugin::publisherCacheExpirer = NULL; - } - - if (KafkaPlugin::publisherCache) - { - KafkaPlugin::publisherCache->deleteAll(); - delete(KafkaPlugin::publisherCache); - KafkaPlugin::publisherCache = NULL; - } + KafkaPlugin::publisherCacheExpirer.destroy(); + KafkaPlugin::publisherCache.destroy(); RdKafka::wait_destroyed(3000); }