Skip to content

Commit

Permalink
Refactor expiration
Browse files Browse the repository at this point in the history
  • Loading branch information
dcamper committed Mar 21, 2024
1 parent 8c1d74b commit 0ee7612
Showing 1 changed file with 15 additions and 85 deletions.
100 changes: 15 additions & 85 deletions plugins/kafka/kafka.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,15 @@ namespace KafkaPlugin

}

/**
* Destructor
*
*/
~PublisherCacheObj()
{
deleteAll();
}

void deleteAll()
{
CriticalBlock block(lock);
Expand Down Expand Up @@ -735,8 +744,6 @@ namespace KafkaPlugin
{
if (!cachedPublishers.empty())
{
CriticalBlock block(lock);

time_t oldestAllowedTime = time(NULL) - OBJECT_EXPIRE_TIMEOUT_SECONDS;
__int32 expireCount = 0;

Expand Down Expand Up @@ -816,6 +823,9 @@ namespace KafkaPlugin
{
DBGLOG("Kafka: Created and cached new publisher object: %s @ %s", topic.c_str(), brokers.c_str());
}

// Expire any old publishers before returning the new one
expire();
}
}

Expand All @@ -833,86 +843,14 @@ namespace KafkaPlugin
CriticalSection lock; //!< Mutex guarding modifications to cachedPublishers
};

//----------------------------------

static Singleton<PublisherCacheObj> publisherCache;

static PublisherCacheObj & queryPublisherCache()
{
return *publisherCache.query([] () { return new PublisherCacheObj; });
}

//--------------------------------------------------------------------------

/** @class PublisherCacheExpirerObj
* Class used to expire old publisher objects held within publisherCache
*/
class PublisherCacheExpirerObj : public Thread
{
public:

PublisherCacheExpirerObj()
: Thread("Kafka::PublisherExpirer"),
shouldRun(false)
{

}

virtual void start()
{
if (!isAlive())
{
shouldRun = true;
Thread::start(false);
}
}

virtual void stop()
{
if (isAlive())
{
shouldRun = false;
join();
}
}

virtual int run()
{
while (shouldRun)
{
queryPublisherCache().expire();
usleep(1000);
}

return 0;
}

private:

std::atomic_bool shouldRun; //!< If true, we should execute our thread's main event loop
};

//----------------------------------

static Singleton<PublisherCacheExpirerObj> publisherCacheExpirer;

static PublisherCacheExpirerObj & queryPublisherCacheExpirer()
{
return *publisherCacheExpirer.query([] () { return new PublisherCacheExpirerObj; });
}

//--------------------------------------------------------------------------
// Singleton Initialization
//--------------------------------------------------------------------------

/**
* Make sure the publisher object cache is initialized as well as the
* associated background thread for expiring idle publishers.
*/
static void setupPublisherCache()
static Singleton<PublisherCacheObj> publisherCache;
static PublisherCacheObj & queryPublisherCache()
{
queryPublisherCache();
queryPublisherCacheExpirer().start();
return *publisherCache.query([] () { return new PublisherCacheObj; });
}

//--------------------------------------------------------------------------
Expand All @@ -921,8 +859,6 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, const char* message, const char* key)
{
setupPublisherCache();

Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);

pubObjPtr->sendMessage(message, key);
Expand All @@ -932,8 +868,6 @@ namespace KafkaPlugin

ECL_KAFKA_API bool ECL_KAFKA_CALL publishMessage(ICodeContext* ctx, const char* brokers, const char* topic, size32_t lenMessage, const char* message, size32_t lenKey, const char* key)
{
setupPublisherCache();

Publisher* pubObjPtr = queryPublisherCache().getPublisher(brokers, topic, POLL_TIMEOUT);
std::string messageStr(message, rtlUtf8Size(lenMessage, message));
std::string keyStr(key, rtlUtf8Size(lenKey, key));
Expand Down Expand Up @@ -1098,10 +1032,6 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)

MODULE_EXIT()
{
// Delete the background thread expiring items from the publisher cache
// before deleting the publisher cache
KafkaPlugin::publisherCacheExpirer.destroy();
KafkaPlugin::publisherCache.destroy();

RdKafka::wait_destroyed(3000);
}

0 comments on commit 0ee7612

Please sign in to comment.