diff --git a/helm/hpcc/templates/_helpers.tpl b/helm/hpcc/templates/_helpers.tpl index 087b0962d83..ba2453bd2a3 100644 --- a/helm/hpcc/templates/_helpers.tpl +++ b/helm/hpcc/templates/_helpers.tpl @@ -811,12 +811,28 @@ Get image name {{- end -}} {{/* -Generates imagePattern as env variable +Generates image information into env. variables used at runtime for runtime platform version switching */}} {{- define "hpcc.generateImageEnv" -}} {{- /* Pass in a dictionary with root and me defined */ -}} -- name: imagePattern - value: {{ include "hpcc.imageName" . }} +{{- $baseImageRootName := "" -}} +{{- $baseImageVersion := "" -}} +{{- if .me.image -}} + {{- $baseImageRootName = printf "%s/%s" (.me.image.root | default .root.Values.global.image.root | default "hpccsystems") (.me.image.name | default .root.Values.global.image.name | default "platform-core") -}} +{{- else -}} + {{- $baseImageRootName = printf "%s/%s" (.root.Values.global.image.root | default "hpccsystems") (.root.Values.global.image.name | default "platform-core") -}} +{{- end -}} +{{- if .me.image -}} + {{- $baseImageVersion = .me.image.version | default .root.Values.global.image.version | default .root.Chart.Version -}} +{{- else -}} + {{- $baseImageVersion = .root.Values.global.image.version | default .root.Chart.Version -}} +{{- end }} +- name: baseImageRootName + value: {{ $baseImageRootName }} +- name: baseImageVersion + value: {{ $baseImageVersion }} +- name: runtimeImageVersion + value: _HPCC_JOB_VERSION_ {{- end -}} {{/* diff --git a/system/jlib/jcontainerized.cpp b/system/jlib/jcontainerized.cpp index 8e63e7b96ea..461cd316c84 100644 --- a/system/jlib/jcontainerized.cpp +++ b/system/jlib/jcontainerized.cpp @@ -222,21 +222,21 @@ bool applyYaml(const char *componentName, const char *wuid, const char *job, con VStringBuffer args("\"--workunit=%s\"", wuid); args.append(" \"--k8sJob=true\""); + const char *runtimeImageVersion = getenv("baseImageVersion"); // runtime image version will equal base version unless changed dynamically below for (const auto &p: extraParams) { + // special handling _HPCC_JOB_VERSION_, not just a straight substitution if (streq(p.first.c_str(), "_HPCC_JOB_VERSION_")) { - const char *imagePattern = getenv("imagePattern"); - if (imagePattern) + // locates "image: :" and replaces with "image: :" + const char *baseImageRootName = getenv("baseImageRootName"); + if (!isEmptyString(baseImageRootName) && !isEmptyString(runtimeImageVersion)) { - const char *imageVersionStart = strstr(imagePattern, ":"); - if (imageVersionStart) - { - VStringBuffer oriImagePatternSpec("image: %s", imagePattern); - VStringBuffer newImagePatternSpec("image: %.*s:%s", (int)(imageVersionStart-imagePattern), imagePattern, p.second.c_str()); - jobYaml.replaceString(oriImagePatternSpec, newImagePatternSpec); - DBGLOG("Job image version changed from '%s' to '%s'", imageVersionStart, p.second.c_str()); - } + VStringBuffer oriImagePatternSpec("image: %s:%s", baseImageRootName, runtimeImageVersion); + VStringBuffer newImagePatternSpec("image: %s:%s", baseImageRootName, p.second.c_str()); + jobYaml.replaceString(oriImagePatternSpec, newImagePatternSpec); + DBGLOG("Job image version changed from '%s' to '%s'", runtimeImageVersion, p.second.c_str()); + runtimeImageVersion = p.second.c_str(); // used to substitute _HPCC_JOB_VERSION_ in jobYaml (used in runtimeImageVersion env variable) } } else if (hasPrefix(p.first.c_str(), "_HPCC_", false)) // job yaml substitution @@ -244,6 +244,14 @@ bool applyYaml(const char *componentName, const char *wuid, const char *job, con else args.append(" \"--").append(p.first.c_str()).append('=').append(p.second.c_str()).append("\""); } + // always substitute _HPCC_JOB_VERSION_ - runtimeImageVersion is either the original baseImageVersion or the one from _HPCC_JOB_VERSION_ from above) + if (isEmptyString(runtimeImageVersion)) // if it were empty, it implies there's an incompatible helm chart/runtime image mismatch + { + // we replace _HPCC_JOB_VERSION_ (helm env runtimeImageVersion) with an empty string + // so that the engine knows it can't use it. + runtimeImageVersion = ""; + } + jobYaml.replaceString("_HPCC_JOB_VERSION_", runtimeImageVersion); jobYaml.replaceString("_HPCC_ARGS_", args.str()); // retrySecs=0 - I am not sure want to retry this command systematically.. diff --git a/thorlcr/master/thgraphmanager.cpp b/thorlcr/master/thgraphmanager.cpp index 50538b0774d..48cfc6e1e08 100644 --- a/thorlcr/master/thgraphmanager.cpp +++ b/thorlcr/master/thgraphmanager.cpp @@ -1450,6 +1450,10 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam VStringBuffer multiJobLingerQueueName("%s_lingerqueue", globals->queryProp("@name")); StringBuffer instance("thorinstance_"); + // NB: in k8s a Thor instance is explicitly started to run a specific wuid/graph + // it will not listen/receive another job/graph until the 1st explicit request the job + // started to is complete. + if (multiJobLinger) { StringBuffer thorQueueName; @@ -1469,30 +1473,16 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam StringBuffer currentWuid(wuid); CTimeMon lingerTimer(lingerPeriod); // NB: reset after it actually runs a job - StringBuffer runtimePlatformVersion; - // The imagePattern/baseImageVersion corresponds to the helm chart image version - const char *imagePattern = getenv("imagePattern"); - const char *baseImageVersion = nullptr; - if (isEmptyString(imagePattern)) + // baseImageVersion corresponds to the helm chart image version + const char *baseImageVersion = getenv("baseImageVersion"); + // runtimeImageVersion will either match baseImageVersion, or have been set in the yaml to the runtime "platformVersion" + const char *runtimeImageVersion = getenv("runtimeImageVersion"); + bool platformVersioningAvailable = true; + if (isEmptyString(baseImageVersion) || isEmptyString(runtimeImageVersion)) { - IWARNLOG("imagePattern not set in environment"); - baseImageVersion = "unknown"; - runtimePlatformVersion.set(baseImageVersion); - imagePattern = nullptr; - } - else - { - baseImageVersion = strrchr(imagePattern, ':'); - if (!baseImageVersion) - { - IWARNLOG("imagePattern has unexpected format: %s", imagePattern); - baseImageVersion = "unknown"; - runtimePlatformVersion.set(baseImageVersion); - imagePattern = nullptr; - } - else - baseImageVersion++; + IWARNLOG("baseImageVersion or runtimeImageVersion missing from environment"); + platformVersioningAvailable = false; } while (true) { @@ -1521,64 +1511,52 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam } else { - SCMStringBuffer jobVersion; - bool mismatch = false; - if (imagePattern) // null if failed to parse (see above), will continue without looking at versioning + bool runJob = true; + if (platformVersioningAvailable) { + SCMStringBuffer jobVersion; workunit->getDebugValue("platformVersion", jobVersion); - if (0 == runtimePlatformVersion.length()) // 1st job + if (jobVersion.length()) { - if (jobVersion.length()) - runtimePlatformVersion.set(jobVersion.str()); - else - runtimePlatformVersion.set(baseImageVersion); // NB: this is the original helm generated image version + if (!streq(jobVersion.str(), runtimeImageVersion)) + runJob = false; } - else + else if (!streq(baseImageVersion, runtimeImageVersion)) { - // runtimePlatformVersion has been set by the 1st job that ran, either to its "platformVersion" or the baseImageVersion + // This is a custom runtime version, which did not specify a jobVersion, + // meaning it intended to use the regular baseImageVersion. + // Therefore we mismatch + runJob = false; + } + if (!runJob) // version mismatch, delay and queue for other instance to take + { + assertex(thorQueue); // it should never be possible for a non-lingering Thor to have a mismatch + + // This Thor has picked up a job that has submitted with a different #option platformVersion. + // requeue it, and wait a bit, so that it can either be picked up by an existing compatible Thor, or + // an agent + + VStringBuffer job("%s/%s/%s", currentWfId.str(), currentWuid.str(), currentGraphName.str()); + Owned item = createJobQueueItem(job); + item->setOwner(workunit->queryUser()); + item->setPriority(workunit->getPriorityValue()); + thorQueue->enqueue(item.getClear()); + currentWuid.clear(); + constexpr unsigned pauseSecs = 10; if (jobVersion.length()) - { - if (!streq(jobVersion.str(), runtimePlatformVersion.str())) - mismatch = true; - } - else if (!streq(baseImageVersion, runtimePlatformVersion.str())) - { - // The 1st job to run set runtimePlatformVersion to something other than baseImageVersion - // Therefore this is a custom runtime version, which did not specify a jobVersion, - // meaning it intended to use the regular baseImageVersion. - // Therefore we mismatch - mismatch = true; - } + WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version=%s). Pausing for %u seconds", job.str(), runtimeImageVersion, jobVersion.str(), pauseSecs); + else + WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version not specified, uses original helm image version=%s). Pausing for %u seconds", job.str(), runtimeImageVersion, baseImageVersion, pauseSecs); + MilliSleep(pauseSecs*1000); } } - if (mismatch) - { - assertex(thorQueue); // it should never be possible for a non-lingering Thor to have a mismatch - - // This Thor has picked up a job that has submitted with a different #option platformVersion. - // requeue it, and wait a bit, so that it can either be picked up by an existing compatible Thor, or - // an agent - - VStringBuffer job("%s/%s/%s", currentWfId.str(), currentWuid.str(), currentGraphName.str()); - Owned item = createJobQueueItem(job); - item->setOwner(workunit->queryUser()); - item->setPriority(workunit->getPriorityValue()); - thorQueue->enqueue(item.getClear()); - currentWuid.clear(); - constexpr unsigned pauseSecs = 10; - if (jobVersion.length()) - WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), jobVersion.str(), pauseSecs); - else - WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version not specified, uses original helm image version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), baseImageVersion, pauseSecs); - MilliSleep(pauseSecs*1000); - } - else + if (runJob) { JobNameScope activeJobName(currentWuid.str()); saveWuidToFile(currentWuid); VStringBuffer msg("Executing: wuid=%s, graph=%s", currentWuid.str(), currentGraphName.str()); - if (imagePattern && !streq(baseImageVersion, runtimePlatformVersion.str())) - msg.appendf(" (custom runtime version=%s)", runtimePlatformVersion.str()); + if (platformVersioningAvailable && !streq(baseImageVersion, runtimeImageVersion)) + msg.appendf(" (custom runtime version=%s)", runtimeImageVersion); PROGLOG("%s", msg.str()); {