Skip to content

Commit

Permalink
HPCC4J-552 File Utility Read Invididual File Parts
Browse files Browse the repository at this point in the history
- Added support for reading a file without writing it to disk
- Added support for test reading individual file parts

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu committed Nov 7, 2023
1 parent 5ea3beb commit 07632a7
Show file tree
Hide file tree
Showing 2 changed files with 232 additions and 0 deletions.
220 changes: 220 additions & 0 deletions dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
Original file line number Diff line number Diff line change
Expand Up @@ -417,6 +417,24 @@ private static Options getReadOptions()
return options;
}

private static Options getReadTestOptions()
{
Options options = new Options();
options.addRequiredOption("read_test", "Read test", true, "Specifies the file that should be read.");
options.addRequiredOption("url", "Source Cluster URL", true, "Specifies the URL of the ESP to connect to.");
options.addOption("user", true, "Specifies the username used to connect. Defaults to null.");
options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");

options.addOption(Option.builder("file_parts")
.argName("_file_parts")
.hasArgs()
.valueSeparator(',')
.desc("Specifies the file parts that should be read. Defaults to all file parts.")
.build());
return options;
}

private static Options getCopyOptions()
{
Options options = new Options();
Expand Down Expand Up @@ -463,6 +481,7 @@ private static Options getTopLevelOptions()
{
Options options = new Options();
options.addOption("read", "Reads the specified file(s) and writes a copy of the files to the local directory.");
options.addOption("read_test", "Reads the specified file and/or particular file parts without writing it locally.");
options.addOption("copy", "Copies the specified remote source file to the specified remote destination cluster / file.");
options.addOption("write", "Writes the specified local source file to the specified remote destination cluster / file.");

Expand Down Expand Up @@ -660,6 +679,44 @@ public void run()
}
}

private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
for (int i = 0; i < tasks.length; i++)
{
final int taskIndex = i;
final DataPartition filePart = fileParts[taskIndex];
final HpccRemoteFileReader<HPCCRecord> filePartReader = new HpccRemoteFileReader<HPCCRecord>(filePart, recordDef, new HPCCRecordBuilder(recordDef));

tasks[taskIndex] = new Runnable()
{
HpccRemoteFileReader<HPCCRecord> fileReader = filePartReader;

public void run()
{
try
{
while (fileReader.hasNext())
{
HPCCRecord record = fileReader.next();
context.recordsRead.incrementAndGet();
}

fileReader.close();
context.bytesRead.addAndGet(fileReader.getStreamPosition());
}
catch (Exception e)
{
context.addError("Error while reading file part index: '" + filePart.getThisPart() + " Error message: " + e.getMessage());
return;
}
}
};
}

return tasks;
}

private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, SplitTable[] splitTables, String[] outFilePaths, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
Expand Down Expand Up @@ -1159,6 +1216,165 @@ private static void performRead(String[] args, TaskContext context)
}
}

private static void performReadTest(String[] args, TaskContext context)
{
Options options = getReadTestOptions();
CommandLineParser parser = new DefaultParser();

CommandLine cmd = null;
try
{
cmd = parser.parse(options, args);
}
catch (ParseException e)
{
System.out.println("Error parsing commandline options:\n" + e.getMessage());
return;
}

String connString = cmd.getOptionValue("url");
String user = cmd.getOptionValue("user");
String pass = cmd.getOptionValue("pass");

String outputPath = cmd.getOptionValue("out",".");

int numThreads = NUM_DEFAULT_THREADS;
String numThreadsStr = cmd.getOptionValue("num_threads", "" + numThreads);
try
{
numThreads = Integer.parseInt(numThreadsStr);
}
catch(Exception e)
{
System.out.println("Invalid option value for num_threads: "
+ numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
}

String formatStr = cmd.getOptionValue("format");
if (formatStr == null)
{
formatStr = "THOR";
}

FileFormat format = FileFormat.THOR;
switch (formatStr.toUpperCase())
{
case "THOR":
format = FileFormat.THOR;
break;
case "PARQUET":
format = FileFormat.PARQUET;
break;
default:
System.out.println("Error unsupported format specified: " + format);
return;
}

String datasetName = cmd.getOptionValue("read_test");
context.startOperation("Read Test " + datasetName);

HPCCFile file = null;
try
{
file = new HPCCFile(datasetName, connString, user, pass);
}
catch (Exception e)
{
System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
return;
}

DataPartition[] fileParts = null;
FieldDef recordDef = null;
try
{
fileParts = file.getFileParts();
recordDef = file.getRecordDefinition();
}
catch (Exception e)
{
System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
return;
}

String[] filePartsStrs = cmd.getOptionValues("file_parts");
if (filePartsStrs != null && filePartsStrs.length > 0)
{
ArrayList<DataPartition> filePartList = new ArrayList<DataPartition>();
for (int i = 0; i < filePartsStrs.length; i++)
{
try
{
int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1;
if (filePartIndex < 0 || filePartIndex >= fileParts.length)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]
+ " outside of range: [0," + fileParts.length + "]");
continue;
}

filePartList.add(fileParts[filePartIndex]);
}
catch (NumberFormatException e)
{
System.out.println("Skipping invalid file part index: " + filePartsStrs[i]);
}
}
}

Runnable[] tasks = null;
try
{
switch (format)
{
case THOR:
tasks = createReadTestTasks(fileParts, recordDef, context);
break;
case PARQUET:
default:
throw new Exception("Error unsupported format specified: " + format);
};
}
catch (Exception e)
{
context.addError("Error while attempting to create read tasks for file: '" + datasetName + "': " + e.getMessage());
return;
}

try
{
executeTasks(tasks, numThreads);
}
catch (Exception e)
{
context.addError("Error while attempting to execute read tasks for file: '" + datasetName + "': " + e.getMessage());
return;
}

if (context.hasError())
{
return;
}

try
{
String fileName = file.getFileName().replace(":","_");
String filePath = outputPath + File.separator + fileName + ".meta";
FileOutputStream metaFile = new FileOutputStream(filePath);

String metaStr = RecordDefinitionTranslator.toJsonRecord(file.getRecordDefinition()).toString();
metaFile.write(metaStr.getBytes());
metaFile.close();
}
catch (Exception e)
{
context.addError("Error while attempting to write meta-data for file: '" + datasetName + "': " + e.getMessage());
return;
}

context.endOperation();
}

private static void performCopy(String[] args, TaskContext context)
{
Options options = getCopyOptions();
Expand Down Expand Up @@ -1576,6 +1792,10 @@ public static JSONArray run(String[] args)
{
performRead(args, context);
}
else if (cmd.hasOption("read_test"))
{
performReadTest(args, context);
}
else if (cmd.hasOption("copy"))
{
performCopy(args, context);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,18 @@ public void thorFileTests()
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}

{
String readArgs[] = {"-read_test", "benchmark::integer::20kb", "-url", this.connString,
"-user", this.hpccUser, "-pass", this.hpccPass, "-file_parts", "1" };

JSONArray results = FileUtility.run(readArgs);
JSONObject result = results.optJSONObject(0);
Assert.assertNotNull("FileUtility result should not be null.", result);

boolean success = result.optBoolean("successful",false);
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}

{
String copyArgs[] = {"-copy", "benchmark::integer::20kb benchmark::integer::20kb-copy",
"-url", this.connString, "-dest_url", this.connString,
Expand Down

0 comments on commit 07632a7

Please sign in to comment.