Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HPCC-30787 #19168

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 130 additions & 10 deletions common/thorhelper/thorread.cpp
Original file line number Diff line number Diff line change
@@ -2027,20 +2027,85 @@ void RemoteDiskRowReader::stop()
{
}

///---------------------------------------------------------------------------------------------------------------------

class DiskRowWriter : public CInterfaceOf<ILogicalRowSink>, implements IDiskRowWriter
{
public:
DiskRowWriter(IDiskReadMapping * _mapping) : mapping(_mapping) {}
IMPLEMENT_IINTERFACE_USING(CInterfaceOf<ILogicalRowSink>);

ILogicalRowSink * queryRowSink() override;

protected:
Linked<IDiskReadMapping> mapping;
};

ILogicalRowSink * DiskRowWriter::queryRowSink()
{
return this;
}

class BinaryDiskRowWriter : public DiskRowWriter
{
public:
BinaryDiskRowWriter(IDiskReadMapping * _mapping) : DiskRowWriter(_mapping) {}

bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) override;

bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) override;

void putRow(const void *row) override; // takes ownership of row. rename to putOwnedRow?
void flush() override;
void noteStopped() override;

void writeRow(const void *row) override;
};


bool BinaryDiskRowWriter::matches(const char * format, bool streamRemote, IDiskReadMapping * mapping)
{
return strieq(format, "flat");
}

bool BinaryDiskRowWriter::setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions)
{
return false;
}

void BinaryDiskRowWriter::putRow(const void *row)
{
throwUnexpected();
}

void BinaryDiskRowWriter::flush()
{
throwUnexpected();
}

void BinaryDiskRowWriter::noteStopped()
{
throwUnexpected();
}

void BinaryDiskRowWriter::writeRow(const void *row)
{
throwUnexpected();
}

///---------------------------------------------------------------------------------------------------------------------

// Lookup to map the names of file types/formats to their object constructors;
// map will be initialized within MODULE_INIT
static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeMap;

static std::map<std::string, std::function<DiskRowReader*(IDiskReadMapping*)>> genericFileTypeReadersMap;
static std::map<std::string, std::function<DiskRowWriter*(IDiskReadMapping*)>> genericFileTypeWritersMap;

// format is assumed to be lowercase
IDiskRowReader * doCreateLocalDiskReader(const char * format, IDiskReadMapping * _mapping)
{
auto foundReader = genericFileTypeMap.find(format);
auto foundReader = genericFileTypeReadersMap.find(format);

if (foundReader != genericFileTypeMap.end() && foundReader->second)
if (foundReader != genericFileTypeReadersMap.end() && foundReader->second)
return foundReader->second(_mapping);

UNIMPLEMENTED;
@@ -2085,19 +2150,74 @@ MODULE_INIT(INIT_PRIORITY_STANDARD)
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
genericFileTypeMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
genericFileTypeMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("csv", [](IDiskReadMapping * _mapping) { return new CsvDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace("xml", [](IDiskReadMapping * _mapping) { return new XmlDiskRowReader(_mapping); });
#ifdef _USE_PARQUET
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return new ParquetDiskRowReader(_mapping); });
#else
genericFileTypeMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; });
genericFileTypeReadersMap.emplace(PARQUET_FILE_TYPE_NAME, [](IDiskReadMapping * _mapping) { return nullptr; });
#endif

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeMap.begin(); iter != genericFileTypeMap.end(); iter++)
for (auto iter = genericFileTypeReadersMap.begin(); iter != genericFileTypeReadersMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
}

// format is assumed to be lowercase
IDiskRowWriter * doCreateLocalDiskWriter(const char * format, IDiskReadMapping * _mapping)
{
auto foundReader = genericFileTypeWritersMap.find(format);

if (foundReader != genericFileTypeWritersMap.end() && foundReader->second)
return foundReader->second(_mapping);

UNIMPLEMENTED;
}

IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping)
{
Owned<IDiskRowWriter> directReader = doCreateLocalDiskWriter(format, mapping);
if (mapping->expectedMatchesProjected() || strieq(format, "flat"))
return directReader.getClear();

// Owned<IDiskReadMapping> expectedMapping = createUnprojectedMapping(mapping);
// Owned<IDiskRowWriter> expectedReader = doCreateLocalDiskWriter(format, expectedMapping);
// return new AlternativeDiskRowWriter(directReader, expectedReader, mapping);
return nullptr;
}

// IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * _mapping)
// {
// return new RemoteDiskRowWriter(format, _mapping);
// }

IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * _mapping)
{
// if (streamRemote)
// return createRemoteDiskWriter(format, _mapping);
// else
// return createLocalDiskWriter(format, _mapping);

return createLocalDiskWriter(format, _mapping);
}

MODULE_INIT(INIT_PRIORITY_STANDARD)
{
// All pluggable file types that use the generic disk reader
// should be defined here; the key is the lowecase name of the format,
// as will be used in ECL, and the value should be a lambda
// that creates the appropriate disk row reader object
genericFileTypeWritersMap.emplace("flat", [](IDiskReadMapping * _mapping) { return new BinaryDiskRowWriter(_mapping); });

// Stuff the file type names that were just instantiated into a list;
// list will be accessed by the ECL compiler to validate the names
// at compile time
for (auto iter = genericFileTypeWritersMap.begin(); iter != genericFileTypeWritersMap.end(); iter++)
addAvailableGenericFileTypeName(iter->first.c_str());

return true;
30 changes: 23 additions & 7 deletions common/thorhelper/thorread.hpp
Original file line number Diff line number Diff line change
@@ -68,18 +68,16 @@ THORHELPER_API IDiskReadMapping * createDiskReadMapping(RecordTranslationMode mo


typedef IConstArrayOf<IFieldFilter> FieldFilterArray;
interface IRowReader : extends IInterface
{
public:
// get the interface for reading streams of row. outputAllocator can be null if allocating next is not used.
virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0;
};

interface ITranslator;
class CLogicalFileSlice;
interface IDiskRowReader : extends IRowReader

interface IDiskRowReader : extends IInterface
{
public:
// get the interface for reading streams of row. outputAllocator can be null if allocating next is not used.
virtual IDiskRowStream * queryAllocatedRowStream(IEngineRowAllocator * _outputAllocator) = 0;

virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) = 0;

//Specify where the raw binary input for a particular file is coming from, together with its actual format.
@@ -91,9 +89,27 @@ interface IDiskRowReader : extends IRowReader
virtual bool setInputFile(const CLogicalFileSlice & slice, const FieldFilterArray & expectedFilter, unsigned copy) = 0;
};

interface IDiskRowWriter : extends IInterface
{
public:
//Get the interface for writing streams of row.
virtual ILogicalRowSink * queryRowSink() = 0;


//MORE: Should be a disk write mapping
virtual bool matches(const char * format, bool streamRemote, IDiskReadMapping * mapping) = 0;

//Specify where the data is being written to. I'm not sure that output options are needed (can be passed in the constructor)
virtual bool setOutputFile(const char * localFilename, const char * logicalFilename, unsigned partNumber, const IPropertyTree * outputOptions) = 0;
};


//Create a row reader for a thor binary file. The expected, projected, actual and options never change. The file providing the data can change.
extern THORHELPER_API IDiskRowReader * createLocalDiskReader(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowReader * createRemoteDiskReader(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowReader * createDiskReader(const char * format, bool streamRemote, IDiskReadMapping * mapping);

extern THORHELPER_API IDiskRowWriter * createLocalDiskWriter(const char * format, IDiskReadMapping * mapping);
// extern THORHELPER_API IDiskRowWriter * createRemoteDiskWriter(const char * format, IDiskReadMapping * mapping);
extern THORHELPER_API IDiskRowWriter * createDiskWriter(const char * format, bool streamRemote, IDiskReadMapping * mapping);
#endif
8 changes: 7 additions & 1 deletion ecl/eclagent/eclgraph.cpp
Original file line number Diff line number Diff line change
@@ -53,7 +53,13 @@ static IHThorActivity * createActivity(IAgentContext & agent, unsigned activityI
{
case TAKdiskwrite:
case TAKspillwrite:
return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph);
{
// bool isGeneric = (((IHThorNewDiskReadArg &)arg).getFlags() & TDXgeneric) != 0;
if (1)
return createGenericDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph, node);
else
return createDiskWriteActivity(agent, activityId, subgraphId, (IHThorDiskWriteArg &)arg, kind, graph);
}
case TAKsort:
return createGroupSortActivity(agent, activityId, subgraphId, (IHThorSortArg &)arg, kind, graph);
case TAKdedup:
121 changes: 121 additions & 0 deletions ecl/hthor/hthor.cpp
Original file line number Diff line number Diff line change
@@ -12003,6 +12003,122 @@ const void *CHThorGenericDiskReadActivity::nextRow()
return NULL;
}

CHThorGenericDiskWriteBaseActivity::CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorActivityBase(_agent, _activityId, _subgraphId, _arg, _kind, _graph), helper(_arg), inputOptions(_node)
{

}

void CHThorGenericDiskWriteBaseActivity::ready()
{
CHThorActivityBase::ready();
}

void CHThorGenericDiskWriteBaseActivity::stop()
{
CHThorActivityBase::stop();
}

void CHThorGenericDiskWriteBaseActivity::execute()
{
CHThorActivityBase::execute();
}

IDiskRowWriter * CHThorGenericDiskWriteBaseActivity::ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options)
{
Owned<IDiskReadMapping> mapping = createDiskReadMapping(RecordTranslationMode::None, format, actualCrc, actual, expectedCrc, expected, projectedCrc, projected, options);

ForEachItemIn(i, writers)
{
IDiskRowWriter & cur = writers.item(i);
if (cur.matches(format, streamRemote, mapping))
return &cur;
}
IDiskRowWriter * writer = createDiskWriter(format, streamRemote, mapping);
writers.append(*writer);
return writer;
}

CHThorGenericDiskWriteActivity::CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node)
: CHThorGenericDiskWriteBaseActivity(_agent, _activityId, _subgraphId, _arg, _kind, _graph, _node), helper(_arg)
{

}

