From 0ee7612d4bb94d2ea657aae48b52ab0f42ef2722 Mon Sep 17 00:00:00 2001 From: "Dan S. Camper" Date: Thu, 21 Mar 2024 14:17:33 -0500 Subject: [PATCH] Refactor expiration --- plugins/kafka/kafka.cpp | 100 ++++++---------------------------------- 1 file changed, 15 insertions(+), 85 deletions(-) diff --git a/plugins/kafka/kafka.cpp b/plugins/kafka/kafka.cpp index 02ffba67f3c..5903ebbd11f 100644 --- a/plugins/kafka/kafka.cpp +++ b/plugins/kafka/kafka.cpp @@ -708,6 +708,15 @@ namespace KafkaPlugin } + /** + * Destructor + * + */ + ~PublisherCacheObj() + { + deleteAll(); + } + void deleteAll() { CriticalBlock block(lock); @@ -735,8 +744,6 @@ namespace KafkaPlugin { if (!cachedPublishers.empty()) { - CriticalBlock block(lock); - time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS; __int32 expireCount = 0; @@ -816,6 +823,9 @@ namespace KafkaPlugin { DBGLOG("Kafka: Created and cached new publisher object: %s @ %s", topic.c_str(), brokers.c_str()); } + + // Expire any old publishers before returning the new one + expire(); } } @@ -833,86 +843,14 @@ namespace KafkaPlugin CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers }; - //---------------------------------- - - static Singleton publisherCache; - - static PublisherCacheObj & queryPublisherCache() - { - return *publisherCache.query([] () { return new PublisherCacheObj; }); - } - - //-------------------------------------------------------------------------- - - /** @class PublisherCacheExpirerObj - * Class used to expire old publisher objects held within publisherCache - */ - class PublisherCacheExpirerObj : public Thread - { - public: - - PublisherCacheExpirerObj() - : Thread("Kafka::PublisherExpirer"), - shouldRun(false) - { - - } - - virtual void start() - { - if (!isAlive()) - { - shouldRun = true; - Thread::start(false); - } - } - - virtual void stop() - { - if (isAlive()) - { - shouldRun = false; - join(); - } - } - - virtual int run() - { - while (shouldRun) - { - queryPublisherCache().expire(); - usleep(1000); - } - - return 0; - } - - private: - - std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop - }; - - //---------------------------------- - - static Singleton publisherCacheExpirer; - - static PublisherCacheExpirerObj & queryPublisherCacheExpirer() - { - return *publisherCacheExpirer.query([] () { return new PublisherCacheExpirerObj; }); - } - //-------------------------------------------------------------------------- // Singleton Initialization //-------------------------------------------------------------------------- - /** - * Make sure the publisher object cache is initialized as well as the - * associated background thread for expiring idle publishers. - */ - static void setupPublisherCache() + static Singleton publisherCache; + static PublisherCacheObj & queryPublisherCache() { - queryPublisherCache(); - queryPublisherCacheExpirer().start(); + return *publisherCache.query([] () { return new PublisherCacheObj; }); } //-------------------------------------------------------------------------- @@ -921,8 +859,6 @@ namespace KafkaPlugin ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key) { - setupPublisherCache(); - Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT); pubObjPtr->sendMessage(message, key); @@ -932,8 +868,6 @@ namespace KafkaPlugin ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key) { - setupPublisherCache(); - Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT); std::string messageStr(message, rtlUtf8Size(lenMessage, message)); std::string keyStr(key, rtlUtf8Size(lenKey, key)); @@ -1098,10 +1032,6 @@ MODULE_INIT(INIT_PRIORITY_STANDARD) MODULE_EXIT() { - // Delete the background thread expiring items from the publisher cache - // before deleting the publisher cache - KafkaPlugin::publisherCacheExpirer.destroy(); KafkaPlugin::publisherCache.destroy(); - RdKafka::wait_destroyed(3000); }