diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml
index 47bb0a461..8c21c53fd 100644
--- a/commons-hpcc/pom.xml
+++ b/commons-hpcc/pom.xml
@@ -9,7 +9,7 @@
org.hpccsystems
hpcc4j
- 9.2.65-0-SNAPSHOT
+ 9.4.41-0-SNAPSHOT
diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml
index d5a9f911e..02a9a5dca 100644
--- a/dfsclient/pom.xml
+++ b/dfsclient/pom.xml
@@ -9,7 +9,7 @@
org.hpccsystems
hpcc4j
- 9.2.65-0-SNAPSHOT
+ 9.4.41-0-SNAPSHOT
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java
index 565a59454..58470fdd8 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java
@@ -830,7 +830,15 @@ private BigDecimal getUnsignedDecimal(int numDigits, int precision, int dataLen)
BigDecimal ret = new BigDecimal(0);
int idx = 0;
- int curDigit = numDigits - 1;
+ int curDigit = numDigits;
+
+ // If the # of digits is odd the top most nibble is unused and we don't want to include it
+ // in the scale calculations below. Due to how the scale calculation works below this means
+ // we decrement the starting value of curDigit in the case of even length decimals
+ if ((numDigits % 2) == 0)
+ {
+ curDigit--;
+ }
while (idx < dataLen)
{
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java
index bc648b49c..a846969e0 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java
@@ -16,6 +16,10 @@
package org.hpccsystems.dfs.client;
import java.io.Serializable;
+import java.security.InvalidParameterException;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Arrays;
import org.hpccsystems.commons.ecl.FileFilter;
import org.hpccsystems.commons.errors.HpccFileException;
@@ -261,11 +265,61 @@ public String[] getCopyLocations()
public String getCopyIP(int copyindex)
{
int copiescount = copyLocations.length;
- if (copyindex < 0 || copyindex >= copiescount) return null;
+ if (copyindex < 0 || copyindex >= copiescount)
+ {
+ return null;
+ }
return copyLocations[copyindex];
}
+ /**
+ * Set the copy IP
+ *
+ * @param copyIndex
+ * the copyindex
+ * @param copyIP The IP of the file part copy
+ */
+ public void setCopyIP(int copyIndex, String copyIP)
+ {
+ if (copyIndex < 0 || copyIndex >= copyLocations.length)
+ {
+ return;
+ }
+
+ copyLocations[copyIndex] = copyIP;
+ }
+
+ /**
+ * Add file part copy
+ *
+ * @param index The index at which to insert the file part copy
+ * @param copyIP The IP of the new file part copy
+ * @param copyPath The path of the new file part copy
+ * @throws Exception The exception
+ */
+ public void add(int index, String copyIP, String copyPath) throws Exception
+ {
+ if (index < 0 || index > copyLocations.length)
+ {
+ throw new InvalidParameterException("Insertion index: " + index + " is invalid."
+ + "Expected index in range of: [0," + copyLocations.length + "]");
+ }
+
+ if (copyIP == null || copyPath == null)
+ {
+ throw new InvalidParameterException("Copy IP or Path are invalid, must be non-null.");
+ }
+
+ List copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations));
+ copyLocationsList.add(index, copyIP);
+ copyLocations = copyLocationsList.toArray(new String[0]);
+
+ List copyPathList = new ArrayList<>(Arrays.asList(copyPaths));
+ copyPathList.add(index, copyPath);
+ copyPaths = copyPathList.toArray(new String[0]);
+ }
+
/**
* Count of copies available for this file part.
* @return copy locations size
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
index 3da02c805..2cf0e3b25 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java
@@ -417,6 +417,24 @@ private static Options getReadOptions()
return options;
}
+ private static Options getReadTestOptions()
+ {
+ Options options = new Options();
+ options.addRequiredOption("read_test", "Read test", true, "Specifies the file that should be read.");
+ options.addRequiredOption("url", "Source Cluster URL", true, "Specifies the URL of the ESP to connect to.");
+ options.addOption("user", true, "Specifies the username used to connect. Defaults to null.");
+ options.addOption("pass", true, "Specifies the password used to connect. Defaults to null.");
+ options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations.");
+
+ options.addOption(Option.builder("file_parts")
+ .argName("_file_parts")
+ .hasArgs()
+ .valueSeparator(',')
+ .desc("Specifies the file parts that should be read. Defaults to all file parts.")
+ .build());
+ return options;
+ }
+
private static Options getCopyOptions()
{
Options options = new Options();
@@ -463,6 +481,7 @@ private static Options getTopLevelOptions()
{
Options options = new Options();
options.addOption("read", "Reads the specified file(s) and writes a copy of the files to the local directory.");
+ options.addOption("read_test", "Reads the specified file and/or particular file parts without writing it locally.");
options.addOption("copy", "Copies the specified remote source file to the specified remote destination cluster / file.");
options.addOption("write", "Writes the specified local source file to the specified remote destination cluster / file.");
@@ -660,6 +679,44 @@ public void run()
}
}
+ private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception
+ {
+ Runnable[] tasks = new Runnable[fileParts.length];
+ for (int i = 0; i < tasks.length; i++)
+ {
+ final int taskIndex = i;
+ final DataPartition filePart = fileParts[taskIndex];
+ final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef));
+
+ tasks[taskIndex] = new Runnable()
+ {
+ HpccRemoteFileReader fileReader = filePartReader;
+
+ public void run()
+ {
+ try
+ {
+ while (fileReader.hasNext())
+ {
+ HPCCRecord record = fileReader.next();
+ context.recordsRead.incrementAndGet();
+ }
+
+ fileReader.close();
+ context.bytesRead.addAndGet(fileReader.getStreamPosition());
+ }
+ catch (Exception e)
+ {
+ context.addError("Error while reading file part index: '" + filePart.getThisPart() + " Error message: " + e.getMessage());
+ return;
+ }
+ }
+ };
+ }
+
+ return tasks;
+ }
+
private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, SplitTable[] splitTables, String[] outFilePaths, FieldDef recordDef, TaskContext context) throws Exception
{
Runnable[] tasks = new Runnable[fileParts.length];
@@ -1159,6 +1216,165 @@ private static void performRead(String[] args, TaskContext context)
}
}
+ private static void performReadTest(String[] args, TaskContext context)
+ {
+ Options options = getReadTestOptions();
+ CommandLineParser parser = new DefaultParser();
+
+ CommandLine cmd = null;
+ try
+ {
+ cmd = parser.parse(options, args);
+ }
+ catch (ParseException e)
+ {
+ System.out.println("Error parsing commandline options:\n" + e.getMessage());
+ return;
+ }
+
+ String connString = cmd.getOptionValue("url");
+ String user = cmd.getOptionValue("user");
+ String pass = cmd.getOptionValue("pass");
+
+ String outputPath = cmd.getOptionValue("out",".");
+
+ int numThreads = NUM_DEFAULT_THREADS;
+ String numThreadsStr = cmd.getOptionValue("num_threads", "" + numThreads);
+ try
+ {
+ numThreads = Integer.parseInt(numThreadsStr);
+ }
+ catch(Exception e)
+ {
+ System.out.println("Invalid option value for num_threads: "
+ + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads.");
+ }
+
+ String formatStr = cmd.getOptionValue("format");
+ if (formatStr == null)
+ {
+ formatStr = "THOR";
+ }
+
+ FileFormat format = FileFormat.THOR;
+ switch (formatStr.toUpperCase())
+ {
+ case "THOR":
+ format = FileFormat.THOR;
+ break;
+ case "PARQUET":
+ format = FileFormat.PARQUET;
+ break;
+ default:
+ System.out.println("Error unsupported format specified: " + format);
+ return;
+ }
+
+ String datasetName = cmd.getOptionValue("read_test");
+ context.startOperation("Read Test " + datasetName);
+
+ HPCCFile file = null;
+ try
+ {
+ file = new HPCCFile(datasetName, connString, user, pass);
+ }
+ catch (Exception e)
+ {
+ System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage());
+ return;
+ }
+
+ DataPartition[] fileParts = null;
+ FieldDef recordDef = null;
+ try
+ {
+ fileParts = file.getFileParts();
+ recordDef = file.getRecordDefinition();
+ }
+ catch (Exception e)
+ {
+ System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage());
+ return;
+ }
+
+ String[] filePartsStrs = cmd.getOptionValues("file_parts");
+ if (filePartsStrs != null && filePartsStrs.length > 0)
+ {
+ ArrayList filePartList = new ArrayList();
+ for (int i = 0; i < filePartsStrs.length; i++)
+ {
+ try
+ {
+ int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1;
+ if (filePartIndex < 0 || filePartIndex >= fileParts.length)
+ {
+ System.out.println("Skipping invalid file part index: " + filePartsStrs[i]
+ + " outside of range: [0," + fileParts.length + "]");
+ continue;
+ }
+
+ filePartList.add(fileParts[filePartIndex]);
+ }
+ catch (NumberFormatException e)
+ {
+ System.out.println("Skipping invalid file part index: " + filePartsStrs[i]);
+ }
+ }
+ }
+
+ Runnable[] tasks = null;
+ try
+ {
+ switch (format)
+ {
+ case THOR:
+ tasks = createReadTestTasks(fileParts, recordDef, context);
+ break;
+ case PARQUET:
+ default:
+ throw new Exception("Error unsupported format specified: " + format);
+ };
+ }
+ catch (Exception e)
+ {
+ context.addError("Error while attempting to create read tasks for file: '" + datasetName + "': " + e.getMessage());
+ return;
+ }
+
+ try
+ {
+ executeTasks(tasks, numThreads);
+ }
+ catch (Exception e)
+ {
+ context.addError("Error while attempting to execute read tasks for file: '" + datasetName + "': " + e.getMessage());
+ return;
+ }
+
+ if (context.hasError())
+ {
+ return;
+ }
+
+ try
+ {
+ String fileName = file.getFileName().replace(":","_");
+ String filePath = outputPath + File.separator + fileName + ".meta";
+ FileOutputStream metaFile = new FileOutputStream(filePath);
+
+ String metaStr = RecordDefinitionTranslator.toJsonRecord(file.getRecordDefinition()).toString();
+ metaFile.write(metaStr.getBytes());
+ metaFile.close();
+ }
+ catch (Exception e)
+ {
+ context.addError("Error while attempting to write meta-data for file: '" + datasetName + "': " + e.getMessage());
+ return;
+ }
+
+ context.endOperation();
+ }
+
private static void performCopy(String[] args, TaskContext context)
{
Options options = getCopyOptions();
@@ -1576,6 +1792,10 @@ public static JSONArray run(String[] args)
{
performRead(args, context);
}
+ else if (cmd.hasOption("read_test"))
+ {
+ performReadTest(args, context);
+ }
else if (cmd.hasOption("copy"))
{
performCopy(args, context);
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java
index afec67413..8df2ba73e 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java
@@ -50,6 +50,7 @@ public class HPCCFile implements Serializable
private DataPartition[] dataParts;
private DataPartition tlkPartition = null;
+ private boolean useTLK = true;
private PartitionProcessor partitionProcessor = null;
private long dataPartsCreationTimeMS = -1;
@@ -130,12 +131,44 @@ public HPCCFile(String fileName, String connectionString, String user, String pa
*/
public HPCCFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts,
String targetfilecluster) throws HpccFileException
+ {
+ this(fileName, espconninfo, targetColumnList, filter, remap_info, maxParts, targetfilecluster, true);
+ }
+
+ /**
+ * Constructor for the HpccFile. Captures HPCC logical file information from the DALI Server for the clusters behind
+ * the ESP named by the IP address and re-maps the address information for the THOR nodes to visible addresses when
+ * the THOR clusters are virtual.
+ *
+ * @param fileName
+ * The HPCC file name
+ * @param espconninfo
+ * the espconninfo
+ * @param targetColumnList
+ * a comma separated list of column names in dotted notation for columns within compound columns.
+ * @param filter
+ * a file filter to select records of interest (SQL where syntax)
+ * @param remap_info
+ * address and port re-mapping info for THOR cluster
+ * @param maxParts
+ * optional the maximum number of partitions or zero for no max
+ * @param targetfilecluster
+ * optional - the hpcc cluster the target file resides in
+ * @param useTLK
+ * optional - whether or not the top level key should be used to help filter index files
+ * @throws HpccFileException
+ * the hpcc file exception
+ */
+ public HPCCFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts,
+ String targetfilecluster, boolean useTLK) throws HpccFileException
{
this.fileName = fileName;
this.recordDefinition = null;
this.projectedRecordDefinition = null;
this.columnPruner = new ColumnPruner(targetColumnList);
this.espConnInfo = espconninfo;
+ this.useTLK = useTLK;
+
try
{
if (filter != null && !filter.isEmpty())
@@ -163,12 +196,12 @@ public static int getFilePartFromFPos(long fpos)
}
/**
- * Extracts the offset in the file part from a fileposition value.
+ * Extracts the offset in the file part from a fileposition value.
*
* @param fpos file position
* @return the project list
*/
- public static long getOffsetFromFPos(long fpos)
+ public static long getOffsetFromFPos(long fpos)
{
// First 48 bits store the offset
return fpos & 0xffffffffffffL;
@@ -285,6 +318,34 @@ public HPCCFile setClusterRemapInfo(RemapInfo remapinfo)
return this;
}
+ /**
+ * Get the value of useTLK option
+ *
+ * @return a boolean value indicating use of the TLK to filter index file reads
+ */
+ public boolean getUseTLK()
+ {
+ return this.useTLK;
+ }
+
+ /**
+ * Sets the useTLK option.
+ * Note: the value must be set before querying any data from the file, including record definition information.
+ *
+ * @param useTLK should the TLK be used to filter index file reads
+ *
+ * @return this HPCCFile
+ */
+ public HPCCFile setUseTLK(boolean useTLK)
+ {
+ this.useTLK = useTLK;
+
+ // Force the data parts to be re-created
+ this.dataParts = null;
+
+ return this;
+ }
+
/**
* Gets the filter.
*
@@ -424,13 +485,20 @@ private void createDataParts() throws HpccFileException
this.recordDefinition = RecordDefinitionTranslator.parseJsonRecordDefinition(new JSONObject(originalRecDefInJSON));
- try
+ if (this.useTLK)
{
- this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, this.tlkPartition);
+ try
+ {
+ this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, this.tlkPartition);
+ }
+ catch (Exception e)
+ {
+ log.error("Error while constructing partition processor, reading will continue without partition filtering: " + e.getMessage());
+ this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null);
+ }
}
- catch (Exception e)
+ else
{
- log.error("Error while constructing partition processor, reading will continue without partition filtering: " + e.getMessage());
this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null);
}
@@ -622,13 +690,13 @@ private static String acquireFileAccess(String fileName, HPCCWsDFUClient hpcc, i
String uniqueID = "HPCC-FILE: " + UUID.randomUUID().toString();
return hpcc.getFileAccessBlob(fileName, clusterName, expirySeconds, uniqueID);
}
-
+
/**
* @return the file metadata information for this HPCCFile (if it exists)
*/
- public DFUFileDetailWrapper getOriginalFileMetadata()
+ public DFUFileDetailWrapper getOriginalFileMetadata()
{
- if (originalFileMetadata==null)
+ if (originalFileMetadata==null)
{
HPCCWsDFUClient dfuClient = HPCCWsDFUClient.get(espConnInfo);
if (dfuClient.hasInitError())
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java
index 78e3e6500..ade51a57e 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java
@@ -35,6 +35,8 @@ public class HpccRemoteFileReader implements Iterator
private BinaryRecordReader binaryRecordReader;
private IRecordBuilder recordBuilder = null;
private boolean handlePrefetch = true;
+ private boolean isClosed = false;
+ private boolean canReadNext = true;
private long openTimeMs = 0;
private long recordsRead = 0;
@@ -234,7 +236,6 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde
this.binaryRecordReader.initialize(this.recordBuilder);
}
-
log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart()
+ (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" ));
log.trace("Original record definition:\n"
@@ -315,12 +316,18 @@ public String getRemoteReadMessages()
*/
public void prefetch()
{
- if (this.handlePrefetch)
+ if (handlePrefetch)
{
log.warn("Prefetch called on an HpccRemoteFileReader that has an internal prefetch thread.");
return;
}
+ if (isClosed)
+ {
+ log.warn("Prefetch called on an HpccRemoteFileReader that has been closed.");
+ return;
+ }
+
this.inputStream.prefetchData();
}
@@ -332,10 +339,19 @@ public void prefetch()
@Override
public boolean hasNext()
{
- boolean rslt = false;
+ if (isClosed)
+ {
+ log.warn("hasNext() called on an HpccRemoteFileReader that has been closed.");
+ return false;
+ }
+
+ // Keep track of whether we have said there is another record.
+ // This allows us to handle edge cases around close() being called between hasNext() and next()
+ canReadNext = false;
+
try
{
- rslt = this.binaryRecordReader.hasNext();
+ canReadNext = this.binaryRecordReader.hasNext();
// Has next may not catch the prefetch exception if it occurs at the beginning of a read
// This is due to InputStream.hasNext() being allowed to throw an IOException when closed.
@@ -346,12 +362,14 @@ public boolean hasNext()
}
catch (HpccFileException e)
{
- rslt = false;
+ canReadNext = false;
log.error("Read failure for " + this.dataPartition.toString());
- throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
+ java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
+ exception.initCause(e);
+ throw exception;
}
- return rslt;
+ return canReadNext;
}
/**
@@ -362,6 +380,11 @@ public boolean hasNext()
@Override
public T next()
{
+ if (isClosed && !canReadNext)
+ {
+ throw new java.util.NoSuchElementException("Fatal read error: Attempting to read next() from a closed file reader.");
+ }
+
Object rslt = null;
try
{
@@ -370,10 +393,16 @@ public T next()
catch (HpccFileException e)
{
log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage());
- throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
+ java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage());
+ exception.initCause(e);
+ throw exception;
}
recordsRead++;
+
+ // Reset this after each read so we can handle edge cases where close() was called between hasNext() / next()
+ canReadNext = false;
+
return (T) rslt;
}
@@ -385,8 +414,15 @@ public T next()
*/
public void close() throws Exception
{
+ if (isClosed)
+ {
+ log.warn("Calling close on an already closed file reader for file part: " + this.dataPartition.toString());
+ return;
+ }
+
report();
this.inputStream.close();
+ isClosed = true;
long closeTimeMs = System.currentTimeMillis();
double readTimeS = (closeTimeMs - openTimeMs) / 1000.0;
diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java
index 372bdce1e..ab3c38a6e 100644
--- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java
+++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java
@@ -522,6 +522,11 @@ public String getIP()
return this.dataPart.getCopyIP(prioritizedCopyIndexes.get(getFilePartCopy()));
}
+ private String getCopyPath()
+ {
+ return this.dataPart.getCopyPath(prioritizedCopyIndexes.get(getFilePartCopy()));
+ }
+
private int getFilePartCopy()
{
return filePartCopyIndexPointer;
@@ -1546,150 +1551,156 @@ private void makeActive() throws HpccFileException
this.active.set(false);
this.handle = 0;
- try
+ boolean needsRetry = false;
+ do
{
- log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '"
- + getIP() + "'");
-
+ needsRetry = false;
try
{
- if (getUseSSL())
+ log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '"
+ + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'" + " for Path: '" + getCopyPath() + "'");
+ try
+ {
+ if (getUseSSL())
+ {
+ SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
+ sock = (SSLSocket) ssf.createSocket();
+
+ // Optimize for bandwidth over latency and connection time.
+ // We are opening up a long standing connection and potentially reading a significant amount of
+ // data
+ // So we don't care as much about individual packet latency or connection time overhead
+ sock.setPerformancePreferences(0, 1, 2);
+ sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
+
+ log.debug("Attempting SSL handshake...");
+ ((SSLSocket) sock).startHandshake();
+ log.debug("SSL handshake successful...");
+ log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
+ }
+ else
+ {
+ SocketFactory sf = SocketFactory.getDefault();
+ sock = sf.createSocket();
+
+ // Optimize for bandwidth over latency and connection time.
+ // We are opening up a long standing connection and potentially reading a significant amount of
+ // data
+ // So we don't care as much about individual packet latency or connection time overhead
+ sock.setPerformancePreferences(0, 1, 2);
+ sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
+ }
+
+ this.sock.setSoTimeout(socketOpTimeoutMs);
+
+ log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
+ }
+ catch (java.net.UnknownHostException e)
{
- SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault();
- sock = (SSLSocket) ssf.createSocket();
-
- // Optimize for bandwidth over latency and connection time.
- // We are opening up a long standing connection and potentially reading a significant amount of
- // data
- // So we don't care as much about individual packet latency or connection time overhead
- sock.setPerformancePreferences(0, 1, 2);
- sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
-
- log.debug("Attempting SSL handshake...");
- ((SSLSocket) sock).startHandshake();
- log.debug("SSL handshake successful...");
- log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
+ throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e);
}
- else
+ catch (java.io.IOException e)
{
- SocketFactory sf = SocketFactory.getDefault();
- sock = sf.createSocket();
-
- // Optimize for bandwidth over latency and connection time.
- // We are opening up a long standing connection and potentially reading a significant amount of
- // data
- // So we don't care as much about individual packet latency or connection time overhead
- sock.setPerformancePreferences(0, 1, 2);
- sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout);
+ throw new HpccFileException(e);
}
- this.sock.setSoTimeout(socketOpTimeoutMs);
+ try
+ {
+ this.dos = new java.io.DataOutputStream(sock.getOutputStream());
+ this.dis = new java.io.DataInputStream(sock.getInputStream());
+ }
+ catch (java.io.IOException e)
+ {
+ throw new HpccFileException("Failed to create streams", e);
+ }
- log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort());
- }
- catch (java.net.UnknownHostException e)
- {
- throw new HpccFileException("Bad file part addr " + this.getIP(), e);
- }
- catch (java.io.IOException e)
- {
- throw new HpccFileException(e);
- }
+ //------------------------------------------------------------------------------
+ // Check protocol version
+ //------------------------------------------------------------------------------
- try
- {
- this.dos = new java.io.DataOutputStream(sock.getOutputStream());
- this.dis = new java.io.DataInputStream(sock.getInputStream());
- }
- catch (java.io.IOException e)
- {
- throw new HpccFileException("Failed to create streams", e);
- }
+ try
+ {
+ String msg = makeGetVersionRequest();
+ int msgLen = msg.length();
- //------------------------------------------------------------------------------
- // Check protocol version
- //------------------------------------------------------------------------------
+ this.dos.writeInt(msgLen);
+ this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
+ this.dos.flush();
+ }
+ catch (IOException e)
+ {
+ throw new HpccFileException("Failed on initial remote read trans", e);
+ }
- try
- {
- String msg = makeGetVersionRequest();
- int msgLen = msg.length();
+ RowServiceResponse response = readResponse();
+ if (response.len == 0)
+ {
+ useOldProtocol = true;
+ }
+ else
+ {
+ useOldProtocol = false;
- this.dos.writeInt(msgLen);
- this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen);
- this.dos.flush();
- }
- catch (IOException e)
- {
- throw new HpccFileException("Failed on initial remote read trans", e);
- }
+ byte[] versionBytes = new byte[response.len];
+ try
+ {
+ this.dis.readFully(versionBytes);
+ }
+ catch (IOException e)
+ {
+ throw new HpccFileException("Error while attempting to read version response.", e);
+ }
- RowServiceResponse response = readResponse();
- if (response.len == 0)
- {
- useOldProtocol = true;
- }
- else
- {
- useOldProtocol = false;
+ rowServiceVersion = new String(versionBytes, HPCCCharSet);
+ }
+
+ //------------------------------------------------------------------------------
+ // Send initial read request
+ //------------------------------------------------------------------------------
- byte[] versionBytes = new byte[response.len];
try
{
- this.dis.readFully(versionBytes);
+ String readTrans = null;
+ if (this.tokenBin == null)
+ {
+ this.tokenBin = new byte[0];
+ readTrans = makeInitialRequest();
+ }
+ else
+ {
+ readTrans = makeTokenRequest();
+ }
+
+ int transLen = readTrans.length();
+ this.dos.writeInt(transLen);
+ this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
+ this.dos.flush();
}
catch (IOException e)
{
- throw new HpccFileException("Error while attempting to read version response.", e);
+ throw new HpccFileException("Failed on initial remote read read trans", e);
}
- rowServiceVersion = new String(versionBytes, HPCCCharSet);
- }
-
- //------------------------------------------------------------------------------
- // Send initial read request
- //------------------------------------------------------------------------------
-
- try
- {
- String readTrans = null;
- if (this.tokenBin == null)
- {
- this.tokenBin = new byte[0];
- readTrans = makeInitialRequest();
- }
- else
+ if (CompileTimeConstants.PROFILE_CODE)
{
- readTrans = makeTokenRequest();
+ firstByteTimeNS = System.nanoTime();
}
- int transLen = readTrans.length();
- this.dos.writeInt(transLen);
- this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen);
- this.dos.flush();
+ this.active.set(true);
}
- catch (IOException e)
+ catch (Exception e)
{
- throw new HpccFileException("Failed on initial remote read read trans", e);
- }
+ log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
+ + "'");
+ log.error(e.getMessage());
- if (CompileTimeConstants.PROFILE_CODE)
- {
- firstByteTimeNS = System.nanoTime();
+ needsRetry = true;
+ if (!setNextFilePartCopy())
+ {
+ throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
+ }
}
-
- this.active.set(true);
- }
- catch (Exception e)
- {
- log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP()
- + "'");
- log.error(e.getMessage());
-
- if (!setNextFilePartCopy())
- // This should be a multi exception
- throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e);
- }
+ } while (needsRetry);
}
/* Notes on protocol:
diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java
index 43c3437e7..46f02f39f 100644
--- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java
+++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java
@@ -514,4 +514,4 @@ public static void main(String[] args)
test.tlkFilterExample();
} catch(Exception e) {}
}
-}
+}
\ No newline at end of file
diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java
index 39f6a5280..eb3f2a2fa 100644
--- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java
+++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java
@@ -265,6 +265,7 @@ public void readResumeTest() throws Exception
System.out.println("Messages from file part (" + i + ") read operation:\n" + fileReader.getRemoteReadMessages());
}
+ Runtime runtime = Runtime.getRuntime();
int readSizeKB = 100;
ArrayList resumedRecords = new ArrayList();
for (int i = 0; i < resumeInfo.size(); i++)
@@ -282,6 +283,13 @@ public void readResumeTest() throws Exception
resumedRecords.add(record);
}
+
+ // Periodically run garbage collector to prevent buffers in remote file readers from exhausting free memory
+ // This is only needed due to rapidly creating / destroying thousands of HpccRemoteFileReaders
+ if ((i % 10) == 0)
+ {
+ runtime.gc();
+ }
fileReader.close();
fileReader = null;
@@ -1131,6 +1139,53 @@ public void emptyCompressedFileTest()
}
}
+ @Test
+ public void filePartReadRetryTest()
+ {
+ {
+ HPCCFile readFile = null;
+ try
+ {
+ readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass);
+ DataPartition[] fileParts = readFile.getFileParts();
+ for (int i = 0; i < fileParts.length; i++)
+ {
+ String firstCopyIP = fileParts[i].getCopyIP(0);
+ String firstCopyPath = fileParts[i].getCopyPath(0);
+ fileParts[i].setCopyIP(0, "1.1.1.1");
+ fileParts[i].add(1, firstCopyIP, firstCopyPath);
+ }
+
+ List records = readFile(readFile, null, false);
+ System.out.println("Record count: " + records.size());
+ }
+ catch (Exception e)
+ {
+ Assert.fail(e.getMessage());
+ }
+ }
+
+ {
+ HPCCFile readFile = null;
+ try
+ {
+ readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass);
+ DataPartition[] fileParts = readFile.getFileParts();
+ for (int i = 0; i < fileParts.length; i++)
+ {
+ fileParts[i].add(0,"1.1.1.1", "");
+ }
+
+ List records = readFile(readFile, null, false);
+ System.out.println("Record count: " + records.size());
+ }
+ catch (Exception e)
+ {
+ Assert.fail(e.getMessage());
+ }
+ }
+ }
+
@Test
public void invalidSignatureTest()
{
@@ -1211,6 +1266,77 @@ public void invalidSignatureTest()
}
}
+ @Test
+ public void earlyCloseTest() throws Exception
+ {
+ HPCCFile file = new HPCCFile(datasets[0], connString , hpccUser, hpccPass);
+
+ DataPartition[] fileParts = file.getFileParts();
+ if (fileParts == null || fileParts.length == 0)
+ {
+ Assert.fail("No file parts found");
+ }
+
+ FieldDef originalRD = file.getRecordDefinition();
+ if (originalRD == null || originalRD.getNumDefs() == 0)
+ {
+ Assert.fail("Invalid or null record definition");
+ }
+
+ {
+ HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
+ HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[0], originalRD, recordBuilder);
+
+ int expectedRecordCounts = 10;
+ int numRecords = 0;
+ while (fileReader.hasNext())
+ {
+ try
+ {
+ fileReader.next();
+ numRecords++;
+ }
+ catch (Exception e)
+ {
+ System.out.println("Error: " + e.getMessage());
+ }
+
+ if (numRecords == expectedRecordCounts)
+ {
+ fileReader.close();
+ }
+ }
+ assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts);
+ }
+
+ // Check that calling close() inbetween hasNext() & next() allows the current record to be read
+ {
+ HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition());
+ HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[0], originalRD, recordBuilder);
+
+ int expectedRecordCounts = 11;
+ int numRecords = 0;
+ while (fileReader.hasNext())
+ {
+ if (numRecords == expectedRecordCounts-1)
+ {
+ fileReader.close();
+ }
+
+ try
+ {
+ fileReader.next();
+ numRecords++;
+ }
+ catch (Exception e)
+ {
+ System.out.println("Error: " + e.getMessage());
+ }
+ }
+ assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts);
+ }
+ }
+
public List readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception
{
return readFile(file, connectTimeoutMillis, shouldForceTimeout, false, BinaryRecordReader.NO_STRING_PROCESSING);
diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java
index 6e07edaf2..f62c3d39a 100644
--- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java
+++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java
@@ -56,6 +56,18 @@ public void thorFileTests()
Assert.assertTrue("FileUtility operation didn't complete successfully", success);
}
+ {
+ String readArgs[] = {"-read_test", "benchmark::integer::20kb", "-url", this.connString,
+ "-user", this.hpccUser, "-pass", this.hpccPass, "-file_parts", "1" };
+
+ JSONArray results = FileUtility.run(readArgs);
+ JSONObject result = results.optJSONObject(0);
+ Assert.assertNotNull("FileUtility result should not be null.", result);
+
+ boolean success = result.optBoolean("successful",false);
+ Assert.assertTrue("FileUtility operation didn't complete successfully", success);
+ }
+
{
String copyArgs[] = {"-copy", "benchmark::integer::20kb benchmark::integer::20kb-copy",
"-url", this.connString, "-dest_url", this.connString,
diff --git a/dfsclient/src/test/resources/generate-datasets.ecl b/dfsclient/src/test/resources/generate-datasets.ecl
index 0c1fdafa7..fed129a15 100644
--- a/dfsclient/src/test/resources/generate-datasets.ecl
+++ b/dfsclient/src/test/resources/generate-datasets.ecl
@@ -10,7 +10,10 @@ childRec := {STRING8 childField1, INTEGER8 childField2, REAL8 childField3};
rec := {INTEGER8 int8, UNSIGNED8 uint8, INTEGER4 int4, UNSIGNED4 uint4,
INTEGER2 int2, UNSIGNED2 uint2,
REAL8 r8, REAL4 r4,
- DECIMAL16_8 dec16, UDECIMAL16_8 udec16,
+ DECIMAL16_8 dec16,
+ DECIMAL15_8 dec15,
+ UDECIMAL16_8 udec16,
+ UDECIMAL15_8 udec15,
QSTRING qStr,
STRING8 fixStr8,
STRING str,
@@ -33,7 +36,9 @@ ds := DATASET(totalrecs1, transform(rec,
self.r8 := (REAL)(random() % unique_values);
self.r4 := (REAL)(random() % unique_values);
self.dec16 := (REAL)(random() % unique_values);
+ self.dec15 := (REAL)(random() % unique_values);
self.udec16 := (REAL)(random() % unique_values);
+ self.udec15 := (REAL)(random() % unique_values);
self.qStr := (STRING)(random() % unique_values);
self.fixStr8 := (STRING)(random() % unique_values);
self.str := (STRING)(random() % unique_values);
diff --git a/pom.xml b/pom.xml
index ad846d220..f3c9818b4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -4,7 +4,7 @@
4.0.0
org.hpccsystems
hpcc4j
- 9.2.65-0-SNAPSHOT
+ 9.4.41-0-SNAPSHOT
pom
HPCC Systems Java Projects
https://hpccsystems.com
@@ -31,7 +31,8 @@
UTF-8
3.8.0
- 8
+ 1.8
+ 1.8
3.1.1
1.6.8
false
@@ -54,7 +55,7 @@
20231013
2.17.1
1.8.1
- 2.10.1
+ 3.3.6
1.11.1
1.10.0
1.5.0
@@ -202,7 +203,8 @@
maven-compiler-plugin
${maven.compiler.version}
- ${maven.compiler.release}
+
+ ${maven.compiler.target}
${maven.compiler.groups}
${maven.compiler.excludedGroups}
diff --git a/wsclient/pom.xml b/wsclient/pom.xml
index ead156961..dce4342d0 100644
--- a/wsclient/pom.xml
+++ b/wsclient/pom.xml
@@ -9,7 +9,7 @@
org.hpccsystems
hpcc4j
- 9.2.65-0-SNAPSHOT
+ 9.4.41-0-SNAPSHOT
diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java
index 87d0c5d61..a7283a6cd 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java
@@ -1004,9 +1004,8 @@ public ProgressResponseWrapper sprayVariable(DelimitedDataOptions options, DropZ
if (targetDropZone == null) throw new Exception("TargetDropZone object not available!");
SprayVariable request = new SprayVariable();
-
request.setSourceIP(targetDropZone.getNetAddress());
- request.setSourcePath(targetDropZone.getPath() + "/" + sourceFileName);
+ request.setSourcePath(Utils.ensureTrailingPathSlash(targetDropZone.getPath()) + sourceFileName);
request.setDestGroup(destGroup);
request.setDestLogicalName(targetFileName);
request.setOverwrite(overwrite);
@@ -1162,7 +1161,7 @@ public ProgressResponseWrapper sprayXML(DropZoneWrapper targetDropZone, String s
request.setDestGroup(destGroup);
request.setSourceIP(targetDropZone.getNetAddress());
- request.setSourcePath(targetDropZone.getPath() + "/" + sourceFileName);
+ request.setSourcePath(Utils.ensureTrailingPathSlash(targetDropZone.getPath()) + sourceFileName);
request.setDestLogicalName(targetFileName);
request.setOverwrite(overwrite);
request.setSourceFormat(format.getValue());
@@ -1318,7 +1317,7 @@ public ProgressResponseWrapper sprayFixed(DropZoneWrapper targetDropZone, String
request.setDestGroup(destGroup);
request.setSourceRecordSize(recordSize);
request.setSourceIP(targetDropZone.getNetAddress());
- request.setSourcePath(targetDropZone.getPath() + "/" + sourceFileName);
+ request.setSourcePath(Utils.ensureTrailingPathSlash(targetDropZone.getPath()) + sourceFileName);
request.setDestLogicalName(targetFileLabel);
request.setOverwrite(overwrite);
request.setPrefix(prefix);
@@ -1488,15 +1487,11 @@ public boolean uploadLargeFile(File uploadFile, DropZoneWrapper dropZone)
return false;
}
- uploadurlbuilder += "&NetAddress=" + dropZone.getNetAddress() + "&Path=" + dropZone.getPath();
+ uploadurlbuilder += "&NetAddress=" + dropZone.getNetAddress() + "&Path=" + Utils.ensureTrailingPathSlash(dropZone.getPath());
if (!dropZone.getName().isEmpty())
uploadurlbuilder += "&DropZoneName=" + dropZone.getName();
- String path = dropZone.getPath().trim();
- if (!path.endsWith("/"))
- path += "/";
- uploadurlbuilder += "&Path=" + path;
uploadurlbuilder += "&OS=" + (dropZone.getLinux().equalsIgnoreCase("true") ? "2" : "1");
uploadurlbuilder += "&rawxml_=1";
WritableByteChannel outchannel = null;
diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java
index f33d83cf1..df7a56162 100644
--- a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java
+++ b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java
@@ -1061,21 +1061,64 @@ public static DocumentBuilder newSafeXMLDocBuilder() throws ParserConfigurationE
return safeXMLDocBuilder;
}
+ /**
+ * Ensures the given path contains a trailing path delimiter.
+ * Does not introduce duplicate trailing path delimiter if one already exists.
+ * Defaults to Linux style separator if the given path either contains a Linux style separator, or the path is empty.
+ * Strips all trailing white space character
+ * @param path The path to be postfixed
+ * @return original path with proper trailing path delimiter
+ */
public static String ensureTrailingPathSlash(String path)
{
return ensureTrailingPathSlash(path, (path.contains(Character.toString(LINUX_SEP)) || path.length() == 0) ? LINUX_SEP : WIN_SEP);
}
+ /**
+ * Ensures the given path contains a trailing path delimiter.
+ * Does not introduce duplicate trailing path delimiter if one already exists.
+ * Uses Linux style path separator 'useLinuxSep' == "true", otherwise uses windows style path separator
+ * Strips all trailing white space character
+ * @param path path The path to be postfixed
+ * @param useLinuxSep String, if "true" linux styled path delimiter will be used
+ * @return original path with proper trailing path delimiter
+ */
public static String ensureTrailingPathSlash(String path, String useLinuxSep)
{
return ensureTrailingPathSlash(path, useLinuxSep.equalsIgnoreCase("true") ? LINUX_SEP : WIN_SEP);
}
+ /**
+ * Ensures the given path contains a trailing path delimiter.
+ * Does not introduce duplicate trailing path delimiter if one already exists.
+ * Uses provided 'slash' as trailing path delimiter
+ * Strips all trailing white space character
+ * @param path The path to be postfixed
+ * @param slash The character to append
+ * @return original path with proper trailing path delimiter
+ */
public static String ensureTrailingPathSlash(String path, char slash)
{
+ path = trimTrailing(path);
+
if (path.length() == 0 || path.charAt(path.length()-1) != slash)
path = path + slash;
return path;
}
+
+ /**
+ * Removes trailing whitespace characters from a string.
+ *
+ * @param originalStr the original string from which trailing whitespace should be removed
+ * @return a new string with the same characters as the original string, minus any trailing whitespace
+ */
+ public static String trimTrailing(String originalStr)
+ {
+ int strIndex = originalStr.length()-1;
+ while(strIndex >= 0 && Character.isWhitespace(originalStr.charAt(strIndex)))
+ strIndex--;
+
+ return originalStr.substring(0,strIndex+1);
+ }
}
diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java
index e9ae11815..e27714e24 100644
--- a/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java
+++ b/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java
@@ -21,7 +21,9 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®.
import static org.junit.Assert.fail;
import static org.junit.Assume.assumeTrue;
+import java.util.Arrays;
import java.util.HashSet;
+import java.util.List;
import java.util.Set;
import org.apache.axis2.AxisFault;
@@ -67,8 +69,6 @@ public class WSFileIOClientTest extends BaseRemoteTest
@Test
public void copyFile() throws Exception
{
- Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized());
- assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));
String lzfile=System.currentTimeMillis() + "_csvtest.csv";
String hpccfilename="temp::" + lzfile;
client.createHPCCFile(lzfile, targetLZ, true);
@@ -76,35 +76,43 @@ public void copyFile() throws Exception
client.writeHPCCFileData(data, lzfile, targetLZ, true, 0, 20);
try
{
- ProgressResponseWrapper dfuspray=wsclient.getFileSprayClient().sprayVariable(
+ System.out.println("Starting file spray.");
+ ProgressResponseWrapper dfuspray = wsclient.getFileSprayClient().sprayVariable(
new DelimitedDataOptions(),
wsclient.getFileSprayClient().fetchLocalDropZones().get(0),
lzfile,"~" + hpccfilename,"",thorClusterFileGroup,true,
HPCCFileSprayClient.SprayVariableFormat.DFUff_csv,
null, null, null, null, null, null, null);
- Thread.sleep(1000);
- int wait=60;
if (dfuspray.getExceptions() != null
- && dfuspray.getExceptions().getException() != null
- && dfuspray.getExceptions().getException().size()>0)
+ && dfuspray.getExceptions().getException() != null
+ && dfuspray.getExceptions().getException().size()>0)
{
fail(dfuspray.getExceptions().getException().get(0).getMessage());
}
- if (dfuspray.getSecsLeft()>0)
+
+ List whiteListedStates = Arrays.asList( "queued", "started", "unknown", "finished", "monitoring");
+ int waitCount = 0;
+ int MAX_WAIT_COUNT = 60;
+
+ ProgressResponseWrapper dfuProgress = null;
+ do
{
- System.out.println("Still spraying, waiting 1 sec...");
- for (int i=wait;i>0;i--)
+ dfuProgress = wsclient.getFileSprayClient().getDfuProgress(dfuspray.getWuid());
+ boolean stateIsWhiteListed = whiteListedStates.contains(dfuProgress.getState());
+ if (!stateIsWhiteListed)
{
- if (dfuspray.getSecsLeft()==0)
- {
- i=0;
- }
- else
- {
- Thread.sleep(1000);
- }
+ fail("File spray failed: Summary: " + dfuProgress.getSummaryMessage() + " Exceptions: " + dfuProgress.getExceptions());
}
- }
+
+ if (dfuProgress.getPercentDone() < 100)
+ {
+ Thread.sleep(1000);
+ System.out.println("File spray percent complete: " + dfuProgress.getPercentDone() + "% Sleeping for 1sec to wait for spray.");
+ waitCount++;
+ }
+ } while (waitCount < 60 && dfuProgress.getPercentDone() < 100);
+
+ assumeTrue("File spray did not complete within: " + MAX_WAIT_COUNT + "s. Failing test.", waitCount < MAX_WAIT_COUNT);
System.out.println("Test file successfully sprayed to " + "~" + hpccfilename + ", attempting copy to " + hpccfilename + "_2");
wsclient.getFileSprayClient().copyFile(hpccfilename,hpccfilename + "_2",true);
@@ -144,8 +152,6 @@ public void copyFile() throws Exception
@Test
public void AcreateHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
{
- Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized());
- assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));
System.out.println("Creating file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'");
Assert.assertTrue(client.createHPCCFile(testfilename, targetLZ, true));
}
@@ -153,8 +159,6 @@ public void AcreateHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
@Test
public void BwriteHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
{
- assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));
- System.out.println("Writing data to file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'");
byte[] data = "HELLO MY DARLING, HELLO MY DEAR!1234567890ABCDEFGHIJKLMNOPQRSTUVXYZ".getBytes();
Assert.assertTrue(client.writeHPCCFileData(data, testfilename, targetLZ, true, 0, 20));
}
@@ -162,9 +166,6 @@ public void BwriteHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
@Test
public void CreadHPCCFile() throws Exception, ArrayOfEspExceptionWrapper
{
- Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized());
- assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed"));
-
System.out.println("reading data from file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'");
byte[] data = "HELLO MY DARLING, HELLO MY DEAR!1234567890ABCDEFGHIJKLMNOPQRSTUVXYZ".getBytes();
String response = client.readFileData(targetLZ, testfilename, data.length, 0);
diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java
index 2a378661a..a320260ee 100644
--- a/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java
+++ b/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java
@@ -25,19 +25,35 @@ public static void setup() throws Exception
wswuclient = wsclient.getWsWorkunitsClient();
}
+ public WorkunitWrapper createWU(String ecl, String cluster, String jobName, String owner, ApplicationValueWrapper av)
+ {
+ WorkunitWrapper wu=new WorkunitWrapper();
+ wu.setECL(ecl);
+ wu.setCluster(thorclustername);
+ wu.setJobname(jobName);
+ wu.setOwner(owner);
+ wu.getApplicationValues().add(av);
+ try
+ {
+ wu=wswuclient.compileWUFromECL(wu);
+ wu=wswuclient.runWorkunit(wu.getWuid(),null,null,null,false,null);
+ }
+ catch (Exception e)
+ {
+ System.err.println("WUQueryTest: Failed to create new WU for test: '" + ecl + "'" );
+ System.err.println(e.getLocalizedMessage());
+ return null;
+ }
+ return wu;
+ }
@Test
public void testGetWorkunitByAppName() throws Exception, ArrayOfEspExceptionWrapper, ArrayOfECLExceptionWrapper
{
- WorkunitWrapper wu=new WorkunitWrapper();
- wu.setECL("OUTPUT('1');");
- wu.setCluster(thorclustername);
- wu.setJobname("testGetWorkunitByAppName");
- wu.setOwner("user");
ApplicationValueWrapper av=new ApplicationValueWrapper("HIPIE","testkey","testvalue");
- wu.getApplicationValues().add(av);
- wu=wswuclient.compileWUFromECL(wu);
- wu=wswuclient.runWorkunit(wu.getWuid(),null,null,null,false,null);
+ WorkunitWrapper wu = createWU("OUTPUT('1');", thorclustername, "1testGetWorkunitByAppName", "1user", av);
+
+ assumeTrue("testGetWorkunitByAppName failed to create WU!", wu != null);
WUQueryWrapper info = new WUQueryWrapper().setSortBy(SortBy.WUID).setDescending(true);
info.getApplicationValues().add(av);
@@ -57,8 +73,14 @@ public void testGetWorkunitByAppName() throws Exception, ArrayOfEspExceptionWrap
@Test
public void testGetWorkunitSort() throws Exception
{
+
+ assumeTrue("Testing WU sortBy failed to create First WU!", null != createWU("OUTPUT('a');", thorclustername, "aTestWorkunitSortBA", "aTestUser", new ApplicationValueWrapper("AppA","testkeyA","testvalueA")));
+ assumeTrue("Testing WU sortBy failed to create Second WU!", null != createWU("OUTPUT('b');", thorclustername, "bTestWorkunitSortBy", "bTestUser", new ApplicationValueWrapper("AppB","testkeyB","testvalueB")));
+
//wuid descending
List result=wswuclient.getWorkunits(new WUQueryWrapper().setSortBy(SortBy.WUID).setDescending(true));
+ assumeTrue("Testing WU sortBy failed to find enough WUs!", result.size() > 1);
+
if (result.get(0).getWuid().compareToIgnoreCase(result.get(1).getWuid())<0)
{
Assert.fail("descending workunits in wrong order:" + result.get(0).getWuid() + " then " + result.get(1).getWuid());
diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java
index 1ff309ebf..eef268e47 100644
--- a/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java
+++ b/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java
@@ -8,59 +8,75 @@
public class UtilsTest
{
+ @Test
+ public void testEnsureTrailingSlashTrailingWhiteSpace()
+ {
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(""));
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP)+ " "));
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP) + " "));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+"\t"));
+ assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path "));
+ assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\ "));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path "));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/\t\t"));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/\n"));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/" + '\u005Cn'));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/ " + '\u005Ct'));
+ }
+
@Test
public void testEnsureTrailingSlashNoSlashSpecified()
{
- assertEquals(Utils.ensureTrailingPathSlash(""), Character.toString(Utils.LINUX_SEP)); //no sep in path, default to lin sep
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP)), Character.toString(Utils.LINUX_SEP));//no change expected
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)), Character.toString(Utils.WIN_SEP)); //no change expected
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP));//no change expected
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path"), "C:\\some\\Path\\");
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\"), "C:\\some\\Path\\");
- assertEquals(Utils.ensureTrailingPathSlash("/another/path"), "/another/path/");
- assertEquals(Utils.ensureTrailingPathSlash("/another/path/"), "/another/path/");
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("")); //no sep in path, default to lin sep
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP)));//no change expected
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP))); //no change expected
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)));//no change expected
+ assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path"));
+ assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\"));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path"));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/"));
}
@Test
public void testEnsureTrailingSlashSlashSpecified()
{
- assertEquals(Utils.ensureTrailingPathSlash("", Utils.LINUX_SEP), Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash("", Utils.WIN_SEP), Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP), Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP);
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP), Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.WIN_SEP), Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.LINUX_SEP), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.LINUX_SEP), "C:\\some\\Path\\"+Utils.LINUX_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.WIN_SEP), "C:\\some\\Path\\"+Utils.WIN_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.LINUX_SEP), "C:\\some\\Path\\" + Utils.LINUX_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.WIN_SEP), "C:\\some\\Path\\");
- assertEquals(Utils.ensureTrailingPathSlash("/another/path", Utils.LINUX_SEP), "/another/path" + Utils.LINUX_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("/another/path", Utils.WIN_SEP), "/another/path/"+ Utils.WIN_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("/another/path/", Utils.LINUX_SEP), "/another/path/");
- assertEquals(Utils.ensureTrailingPathSlash("/another/path/", Utils.WIN_SEP), "/another/path/"+Utils.WIN_SEP);
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("", Utils.LINUX_SEP));
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash("", Utils.WIN_SEP));
+ assertEquals(Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP, Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP));
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP));
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.WIN_SEP));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.LINUX_SEP));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP));
+ assertEquals("C:\\some\\Path\\"+Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.LINUX_SEP));
+ assertEquals("C:\\some\\Path\\"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.WIN_SEP));
+ assertEquals("C:\\some\\Path\\" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.LINUX_SEP));
+ assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.WIN_SEP));
+ assertEquals("/another/path" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("/another/path", Utils.LINUX_SEP));
+ assertEquals("/another/path/"+ Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path", Utils.WIN_SEP));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/", Utils.LINUX_SEP));
+ assertEquals("/another/path/"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path/", Utils.WIN_SEP));
}
@Test
public void testEnsureTrailingSlashUseLinuxBoolTest()
{
- assertEquals(Utils.ensureTrailingPathSlash("", "true"), Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash("", "false"), Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash("", "xyz"), Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "false"), Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP);
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "true"), Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "false"), Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "true"), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "true"), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP));
- assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "false"), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP));
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", "true"), "C:\\some\\Path\\"+Utils.LINUX_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", "false"), "C:\\some\\Path\\"+Utils.WIN_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "true"), "C:\\some\\Path\\" + Utils.LINUX_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "false"), "C:\\some\\Path\\");
- assertEquals(Utils.ensureTrailingPathSlash("/another/path", "TRUE"), "/another/path" + Utils.LINUX_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("/another/path", "FALSE"), "/another/path/"+ Utils.WIN_SEP);
- assertEquals(Utils.ensureTrailingPathSlash("/another/path/", "TrUe"), "/another/path/");
- assertEquals(Utils.ensureTrailingPathSlash("/another/path/", "FalSe"), "/another/path/"+Utils.WIN_SEP);
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("", "true"));
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash("", "false"));
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash("", "xyz"));
+ assertEquals(Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP, Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "false"));
+ assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "true"));
+ assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "false"));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "true"));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "true"));
+ assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "false"));
+ assertEquals("C:\\some\\Path\\"+Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", "true"));
+ assertEquals("C:\\some\\Path\\"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", "false"));
+ assertEquals("C:\\some\\Path\\" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "true"));
+ assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "false"));
+ assertEquals("/another/path" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("/another/path", "TRUE"));
+ assertEquals("/another/path/"+ Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path", "FALSE"));
+ assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/", "TrUe"));
+ assertEquals("/another/path/"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path/", "FalSe"));
}
}