Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-32791 Partition the index LRU cache to reduce contention #19200

Merged
merged 5 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
197 changes: 144 additions & 53 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -695,40 +695,19 @@ class DelayedCacheEntryReleaser : public IRemovedMappingCallback
};

typedef OwningSimpleHashTableOf<CNodeMapping, CKeyIdAndPos> CNodeTable;
class CNodeMRUCache final : public CMRUCacheOf<CKeyIdAndPos, CNodeCacheEntry, CNodeMapping, CNodeTable>
class CNodeMRUSubCache final : public CMRUCacheOf<CKeyIdAndPos, CNodeCacheEntry, CNodeMapping, CNodeTable>
{
std::atomic<size32_t> sizeInMem{0};
size32_t memLimit = 0;
std::shared_ptr<hpccMetrics::CustomMetric<RelaxedAtomic<__uint64>>> pNumHits = nullptr;
std::shared_ptr<hpccMetrics::CustomMetric<RelaxedAtomic<__uint64>>> pNumAdds = nullptr;
std::shared_ptr<hpccMetrics::CustomMetric<RelaxedAtomic<__uint64>>> pNumDups = nullptr;
std::shared_ptr<hpccMetrics::CustomMetric<RelaxedAtomic<__uint64>>> pNumEvicts = nullptr;
public:
mutable CriticalSection lock;
RelaxedAtomic<__uint64> numHits{0};
RelaxedAtomic<__uint64> numAdds{0};
RelaxedAtomic<__uint64> numDups{0};
RelaxedAtomic<__uint64> numEvicts{0};
bool enabled = false;
CNodeMRUCache(CacheType cacheType)
{
StringBuffer name, desc;
const char *typeText = cacheTypeText[cacheType];
name.appendf("jhtree.cache.%s.hits", typeText);
desc.appendf("The number of cache hits on the jhtree %s cache", typeText);
pNumHits = hpccMetrics::registerCustomMetric(name, desc, hpccMetrics::METRICS_COUNTER, numHits, SMeasureCount);
name.clear().appendf("jhtree.cache.%s.adds", typeText);
desc.clear().appendf("The number of cache adds to the jhtree %s cache", typeText);
pNumAdds = hpccMetrics::registerCustomMetric(name, desc, hpccMetrics::METRICS_COUNTER, numAdds, SMeasureCount);
name.clear().appendf("jhtree.cache.%s.dups", typeText);
desc.clear().appendf("The number of cache add collisions on the jhtree %s cache", typeText);
pNumDups = hpccMetrics::registerCustomMetric(name, desc, hpccMetrics::METRICS_COUNTER, numDups, SMeasureCount);
name.clear().appendf("jhtree.cache.%s.evictions", typeText);
desc.clear().appendf("The number of nodes evicted from the jhtree %s cache", typeText);
pNumEvicts = hpccMetrics::registerCustomMetric(name, desc, hpccMetrics::METRICS_COUNTER, numEvicts, SMeasureCount);
}

size32_t setMemLimit(size32_t _memLimit)
{
enabled = _memLimit != 0;
size32_t oldMemLimit = memLimit;
memLimit = _memLimit;
if (full())
Expand Down Expand Up @@ -784,7 +763,8 @@ class CNodeMRUCache final : public CMRUCacheOf<CKeyIdAndPos, CNodeCacheEntry, CN
}
void reportEntries(ICacheInfoRecorder &cacheInfo)
{
Owned<CNodeMRUCache::CMRUIterator> iter = getIterator();
CriticalBlock block(lock);
Owned<CNodeMRUSubCache::CMRUIterator> iter = getIterator();
ForEach(*iter)
{
CNodeMapping &mapping = iter->query();
Expand All @@ -804,24 +784,138 @@ class CNodeMRUCache final : public CMRUCacheOf<CKeyIdAndPos, CNodeCacheEntry, CN
void traceState(StringBuffer & out)
{
//Should be safe to call outside of a critical section, but values may be inconsistent
out.append(table.ordinality()).append(":").append(sizeInMem);
out.appendf(" [%" I64F "u:%" I64F "u:%" I64F "u:%" I64F "u]", numHits.load(), numAdds.load(), numDups.load(), numEvicts.load());
}
unsigned __int64 getStatisticValue(StatisticKind kind) const
{
switch (kind)
{
case StNumCacheAdds:
return numAdds.load();
case StNumCacheHits:
return numHits.load();
case StNumCacheDuplicates:
return numDups.load();
case StNumCacheEvictions:
return numEvicts.load();
}
return 0;
}
};

// Maximum cost/benefit seen for 4 buckets, which does not skew the LRU list too much
static constexpr unsigned cacheBits = 2;
static constexpr unsigned cacheBuckets = 1U << cacheBits;
static constexpr unsigned cacheShift = 32 - cacheBits;


class CNodeMRUCache
{
using IMetric = hpccMetrics::IMetric;
class CacheMetric final : public hpccMetrics::MetricBase
{
public:
CacheMetric(const char * name, const char * desc, StatisticKind _kind, CNodeMRUCache & _cache) :
MetricBase(name, desc, hpccMetrics::METRICS_COUNTER, SMeasureCount, hpccMetrics::MetricMetaData()),
cache(_cache), kind(_kind)
{
}

virtual __uint64 queryValue() const
{
return cache.getStatisticValue(kind);
}

protected:
CNodeMRUCache & cache;
StatisticKind kind;
};

void createCacheMetric(const char * name, const char * desc, StatisticKind kind)
{
std::shared_ptr<IMetric> pMetric = std::shared_ptr<IMetric>(new CacheMetric(name, desc, kind, *this));
hpccMetrics::queryMetricsManager().addMetric(pMetric);
metrics.push_back(std::move(pMetric));
}

public:
CNodeMRUCache(CacheType cacheType)
{
StringBuffer name, desc;
const char *typeText = cacheTypeText[cacheType];
name.appendf("jhtree.cache.%s.hits", typeText);
desc.appendf("The number of cache hits on the jhtree %s cache", typeText);
createCacheMetric(name, desc, StNumCacheHits);
name.clear().appendf("jhtree.cache.%s.adds", typeText);
desc.clear().appendf("The number of cache adds to the jhtree %s cache", typeText);
createCacheMetric(name, desc, StNumCacheAdds);
name.clear().appendf("jhtree.cache.%s.dups", typeText);
desc.clear().appendf("The number of cache add collisions on the jhtree %s cache", typeText);
createCacheMetric(name, desc, StNumCacheDuplicates);
name.clear().appendf("jhtree.cache.%s.evictions", typeText);
desc.clear().appendf("The number of nodes evicted from the jhtree %s cache", typeText);
createCacheMetric(name, desc, StNumCacheEvictions);
}

inline unsigned getKeyHash(const CKeyIdAndPos & key)
{
return cache[0].getKeyHash(key);
}

__uint64 getStatisticValue(StatisticKind kind) const
{
__uint64 total = 0;
for (unsigned i =0; i < cacheBuckets; i++)
total += cache[i].getStatisticValue(kind);
return total;
}

void reportEntries(ICacheInfoRecorder &cacheInfo)
{
for (unsigned j = 0; j < cacheBuckets; j++)
cache[j].reportEntries(cacheInfo);
}

size32_t setCacheMem(size32_t newSize)
{
unsigned oldV = 0;
for (unsigned i=0; i < cacheBuckets; i++)
{
CriticalBlock block(cache[i].lock);
oldV += cache[i].setMemLimit(newSize/cacheBuckets);
}
enabled = newSize != 0;
return oldV;
}

void traceState(StringBuffer & out)
{
if (enabled)
{
out.append(table.ordinality()).append(":").append(sizeInMem);
out.appendf(" [%" I64F "u:%" I64F "u:%" I64F "u:%" I64F "u]", numHits.load(), numAdds.load(), numDups.load(), numEvicts.load());
for (unsigned j=0; j < cacheBuckets; j++)
{
if (j)
out.append(' ');
cache[j].traceState(out);
}
}
else
{
out.append("[disabled]");
}
}
CNodeMRUSubCache cache[cacheBuckets];
bool enabled;
protected:
std::vector<std::shared_ptr<IMetric>> metrics;
};


class CNodeCache : public CInterface
{
private:
mutable CriticalSection lock[CacheMax];
CNodeMRUCache cache[CacheMax] = { CacheBranch, CacheLeaf, CacheBlob };
std::vector<std::shared_ptr<hpccMetrics::IMetric>> metrics;
public:
CNodeCache(size32_t maxNodeMem, size32_t maxLeaveMem, size32_t maxBlobMem)
{
Expand Down Expand Up @@ -849,8 +943,11 @@ class CNodeCache : public CInterface
{
for (unsigned i=0; i < CacheMax; i++)
{
CriticalBlock block(lock[i]);
cache[i].kill();
for (unsigned j=0; j < cacheBuckets; j++)
{
CriticalBlock block(cache[i].cache[j].lock);
cache[i].cache[j].kill();
}
}
}
void traceState(StringBuffer & out)
Expand All @@ -872,9 +969,7 @@ class CNodeCache : public CInterface
protected:
size32_t setCacheMem(size32_t newSize, CacheType type)
{
CriticalBlock block(lock[type]);
unsigned oldV = cache[type].setMemLimit(newSize);
return oldV;
return cache[type].setCacheMem(newSize);
}
};

Expand Down Expand Up @@ -2661,10 +2756,7 @@ extern jhtree_decl void getNodeCacheInfo(ICacheInfoRecorder &cacheInfo)
void CNodeCache::getCacheInfo(ICacheInfoRecorder &cacheInfo)
{
for (unsigned i = 0; i < CacheMax; i++)
{
CriticalBlock block(lock[i]);
cache[i].reportEntries(cacheInfo);
}
}

//Use a critical section in each node to prevent multiple threads loading the same node at the same time.
Expand All @@ -2687,23 +2779,26 @@ const CJHTreeNode *CNodeCache::getCachedNode(const INodeLoader *keyIndex, unsign
CacheType cacheType = isTLK ? CacheBranch : (CacheType)type;

// check cacheEnabled[cacheType] avoid the critical section (and testing the flag within the critical section)
CNodeMRUCache & curCache = cache[cacheType];
if (unlikely(!curCache.enabled))
CNodeMRUCache & typeCache = cache[cacheType];
if (unlikely(!typeCache.enabled))
return keyIndex->loadNode(nullptr, pos, nullptr);

CKeyIdAndPos key(iD, pos);
unsigned hashcode = typeCache.getKeyHash(key);
unsigned subCache = cacheBits == 0 ? 0 : hashcode >> cacheShift;
CNodeMRUSubCache & curCache = typeCache.cache[subCache];

//Previously, this was implemented as:
// Lock, unlock. Load the page. Lock, check if it has been added, otherwise add.
//Now, it is coded as:
// Lock, add if missing, unlock. Lock a page-dependent-cr load() release lock.
//There will be the same number of critical section locks, but loading a page will contend on a different lock - so it should reduce contention.
CKeyIdAndPos key(iD, pos);
CriticalSection & cacheLock = lock[cacheType];
CriticalSection & cacheLock = curCache.lock;
Owned<CNodeCacheEntry> ownedCacheEntry; // ensure node gets cleaned up if it fails to load
bool alreadyExists = true;
{
DelayedCacheEntryReleaser delayedReleaser;
CNodeCacheEntry * cacheEntry;
unsigned hashcode = curCache.getKeyHash(key);

CLeavableCriticalBlock block(cacheLock);
cacheEntry = curCache.query(hashcode, &key);
Expand Down Expand Up @@ -2742,20 +2837,16 @@ const CJHTreeNode *CNodeCache::getCachedNode(const INodeLoader *keyIndex, unsign
try
{
//Move the atomic increments out of the critical section - they can be relatively expensive
if (likely(alreadyExists))
if (likely(ctx))
{
if (ctx) ctx->noteStatistic(hitStatId[cacheType], 1);
}
else
{
if (ctx) ctx->noteStatistic(addStatId[cacheType], 1);
//It is most likely that an item has been added - because if the entry already exists
//it will only reach this point if another thread has not already loaded the node.
if (unlikely(alreadyExists))
ctx->noteStatistic(hitStatId[cacheType], 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these backwards ?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unlikely() is correct - because the code only reaches this point if there was a match in the cache, but the node associated with that entry has not been loaded yet.
I will add a comment to clarify why.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

all good.

else
ctx->noteStatistic(addStatId[cacheType], 1);
}

//The common case is that this flag has already been set (by a previous add).
if (likely(ownedCacheEntry->isReady()))
return ownedCacheEntry->getNode();

//Shame that the hash code is recalculated - it might be possible to remove this.
cycle_t startCycles = get_cycles_now();
cycle_t fetchCycles = 0;
cycle_t startLoadCycles;
Expand Down
2 changes: 1 addition & 1 deletion system/jhtree/jhutil.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class CMRUCacheOf : public CInterface//, public IInterface
table.replace(*mapping);
mruList.enqueueHead(mapping);
}
unsigned getKeyHash(KEY & key) const
unsigned getKeyHash(const KEY & key) const
{
return table.getHashFromFindParam(&key);
}
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstatcodes.h
Original file line number Diff line number Diff line change
Expand Up @@ -324,6 +324,8 @@ enum StatisticKind
StNumCacheHits, // Generic 'cache hit' stats, potentially used for many caches; Roxie-specific stats are above
StNumCacheAdds, // Generic 'cache add/miss' stats, potentially used for many caches; Roxie-specific stats are above
StNumPeakCacheObjects, // Peak number of objects in a generic cache
StNumCacheDuplicates,
StNumCacheEvictions,
StMax,

//For any quantity there is potentially the following variants.
Expand Down
2 changes: 2 additions & 0 deletions system/jlib/jstats.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -997,6 +997,8 @@ static const constexpr StatisticMeta statsMetaData[StMax] = {
{ NUMSTAT(CacheHits), "The number of times an item was retrieved from a cache" },
{ NUMSTAT(CacheAdds), "The number of times an item was added to a cache" },
{ PEAKNUMSTAT(PeakCacheObjects), "High water mark for number of objects in a cache"},
{ NUMSTAT(CacheDuplicates), "The number of times an item was added to a cache by two threads at the same time" },
{ NUMSTAT(CacheEvictions), "The number of times an item was evicted from a cache" },
};

static MapStringTo<StatisticKind, StatisticKind> statisticNameMap(true);
Expand Down
Loading