// Implement all pure virtual functions from CHThorGenericDiskWriteBaseActivity
void CHThorGenericDiskWriteActivity::ready()
{
PARENT::ready();
if (!activeWriter)
activeWriter = ensureRowWriter("flat", false, helper.getFormatCrc(), *helper.queryDiskRecordSize(), helper.getFormatCrc(), *helper.queryDiskRecordSize(), 0, *helper.queryDiskRecordSize(), inputOptions);
outSeq.setown(activeWriter->queryRowSink());
}

void CHThorGenericDiskWriteActivity::stop()
{
PARENT::stop();
}

void CHThorGenericDiskWriteActivity::execute()
{
// Loop thru the results
numRecords = 0;
while (next())
numRecords++;
finishOutput();
}

const void * CHThorGenericDiskWriteActivity::getNext()
{ // through operation (writes and returns row)
// needs a one row lookahead to preserve group
if (!nextrow.get())
{
nextrow.setown(input->nextRow());
if (!nextrow.get())
{
nextrow.setown(input->nextRow());
if (nextrow.get()&&grouped) // only write eog if not at eof
outSeq->putRow(NULL);
return NULL;
}
}
outSeq->putRow(nextrow.getLink());
checkSizeLimit();
return nextrow.getClear();
}

bool CHThorGenericDiskWriteActivity::finishOutput()
{
return true;
}

bool CHThorGenericDiskWriteActivity::next()
{
if (!nextrow.get())
{
OwnedConstRoxieRow row(input->nextRow());
if (!row.get())
{
row.setown(input->nextRow());
if (!row.get())
return false; // we are done
if (grouped)
outSeq->putRow(NULL);
}
outSeq->putRow(row.getClear());
}
else
outSeq->putRow(nextrow.getClear());
checkSizeLimit();
return true;
}

void CHThorGenericDiskWriteActivity::checkSizeLimit()
{
if (sizeLimit && (numRecords >= sizeLimit))
throw MakeStringException(0, "Size limit exceeded");
}

//=====================================================================================================

MAKEFACTORY(DiskWrite);
@@ -12070,6 +12186,11 @@ extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_a
return new CHThorGenericDiskReadActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node);
}

extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node)
{
return new CHThorGenericDiskWriteActivity(_agent, _activityId, _subgraphId, arg, kind, _graph, node);
}

MAKEFACTORY(Null);
MAKEFACTORY(SideEffect);
MAKEFACTORY(Action);
1 change: 1 addition & 0 deletions ecl/hthor/hthor.hpp
Original file line number Diff line number Diff line change
@@ -174,6 +174,7 @@ extern HTHOR_API IHThorActivity *createDiskGroupAggregateActivity(IAgentContext

extern HTHOR_API IHThorActivity *createNewDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *node);
extern HTHOR_API IHThorActivity *createGenericDiskReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorNewDiskReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);
extern HTHOR_API IHThorActivity *createGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree * node);

