Skip to content

Commit

Permalink
HPCC-32657 review changes 3
Browse files Browse the repository at this point in the history
Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Sep 26, 2024
1 parent 3d9fca1 commit 8ffeb78
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 81 deletions.
22 changes: 19 additions & 3 deletions helm/hpcc/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -811,12 +811,28 @@ Get image name
{{- end -}}

{{/*
Generates imagePattern as env variable
Generates image information into env. variables used at runtime for runtime platform version switching
*/}}
{{- define "hpcc.generateImageEnv" -}}
{{- /* Pass in a dictionary with root and me defined */ -}}
- name: imagePattern
value: {{ include "hpcc.imageName" . }}
{{- $baseImageRootName := "" -}}
{{- $baseImageVersion := "" -}}
{{- if .me.image -}}
{{- $baseImageRootName = printf "%s/%s" (.me.image.root | default .root.Values.global.image.root | default "hpccsystems") (.me.image.name | default .root.Values.global.image.name | default "platform-core") -}}
{{- else -}}
{{- $baseImageRootName = printf "%s/%s" (.root.Values.global.image.root | default "hpccsystems") (.root.Values.global.image.name | default "platform-core") -}}
{{- end -}}
{{- if .me.image -}}
{{- $baseImageVersion = .me.image.version | default .root.Values.global.image.version | default .root.Chart.Version -}}
{{- else -}}
{{- $baseImageVersion = .root.Values.global.image.version | default .root.Chart.Version -}}
{{- end }}
- name: baseImageRootName
value: {{ $baseImageRootName }}
- name: baseImageVersion
value: {{ $baseImageVersion }}
- name: runtimeImageVersion
value: _HPCC_JOB_VERSION_
{{- end -}}

