Skip to content

Commit

Permalink
Connect CHThorGenericDiskWriteActivity to a BinaryDiskRowWriter
Browse files Browse the repository at this point in the history
  • Loading branch information
jackdelv committed Oct 1, 2024
1 parent a3be103 commit 071298b
Show file tree
Hide file tree
Showing 6 changed files with 238 additions and 33 deletions.
140 changes: 130 additions & 10 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2027,20 +2027,85 @@ void RemoteDiskRowReader::stop()
{
}

///---------------------------------------------------------------------------------------------------------------------

class DiskRowWriter : public CInterfaceOf<ILogicalRowSink>, implements IDiskRowWriter
{
public:
DiskRowWriter(IDiskReadMapping * _mapping) : mapping(_mapping) {}
IMPLEMENT_IINTERFACE_USING(CInterfaceOf<ILogicalRowSink>);

ILogicalRowSink * queryRowSink() override;

protected:
Linked<IDiskReadMapping> mapping;
};

ILogicalRowSink * DiskRowWriter::queryRowSink()
{
return this;
}

class BinaryDiskRowWriter : public DiskRowWriter
{
public:
BinaryDiskRowWriter(IDiskReadMapping * _mapping) : DiskRowWriter(_mapping) {}

bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) override;

bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) override;

void putRow(const void *row) override; // takes ownership of row. rename to putOwnedRow?
void flush() override;
void noteStopped() override;

void writeRow(const void *row) override;
};


bool BinaryDiskRowWriter::matches(const char * format, bool streamRemote, IDiskReadMapping * mapping)
{
return strieq(format, "flat");
}

bool BinaryDiskRowWriter::setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions)
{
return false;
}

void BinaryDiskRowWriter::putRow(const void *row)
{
throwUnexpected();
}

void BinaryDiskRowWriter::flush()
{
throwUnexpected();
}

void BinaryDiskRowWriter::noteStopped()
{
throwUnexpected();
}

void BinaryDiskRowWriter::writeRow(const void *row)
{
throwUnexpected();
}

///---------------------------------------------------------------------------------------------------------------------

// Lookup to map the names of file types/formats to their object constructors;
// map will be initialized within MODULE_INIT
static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeMap;

static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeReadersMap;
static std::map<std::string, std::function<DiskRowWriter*(IDiskReadMapping*)>> genericFileTypeWritersMap;

// format is assumed to be lowercase
IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping)
{
auto foundReader = genericFileTypeMap.find(format);
auto foundReader = genericFileTypeReadersMap.find(format);

if (foundReader != genericFileTypeMap.end() && foundReader->second)
if (foundReader != genericFileTypeReadersMap.end() && foundReader->second)
return foundReader->second(_mapping);

UNIMPLEMENTED;
Expand Down Expand Up @@ -2085,19 +2150,74 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
genericFileTypeMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); });
#ifdef _USE_PARQUET
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
#else
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; });
genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; });
#endif

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++)
for (auto iter = genericFileTypeReadersMap.begin(); iter != genericFileTypeReadersMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
}

// format is assumed to be lowercase
IDiskRowWriter * doCreateLocalDiskWriter(const char * format, IDiskReadMapping * _mapping)
{
auto foundReader = genericFileTypeWritersMap.find(format);

if (foundReader != genericFileTypeWritersMap.end() && foundReader->second)
return foundReader->second(_mapping);

UNIMPLEMENTED;
}

IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping)
{
Owned<IDiskRowWriter> directReader = doCreateLocalDiskWriter(format, mapping);
if (mapping->expectedMatchesProjected() || strieq(format, "flat"))
return directReader.getClear();

// Owned<IDiskReadMapping> expectedMapping = createUnprojectedMapping(mapping);
// Owned<IDiskRowWriter> expectedReader = doCreateLocalDiskWriter(format, expectedMapping);
// return new AlternativeDiskRowWriter(directReader, expectedReader, mapping);
return nullptr;
}

// IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * _mapping)
// {
// return new RemoteDiskRowWriter(format, _mapping);
// }

IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * _mapping)
{
// if (streamRemote)
// return createRemoteDiskWriter(format, _mapping);
// else
// return createLocalDiskWriter(format, _mapping);

return createLocalDiskWriter(format, _mapping);
}

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
// All pluggable file types that use the generic disk reader
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeWritersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowWriter(_mapping); });

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeWritersMap.begin(); iter != genericFileTypeWritersMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
Expand Down
3 changes: 3 additions & 0 deletions common/thorhelper/thorread.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -109,4 +109,7 @@ extern THORHELPER_API IDiskRowReader * createLocalDiskReader(const char * format
extern THORHELPER_API IDiskRowReader * createRemoteDiskReader(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskReadMapping * mapping);

extern THORHELPER_API IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping);
// extern THORHELPER_API IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * mapping);
#endif
2 changes: 1 addition & 1 deletion ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
{
// bool isGeneric = (((IHThorNewDiskReadArg &)arg).getFlags() & TDXgeneric) != 0;
if (1)
return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorNewDiskReadArg &)arg, kind, graph, node);
return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph, node);
else
return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph);
}
Expand Down
93 changes: 80 additions & 13 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12003,28 +12003,43 @@ const void *CHThorGenericDiskReadActivity::nextRow()
return NULL;
}

CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg)
CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), inputOptions(_node)
{

}

void CHThorGenericDiskWriteBaseActivity::ready()
{
// Implementation here
CHThorActivityBase::ready();
}

