Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jakesmith committed Nov 4, 2024
1 parent 901d209 commit 1ad9731
Show file tree
Hide file tree
Showing 4 changed files with 175 additions and 141 deletions.
119 changes: 68 additions & 51 deletions dali/ft/daftformat.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2382,66 +2382,83 @@ IOutputProcessor * createOutputProcessor(const FileFormat & format)
}
}

IFormatPartitioner * createFormatPartitioner(FileSprayer &sprayer, const SocketEndpoint & ep, const FileFormat & srcFormat, const FileFormat & tgtFormat, bool calcOutput, const char * slave, const char *wuid)
IFormatPartitioner * createFormatPartitioner(FileSprayer &sprayer, const SocketEndpoint & ep, const FileFormat & srcFormat, const FileFormat & tgtFormat, bool calcOutput, bool pushSupported, const char * slave, const char *wuid)
{
bool sameFormats = sameEncoding(srcFormat, tgtFormat);
LOG(MCdebugProgressDetail, "createFormatProcessor(srcFormat.type:'%s', tgtFormat.type:'%s', calcOutput:%d, sameFormats:%d)", srcFormat.getFileFormatTypeString(), tgtFormat.getFileFormatTypeString(), calcOutput, sameFormats);
if (sameFormats)
{
switch (srcFormat.type)
{
case FFTfixed:
return new CSimpleFixedPartitioner(srcFormat.recordSize, sameFormats);
case FFTblocked:
return new CSimpleBlockedPartitioner(sameFormats);
case FFTcsv:
if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
return new CCsvPartitioner(srcFormat);
else
return new CCsvQuickPartitioner(srcFormat, sameFormats);
break;
case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
if (srcFormat.markup==FMTxml)
return new CXmlQuickPartitioner(srcFormat, sameFormats);
if (srcFormat.markup==FMTjson)
return new CJsonPartitioner(srcFormat);
if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
return new CUtfPartitioner(srcFormat);
return new CUtfQuickPartitioner(srcFormat, sameFormats);
}
}
if (!calcOutput)

switch (srcFormat.type)
{
switch (srcFormat.type)
{
case FFTfixed:
return new CSimpleFixedPartitioner(srcFormat.recordSize, sameFormats);
case FFTblocked:
return new CSimpleBlockedPartitioner(sameFormats);
case FFTvariable:
return new CVariablePartitioner(false);
case FFTvariablebigendian:
return new CVariablePartitioner(true);
case FFTrecfmvb:
return new CRECFMvbPartitioner(true);
case FFTrecfmv:
return new CRECFMvbPartitioner(false);
case FFTcsv:
return new CCsvQuickPartitioner(srcFormat, sameFormats);
case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
if (srcFormat.markup==FMTxml)
return new CXmlQuickPartitioner(srcFormat, sameFormats);
if (srcFormat.markup==FMTjson)
return new CJsonPartitioner(srcFormat);
return new CUtfQuickPartitioner(srcFormat, sameFormats);
default:
throwError(DFTERR_UnknownFileFormatType);
break;
{
// all other will involve processing the source files

bool usePush;
if (isContainerized())
usePush = pushSupported;
else
{
// BM has always pushed based on this (calcOutput == !pullSupported || verify)
usePush = calcOutput;
// but if pushSupposed, I suspect we want to use in BM too
// otherwhise partitioning will happen locally (e.g. in dfuserver) and will not be farmed out to worker nodes
}
if (usePush)
{
StringBuffer name;
if (!slave)
slave = queryFtSlaveExecutable(ep, name);

return new CRemotePartitioner(sprayer, ep, srcFormat, tgtFormat, slave, wuid);
}

// only here if BM and not pushing (i.e. !calcOutput)
if (sameFormats)
{
switch (srcFormat.type)
{
case FFTcsv:
if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
return new CCsvPartitioner(srcFormat);
else
return new CCsvQuickPartitioner(srcFormat, sameFormats);
break;
case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
if (srcFormat.markup==FMTxml)
return new CXmlQuickPartitioner(srcFormat, sameFormats);
if (srcFormat.markup==FMTjson)
return new CJsonPartitioner(srcFormat);
if (srcFormat.hasQuote() && srcFormat.hasQuotedTerminator())
return new CUtfPartitioner(srcFormat);
return new CUtfQuickPartitioner(srcFormat, sameFormats);
}
}
switch (srcFormat.type)
{
case FFTvariable:
return new CVariablePartitioner(false);
case FFTvariablebigendian:
return new CVariablePartitioner(true);
case FFTrecfmvb:
return new CRECFMvbPartitioner(true);
case FFTrecfmv:
return new CRECFMvbPartitioner(false);
case FFTcsv:
return new CCsvQuickPartitioner(srcFormat, sameFormats);
case FFTutf: case FFTutf8: case FFTutf8n: case FFTutf16: case FFTutf16be: case FFTutf16le: case FFTutf32: case FFTutf32be: case FFTutf32le:
if (srcFormat.markup==FMTxml)
return new CXmlQuickPartitioner(srcFormat, sameFormats);
if (srcFormat.markup==FMTjson)
return new CJsonPartitioner(srcFormat);
return new CUtfQuickPartitioner(srcFormat, sameFormats);
default:
throwError(DFTERR_UnknownFileFormatType);
break;
}
}
}
StringBuffer name;
if (!slave)
slave = queryFtSlaveExecutable(ep, name);

return new CRemotePartitioner(sprayer, ep, srcFormat, tgtFormat, slave, wuid);
}
2 changes: 1 addition & 1 deletion dali/ft/daftformat.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,6 @@ extern DALIFT_API IFormatProcessor * createFormatProcessor(const FileFormat & sr
extern DALIFT_API IOutputProcessor * createOutputProcessor(const FileFormat & format);

class FileSprayer;
extern DALIFT_API IFormatPartitioner * createFormatPartitioner(FileSprayer &sprayer, const SocketEndpoint & ep, const FileFormat & srcFormat, const FileFormat & tgtFormat, bool calcOutput, const char * slave, const char *wuid);
extern DALIFT_API IFormatPartitioner * createFormatPartitioner(FileSprayer &sprayer, const SocketEndpoint & ep, const FileFormat & srcFormat, const FileFormat & tgtFormat, bool calcOutput, bool pushSupported, const char * slave, const char *wuid);

#endif
Loading

0 comments on commit 1ad9731

Please sign in to comment.