diff --git a/plugins/parquet/parquetembed.cpp b/plugins/parquet/parquetembed.cpp index 934ae3dcf43..f8c72646d43 100644 --- a/plugins/parquet/parquetembed.cpp +++ b/plugins/parquet/parquetembed.cpp @@ -167,6 +167,22 @@ arrow::Status ParquetHelper::openWriteFile() } else { + StringBuffer filename; + StringBuffer path; + splitFilename(destination.c_str(), nullptr, &path, &filename, nullptr, false); + Owned itr = createDirectoryIterator(path.str(), filename.append("*.parquet").str()); + + ForEach(*itr) + { + if (overwrite) + { + IFile &file = itr->query(); + if(!file.remove()) + failx("File %s could not be overwritten.", file.queryFilename()); + } + else + failx("Cannot write to file %s because it already exists. To delete it set the overwrite option to true.", destination.c_str()); + } // Currently under the assumption that all channels and workers are given a worker id and no matter // the configuration will show up in activityCtx->numSlaves() if (activityCtx->numSlaves() > 1) @@ -174,17 +190,6 @@ arrow::Status ParquetHelper::openWriteFile() destination.insert(destination.find(".parquet"), std::to_string(activityCtx->querySlave())); } - if (!overwrite) - { - StringBuffer filename; - StringBuffer path; - splitFilename(destination.c_str(), nullptr, &path, &filename, &filename, false); - Owned itr = createDirectoryIterator(path.str(), filename.str()); - - if (itr) - failx("Error writing to file %s in directory %s because file already exists. Set the Overwrite option to True to overwrite existing files.", filename.str(), path.str()); - } - std::shared_ptr outfile; PARQUET_ASSIGN_OR_THROW(outfile, arrow::io::FileOutputStream::Open(destination));