Skip to content

Commit

Permalink
HPCC-30854 Ensure partitioning of KJ parts between workers in k8s
Browse files Browse the repository at this point in the history
When mapping index parts to workers in containerized, treat all as
if locally available, such that they will be round-robined into
worker buckets. This will cause (when the default remoteKeyedLookup
is true), workers to directly access a partition of the index parts
and farm out request of the rest to other workers.
The net result is to increase cache locality (both jhtree and within
the Linux page cache).

Also make some detailed logging conditional on logging level.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Nov 23, 2023
1 parent e09e412 commit b1d0564
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 10 deletions.
10 changes: 8 additions & 2 deletions thorlcr/activities/keyedjoin/thkeyedjoin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,14 @@ class CKeyedJoinMaster : public CMasterActivity
{
do
{
INode &groupNode = dfsGroup.queryNode(gn);
if (partNode->equals(&groupNode))
// NB: all parts are considered 'local' in containerized mode
// It will cause the parts to be striped across the group (and hence the workers),
// such that the parts will be partitioned, with each worker dealing with some parts
// locally via local key lookup handlers and the rest being handled remotely by other
// workers via remote key lookup handlers.
// remoteKeyedLookup=false will disabled this default behaviour, causing all parts
// to be handled locally by each worker.
if (isContainerized() || partNode->equals(&dfsGroup.queryNode(gn)))
{
/* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup
* Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has.
Expand Down
16 changes: 8 additions & 8 deletions thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
{
StringBuffer log;
getInfo(log).append(": ").append(msg);
PROGLOG("%s", log.str());
LOG(MCthorDetailedDebugInfo, thorJob, "%s", log.str());
}
virtual StringBuffer &getInfo(StringBuffer &info) const
{
Expand Down Expand Up @@ -1165,10 +1165,11 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
virtual void end()
{
#ifdef _DEBUG
VStringBuffer log("processed: %" I64F "u", total);
trace(log);
#endif
if (!REJECTLOG(MCthorDetailedDebugInfo))
{
VStringBuffer log("processed: %" I64F "u", total);
trace(log);
}
}
virtual void process(CThorExpandingRowArray &processing, unsigned selected) = 0;
// IThreaded
Expand Down Expand Up @@ -2931,9 +2932,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem
}
}
handlerContainer.init();
#ifdef _DEBUG
handlerContainer.trace();
#endif
if (!REJECTLOG(MCthorDetailedDebugInfo))
handlerContainer.trace();
}
public:
IMPLEMENT_IINTERFACE_USING(PARENT);
Expand Down

0 comments on commit b1d0564

Please sign in to comment.