Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/candidate-9.8.x'
Browse files Browse the repository at this point in the history
Signed-off-by: Gavin Halliday <[email protected]>
  • Loading branch information
ghalliday committed Dec 4, 2024
2 parents a8b1971 + a29e778 commit 38456ac
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 53 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -156,7 +156,9 @@
<entry><emphasis>compress</emphasis></entry>

<entry>Optional. A boolean TRUE or FALSE flag indicating whether to
compress the new file. If omitted, the default is FALSE.</entry>
compress the new file. If omitted, the default is TRUE in a
containerized deployment and FALSE in a bare-metal
deployment.</entry>
</row>

<row>
Expand Down Expand Up @@ -215,7 +217,7 @@

<entry>Override the number of parts to be created when spraying. The
default is 0 which means it will create the same number of parts as
the target cluster. </entry>
the target cluster.</entry>
</row>

<row>
Expand Down
6 changes: 4 additions & 2 deletions docs/EN_US/ECLStandardLibraryReference/SLR-Mods/SprayJson.xml
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,9 @@
<entry><emphasis>compress</emphasis></entry>

<entry>Optional. A boolean TRUE or FALSE flag indicating whether to
compress the new file. If omitted, the default is FALSE.</entry>
compress the new file. If omitted, the default is TRUE in a
containerized deployment and FALSE in a bare-metal
deployment.</entry>
</row>

<row>
Expand Down Expand Up @@ -242,7 +244,7 @@

<entry>Override the number of parts to be created when spraying. The
default is 0 which means it will create the same number of parts as
the target cluster. </entry>
the target cluster.</entry>
</row>

<row>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,9 @@
<entry><emphasis>compress</emphasis></entry>

<entry>Optional. A boolean TRUE or FALSE flag indicating whether to
compress the new file. If omitted, the default is FALSE.</entry>
compress the new file. If omitted, the default is TRUE in a
containerized deployment and FALSE in a bare-metal
deployment.</entry>
</row>

<row>
Expand Down Expand Up @@ -300,7 +302,7 @@

<entry>Override the number of parts to be created when spraying. The
default is 0 which means it will create the same number of parts as
the target cluster. </entry>
the target cluster.</entry>
</row>

<row>
Expand Down
6 changes: 4 additions & 2 deletions docs/EN_US/ECLStandardLibraryReference/SLR-Mods/SprayXML.xml
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@
<entry><emphasis>compress</emphasis></entry>

<entry>Optional. A boolean TRUE or FALSE flag indicating whether to
compress the new file. If omitted, the default is FALSE.</entry>
compress the new file. If omitted, the default is TRUE in a
containerized deployment and FALSE in a bare-metal
deployment.</entry>
</row>

<row>
Expand Down Expand Up @@ -241,7 +243,7 @@

<entry>Override the number of parts to be created when spraying. The
default is 0 which means it will create the same number of parts as
the target cluster. </entry>
the target cluster.</entry>
</row>

<row>
Expand Down
27 changes: 27 additions & 0 deletions helm/hpcc/docs/expert.md
Original file line number Diff line number Diff line change
Expand Up @@ -90,3 +90,30 @@ its own cache. Threads/channels within a process share that process's cache.
Optional list of bash commands to execute within an init container in pods that use this plane.
This can be used to validate that the plane is healthy, e.g. that it is mounted as expected.
If the script returns a non-zero result, the init container and therefore the pod will fail.

## blockedFileIOKB (unsigned)

The optimal size to read and write sequential file io (e.g. for Azure Blob storage set to 4096)

## blockedRandomIOKB (unsigned)

The optimal size of random file io reads (e.g. index lookups).

## fileSyncWriteClose (boolean)

Perform a fsync ahead of file close operations.
Default: false

## concurrentWriteSupport (boolean)

Plane supports concurrent writing to a single physical file.
Default: false

## writeSyncMarginMs (unsigned)

