diff --git a/dali/base/dadfs.cpp b/dali/base/dadfs.cpp index dcf10aaf171..ed5d250e581 100644 --- a/dali/base/dadfs.cpp +++ b/dali/base/dadfs.cpp @@ -2912,7 +2912,7 @@ class CDistributedFileBase : implements INTERFACE, public CInterface if (history) queryAttributes().removeTree(history); } - void lockFileAttrLock(CFileAttrLock & attrLock) + virtual void lockFileAttrLock(CFileAttrLock & attrLock) { if (!attrLock.init(logicalName, DXB_File, RTM_LOCK_WRITE, conn, defaultTimeout, "CDistributedFile::lockFileAttrLock")) { @@ -6482,6 +6482,19 @@ class CDistributedSuperFile: public CDistributedFileBase return new cSubFileIterator(subfiles,supersub); } + virtual void lockFileAttrLock(CFileAttrLock & attrLock) override + { + if (!attrLock.init(logicalName, DXB_SuperFile, RTM_LOCK_WRITE, conn, defaultTimeout, "CDistributedFile::lockFileAttrLock")) + { + // In unlikely event File/Attr doesn't exist, must ensure created, commited and root connection is reloaded. + verifyex(attrLock.init(logicalName, DXB_SuperFile, RTM_LOCK_WRITE|RTM_CREATE_QUERY, conn, defaultTimeout, "CDistributedFile::lockFileAttrLock")); + attrLock.commit(); + conn->commit(); + conn->reload(); + root.setown(conn->getRoot()); + } + } + void updateFileAttrs() { if (subfiles.ordinality()==0) { diff --git a/dali/daliadmin/daadmin.cpp b/dali/daliadmin/daadmin.cpp index d1ea8bd9b02..e143850e3eb 100644 --- a/dali/daliadmin/daadmin.cpp +++ b/dali/daliadmin/daadmin.cpp @@ -3385,4 +3385,43 @@ void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct) } } +void cleanJobQueues(bool dryRun) +{ + Owned conn = querySDS().connect("/JobQueues", myProcessSession(), 0, SDS_LOCK_TIMEOUT); + if (!conn) + { + WARNLOG("Failed to connect to /JobQueues"); + return; + } + Owned queueIter = conn->queryRoot()->getElements("Queue"); + ForEach(*queueIter) + { + IPropertyTree &queue = queueIter->query(); + const char *name = queue.queryProp("@name"); + if (isEmptyString(name)) // should not be blank, but guard + continue; + PROGLOG("Processing queue: %s", name); + VStringBuffer queuePath("/JobQueues/Queue[@name=\"%s\"]", name); + Owned queueConn = querySDS().connect(queuePath, myProcessSession(), RTM_LOCK_WRITE, SDS_LOCK_TIMEOUT); + IPropertyTree *queueRoot = queueConn->queryRoot(); + + Owned clientIter = queueRoot->getElements("Client"); + std::vector toRemove; + ForEach (*clientIter) + { + IPropertyTree &client = clientIter->query(); + if (client.getPropInt("@connected") == 0) + toRemove.push_back(&client); + } + if (!dryRun) + { + for (auto &client: toRemove) + queue.removeTree(client); + } + PROGLOG("Job queue '%s': %s %u stale client entries", name, dryRun ? "dryrun, there are" : "removed", (unsigned)toRemove.size()); + queueConn->commit(); + queueConn.clear(); + } +} + } // namespace daadmin diff --git a/dali/daliadmin/daadmin.hpp b/dali/daliadmin/daadmin.hpp index 687d0d882b2..f0141760e03 100644 --- a/dali/daliadmin/daadmin.hpp +++ b/dali/daliadmin/daadmin.hpp @@ -95,5 +95,6 @@ extern DALIADMIN_API void daliping(const char *dalis, unsigned connecttime, unsi extern DALIADMIN_API void validateStore(bool fix, bool deleteFiles, bool verbose); extern DALIADMIN_API void removeOrphanedGlobalVariables(bool dryrun, bool reconstruct); +extern DALIADMIN_API void cleanJobQueues(bool dryRun); } // namespace daadmin \ No newline at end of file diff --git a/dali/daliadmin/daliadmin.cpp b/dali/daliadmin/daliadmin.cpp index 0491b7d42f1..4c56836aac6 100644 --- a/dali/daliadmin/daliadmin.cpp +++ b/dali/daliadmin/daliadmin.cpp @@ -93,6 +93,7 @@ void usage(const char *exe) printf("Other dali server and misc commands:\n"); printf(" auditlog \n"); printf(" cleanglobalwuid [dryrun] [noreconstruct]\n"); + printf(" cleanjobqueues [dryrun]\n"); printf(" clusterlist -- list clusters (mask optional)\n"); printf(" coalesce -- force transaction coalesce\n"); printf(" dalilocks [ ] [ files ] -- get all locked files/xpaths\n"); @@ -576,6 +577,11 @@ int main(int argc, const char* argv[]) } removeOrphanedGlobalVariables(dryrun, reconstruct); } + else if (strieq(cmd, "cleanjobqueues")) + { + bool dryRun = np>0 && strieq("dryrun", params.item(1)); + cleanJobQueues(dryRun); + } else if (strieq(cmd, "remotetest")) remoteTest(params.item(1), true); else diff --git a/dali/dfu/dfuutil.cpp b/dali/dfu/dfuutil.cpp index 305df796fe3..ead2a9478c8 100644 --- a/dali/dfu/dfuutil.cpp +++ b/dali/dfu/dfuutil.cpp @@ -530,13 +530,18 @@ class CFileCloner if (iskey&&!cluster2.isEmpty()) dstfdesc->addCluster(cluster2,grp2,spec2); - for (unsigned pn=0; pnqueryPart(pn)->queryProperties().getPropInt64("@size",-1); + for (unsigned pn=0; pnqueryPart(pn)->queryProperties(); + IPropertyTree &dstProps = dstfdesc->queryPart(pn)->queryProperties(); + offset_t sz = srcProps.getPropInt64("@size",-1); if (sz!=(offset_t)-1) - dstfdesc->queryPart(pn)->queryProperties().setPropInt64("@size",sz); + dstProps.setPropInt64("@size",sz); StringBuffer dates; - if (srcfdesc->queryPart(pn)->queryProperties().getProp("@modified",dates)) - dstfdesc->queryPart(pn)->queryProperties().setProp("@modified",dates.str()); + if (srcProps.getProp("@modified",dates)) + dstProps.setProp("@modified",dates.str()); + if (srcProps.hasProp("@kind")) + dstProps.setProp("@kind", srcProps.queryProp("@kind")); } if (!copyphysical) //cloneFrom tells roxie where to copy from.. it's unnecessary if we already did the copy diff --git a/devdoc/userdoc/copilot/CopilotPromptTips.md b/devdoc/userdoc/copilot/CopilotPromptTips.md new file mode 100644 index 00000000000..13a770b33dd --- /dev/null +++ b/devdoc/userdoc/copilot/CopilotPromptTips.md @@ -0,0 +1,84 @@ +# Copilot Prompt Tips +Unlock the full potential of GitHub Copilot with these carefully curated prompts designed to streamline your workflow and enhance productivity. + +Whether you're summarizing complex texts, brainstorming innovative ideas, or creating detailed guides, these prompts will help you get the most out of Copilot's capabilities. + +Dive in and discover how these simple yet powerful prompts can save you time and effort in your daily tasks. + +## Generic Prompts + +Here are a few simple prompts that can save time: + +- Provide a brief summary of the text below. Show the key points in a bullet list. + +- List [N (number)] ways to [accomplish a specific goal or solve a problem]? Include a short description for each approach in a bullet list. + + - **Example**: List 10 ways to reduce redundancy in text. Include a short description for each approach in a bullet list. + +- What are the key differences/similarities between [concept A] and [concept B]? Present the information in a table format. + + - **Example:** What are the key differences/similarities between a quicksort ad a bubble sort? Present the information in a table format. + +- Explain [complex topic] in simple terms. Use analogies or examples to help make it easier to understand. + + - **Example:** Explain generative AI in simple terms. Use analogies or examples to help make it easier to understand. + +- Brainstorm [N (number)] ideas for [a specific project or topic]? Include a short description for each idea in a bullet list. + + - **Example:** Brainstorm 7 ideas for an instructional video for new programmers? Include a short description for each idea in a bullet list. + +- Create a template for [a specific type of document, such as a business email, proposal, etc.]. Include the key elements to include in a bullet list. + + - **Example:** Create a template for product version release announcement. Include the key elements to include in a bullet list. + +- Write a step-by-step guide on how to [specific task or procedure]. Number the steps to improve clarity. + + - **Example:** Write a step-by-step guide on how to make a PB & J sandwich. Number the steps to improve clarity. + +## Specific Prompts + +These prompts are more focused and are meant to accomplish a specific purpose. They are included here to spark your imagination. + +If you think of others that could be included here to share with the world, please send your ideas to . + +- Write comments for this ECL code in javadoc format + +- Create an ECL File Definition, record structure, and inline dataset with [N (number)] of records with the following fields [list of fields] + + - **Example:** Create an ECL file definition with a record structure and an inline dataset with 20 records for a dataset of animals with the following fields: ReferenceNumber, AnimalName, ClassName, FamilyName, Color, Size, and LifeExpectancy. + +- Write ECL code to classify records into [categories]. + + - **Example:** Write ECL code to classify customer feedback into neutral, negative, or positive categories. + + +## How to Avoid AI Hallucinations with Good Prompts +AI hallucinations refer to instances where artificial intelligence systems generate information or responses that are incorrect, misleading, or entirely fabricated. + +Hallucinations can occur due to various reasons, such as limitations in the training data, inherent biases, or the AI's attempt to provide an answer even when it lacks sufficient context or knowledge. + +Understanding and mitigating AI hallucinations is crucial for ensuring the reliability and accuracy of AI-driven applications. + +Creating effective prompts is essential to minimize AI hallucinations. Here are some tips to help you craft prompts that lead to accurate and reliable responses: + +- **Be Specific and Clear**: Ambiguous prompts can lead to incorrect or irrelevant answers. Clearly define the task and provide specific instructions. + + - **Example**: Instead of asking "What is AI?", ask "Explain the concept of artificial intelligence and its primary applications in healthcare." + +- **Provide Context**: Give the AI enough background information to understand the task. This helps in generating more accurate responses. + + - **Example**: "Given the following text about climate change, summarize the key points in a bullet list." + +- **Use Constraints**: Limit the scope of the response by specifying constraints such as word count, format, or specific details to include. + + - **Example**: "List 5 benefits of renewable energy in a bullet list, each point not exceeding 20 words." + +- **Ask for Evidence or Sources**: Encourage the AI to provide evidence or cite sources for the information it generates. + + - **Example**: "Explain the impact of social media on mental health and provide references to recent studies." + +- **Iterative Refinement**: Start with a broad prompt and refine it based on the initial responses to get more accurate results. + + - **Example**: Begin with "Describe the process of photosynthesis." If the response is too vague, refine it to "Describe the process of photosynthesis in plants, including the role of chlorophyll and sunlight." + +By following these guidelines, you can reduce the likelihood of AI hallucinations and ensure that the responses generated are accurate and useful. diff --git a/ecl/eclcc/eclcc.cpp b/ecl/eclcc/eclcc.cpp index d6d360e6745..3890c51b50d 100644 --- a/ecl/eclcc/eclcc.cpp +++ b/ecl/eclcc/eclcc.cpp @@ -1537,8 +1537,11 @@ void EclCC::processSingleQuery(const EclRepositoryManager & localRepositoryManag updateWorkunitStat(instance.wu, SSToperation, ">compile:>parse", StTimeElapsed, NULL, parseTimeNs); stat_type sourceDownloadTime = localRepositoryManager.getStatistic(StTimeElapsed); + stat_type sourceDownloadBlockedTime = localRepositoryManager.getStatistic(StTimeBlocked); if (sourceDownloadTime) updateWorkunitStat(instance.wu, SSToperation, ">compile:>parse:>download", StTimeElapsed, NULL, sourceDownloadTime); + if (sourceDownloadBlockedTime) + updateWorkunitStat(instance.wu, SSToperation, ">compile:>parse:>download", StTimeBlocked, NULL, sourceDownloadBlockedTime); if (optExtraStats) { diff --git a/ecl/hql/hqlrepository.cpp b/ecl/hql/hqlrepository.cpp index 06692728bb1..1c0d2507a21 100644 --- a/ecl/hql/hqlrepository.cpp +++ b/ecl/hql/hqlrepository.cpp @@ -676,6 +676,8 @@ unsigned __int64 EclRepositoryManager::getStatistic(StatisticKind kind) const { case StTimeElapsed: return cycle_to_nanosec(gitDownloadCycles); + case StTimeBlocked: + return cycle_to_nanosec(gitDownloadBlockedCycles); } return 0; } @@ -823,7 +825,12 @@ IEclSourceCollection * EclRepositoryManager::resolveGitCollection(const char * r throw makeStringExceptionV(99, "Unsupported repository link format '%s'", defaultUrl); bool alreadyExists = false; + + CCycleTimer gitDownloadTimer; Owned gitUpdateLock(getGitUpdateLock(repoPath)); + cycle_t blockedCycles = gitDownloadTimer.elapsedCycles(); + gitDownloadBlockedCycles += blockedCycles; + if (checkDirExists(repoPath)) { if (options.cleanRepos) @@ -853,7 +860,6 @@ IEclSourceCollection * EclRepositoryManager::resolveGitCollection(const char * r bool ok = false; Owned error; - CCycleTimer gitDownloadTimer; if (alreadyExists) { if (options.updateRepos) @@ -890,7 +896,8 @@ IEclSourceCollection * EclRepositoryManager::resolveGitCollection(const char * r //this could become a read/write lock if that proved to be an issue. gitUpdateLock.clear(); - gitDownloadCycles += gitDownloadTimer.elapsedCycles(); + gitDownloadCycles += (gitDownloadTimer.elapsedCycles() - blockedCycles); + if (error) { if (errorReceiver) diff --git a/ecl/hql/hqlrepository.hpp b/ecl/hql/hqlrepository.hpp index 071139fe2da..3d4e5e1317d 100644 --- a/ecl/hql/hqlrepository.hpp +++ b/ecl/hql/hqlrepository.hpp @@ -105,6 +105,7 @@ class HQL_API EclRepositoryManager IArrayOf overrideSources; // -D options IArrayOf allSources; // also includes -D options cycle_t gitDownloadCycles = 0; + cycle_t gitDownloadBlockedCycles = 0; //Include all options in a nested struct to make it easy to ensure they are cloned struct { diff --git a/esp/src/src-react/components/Metrics.tsx b/esp/src/src-react/components/Metrics.tsx index 3d1f91cc2bd..65996c00a9f 100644 --- a/esp/src/src-react/components/Metrics.tsx +++ b/esp/src/src-react/components/Metrics.tsx @@ -56,6 +56,9 @@ export const Metrics: React.FunctionComponent = ({ selection, fullscreen = false }) => { + if (querySet && queryId) { + wuid = ""; + } const [_uiState, _setUIState] = React.useState({ ...defaultUIState }); const [selectedMetricsSource, setSelectedMetricsSource] = React.useState(""); const [selectedMetrics, setSelectedMetrics] = React.useState([]); diff --git a/esp/src/src-react/hooks/metrics.ts b/esp/src/src-react/hooks/metrics.ts index 2b0a47687f2..6afbcff8f32 100644 --- a/esp/src/src-react/hooks/metrics.ts +++ b/esp/src/src-react/hooks/metrics.ts @@ -5,6 +5,7 @@ import { scopedLogger } from "@hpcc-js/util"; import { singletonHook } from "react-singleton-hook"; import { userKeyValStore } from "src/KeyValStore"; import { DockPanelLayout } from "../layouts/DockPanel"; +import { singletonDebounce } from "../util/throttle"; import { useWorkunit } from "./workunit"; import { useQuery } from "./query"; import { useCounter } from "./util"; @@ -214,6 +215,8 @@ function useMetricsViewsImpl(): useMetricsViewsResult { export const useMetricsViews = singletonHook(defaultState, useMetricsViewsImpl); +let wuDetailsMetaResponse: Promise; + export function useMetricMeta(): [string[], string[]] { const service = useConst(() => new WorkunitsService({ baseUrl: "" })); @@ -221,7 +224,10 @@ export function useMetricMeta(): [string[], string[]] { const [properties, setProperties] = React.useState([]); React.useEffect(() => { - service?.WUDetailsMeta({}).then(response => { + if (!wuDetailsMetaResponse && service) { + wuDetailsMetaResponse = service.WUDetailsMeta({}); + } + wuDetailsMetaResponse?.then(response => { setScopeTypes(response?.ScopeTypes?.ScopeType || []); setProperties((response?.Properties?.Property.map(p => p.Name) || []).sort()); }); @@ -274,45 +280,48 @@ export function useWorkunitMetrics( const [count, increment] = useCounter(); React.useEffect(() => { - setStatus(FetchStatus.STARTED); - workunit?.fetchDetailsNormalized({ - ScopeFilter: scopeFilter, - NestedFilter: nestedFilter, - PropertiesToReturn: { - AllScopes: true, - AllAttributes: true, - AllProperties: true, - AllNotes: true, - AllStatistics: true, - AllHints: true - }, - ScopeOptions: { - IncludeId: true, - IncludeScope: true, - IncludeScopeType: true, - IncludeMatchedScopesInResults: true - }, - PropertyOptions: { - IncludeName: true, - IncludeRawValue: true, - IncludeFormatted: true, - IncludeMeasure: true, - IncludeCreator: false, - IncludeCreatorType: false - } - }).then(response => { - setData(response?.data); - setColumns(response?.columns); - setActivities(response?.meta?.Activities?.Activity || []); - setProperties(response?.meta?.Properties?.Property || []); - setMeasures(response?.meta?.Measures?.Measure || []); - setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); - }).catch(e => { - logger.error(e); - }).finally(() => { - setStatus(FetchStatus.COMPLETE); - }); - }, [workunit, state, count, scopeFilter, nestedFilter]); + if (wuid && workunit) { + const fetchDetailsNormalized = singletonDebounce(workunit, "fetchDetailsNormalized"); + setStatus(FetchStatus.STARTED); + fetchDetailsNormalized({ + ScopeFilter: scopeFilter, + NestedFilter: nestedFilter, + PropertiesToReturn: { + AllScopes: true, + AllAttributes: true, + AllProperties: true, + AllNotes: true, + AllStatistics: true, + AllHints: true + }, + ScopeOptions: { + IncludeId: true, + IncludeScope: true, + IncludeScopeType: true, + IncludeMatchedScopesInResults: true + }, + PropertyOptions: { + IncludeName: true, + IncludeRawValue: true, + IncludeFormatted: true, + IncludeMeasure: true, + IncludeCreator: false, + IncludeCreatorType: false + } + }).then(response => { + setData(response?.data); + setColumns(response?.columns); + setActivities(response?.meta?.Activities?.Activity || []); + setProperties(response?.meta?.Properties?.Property || []); + setMeasures(response?.meta?.Measures?.Measure || []); + setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); + }).catch(e => { + logger.error(e); + }).finally(() => { + setStatus(FetchStatus.COMPLETE); + }); + } + }, [workunit, state, count, scopeFilter, nestedFilter, wuid]); return { metrics: data, columns, activities, properties, measures, scopeTypes, status, refresh: increment }; } @@ -335,45 +344,48 @@ export function useQueryMetrics( const [count, increment] = useCounter(); React.useEffect(() => { - setStatus(FetchStatus.STARTED); - query?.fetchDetailsNormalized({ - ScopeFilter: scopeFilter, - NestedFilter: nestedFilter, - PropertiesToReturn: { - AllScopes: true, - AllAttributes: true, - AllProperties: true, - AllNotes: true, - AllStatistics: true, - AllHints: true - }, - ScopeOptions: { - IncludeId: true, - IncludeScope: true, - IncludeScopeType: true, - IncludeMatchedScopesInResults: true - }, - PropertyOptions: { - IncludeName: true, - IncludeRawValue: true, - IncludeFormatted: true, - IncludeMeasure: true, - IncludeCreator: false, - IncludeCreatorType: false - } - }).then(response => { - setData(response?.data); - setColumns(response?.columns); - setActivities(response?.meta?.Activities?.Activity || []); - setProperties(response?.meta?.Properties?.Property || []); - setMeasures(response?.meta?.Measures?.Measure || []); - setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); - }).catch(e => { - logger.error(e); - }).finally(() => { - setStatus(FetchStatus.COMPLETE); - }); - }, [query, state, count, scopeFilter, nestedFilter]); + if (querySet && queryId && query) { + const fetchDetailsNormalized = singletonDebounce(query, "fetchDetailsNormalized"); + setStatus(FetchStatus.STARTED); + fetchDetailsNormalized({ + ScopeFilter: scopeFilter, + NestedFilter: nestedFilter, + PropertiesToReturn: { + AllScopes: true, + AllAttributes: true, + AllProperties: true, + AllNotes: true, + AllStatistics: true, + AllHints: true + }, + ScopeOptions: { + IncludeId: true, + IncludeScope: true, + IncludeScopeType: true, + IncludeMatchedScopesInResults: true + }, + PropertyOptions: { + IncludeName: true, + IncludeRawValue: true, + IncludeFormatted: true, + IncludeMeasure: true, + IncludeCreator: false, + IncludeCreatorType: false + } + }).then(response => { + setData(response?.data); + setColumns(response?.columns); + setActivities(response?.meta?.Activities?.Activity || []); + setProperties(response?.meta?.Properties?.Property || []); + setMeasures(response?.meta?.Measures?.Measure || []); + setScopeTypes(response?.meta?.ScopeTypes?.ScopeType || []); + }).catch(e => { + logger.error(e); + }).finally(() => { + setStatus(FetchStatus.COMPLETE); + }); + } + }, [query, state, count, scopeFilter, nestedFilter, querySet, queryId]); return { metrics: data, columns, activities, properties, measures, scopeTypes, status, refresh: increment }; } @@ -385,7 +397,8 @@ export function useWUQueryMetrics( scopeFilter: Partial = scopeFilterDefault, nestedFilter: WsWorkunits.NestedFilter = nestedFilterDefault ): useMetricsResult { - const wuMetrics = useWorkunitMetrics(wuid, scopeFilter, nestedFilter); - const queryMetrics = useQueryMetrics(querySet, queryId, scopeFilter, nestedFilter); - return querySet && queryId ? { ...queryMetrics } : { ...wuMetrics }; + const isQuery = querySet && queryId; + const wuMetrics = useWorkunitMetrics(isQuery ? "" : wuid, scopeFilter, nestedFilter); + const queryMetrics = useQueryMetrics(isQuery ? querySet : "", isQuery ? queryId : "", scopeFilter, nestedFilter); + return isQuery ? { ...queryMetrics } : { ...wuMetrics }; } diff --git a/esp/src/src-react/hooks/workunit.ts b/esp/src/src-react/hooks/workunit.ts index 3bc1c22314a..089b25d883f 100644 --- a/esp/src/src-react/hooks/workunit.ts +++ b/esp/src/src-react/hooks/workunit.ts @@ -16,7 +16,7 @@ export function useWorkunit(wuid: string, full: boolean = false): [Workunit, WUS const [retVal, setRetVal] = React.useState<{ workunit: Workunit, state: number, lastUpdate: number, isComplete: boolean, refresh: RefreshFunc }>(); React.useEffect(() => { - if (wuid === undefined || wuid === null) { + if (!wuid) { setRetVal({ workunit: undefined, state: WUStateID.NotFound, lastUpdate: Date.now(), isComplete: undefined, refresh: (full?: boolean) => Promise.resolve(undefined) }); return; } diff --git a/fs/dafsserver/dafsserver.cpp b/fs/dafsserver/dafsserver.cpp index 26ec65113f8..a2311f52be4 100644 --- a/fs/dafsserver/dafsserver.cpp +++ b/fs/dafsserver/dafsserver.cpp @@ -2278,6 +2278,7 @@ class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity Owned translator; unsigned __int64 chooseN = 0; + bool cleanupBlobs = false; public: CRemoteIndexReadActivity(IPropertyTree &config, IFileDescriptor *fileDesc) : PARENT(config, fileDesc) { @@ -2316,6 +2317,12 @@ class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity } dbgassertex(retSz); + if (cleanupBlobs) + { + keyManager->releaseBlobs(); + cleanupBlobs = false; + } + const void *ret = outBuilder.getSelf(); outBuilder.finishRow(retSz); ++processed; @@ -2350,6 +2357,13 @@ class CRemoteIndexReadActivity : public CRemoteIndexBaseActivity { return out.appendf("indexread[%s]", fileName.get()); } + + virtual const byte * lookupBlob(unsigned __int64 id) override + { + size32_t dummy; + cleanupBlobs = true; + return (byte *) keyManager->loadBlob(id, dummy, nullptr); + } }; diff --git a/roxie/ccd/ccdlistener.cpp b/roxie/ccd/ccdlistener.cpp index 85fd1241a4e..19c4bc81b91 100644 --- a/roxie/ccd/ccdlistener.cpp +++ b/roxie/ccd/ccdlistener.cpp @@ -967,7 +967,7 @@ StringBuffer & ContextLogger::getStats(StringBuffer &s) const ids.append(slowestActivityIds[i]); formatStatistic(times, cycle_to_nanosec(slowestActivityTimes[i]), SMeasureTimeNs); } - s.appendf(", slowestActivities={ ids=[%s] times=[%s] }", ids.str(), times.str()); + s.appendf(" slowestActivities={ ids=[%s] times=[%s] }", ids.str(), times.str()); } return s; } diff --git a/roxie/ccd/ccdquery.cpp b/roxie/ccd/ccdquery.cpp index 29c34c9fa2d..c983ebd0067 100644 --- a/roxie/ccd/ccdquery.cpp +++ b/roxie/ccd/ccdquery.cpp @@ -625,9 +625,7 @@ class CQueryFactory : implements IQueryFactory, implements IResourceContext, pub break; } StringBuffer helperName; - node.getProp("att[@name=\"helper\"]/@value", helperName); - if (!helperName.length()) - helperName.append("fAc").append(id); + helperName.append("fAc").append(id); HelperFactory *helperFactory = dll->getFactory(helperName); if (!helperFactory) throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str()); @@ -2002,9 +2000,7 @@ class CAgentQueryFactory : public CQueryFactory else { StringBuffer helperName; - node.getProp("att[@name=\"helper\"]/@value", helperName); - if (!helperName.length()) - helperName.append("fAc").append(node.getPropInt("@id", 0)); + helperName.append("fAc").append(node.getPropInt("@id", 0)); HelperFactory *helperFactory = dll->getFactory(helperName.str()); if (!helperFactory) throw MakeStringException(ROXIE_INTERNAL_ERROR, "Internal error: helper function %s not exported", helperName.str()); diff --git a/rtl/eclrtl/eclrtl.hpp b/rtl/eclrtl/eclrtl.hpp index 8d39bc32880..2fe998d91f3 100644 --- a/rtl/eclrtl/eclrtl.hpp +++ b/rtl/eclrtl/eclrtl.hpp @@ -89,9 +89,9 @@ interface IStrRegExprFindInstance interface ICompiledStrRegExpr { virtual void replace(size32_t & outlen, char * & out, size32_t slen, char const * str, size32_t rlen, char const * rstr) const = 0; - virtual void replaceFixed(size32_t tlen, char * tgt, size32_t slen, char const * str, size32_t rlen, char const * rstr) const = 0; virtual IStrRegExprFindInstance * find(const char * str, size32_t from, size32_t len, bool needToKeepSearchString) const = 0; virtual void getMatchSet(bool & __isAllResult, size32_t & __resultBytes, void * & __result, size32_t _srcLen, const char * _search) = 0; + virtual void replaceFixed(size32_t tlen, char * tgt, size32_t slen, char const * str, size32_t rlen, char const * rstr) const = 0; virtual void replaceTimed(ISectionTimer * timer, size32_t & outlen, char * & out, size32_t slen, char const * str, size32_t rlen, char const * rstr) const = 0; virtual void replaceFixedTimed(ISectionTimer * timer, size32_t tlen, char * tgt, size32_t slen, char const * str, size32_t rlen, char const * rstr) const = 0; virtual IStrRegExprFindInstance * findTimed(ISectionTimer * timer, const char * str, size32_t from, size32_t len, bool needToKeepSearchString) const = 0; @@ -109,9 +109,9 @@ interface IUStrRegExprFindInstance interface ICompiledUStrRegExpr { virtual void replace(size32_t & outlen, UChar * & out, size32_t slen, UChar const * str, size32_t rlen, UChar const * rstr) const = 0; - virtual void replaceFixed(size32_t tlen, UChar * tgt, size32_t slen, UChar const * str, size32_t rlen, UChar const * rstr) const = 0; virtual IUStrRegExprFindInstance * find(const UChar * str, size32_t from, size32_t len) const = 0; virtual void getMatchSet(bool & __isAllResult, size32_t & __resultBytes, void * & __result, size32_t _srcLen, const UChar * _search) = 0; + virtual void replaceFixed(size32_t tlen, UChar * tgt, size32_t slen, UChar const * str, size32_t rlen, UChar const * rstr) const = 0; virtual void replaceTimed(ISectionTimer * timer, size32_t & outlen, UChar * & out, size32_t slen, UChar const * str, size32_t rlen, UChar const * rstr) const = 0; virtual void replaceFixedTimed(ISectionTimer * timer, size32_t tlen, UChar * tgt, size32_t slen, UChar const * str, size32_t rlen, UChar const * rstr) const = 0; virtual IUStrRegExprFindInstance * findTimed(ISectionTimer * timer, const UChar * str, size32_t from, size32_t len) const = 0; diff --git a/rtl/eclrtl/rtldynfield.cpp b/rtl/eclrtl/rtldynfield.cpp index 2d8ccd3aa14..5f1e70f3376 100644 --- a/rtl/eclrtl/rtldynfield.cpp +++ b/rtl/eclrtl/rtldynfield.cpp @@ -1690,6 +1690,7 @@ class GeneralRecordTranslator : public CInterfaceOf { const RtlRecord *subDest = destRecInfo.queryNested(idx); const RtlRecord *subSrc = sourceRecInfo.queryNested(info.matchIdx); + assertex(subSrc); info.subTrans = new GeneralRecordTranslator(*subDest, *subSrc, binarySource); if (!info.subTrans->needsTranslate()) { diff --git a/rtl/eclrtl/rtlrecord.cpp b/rtl/eclrtl/rtlrecord.cpp index 4c27b8cd05c..fb06d6b74e4 100644 --- a/rtl/eclrtl/rtlrecord.cpp +++ b/rtl/eclrtl/rtlrecord.cpp @@ -292,6 +292,12 @@ RtlRecord::RtlRecord(const RtlFieldInfo * const *_fields, bool expandFields) : f const RtlTypeInfo *curType = queryType(i); if (!curType->isFixedSize() || (fields[i]->flags & RFTMinifblock)) numVarFields++; + if (curType->isBlob()) + { + curType = curType->queryChildType(); + if (unlikely(!curType)) + throwUnexpectedX("Blob type has no child type"); + } if (curType->getType()==type_table || curType->getType()==type_record || curType->getType()==type_dictionary) numTables++; } @@ -331,6 +337,8 @@ RtlRecord::RtlRecord(const RtlFieldInfo * const *_fields, bool expandFields) : f curVariable++; fixedOffset = 0; } + if (curType->isBlob()) + curType = curType->queryChildType(); switch (curType->getType()) { case type_table: diff --git a/system/jhtree/jhtree.cpp b/system/jhtree/jhtree.cpp index 956118e6faf..7385851cbe2 100644 --- a/system/jhtree/jhtree.cpp +++ b/system/jhtree/jhtree.cpp @@ -3668,7 +3668,7 @@ class IKeyManagerSlowTest : public CppUnit::TestFixture { const char *json = variable ? "{ \"ty1\": { \"fieldType\": 4, \"length\": 10 }, " - " \"ty2\": { \"fieldType\": 15, \"length\": 8 }, " + " \"ty2\": { \"fieldType\": 15, \"length\": 8, \"child\": \"ty1\" }, " " \"fieldType\": 13, \"length\": 10, " " \"fields\": [ " " { \"name\": \"f1\", \"type\": \"ty1\", \"flags\": 4 }, " @@ -3847,13 +3847,22 @@ class IKeyManagerSlowTest : public CppUnit::TestFixture void testKeys() { - ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t)); - for (bool var : { true, false }) - for (bool trail : { false, true }) - for (bool noseek : { false, true }) - for (bool quick : { true, false }) - for (const char * compression : { (const char *)nullptr, "POC", "inplace" }) - testKeys(var, trail, noseek, quick, compression); + try + { + ASSERT(sizeof(CKeyIdAndPos) == sizeof(unsigned __int64) + sizeof(offset_t)); + for (bool var : { true, false }) + for (bool trail : { false, true }) + for (bool noseek : { false, true }) + for (bool quick : { true, false }) + for (const char * compression : { (const char *)nullptr, "POC", "inplace" }) + testKeys(var, trail, noseek, quick, compression); + } + catch (IException * e) + { + StringBuffer s; + e->errorMessage(s); + CPPUNIT_ASSERT_MESSAGE(s.str(), false); + } } }; diff --git a/system/jlib/CMakeLists.txt b/system/jlib/CMakeLists.txt index 9fa8175eabc..1a88f03d3a1 100644 --- a/system/jlib/CMakeLists.txt +++ b/system/jlib/CMakeLists.txt @@ -223,6 +223,9 @@ include_directories ( ${CMAKE_BINARY_DIR}/oss ) +# ensure httplib uses poll rather than select - otherwise it fail if too many sockets have been opened. +ADD_DEFINITIONS( -DCPPHTTPLIB_USE_POLL ) + ADD_DEFINITIONS( -D_USRDLL -DJLIB_EXPORTS ) HPCC_ADD_LIBRARY( jlib SHARED ${SRCS} ${INCLUDES} ) diff --git a/system/metrics/sinks/prometheus/CMakeLists.txt b/system/metrics/sinks/prometheus/CMakeLists.txt index c2454e1eadc..f2a89ad3f2d 100644 --- a/system/metrics/sinks/prometheus/CMakeLists.txt +++ b/system/metrics/sinks/prometheus/CMakeLists.txt @@ -30,6 +30,9 @@ include_directories( ${HPCC_SOURCE_DIR}/system/httplib ) +# ensure httplib uses poll rather than select - otherwise it fail if too many sockets have been opened. +ADD_DEFINITIONS( -DCPPHTTPLIB_USE_POLL ) + SET (CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} ${STRICT_CXX_FLAGS}") ADD_DEFINITIONS( -DPROMETHEUSSINK_EXPORTS ) HPCC_ADD_LIBRARY( hpccmetrics_prometheussink SHARED ${srcs} ) diff --git a/thorlcr/activities/hashdistrib/thhashdistrib.cpp b/thorlcr/activities/hashdistrib/thhashdistrib.cpp index b2c8a42e50f..36c2cba1194 100644 --- a/thorlcr/activities/hashdistrib/thhashdistrib.cpp +++ b/thorlcr/activities/hashdistrib/thhashdistrib.cpp @@ -26,6 +26,7 @@ #include "thorport.hpp" #include "thbufdef.hpp" #include "thexception.hpp" +#include "thormisc.hpp" #define NUMINPARALLEL 16 @@ -115,7 +116,7 @@ class IndexDistributeActivityMaster : public HashDistributeMasterBase checkFormatCrc(this, file, helper->getFormatCrc(), nullptr, helper->getFormatCrc(), nullptr, true); Owned fileDesc = file->getFileDescriptor(); Owned tlkDesc = fileDesc->getPart(fileDesc->numParts()-1); - if (!tlkDesc->queryProperties().hasProp("@kind") || 0 != stricmp("topLevelKey", tlkDesc->queryProperties().queryProp("@kind"))) + if (!hasTLK(*file, this)) throw MakeActivityException(this, 0, "Cannot distribute using a non-distributed key: '%s'", scoped.str()); unsigned location; OwnedIFile iFile; diff --git a/thorlcr/activities/indexwrite/thindexwrite.cpp b/thorlcr/activities/indexwrite/thindexwrite.cpp index 1a9f5894a87..b9c2963ff8a 100644 --- a/thorlcr/activities/indexwrite/thindexwrite.cpp +++ b/thorlcr/activities/indexwrite/thindexwrite.cpp @@ -159,9 +159,9 @@ class IndexWriteActivityMaster : public CMasterActivity checkFormatCrc(this, _f, helper->getFormatCrc(), nullptr, helper->getFormatCrc(), nullptr, true); IDistributedFile *f = _f->querySuperFile(); if (!f) f = _f; - Owned existingTlk = f->getPart(f->numParts()-1); - if (!existingTlk->queryAttributes().hasProp("@kind") || 0 != stricmp("topLevelKey", existingTlk->queryAttributes().queryProp("@kind"))) + if (!hasTLK(*f, this)) throw MakeActivityException(this, 0, "Cannot build new key '%s' based on non-distributed key '%s'", fileName.get(), diName.get()); + Owned existingTlk = f->getPart(f->numParts()-1); IPartDescriptor *tlkDesc = fileDesc->queryPart(fileDesc->numParts()-1); IPropertyTree &props = tlkDesc->queryProperties(); if (existingTlk->queryAttributes().hasProp("@size")) diff --git a/thorlcr/activities/keydiff/thkeydiff.cpp b/thorlcr/activities/keydiff/thkeydiff.cpp index 7b56dc6c841..e0ae29db9d0 100644 --- a/thorlcr/activities/keydiff/thkeydiff.cpp +++ b/thorlcr/activities/keydiff/thkeydiff.cpp @@ -71,10 +71,7 @@ class CKeyDiffMaster : public CMasterActivity originalDesc.setown(originalIndexFile->getFileDescriptor()); newIndexDesc.setown(newIndexFile->getFileDescriptor()); - Owned tlkDesc = originalDesc->getPart(originalDesc->numParts()-1); - const char *kind = tlkDesc->queryProperties().queryProp("@kind"); - local = NULL == kind || 0 != stricmp("topLevelKey", kind); - + local = !hasTLK(*originalIndexFile, this); if (!local) width--; // 1 part == No n distributed / Monolithic key if (width > container.queryJob().querySlaves()) diff --git a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp index 38fabfca861..af5d197721f 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoin.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoin.cpp @@ -112,20 +112,21 @@ class CKeyedJoinMaster : public CMasterActivity unsigned numParts = fileDesc->numParts(); unsigned nextGroupStartPos = 0; + IDistributedFile *subFile = file; for (unsigned p=0; pqueryPart(p); - const char *kind = isIndexWithTlk ? part->queryProperties().queryProp("@kind") : nullptr; - if (!kind || !strsame("topLevelKey", kind)) + unsigned partIdx = part->queryPartIndex(); + unsigned subFileNum = NotFound; + unsigned subPartIdx = partIdx; + if (superFileDesc) + { + superFileDesc->mapSubPart(partIdx, subFileNum, subPartIdx); + partIdx = superWidth*subFileNum+subPartIdx; + subFile = &super->querySubFile(subFileNum, true); + } + if (!isIndexWithTlk || (1 == numParts) || (subPartIdx < (subFile->numParts()-1)) || !hasTLK(*subFile, nullptr)) { - unsigned partIdx = part->queryPartIndex(); - unsigned subfile = NotFound; - unsigned subPartIdx = partIdx; - if (superFileDesc) - { - superFileDesc->mapSubPart(partIdx, subfile, subPartIdx); - partIdx = superWidth*subfile+subPartIdx; - } if (activity.local) { if (activity.queryContainer().queryLocalData()) @@ -234,7 +235,7 @@ class CKeyedJoinMaster : public CMasterActivity slaveParts.push_back(p); } if (superFileDesc) - partIdx = superWidth*subfile+subPartIdx; + partIdx = superWidth*subFileNum+subPartIdx; partsByPartIdx.push_back(partIdx); assertex(partIdx < totalParts); partToSlave[partIdx] = mappedPos; @@ -387,10 +388,7 @@ class CKeyedJoinMaster : public CMasterActivity ForEach(*iter) { IDistributedFile &f = iter->query(); - unsigned np = f.numParts()-1; - IDistributedFilePart &part = f.queryPart(np); - const char *kind = part.queryAttributes().queryProp("@kind"); - bool hasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); // if last part not tlk, then deemed local (might be singlePartKey) + bool hasTlk = hasTLK(f, this); if (first) { first = false; @@ -419,8 +417,7 @@ class CKeyedJoinMaster : public CMasterActivity totalIndexParts = indexFile->numParts(); if (totalIndexParts) { - const char *kind = indexFile->queryPart(indexFile->numParts()-1).queryAttributes().queryProp("@kind"); - keyHasTlk = NULL != kind && 0 == stricmp("topLevelKey", kind); + keyHasTlk = hasTLK(*indexFile, this); if (keyHasTlk) --totalIndexParts; } diff --git a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp index 46e7a05302c..cdcaea2a987 100644 --- a/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp +++ b/thorlcr/activities/keyedjoin/thkeyedjoinslave.cpp @@ -1445,7 +1445,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem { unsigned partNo = partCopy & partMask; unsigned copy = partCopy >> partBits; - Owned keyIndex = activity.createPartKeyIndex(partNo, copy, false); + Owned keyIndex = activity.createPartKeyIndex(partNo, copy); partKeySet->addIndex(keyIndex.getClear()); } keyManager = createKeyMerger(helper->queryIndexRecordSize()->queryRecordAccessor(true), partKeySet, 0, &contextLogger, helper->hasNewSegmentMonitors(), false); @@ -2454,7 +2454,7 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem } return tlkKeyIndexes.ordinality(); } - IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy, bool delayed) + IKeyIndex *createPartKeyIndex(unsigned partNo, unsigned copy) { IPartDescriptor &filePart = allIndexParts.item(partNo); unsigned crc=0; @@ -2464,25 +2464,16 @@ class CKeyedJoinSlave : public CSlaveActivity, implements IJoinProcessor, implem StringBuffer filename; rfn.getPath(filename); - if (delayed) - { - Owned lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr); - Owned delayedFile = createDelayedFile(lazyIFileIO); - return createKeyIndex(filename, crc, *delayedFile, (unsigned) -1, false, 0); - } - else - { - /* NB: createKeyIndex here, will load the key immediately - * But that's okay, because we are only here on demand. - * The underlying IFileIO can later be closed by fhe file caching mechanism. - */ - Owned lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr); - return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0); - } + /* NB: createKeyIndex here, will load the key immediately + * But that's okay, because we are only here on demand. + * The underlying IFileIO can later be closed by fhe file caching mechanism. + */ + Owned lazyIFileIO = queryThor().queryFileCache().lookupIFileIO(*this, indexName, filePart, nullptr); + return createKeyIndex(filename, crc, *lazyIFileIO, (unsigned) -1, false, 0); } IKeyManager *createPartKeyManager(unsigned partNo, unsigned copy, IContextLogger *ctx) { - Owned keyIndex = createPartKeyIndex(partNo, copy, false); + Owned keyIndex = createPartKeyIndex(partNo, copy); return createLocalKeyManager(helper->queryIndexRecordSize()->queryRecordAccessor(true), keyIndex, ctx, helper->hasNewSegmentMonitors(), false); } const void *preparePendingLookupRow(void *row, size32_t maxSz, const void *lhsRow, size32_t keySz) diff --git a/thorlcr/activities/keypatch/thkeypatch.cpp b/thorlcr/activities/keypatch/thkeypatch.cpp index 3f279cbe82c..50cb4ec0354 100644 --- a/thorlcr/activities/keypatch/thkeypatch.cpp +++ b/thorlcr/activities/keypatch/thkeypatch.cpp @@ -71,11 +71,7 @@ class CKeyPatchMaster : public CMasterActivity originalDesc.setown(originalIndexFile->getFileDescriptor()); patchDesc.setown(patchFile->getFileDescriptor()); - - Owned tlkDesc = originalDesc->getPart(originalDesc->numParts()-1); - const char *kind = tlkDesc->queryProperties().queryProp("@kind"); - local = NULL == kind || 0 != stricmp("topLevelKey", kind); - + local = !hasTLK(*originalIndexFile, this); if (!local && width > 1) width--; // 1 part == No n distributed / Monolithic key if (width > container.queryJob().querySlaves()) diff --git a/thorlcr/activities/merge/thmergeslave.cpp b/thorlcr/activities/merge/thmergeslave.cpp index 3dbfed026af..7d3e6fbf22d 100644 --- a/thorlcr/activities/merge/thmergeslave.cpp +++ b/thorlcr/activities/merge/thmergeslave.cpp @@ -47,6 +47,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity offset_t *partitionpos; size32_t chunkmaxsize; unsigned width; + unsigned rwFlags = 0x0; // flags for streams (e.g. compression flags) class cRemoteStream : implements IRowStream, public CSimpleInterface { @@ -220,7 +221,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity CThorKeyArray partition(*this, queryRowInterfaces(this),helper->querySerialize(),helper->queryCompare(),helper->queryCompareKey(),helper->queryCompareRowKey()); partition.deserialize(mb,false); - partition.calcPositions(tmpfile,sample); + partition.calcPositions(tmpfile, sample, rwFlags); partitionpos = new offset_t[width]; unsigned i; for (i=0;i writer = createRowWriter(tmpfile, this); + Owned writer = createRowWriter(tmpfile, this, rwFlags); CThorKeyArray sample(*this, this, helper->querySerialize(), helper->queryCompare(), helper->queryCompareKey(), helper->queryCompareRowKey()); sample.setSampling(MERGE_TRANSFER_BUFFER_SIZE); ActPrintLog("MERGE: start gather"); @@ -366,7 +380,7 @@ class GlobalMergeSlaveActivity : public CSlaveActivity offset_t end = partitionpos[idx]; if (pos>=end) return 0; - Owned rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end); // this is not good + Owned rs = createRowStreamEx(tmpfile, queryRowInterfaces(this), pos, end, (unsigned __int64)-1, rwFlags); // this is not good offset_t so = rs->getOffset(); size32_t len = 0; size32_t chunksize = chunkmaxsize; diff --git a/thorlcr/msort/tsorta.cpp b/thorlcr/msort/tsorta.cpp index 4db84072bd6..573db3d0e8e 100644 --- a/thorlcr/msort/tsorta.cpp +++ b/thorlcr/msort/tsorta.cpp @@ -443,7 +443,7 @@ offset_t CThorKeyArray::findLessRowPos(const void * row) return getFixedFilePos(p); } -void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample) +void CThorKeyArray::calcPositions(IFile *file, CThorKeyArray &sample, unsigned rwFlags) { // calculates positions based on sample // not fast! @@ -459,7 +459,7 @@ void CThorKeyArray::calcPositions(IFile *file,CThorKeyArray &sample) if (pos==(offset_t)-1) pos = 0; // should do bin-chop for fixed length but initially do sequential search - Owned s = createRowStreamEx(file, rowif, pos); + Owned s = createRowStreamEx(file, rowif, pos, (offset_t)-1, (unsigned __int64)-1, rwFlags); for (;;) { OwnedConstThorRow rowcmp = s->nextRow(); diff --git a/thorlcr/msort/tsorta.hpp b/thorlcr/msort/tsorta.hpp index ae8b1fe20d8..30097801299 100644 --- a/thorlcr/msort/tsorta.hpp +++ b/thorlcr/msort/tsorta.hpp @@ -114,7 +114,7 @@ class THORSORT_API CThorKeyArray void deserialize(MemoryBuffer &mb,bool append); void sort(); void createSortedPartition(unsigned pn); - void calcPositions(IFile *file,CThorKeyArray &sample); + void calcPositions(IFile *file, CThorKeyArray &sample, unsigned rwFlags); void setSampling(size32_t _maxsamplesize, unsigned _divisor=0); int keyCompare(unsigned a,unsigned b); offset_t getFixedFilePos(unsigned i); diff --git a/thorlcr/thorutil/thbuf.cpp b/thorlcr/thorutil/thbuf.cpp index ba6163fbba1..1f897008143 100644 --- a/thorlcr/thorutil/thbuf.cpp +++ b/thorlcr/thorutil/thbuf.cpp @@ -1302,6 +1302,10 @@ class CRowSet : public CSimpleInterface, implements IInterface { return rows.get(r); } + inline bool isFull() const + { + return rows.isFull(); + } }; class Chunk : public CInterface @@ -1493,8 +1497,7 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu } } rowsRead++; - const void *retrow = rowSet->getRow(row++); - return retrow; + return rowSet->getRow(row++); } virtual void stop() { @@ -1723,7 +1726,8 @@ class CSharedWriteAheadBase : public CSimpleInterface, implements ISharedSmartBu unsigned len=rowSize(row); CriticalBlock b(crit); bool paged = false; - if (totalOutChunkSize >= minChunkSize) // chunks required to be at least minChunkSize + // NB: The isFull condition ensures that we never expand inMemRows, which would cause a race with readers reading same set + if (totalOutChunkSize >= minChunkSize || inMemRows->isFull()) // chunks required to be at least minChunkSize, or if hits max capacity { unsigned reader=anyReaderBehind(); if (NotFound != reader) diff --git a/thorlcr/thorutil/thmem.hpp b/thorlcr/thorutil/thmem.hpp index afed88e3bdb..af695891265 100644 --- a/thorlcr/thorutil/thmem.hpp +++ b/thorlcr/thorutil/thmem.hpp @@ -327,7 +327,8 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface if (!resize(numRows+1)) return false; } - rows[numRows++] = row; + rows[numRows] = row; + numRows++; return true; } bool binaryInsert(const void *row, ICompare &compare, bool dropLast=false); // NB: takes ownership on success @@ -356,6 +357,7 @@ class graph_decl CThorExpandingRowArray : public CSimpleInterface } inline rowidx_t ordinality() const { return numRows; } inline rowidx_t queryMaxRows() const { return maxRows; } + inline bool isFull() const { return numRows >= maxRows; } inline const void **getRowArray() { return rows; } void swap(CThorExpandingRowArray &src); diff --git a/thorlcr/thorutil/thormisc.cpp b/thorlcr/thorutil/thormisc.cpp index 17e43771bba..16a42a352e0 100644 --- a/thorlcr/thorutil/thormisc.cpp +++ b/thorlcr/thorutil/thormisc.cpp @@ -31,6 +31,8 @@ #include "jsocket.hpp" #include "jmutex.hpp" +#include "jhtree.hpp" + #include "commonext.hpp" #include "dadfs.hpp" #include "dasds.hpp" @@ -1672,6 +1674,38 @@ void saveWuidToFile(const char *wuid) wuidFileIO->close(); } +bool hasTLK(IDistributedFile &file, CActivityBase *activity) +{ + unsigned np = file.numParts(); + if (np<=1) // NB: a better test would be to only continue if this is a width that is +1 of group it's based on, but not worth checking + return false; + IDistributedFilePart &part = file.queryPart(np-1); + bool keyHasTlk = strisame("topLevelKey", part.queryAttributes().queryProp("@kind")); + if (!keyHasTlk) + { + // See HPCC-32845 - check if TLK flag is missing from TLK part + // It is very likely the last part should be a TLK. Even a local key (>1 parts) has a TLK by default (see buildLocalTlks) + RemoteFilename rfn; + part.getFilename(rfn); + StringBuffer filename; + rfn.getPath(filename); + Owned index = createKeyIndex(filename, 0, false, 0); + dbgassertex(index); + if (index->isTopLevelKey()) + { + if (activity) + { + Owned e = MakeActivityException(activity, 0, "TLK file part of file %s is missing kind=\"topLevelKey\" flag. The meta data should be fixed!", file.queryLogicalName()); + reportExceptionToWorkunitCheckIgnore(activity->queryJob().queryWorkUnit(), e, SeverityWarning); + StringBuffer errMsg; + UWARNLOG("%s", e->errorMessage(errMsg).str()); + } + keyHasTlk = true; + } + } + return keyHasTlk; +} + std::vector captureDebugInfo(const char *_dir, const char *prefix, const char *suffix) { if (!recursiveCreateDirectory(_dir)) @@ -1716,4 +1750,4 @@ std::vector captureDebugInfo(const char *_dir, const char *prefix, return { }; } return { stacksFName.str() }; // JCSMORE capture/return other files -} \ No newline at end of file +} diff --git a/thorlcr/thorutil/thormisc.hpp b/thorlcr/thorutil/thormisc.hpp index 5900502e5d8..1bdbd5d9f67 100644 --- a/thorlcr/thorutil/thormisc.hpp +++ b/thorlcr/thorutil/thormisc.hpp @@ -723,6 +723,7 @@ class graph_decl CThorPerfTracer : protected PerfTracer extern graph_decl void saveWuidToFile(const char *wuid); +extern graph_decl bool hasTLK(IDistributedFile &file, CActivityBase *activity); extern graph_decl std::vector captureDebugInfo(const char *dir, const char *prefix, const char *suffix); #endif