From 071298bb10f28f177fce66efa42238ecb4ba09d8 Mon Sep 17 00:00:00 2001 From: Jack Del Vecchio Date: Tue, 1 Oct 2024 09:04:50 -0400 Subject: [PATCH] Connect CHThorGenericDiskWriteActivity to a BinaryDiskRowWriter --- common/thorhelper/thorread.cpp | 140 ++++++++++++++++++++++++++++++--- common/thorhelper/thorread.hpp | 3 + ecl/eclagent/eclgraph.cpp | 2 +- ecl/hthor/hthor.cpp | 93 +++++++++++++++++++--- ecl/hthor/hthor.hpp | 2 +- ecl/hthor/hthor.ipp | 31 ++++++-- 6 files changed, 238 insertions(+), 33 deletions(-) diff --git a/common/thorhelper/thorread.cpp b/common/thorhelper/thorread.cpp index 2b48cab2fcf..34d101bf0b1 100644 --- a/common/thorhelper/thorread.cpp +++ b/common/thorhelper/thorread.cpp @@ -2027,20 +2027,85 @@ void RemoteDiskRowReader::stop() { } +///--------------------------------------------------------------------------------------------------------------------- + +class DiskRowWriter : public CInterfaceOf, implements IDiskRowWriter +{ +public: + DiskRowWriter(IDiskReadMapping * _mapping) : mapping(_mapping) {} + IMPLEMENT_IINTERFACE_USING(CInterfaceOf); + + ILogicalRowSink * queryRowSink() override; + +protected: + Linked mapping; +}; + +ILogicalRowSink * DiskRowWriter::queryRowSink() +{ + return this; +} + +class BinaryDiskRowWriter : public DiskRowWriter +{ +public: + BinaryDiskRowWriter(IDiskReadMapping * _mapping) : DiskRowWriter(_mapping) {} + + bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) override; + + bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) override; + + void putRow(const void *row) override; // takes ownership of row. rename to putOwnedRow? + void flush() override; + void noteStopped() override; + + void writeRow(const void *row) override; +}; + + +bool BinaryDiskRowWriter::matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) +{ + return strieq(format, "flat"); +} + +bool BinaryDiskRowWriter::setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) +{ + return false; +} + +void BinaryDiskRowWriter::putRow(const void *row) +{ + throwUnexpected(); +} + +void BinaryDiskRowWriter::flush() +{ + throwUnexpected(); +} + +void BinaryDiskRowWriter::noteStopped() +{ + throwUnexpected(); +} + +void BinaryDiskRowWriter::writeRow(const void *row) +{ + throwUnexpected(); +} ///--------------------------------------------------------------------------------------------------------------------- // Lookup to map the names of file types/formats to their object constructors; // map will be initialized within MODULE_INIT -static std::map> genericFileTypeMap; - +static std::map> genericFileTypeReadersMap; +static std::map> genericFileTypeWritersMap; // format is assumed to be lowercase IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping) { - auto foundReader = genericFileTypeMap.find(format); + auto foundReader = genericFileTypeReadersMap.find(format); - if (foundReader != genericFileTypeMap.end() && foundReader->second) + if (foundReader != genericFileTypeReadersMap.end() && foundReader->second) return foundReader->second(_mapping); UNIMPLEMENTED; @@ -2085,19 +2150,74 @@ MODULE_INIT(INIT_PRIORITY_STANDARD) // should be defined here; the key is the lowecase name of the format, // as will be used in ECL, and the value should be a lambda // that creates the appropriate disk row reader object - genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); }); - genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); }); - genericFileTypeMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); }); #ifdef _USE_PARQUET - genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); }); + genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); }); #else - genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; }); + genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; }); #endif // Stuff the file type names that were just instantiated into a list; // list will be accessed by the ECL compiler to validate the names // at compile time - for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++) + for (auto iter = genericFileTypeReadersMap.begin(); iter != genericFileTypeReadersMap.end(); iter++) + addAvailableGenericFileTypeName(iter->first.c_str()); + + return true; +} + +// format is assumed to be lowercase +IDiskRowWriter * doCreateLocalDiskWriter(const char * format, IDiskReadMapping * _mapping) +{ + auto foundReader = genericFileTypeWritersMap.find(format); + + if (foundReader != genericFileTypeWritersMap.end() && foundReader->second) + return foundReader->second(_mapping); + + UNIMPLEMENTED; +} + +IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping) +{ + Owned directReader = doCreateLocalDiskWriter(format, mapping); + if (mapping->expectedMatchesProjected() || strieq(format, "flat")) + return directReader.getClear(); + + // Owned expectedMapping = createUnprojectedMapping(mapping); + // Owned expectedReader = doCreateLocalDiskWriter(format, expectedMapping); + // return new AlternativeDiskRowWriter(directReader, expectedReader, mapping); + return nullptr; +} + +// IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * _mapping) +// { +// return new RemoteDiskRowWriter(format, _mapping); +// } + +IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * _mapping) +{ + // if (streamRemote) + // return createRemoteDiskWriter(format, _mapping); + // else + // return createLocalDiskWriter(format, _mapping); + + return createLocalDiskWriter(format, _mapping); +} + +MODULE_INIT(INIT_PRIORITY_STANDARD) +{ + // All pluggable file types that use the generic disk reader + // should be defined here; the key is the lowecase name of the format, + // as will be used in ECL, and the value should be a lambda + // that creates the appropriate disk row reader object + genericFileTypeWritersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowWriter(_mapping); }); + + // Stuff the file type names that were just instantiated into a list; + // list will be accessed by the ECL compiler to validate the names + // at compile time + for (auto iter = genericFileTypeWritersMap.begin(); iter != genericFileTypeWritersMap.end(); iter++) addAvailableGenericFileTypeName(iter->first.c_str()); return true; diff --git a/common/thorhelper/thorread.hpp b/common/thorhelper/thorread.hpp index 0f26a91c2d2..604497add5d 100644 --- a/common/thorhelper/thorread.hpp +++ b/common/thorhelper/thorread.hpp @@ -109,4 +109,7 @@ extern THORHELPER_API IDiskRowReader * createLocalDiskReader(const char * format extern THORHELPER_API IDiskRowReader * createRemoteDiskReader(const char * format, IDiskReadMapping * mapping); extern THORHELPER_API IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskReadMapping * mapping); +extern THORHELPER_API IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping); +// extern THORHELPER_API IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * mapping); +extern THORHELPER_API IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * mapping); #endif diff --git a/ecl/eclagent/eclgraph.cpp b/ecl/eclagent/eclgraph.cpp index 8d2a841f535..0686ccc2c87 100644 --- a/ecl/eclagent/eclgraph.cpp +++ b/ecl/eclagent/eclgraph.cpp @@ -56,7 +56,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI { // bool isGeneric = (((IHThorNewDiskReadArg &)arg).getFlags() & TDXgeneric) != 0; if (1) - return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorNewDiskReadArg &)arg, kind, graph, node); + return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph, node); else return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph); } diff --git a/ecl/hthor/hthor.cpp b/ecl/hthor/hthor.cpp index 9ae780bf5c1..ce6ea32a343 100644 --- a/ecl/hthor/hthor.cpp +++ b/ecl/hthor/hthor.cpp @@ -12003,28 +12003,43 @@ const void *CHThorGenericDiskReadActivity::nextRow() return NULL; } -CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) -: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg) +CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) +: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), inputOptions(_node) { } void CHThorGenericDiskWriteBaseActivity::ready() { - // Implementation here + CHThorActivityBase::ready(); } void CHThorGenericDiskWriteBaseActivity::stop() { - // Implementation here + CHThorActivityBase::stop(); } void CHThorGenericDiskWriteBaseActivity::execute() { - // Implementation here + CHThorActivityBase::execute(); +} + +IDiskRowWriter * CHThorGenericDiskWriteBaseActivity::ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options) +{ + Owned mapping = createDiskReadMapping(RecordTranslationMode::None, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options); + + ForEachItemIn(i, writers) + { + IDiskRowWriter & cur = writers.item(i); + if (cur.matches(format, streamRemote, mapping)) + return &cur; + } + IDiskRowWriter * writer = createDiskWriter(format, streamRemote, mapping); + writers.append(*writer); + return writer; } -CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) +CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node) : CHThorGenericDiskWriteBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg) { @@ -12033,23 +12048,75 @@ CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_a // Implement all pure virtual functions from CHThorGenericDiskWriteBaseActivity void CHThorGenericDiskWriteActivity::ready() { - // Implementation here + PARENT::ready(); + if (!activeWriter) + activeWriter = ensureRowWriter("flat", false, helper.getFormatCrc(), *helper.queryDiskRecordSize(), helper.getFormatCrc(), *helper.queryDiskRecordSize(), 0, *helper.queryDiskRecordSize(), inputOptions); + outSeq.setown(activeWriter->queryRowSink()); } void CHThorGenericDiskWriteActivity::stop() { - // Implementation here + PARENT::stop(); } void CHThorGenericDiskWriteActivity::execute() { - // Implementation here + // Loop thru the results + numRecords = 0; + while (next()) + numRecords++; + finishOutput(); +} + +const void * CHThorGenericDiskWriteActivity::getNext() +{ // through operation (writes and returns row) + // needs a one row lookahead to preserve group + if (!nextrow.get()) + { + nextrow.setown(input->nextRow()); + if (!nextrow.get()) + { + nextrow.setown(input->nextRow()); + if (nextrow.get()&&grouped) // only write eog if not at eof + outSeq->putRow(NULL); + return NULL; + } + } + outSeq->putRow(nextrow.getLink()); + checkSizeLimit(); + return nextrow.getClear(); +} + +bool CHThorGenericDiskWriteActivity::finishOutput() +{ + return true; +} + +bool CHThorGenericDiskWriteActivity::next() +{ + if (!nextrow.get()) + { + OwnedConstRoxieRow row(input->nextRow()); + if (!row.get()) + { + row.setown(input->nextRow()); + if (!row.get()) + return false; // we are done + if (grouped) + outSeq->putRow(NULL); + } + outSeq->putRow(row.getClear()); + } + else + outSeq->putRow(nextrow.getClear()); + checkSizeLimit(); + return true; } -const void *CHThorGenericDiskWriteActivity::nextRow() +void CHThorGenericDiskWriteActivity::checkSizeLimit() { - // Implementation here - return nullptr; + if (sizeLimit && (numRecords >= sizeLimit)) + throw MakeStringException(0, "Size limit exceeded"); } //===================================================================================================== @@ -12119,7 +12186,7 @@ extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_a return new CHThorGenericDiskReadActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node); } -extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node) +extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node) { return new CHThorGenericDiskWriteActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node); } diff --git a/ecl/hthor/hthor.hpp b/ecl/hthor/hthor.hpp index ee958dbb487..9f2ede59cac 100644 --- a/ecl/hthor/hthor.hpp +++ b/ecl/hthor/hthor.hpp @@ -174,7 +174,7 @@ extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node); extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node); -extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node); +extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node); extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node); extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node); diff --git a/ecl/hthor/hthor.ipp b/ecl/hthor/hthor.ipp index 4971638fc41..ae770538216 100644 --- a/ecl/hthor/hthor.ipp +++ b/ecl/hthor/hthor.ipp @@ -3244,12 +3244,16 @@ protected: class CHThorGenericDiskWriteBaseActivity : public CHThorActivityBase/*, implements IThorDiskCallback, implements IIndexWriteContext, public IFileCollectionContext*/ { protected: - IHThorNewDiskReadBaseArg &helper; - IDiskRowWriter * activeReader = nullptr; + IHThorDiskWriteArg &helper; + IDiskRowWriter * activeWriter = nullptr; CLogicalFileCollection files; + IArrayOf writers; + Owned inputOptions; bool outputGrouped = false; + OwnedConstRoxieRow nextrow; // needed for grouped spill + public: - CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node); + CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node); IMPLEMENT_IINTERFACE_USING(CHThorActivityBase) virtual void ready(); @@ -3259,23 +3263,34 @@ public: //interface IHThorInput virtual bool isGrouped() { return outputGrouped; } virtual IOutputMetaData * queryOutputMeta() const { return outputMeta; } + + protected: + IDiskRowWriter * ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options); }; class CHThorGenericDiskWriteActivity : public CHThorGenericDiskWriteBaseActivity { typedef CHThorGenericDiskWriteBaseActivity PARENT; protected: - IHThorNewDiskReadArg &helper; + IHThorDiskWriteArg &helper; + bool grouped; + Owned outSeq; + unsigned __int64 numRecords; + offset_t sizeLimit; + + bool finishOutput(); + bool next(); + const void * getNext(); + void checkSizeLimit(); public: - CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node); + IMPLEMENT_SINKACTIVITY + + CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node); virtual void ready(); virtual void stop(); virtual void execute(); virtual bool needsAllocator() const { return true; } - - //interface IHThorInput - virtual const void *nextRow(); };