{{/*
Expand Down
28 changes: 18 additions & 10 deletions system/jlib/jcontainerized.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,28 +222,36 @@ bool applyYaml(const char *componentName, const char *wuid, const char *job, con

VStringBuffer args("\"--workunit=%s\"", wuid);
args.append(" \"--k8sJob=true\"");
const char *runtimeImageVersion = getenv("baseImageVersion"); // runtime image version will equal base version unless changed dynamically below
for (const auto &p: extraParams)
{
// special handling _HPCC_JOB_VERSION_, not just a straight substitution
if (streq(p.first.c_str(), "_HPCC_JOB_VERSION_"))
{
const char *imagePattern = getenv("imagePattern");
if (imagePattern)
// locates "image: <baseImageRootName>:<baseImageVersion>" and replaces with "image: <baseImageRootName>:<p.second>"
const char *baseImageRootName = getenv("baseImageRootName");
if (!isEmptyString(baseImageRootName) && !isEmptyString(runtimeImageVersion))
{
const char *imageVersionStart = strstr(imagePattern, ":");
if (imageVersionStart)
{
VStringBuffer oriImagePatternSpec("image: %s", imagePattern);
VStringBuffer newImagePatternSpec("image: %.*s:%s", (int)(imageVersionStart-imagePattern), imagePattern, p.second.c_str());
jobYaml.replaceString(oriImagePatternSpec, newImagePatternSpec);
DBGLOG("Job image version changed from '%s' to '%s'", imageVersionStart, p.second.c_str());
}
VStringBuffer oriImagePatternSpec("image: %s:%s", baseImageRootName, runtimeImageVersion);
VStringBuffer newImagePatternSpec("image: %s:%s", baseImageRootName, p.second.c_str());
jobYaml.replaceString(oriImagePatternSpec, newImagePatternSpec);
DBGLOG("Job image version changed from '%s' to '%s'", runtimeImageVersion, p.second.c_str());
runtimeImageVersion = p.second.c_str(); // used to substitute _HPCC_JOB_VERSION_ in jobYaml (used in runtimeImageVersion env variable)
}
}
else if (hasPrefix(p.first.c_str(), "_HPCC_", false)) // job yaml substitution
jobYaml.replaceString(p.first.c_str(), p.second.c_str());
else
args.append(" \"--").append(p.first.c_str()).append('=').append(p.second.c_str()).append("\"");
}
// always substitute _HPCC_JOB_VERSION_ - runtimeImageVersion is either the original baseImageVersion or the one from _HPCC_JOB_VERSION_ from above)
if (isEmptyString(runtimeImageVersion)) // if it were empty, it implies there's an incompatible helm chart/runtime image mismatch
{
// we replace _HPCC_JOB_VERSION_ (helm env runtimeImageVersion) with an empty string
// so that the engine knows it can't use it.
runtimeImageVersion = "";
}
jobYaml.replaceString("_HPCC_JOB_VERSION_", runtimeImageVersion);
jobYaml.replaceString("_HPCC_ARGS_", args.str());

// retrySecs=0 - I am not sure want to retry this command systematically..
Expand Down
114 changes: 46 additions & 68 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1450,6 +1450,10 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
VStringBuffer multiJobLingerQueueName("%s_lingerqueue", globals->queryProp("@name"));
StringBuffer instance("thorinstance_");

// NB: in k8s a Thor instance is explicitly started to run a specific wuid/graph
// it will not listen/receive another job/graph until the 1st explicit request the job
// started to is complete.

if (multiJobLinger)
{
StringBuffer thorQueueName;
Expand All @@ -1469,30 +1473,16 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
StringBuffer currentWuid(wuid);

CTimeMon lingerTimer(lingerPeriod); // NB: reset after it actually runs a job
StringBuffer runtimePlatformVersion;

// The imagePattern/baseImageVersion corresponds to the helm chart image version
const char *imagePattern = getenv("imagePattern");
const char *baseImageVersion = nullptr;
if (isEmptyString(imagePattern))
// baseImageVersion corresponds to the helm chart image version
const char *baseImageVersion = getenv("baseImageVersion");
// runtimeImageVersion will either match baseImageVersion, or have been set in the yaml to the runtime "platformVersion"
const char *runtimeImageVersion = getenv("runtimeImageVersion");
bool platformVersioningAvailable = true;
if (isEmptyString(baseImageVersion) || isEmptyString(runtimeImageVersion))
{
IWARNLOG("imagePattern not set in environment");
baseImageVersion = "unknown";
runtimePlatformVersion.set(baseImageVersion);
imagePattern = nullptr;
}
else
{
baseImageVersion = strrchr(imagePattern, ':');
if (!baseImageVersion)
{
IWARNLOG("imagePattern has unexpected format: %s", imagePattern);
baseImageVersion = "unknown";
runtimePlatformVersion.set(baseImageVersion);
imagePattern = nullptr;
}
else
baseImageVersion++;
IWARNLOG("baseImageVersion or runtimeImageVersion missing from environment");
platformVersioningAvailable = false;
}
while (true)
{
Expand Down Expand Up @@ -1521,64 +1511,52 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
}
else
{
SCMStringBuffer jobVersion;
bool mismatch = false;
if (imagePattern) // null if failed to parse (see above), will continue without looking at versioning
bool runJob = true;
if (platformVersioningAvailable)
{
SCMStringBuffer jobVersion;
workunit->getDebugValue("platformVersion", jobVersion);
if (0 == runtimePlatformVersion.length()) // 1st job
if (jobVersion.length())
{
if (jobVersion.length())
runtimePlatformVersion.set(jobVersion.str());
else
runtimePlatformVersion.set(baseImageVersion); // NB: this is the original helm generated image version
if (!streq(jobVersion.str(), runtimeImageVersion))
runJob = false;
}
else
else if (!streq(baseImageVersion, runtimeImageVersion))
{
// runtimePlatformVersion has been set by the 1st job that ran, either to its "platformVersion" or the baseImageVersion
// This is a custom runtime version, which did not specify a jobVersion,
// meaning it intended to use the regular baseImageVersion.
// Therefore we mismatch
runJob = false;
}
if (!runJob) // version mismatch, delay and queue for other instance to take
{
assertex(thorQueue); // it should never be possible for a non-lingering Thor to have a mismatch

// This Thor has picked up a job that has submitted with a different #option platformVersion.
// requeue it, and wait a bit, so that it can either be picked up by an existing compatible Thor, or
// an agent

VStringBuffer job("%s/%s/%s", currentWfId.str(), currentWuid.str(), currentGraphName.str());
Owned<IJobQueueItem> item = createJobQueueItem(job);
item->setOwner(workunit->queryUser());
item->setPriority(workunit->getPriorityValue());
thorQueue->enqueue(item.getClear());
currentWuid.clear();
constexpr unsigned pauseSecs = 10;
if (jobVersion.length())
{
if (!streq(jobVersion.str(), runtimePlatformVersion.str()))
mismatch = true;
}
else if (!streq(baseImageVersion, runtimePlatformVersion.str()))
{
// The 1st job to run set runtimePlatformVersion to something other than baseImageVersion
// Therefore this is a custom runtime version, which did not specify a jobVersion,
// meaning it intended to use the regular baseImageVersion.
// Therefore we mismatch
mismatch = true;
}
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version=%s). Pausing for %u seconds", job.str(), runtimeImageVersion, jobVersion.str(), pauseSecs);
else
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version not specified, uses original helm image version=%s). Pausing for %u seconds", job.str(), runtimeImageVersion, baseImageVersion, pauseSecs);
MilliSleep(pauseSecs*1000);
}
}
if (mismatch)
{
assertex(thorQueue); // it should never be possible for a non-lingering Thor to have a mismatch

// This Thor has picked up a job that has submitted with a different #option platformVersion.
// requeue it, and wait a bit, so that it can either be picked up by an existing compatible Thor, or
// an agent

VStringBuffer job("%s/%s/%s", currentWfId.str(), currentWuid.str(), currentGraphName.str());
Owned<IJobQueueItem> item = createJobQueueItem(job);
item->setOwner(workunit->queryUser());
item->setPriority(workunit->getPriorityValue());
thorQueue->enqueue(item.getClear());
currentWuid.clear();
constexpr unsigned pauseSecs = 10;
if (jobVersion.length())
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), jobVersion.str(), pauseSecs);
else
WARNLOG("Job=%s requeued due to version mismatch (this Thor version=%s, Job version not specified, uses original helm image version=%s). Pausing for %u seconds", job.str(), runtimePlatformVersion.str(), baseImageVersion, pauseSecs);
MilliSleep(pauseSecs*1000);
}
else
if (runJob)
{
JobNameScope activeJobName(currentWuid.str());
saveWuidToFile(currentWuid);
VStringBuffer msg("Executing: wuid=%s, graph=%s", currentWuid.str(), currentGraphName.str());
if (imagePattern && !streq(baseImageVersion, runtimePlatformVersion.str()))
msg.appendf(" (custom runtime version=%s)", runtimePlatformVersion.str());
if (platformVersioningAvailable && !streq(baseImageVersion, runtimeImageVersion))
msg.appendf(" (custom runtime version=%s)", runtimeImageVersion);
PROGLOG("%s", msg.str());

{
Expand Down

0 comments on commit 8ffeb78

Please sign in to comment.