Minimum time period between the publication of a logical file and when it can
be read. This setting will introduce a delay if a read operation is within this
margin period.
Should be set on planes backed by storage types that do not guarantee files are
ready to be read by any other consumer immediately, e.g. Azure Blob storage.
Default: 0
12 changes: 8 additions & 4 deletions system/jhtree/jhtree.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1347,8 +1347,9 @@ CDiskKeyIndex::CDiskKeyIndex(unsigned _iD, IFileIO *_io, const char *_name, bool
blockedIOSize = _blockedIOSize;
io.setown(_io);
KeyHdr hdr;
if (io->read(0, sizeof(hdr), &hdr) != sizeof(hdr))
throw MakeStringException(0, "Failed to read key header: file too small, could not read %u bytes", (unsigned) sizeof(hdr));
offset_t sizeRead = io->read(0, sizeof(hdr), &hdr);
if (sizeRead != sizeof(hdr))
throw MakeStringException(0, "Failed to read key '%s' header: file too small, could not read %u bytes - read %u", _name, (unsigned) sizeof(hdr), (unsigned)sizeRead);

#ifdef _DEBUG
//In debug mode always use the trailing header if it is available to ensure that code path is tested
Expand All @@ -1358,8 +1359,11 @@ CDiskKeyIndex::CDiskKeyIndex(unsigned _iD, IFileIO *_io, const char *_name, bool
#endif
{
_WINREV(hdr.nodeSize);
if (!io->read(io->size() - hdr.nodeSize, sizeof(hdr), &hdr))
throw MakeStringException(4, "Invalid key %s: failed to read trailing key header", _name);
offset_t actualSize = io->size();
offset_t readOffset = actualSize - hdr.nodeSize;
sizeRead = io->read(readOffset, sizeof(hdr), &hdr);
if (sizeRead != sizeof(hdr))
throw MakeStringException(4, "Invalid key %s: failed to read trailing key header at offset %llu, read %u", _name, readOffset, (unsigned)sizeRead);
}
init(hdr, isTLK);
}
Expand Down
2 changes: 1 addition & 1 deletion system/jlib/jio.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ extern jlib_decl size32_t checked_pread(const char * filename, int file, void *b
readNow = 0;
break;
}
throw makeErrnoExceptionV(errno, "checked_pread for file '%s'", filename);
throw makeErrnoExceptionV(errno, "checked_pread for file '%s' @%lld", filename, pos);
}
}
else if (!readNow)
Expand Down
77 changes: 37 additions & 40 deletions thorlcr/master/thgraphmanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1360,9 +1360,10 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
else
{
unsigned lingerPeriod = globals->getPropInt("@lingerPeriod", defaultThorLingerPeriod)*1000;
dbgassertex(lingerPeriod>=1000); // NB: the schema or the default ensure the linger period is non-zero
bool multiJobLinger = globals->getPropBool("@multiJobLinger", defaultThorMultiJobLinger);
VStringBuffer multiJobLingerQueueName("%s_lingerqueue", globals->queryProp("@name"));
StringBuffer instance("thorinstance_"); // only used when multiJobLinger = false (and lingerPeriod>0)
StringBuffer instance("thorinstance_"); // only used when multiJobLinger = false

// NB: in k8s a Thor instance is explicitly started to run a specific wuid/graph
// it will not listen/receive another job/graph until the 1st explicit request the job
Expand All @@ -1382,7 +1383,7 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
}

StringBuffer currentWfId; // not filled/not used until recvNextGraph() is called.
if (!multiJobLinger && lingerPeriod)
if (!multiJobLinger)
{
// We avoid using getEndpointHostText here and get an IP instead, because the client pod communicating directly with this Thor manager,
// will not have the ability to resolve this pods hostname.
Expand Down Expand Up @@ -1486,7 +1487,7 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
jobManager->execute(workunit, currentWuid, currentGraphName, dummyAgentEp);

Owned<IWorkUnit> w = &workunit->lock();
if (!multiJobLinger && lingerPeriod)
if (!multiJobLinger)
w->setDebugValue(instance, "1", true);

if (jobManager->queryExitException())
Expand Down Expand Up @@ -1517,49 +1518,45 @@ void thorMain(ILogMsgHandler *logHandler, const char *wuid, const char *graphNam
}

currentGraphName.clear();
if (lingerPeriod)
unsigned lingerRemaining;
if (lingerTimer.timedout(&lingerRemaining))
break;
PROGLOG("Lingering time left: %.2f", ((float)lingerRemaining)/1000);
StringBuffer nextJob;
do
{
unsigned lingerRemaining;
if (lingerTimer.timedout(&lingerRemaining))
break;
PROGLOG("Lingering time left: %.2f", ((float)lingerRemaining)/1000);
StringBuffer nextJob;
do
StringBuffer wuid;
int ret = recvNextGraph(lingerRemaining, currentWuid.str(), currentWfId, wuid, currentGraphName);
if (ret > 0)
{
StringBuffer wuid;
int ret = recvNextGraph(lingerRemaining, currentWuid.str(), currentWfId, wuid, currentGraphName);
if (ret > 0)
{
currentWuid.set(wuid); // NB: will always be same if !multiJobLinger
break; // success
}
else if (ret < 0)
break; // timeout/abort
// else - reject/ignore duff message.
} while (!lingerTimer.timedout(&lingerRemaining));

// The following is true if no workunit/graph have been received
// MORE: I think it should also be executed if lingerPeriod is 0
if (0 == currentGraphName.length())
currentWuid.set(wuid); // NB: will always be same if !multiJobLinger
break; // success
}
else if (ret < 0)
break; // timeout/abort
// else - reject/ignore duff message.
} while (!lingerTimer.timedout(&lingerRemaining));


if (0 == currentGraphName.length())
{
if (!multiJobLinger)
{
if (!multiJobLinger)
// De-register the idle lingering entry.
Owned<IWorkUnitFactory> factory;
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
//Unlikely, but the workunit could have been deleted while we were lingering
//currentWuid can also be blank if the workunit this started for died before thor started
//processing the graph. This test covers both (unlikely) situations.
if (workunit)
{
// De-register the idle lingering entry.
Owned<IWorkUnitFactory> factory;
Owned<IConstWorkUnit> workunit;
factory.setown(getWorkUnitFactory());
workunit.setown(factory->openWorkUnit(currentWuid));
//Unlikely, but the workunit could have been deleted while we were lingering
//currentWuid can also be blank if the workunit this started for died before thor started
//processing the graph. This test covers both (unlikely) situations.
if (workunit)
{
Owned<IWorkUnit> w = &workunit->lock();
w->setDebugValue(instance, "0", true);
}
Owned<IWorkUnit> w = &workunit->lock();
w->setDebugValue(instance, "0", true);
}
break;
}
break;
}
}
thorQueue.clear();
Expand Down

0 comments on commit 38456ac

Please sign in to comment.