void CHThorGenericDiskWriteBaseActivity::stop()
{
// Implementation here
CHThorActivityBase::stop();
}

void CHThorGenericDiskWriteBaseActivity::execute()
{
// Implementation here
CHThorActivityBase::execute();
}

IDiskRowWriter * CHThorGenericDiskWriteBaseActivity::ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options)
{
Owned<IDiskReadMapping> mapping = createDiskReadMapping(RecordTranslationMode::None, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options);

ForEachItemIn(i, writers)
{
IDiskRowWriter & cur = writers.item(i);
if (cur.matches(format, streamRemote, mapping))
return &cur;
}
IDiskRowWriter * writer = createDiskWriter(format, streamRemote, mapping);
writers.append(*writer);
return writer;
}

CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorGenericDiskWriteBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
{

Expand All @@ -12033,23 +12048,75 @@ CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_a
// Implement all pure virtual functions from CHThorGenericDiskWriteBaseActivity
void CHThorGenericDiskWriteActivity::ready()
{
// Implementation here
PARENT::ready();
if (!activeWriter)
activeWriter = ensureRowWriter("flat", false, helper.getFormatCrc(), *helper.queryDiskRecordSize(), helper.getFormatCrc(), *helper.queryDiskRecordSize(), 0, *helper.queryDiskRecordSize(), inputOptions);
outSeq.setown(activeWriter->queryRowSink());
}

void CHThorGenericDiskWriteActivity::stop()
{
// Implementation here
PARENT::stop();
}

void CHThorGenericDiskWriteActivity::execute()
{
// Implementation here
// Loop thru the results
numRecords = 0;
while (next())
numRecords++;
finishOutput();
}

const void * CHThorGenericDiskWriteActivity::getNext()
{ // through operation (writes and returns row)
// needs a one row lookahead to preserve group
if (!nextrow.get())
{
nextrow.setown(input->nextRow());
if (!nextrow.get())
{
nextrow.setown(input->nextRow());
if (nextrow.get()&&grouped) // only write eog if not at eof
outSeq->putRow(NULL);
return NULL;
}
}
outSeq->putRow(nextrow.getLink());
checkSizeLimit();
return nextrow.getClear();
}

bool CHThorGenericDiskWriteActivity::finishOutput()
{
return true;
}

bool CHThorGenericDiskWriteActivity::next()
{
if (!nextrow.get())
{
OwnedConstRoxieRow row(input->nextRow());
if (!row.get())
{
row.setown(input->nextRow());
if (!row.get())
return false; // we are done
if (grouped)
outSeq->putRow(NULL);
}
outSeq->putRow(row.getClear());
}
else
outSeq->putRow(nextrow.getClear());
checkSizeLimit();
return true;
}

const void *CHThorGenericDiskWriteActivity::nextRow()
void CHThorGenericDiskWriteActivity::checkSizeLimit()
{
// Implementation here
return nullptr;
if (sizeLimit && (numRecords >= sizeLimit))
throw MakeStringException(0, "Size limit exceeded");
}

//=====================================================================================================
Expand Down Expand Up @@ -12119,7 +12186,7 @@ extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_a
return new CHThorGenericDiskReadActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node);
}

extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node)
extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node)
{
return new CHThorGenericDiskWriteActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node);
}
Expand Down
2 changes: 1 addition & 1 deletion ecl/hthor/hthor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext

extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);
extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);
extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);

extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
Expand Down
31 changes: 23 additions & 8 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
Expand Up @@ -3244,12 +3244,16 @@ protected:
class CHThorGenericDiskWriteBaseActivity : public CHThorActivityBase/*, implements IThorDiskCallback, implements IIndexWriteContext, public IFileCollectionContext*/
{
protected:
IHThorNewDiskReadBaseArg &helper;
IDiskRowWriter * activeReader = nullptr;
IHThorDiskWriteArg &helper;
IDiskRowWriter * activeWriter = nullptr;
CLogicalFileCollection files;
IArrayOf<IDiskRowWriter> writers;
Owned<IPropertyTree> inputOptions;
bool outputGrouped = false;
OwnedConstRoxieRow nextrow; // needed for grouped spill

public:
CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadBaseArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
IMPLEMENT_IINTERFACE_USING(CHThorActivityBase)

virtual void ready();
Expand All @@ -3259,23 +3263,34 @@ public:
//interface IHThorInput
virtual bool isGrouped() { return outputGrouped; }
virtual IOutputMetaData * queryOutputMeta() const { return outputMeta; }

protected:
IDiskRowWriter * ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options);
};

class CHThorGenericDiskWriteActivity : public CHThorGenericDiskWriteBaseActivity
{
typedef CHThorGenericDiskWriteBaseActivity PARENT;
protected:
IHThorNewDiskReadArg &helper;
IHThorDiskWriteArg &helper;
bool grouped;
Owned<ILogicalRowSink> outSeq;
unsigned __int64 numRecords;
offset_t sizeLimit;

bool finishOutput();
bool next();
const void * getNext();
void checkSizeLimit();
public:
CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
IMPLEMENT_SINKACTIVITY

CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);

virtual void ready();
virtual void stop();
virtual void execute();
virtual bool needsAllocator() const { return true; }

//interface IHThorInput
virtual const void *nextRow();
};


Expand Down

0 comments on commit 071298b

Please sign in to comment.