From b1d05640e06a099a0b270dea3c3534aa3202497e Mon Sep 17 00:00:00 2001 From: Jake Smith Date: Thu, 23 Nov 2023 18:40:34 +0000 Subject: [PATCH] HPCC-30854 Ensure partitioning of KJ parts between workers in k8s When mapping index parts to workers in containerized, treat all as if locally available, such that they will be round-robined into worker buckets. This will cause (when the default remoteKeyedLookup is true), workers to directly access a partition of the index parts and farm out request of the rest to other workers. The net result is to increase cache locality (both jhtree and within the Linux page cache). Also make some detailed logging conditional on logging level. Signed-off-by: Jake Smith --- thorlcr/activities/keyedjoin/thkeyedjoin.cpp | 10 ++++++++-- .../activities/keyedjoin/thkeyedjoinslave.cpp | 16 ++++++++-------- 2 files changed, 16 insertions(+), 10 deletions(-) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 5e709c770a1..d1eb7076b99 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -178,8 +178,14 @@ class CKeyedJoinMaster : public CMasterActivity { do { - INode &groupNode = dfsGroup.queryNode(gn); - if (partNode->equals(&groupNode)) + // NB: all parts are considered 'local' in containerized mode + // It will cause the parts to be striped across the group (and hence the workers), + // such that the parts will be partitioned, with each worker dealing with some parts + // locally via local key lookup handlers and the rest being handled remotely by other + // workers via remote key lookup handlers. + // remoteKeyedLookup=false will disabled this default behaviour, causing all parts + // to be handled locally by each worker. + if (isContainerized() || partNode->equals(&dfsGroup.queryNode(gn))) { /* NB: If there's >1 slave per node (e.g. slavesPerNode>1) then there are multiple matching node's in the dfsGroup * Which means a copy of a part may already be assigned to a cluster slave map. This check avoid handling it again if it has. diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index 72422294d1d..57b821e3688 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1030,7 +1030,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { StringBuffer log; getInfo(log).append(": ").append(msg); - PROGLOG("%s", log.str()); + LOG(MCthorDetailedDebugInfo, thorJob, "%s", log.str()); } virtual StringBuffer &getInfo(StringBuffer &info) const { @@ -1165,10 +1165,11 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } virtual void end() { -#ifdef _DEBUG - VStringBuffer log("processed: %" I64F "u", total); - trace(log); -#endif + if (!REJECTLOG(MCthorDetailedDebugInfo)) + { + VStringBuffer log("processed: %" I64F "u", total); + trace(log); + } } virtual void process(CThorExpandingRowArray &processing, unsigned selected) = 0; // IThreaded @@ -2931,9 +2932,8 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } } handlerContainer.init(); -#ifdef _DEBUG - handlerContainer.trace(); -#endif + if (!REJECTLOG(MCthorDetailedDebugInfo)) + handlerContainer.trace(); } public: IMPLEMENT_IINTERFACE_USING(PARENT);