Skip to content

Commit

Permalink
On output check for both hThor and Thor files before writing.
Browse files Browse the repository at this point in the history
If files exist and overwrite option is true then they are deleted.
  • Loading branch information
jackdelv committed Oct 12, 2023
1 parent 3028a75 commit 00b1317
Showing 1 changed file with 16 additions and 11 deletions.
27 changes: 16 additions & 11 deletions plugins/parquet/parquetembed.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -167,24 +167,29 @@ arrow::Status ParquetHelper::openWriteFile()
}
else
{
StringBuffer filename;
StringBuffer path;
splitFilename(destination.c_str(), nullptr, &path, &filename, nullptr, false);
Owned<IDirectoryIterator> itr = createDirectoryIterator(path.str(), filename.append("*.parquet").str());

ForEach(*itr)
{
if (overwrite)
{
IFile &file = itr->query();
if(!file.remove())
failx("File %s could not be overwritten.", file.queryFilename());
}
else
failx("Cannot write to file %s because it already exists. To delete it set the overwrite option to true.", destination.c_str());
}
// Currently under the assumption that all channels and workers are given a worker id and no matter
// the configuration will show up in activityCtx->numSlaves()
if (activityCtx->numSlaves() > 1)
{
destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave()));
}

if (!overwrite)
{
StringBuffer filename;
StringBuffer path;
splitFilename(destination.c_str(), nullptr, &path, &filename, &filename, false);
Owned<IDirectoryIterator> itr = createDirectoryIterator(path.str(), filename.str());

if (itr)
failx("Error writing to file %s in directory %s because file already exists. Set the Overwrite option to True to overwrite existing files.", filename.str(), path.str());
}

std::shared_ptr<arrow::io::FileOutputStream> outfile;

PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination));
Expand Down

0 comments on commit 00b1317

Please sign in to comment.