extern HTHOR_API IHThorActivity *createIndexReadActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexReadArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
extern HTHOR_API IHThorActivity *createIndexNormalizeActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorIndexNormalizeArg &arg, ThorActivityKind kind, EclGraph & _graph, IPropertyTree *_node);
52 changes: 52 additions & 0 deletions ecl/hthor/hthor.ipp
Original file line number Diff line number Diff line change
@@ -3241,6 +3241,58 @@ protected:
void onLimitExceeded();
};

class CHThorGenericDiskWriteBaseActivity : public CHThorActivityBase/*, implements IThorDiskCallback, implements IIndexWriteContext, public IFileCollectionContext*/
{
protected:
IHThorDiskWriteArg &helper;
IDiskRowWriter * activeWriter = nullptr;
CLogicalFileCollection files;
IArrayOf<IDiskRowWriter> writers;
Owned<IPropertyTree> inputOptions;
bool outputGrouped = false;
OwnedConstRoxieRow nextrow; // needed for grouped spill

public:
CHThorGenericDiskWriteBaseActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);
IMPLEMENT_IINTERFACE_USING(CHThorActivityBase)

virtual void ready();
virtual void stop();
virtual void execute();

//interface IHThorInput
virtual bool isGrouped() { return outputGrouped; }
virtual IOutputMetaData * queryOutputMeta() const { return outputMeta; }

protected:
IDiskRowWriter * ensureRowWriter(const char * format, bool streamRemote, unsigned expectedCrc, IOutputMetaData & expected, unsigned projectedCrc, IOutputMetaData & projected, unsigned actualCrc, IOutputMetaData & actual, const IPropertyTree * options);
};

class CHThorGenericDiskWriteActivity : public CHThorGenericDiskWriteBaseActivity
{
typedef CHThorGenericDiskWriteBaseActivity PARENT;
protected:
IHThorDiskWriteArg &helper;
bool grouped;
Owned<ILogicalRowSink> outSeq;
unsigned __int64 numRecords;
offset_t sizeLimit;

bool finishOutput();
bool next();
const void * getNext();
void checkSizeLimit();
public:
IMPLEMENT_SINKACTIVITY

CHThorGenericDiskWriteActivity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThorDiskWriteArg &_arg, ThorActivityKind _kind, EclGraph & _graph, IPropertyTree *_node);

virtual void ready();
virtual void stop();
virtual void execute();
virtual bool needsAllocator() const { return true; }
};


#define MAKEFACTORY(NAME) \
extern HTHOR_API IHThorActivity * create ## NAME ## Activity(IAgentContext &_agent, unsigned _activityId, unsigned _subgraphId, IHThor ## NAME ## Arg &arg, ThorActivityKind kind, EclGraph & _graph) \
15 changes: 14 additions & 1 deletion system/jlib/jrowstream.hpp
Original file line number Diff line number Diff line change
@@ -48,7 +48,7 @@ class MemoryBuffer;
class MemoryBufferBuilder;

//An interface for reading rows - which can request the row in the most efficient way for the caller.
interface IDiskRowStream : extends IRowStream
interface ILogicalRowStream : extends IRowStream
{
// Defined in IRowStream, here for documentation:
// Request a row which is owned by the caller, and must be freed once it is finished with.
@@ -74,6 +74,19 @@ interface IDiskRowStream : extends IRowStream
virtual const void *nextRow(MemoryBufferBuilder & builder)=0;
// rows returned are created in the target buffer. This should be generalized to an ARowBuilder
};
using IDiskRowStream = ILogicalRowStream; // MORE: Replace these in the code, but alias for now to avoid compile problems


//An interface for writing rows - with separate functions whether or not ownership of the row is being passed
interface ILogicalRowSink : extends IRowWriterEx
{
// Defined in IRowWriterEx, here for documentation:
virtual void putRow(const void *row) override = 0; // takes ownership of row. rename to putOwnedRow?
virtual void flush() override = 0;
virtual void noteStopped() override = 0;

virtual void writeRow(const void *row) = 0; // does not take ownership of row
};


extern jlib_decl IDiskRowStream * queryNullDiskRowStream();
4 changes: 4 additions & 0 deletions testing/regress/ecl/genericActivities.ecl
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
OUTPUT(DATASET([{1}], {UNSIGNED1 a}), ,'data');

d := DATASET('data', {UNSIGNED1 a}, TYPE(FLAT));
OUTPUT(d);