diff --git a/commons-hpcc/pom.xml b/commons-hpcc/pom.xml index 47bb0a461..8c21c53fd 100644 --- a/commons-hpcc/pom.xml +++ b/commons-hpcc/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.2.65-0-SNAPSHOT + 9.4.41-0-SNAPSHOT diff --git a/dfsclient/pom.xml b/dfsclient/pom.xml index d5a9f911e..02a9a5dca 100644 --- a/dfsclient/pom.xml +++ b/dfsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.2.65-0-SNAPSHOT + 9.4.41-0-SNAPSHOT diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java index 565a59454..58470fdd8 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java @@ -830,7 +830,15 @@ private BigDecimal getUnsignedDecimal(int numDigits, int precision, int dataLen) BigDecimal ret = new BigDecimal(0); int idx = 0; - int curDigit = numDigits - 1; + int curDigit = numDigits; + + // If the # of digits is odd the top most nibble is unused and we don't want to include it + // in the scale calculations below. Due to how the scale calculation works below this means + // we decrement the starting value of curDigit in the case of even length decimals + if ((numDigits % 2) == 0) + { + curDigit--; + } while (idx < dataLen) { diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index bc648b49c..a846969e0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -16,6 +16,10 @@ package org.hpccsystems.dfs.client; import java.io.Serializable; +import java.security.InvalidParameterException; +import java.util.List; +import java.util.ArrayList; +import java.util.Arrays; import org.hpccsystems.commons.ecl.FileFilter; import org.hpccsystems.commons.errors.HpccFileException; @@ -261,11 +265,61 @@ public String[] getCopyLocations() public String getCopyIP(int copyindex) { int copiescount = copyLocations.length; - if (copyindex < 0 || copyindex >= copiescount) return null; + if (copyindex < 0 || copyindex >= copiescount) + { + return null; + } return copyLocations[copyindex]; } + /** + * Set the copy IP + * + * @param copyIndex + * the copyindex + * @param copyIP The IP of the file part copy + */ + public void setCopyIP(int copyIndex, String copyIP) + { + if (copyIndex < 0 || copyIndex >= copyLocations.length) + { + return; + } + + copyLocations[copyIndex] = copyIP; + } + + /** + * Add file part copy + * + * @param index The index at which to insert the file part copy + * @param copyIP The IP of the new file part copy + * @param copyPath The path of the new file part copy + * @throws Exception The exception + */ + public void add(int index, String copyIP, String copyPath) throws Exception + { + if (index < 0 || index > copyLocations.length) + { + throw new InvalidParameterException("Insertion index: " + index + " is invalid." + + "Expected index in range of: [0," + copyLocations.length + "]"); + } + + if (copyIP == null || copyPath == null) + { + throw new InvalidParameterException("Copy IP or Path are invalid, must be non-null."); + } + + List copyLocationsList = new ArrayList<>(Arrays.asList(copyLocations)); + copyLocationsList.add(index, copyIP); + copyLocations = copyLocationsList.toArray(new String[0]); + + List copyPathList = new ArrayList<>(Arrays.asList(copyPaths)); + copyPathList.add(index, copyPath); + copyPaths = copyPathList.toArray(new String[0]); + } + /** * Count of copies available for this file part. * @return copy locations size diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java index 3da02c805..2cf0e3b25 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/FileUtility.java @@ -417,6 +417,24 @@ private static Options getReadOptions() return options; } + private static Options getReadTestOptions() + { + Options options = new Options(); + options.addRequiredOption("read_test", "Read test", true, "Specifies the file that should be read."); + options.addRequiredOption("url", "Source Cluster URL", true, "Specifies the URL of the ESP to connect to."); + options.addOption("user", true, "Specifies the username used to connect. Defaults to null."); + options.addOption("pass", true, "Specifies the password used to connect. Defaults to null."); + options.addOption("num_threads", true, "Specifies the number of parallel to use to perform operations."); + + options.addOption(Option.builder("file_parts") + .argName("_file_parts") + .hasArgs() + .valueSeparator(',') + .desc("Specifies the file parts that should be read. Defaults to all file parts.") + .build()); + return options; + } + private static Options getCopyOptions() { Options options = new Options(); @@ -463,6 +481,7 @@ private static Options getTopLevelOptions() { Options options = new Options(); options.addOption("read", "Reads the specified file(s) and writes a copy of the files to the local directory."); + options.addOption("read_test", "Reads the specified file and/or particular file parts without writing it locally."); options.addOption("copy", "Copies the specified remote source file to the specified remote destination cluster / file."); options.addOption("write", "Writes the specified local source file to the specified remote destination cluster / file."); @@ -660,6 +679,44 @@ public void run() } } + private static Runnable[] createReadTestTasks(DataPartition[] fileParts, FieldDef recordDef, TaskContext context) throws Exception + { + Runnable[] tasks = new Runnable[fileParts.length]; + for (int i = 0; i < tasks.length; i++) + { + final int taskIndex = i; + final DataPartition filePart = fileParts[taskIndex]; + final HpccRemoteFileReader filePartReader = new HpccRemoteFileReader(filePart, recordDef, new HPCCRecordBuilder(recordDef)); + + tasks[taskIndex] = new Runnable() + { + HpccRemoteFileReader fileReader = filePartReader; + + public void run() + { + try + { + while (fileReader.hasNext()) + { + HPCCRecord record = fileReader.next(); + context.recordsRead.incrementAndGet(); + } + + fileReader.close(); + context.bytesRead.addAndGet(fileReader.getStreamPosition()); + } + catch (Exception e) + { + context.addError("Error while reading file part index: '" + filePart.getThisPart() + " Error message: " + e.getMessage()); + return; + } + } + }; + } + + return tasks; + } + private static Runnable[] createReadToThorTasks(DataPartition[] fileParts, SplitTable[] splitTables, String[] outFilePaths, FieldDef recordDef, TaskContext context) throws Exception { Runnable[] tasks = new Runnable[fileParts.length]; @@ -1159,6 +1216,165 @@ private static void performRead(String[] args, TaskContext context) } } + private static void performReadTest(String[] args, TaskContext context) + { + Options options = getReadTestOptions(); + CommandLineParser parser = new DefaultParser(); + + CommandLine cmd = null; + try + { + cmd = parser.parse(options, args); + } + catch (ParseException e) + { + System.out.println("Error parsing commandline options:\n" + e.getMessage()); + return; + } + + String connString = cmd.getOptionValue("url"); + String user = cmd.getOptionValue("user"); + String pass = cmd.getOptionValue("pass"); + + String outputPath = cmd.getOptionValue("out","."); + + int numThreads = NUM_DEFAULT_THREADS; + String numThreadsStr = cmd.getOptionValue("num_threads", "" + numThreads); + try + { + numThreads = Integer.parseInt(numThreadsStr); + } + catch(Exception e) + { + System.out.println("Invalid option value for num_threads: " + + numThreadsStr + ", must be an integer. Defaulting to: " + NUM_DEFAULT_THREADS + " threads."); + } + + String formatStr = cmd.getOptionValue("format"); + if (formatStr == null) + { + formatStr = "THOR"; + } + + FileFormat format = FileFormat.THOR; + switch (formatStr.toUpperCase()) + { + case "THOR": + format = FileFormat.THOR; + break; + case "PARQUET": + format = FileFormat.PARQUET; + break; + default: + System.out.println("Error unsupported format specified: " + format); + return; + } + + String datasetName = cmd.getOptionValue("read_test"); + context.startOperation("Read Test " + datasetName); + + HPCCFile file = null; + try + { + file = new HPCCFile(datasetName, connString, user, pass); + } + catch (Exception e) + { + System.out.println("Error while attempting to open file: '" + datasetName + "': " + e.getMessage()); + return; + } + + DataPartition[] fileParts = null; + FieldDef recordDef = null; + try + { + fileParts = file.getFileParts(); + recordDef = file.getRecordDefinition(); + } + catch (Exception e) + { + System.out.println("Error while retrieving file parts for: '" + datasetName + "': " + e.getMessage()); + return; + } + + String[] filePartsStrs = cmd.getOptionValues("file_parts"); + if (filePartsStrs != null && filePartsStrs.length > 0) + { + ArrayList filePartList = new ArrayList(); + for (int i = 0; i < filePartsStrs.length; i++) + { + try + { + int filePartIndex = Integer.parseInt(filePartsStrs[i]) - 1; + if (filePartIndex < 0 || filePartIndex >= fileParts.length) + { + System.out.println("Skipping invalid file part index: " + filePartsStrs[i] + + " outside of range: [0," + fileParts.length + "]"); + continue; + } + + filePartList.add(fileParts[filePartIndex]); + } + catch (NumberFormatException e) + { + System.out.println("Skipping invalid file part index: " + filePartsStrs[i]); + } + } + } + + Runnable[] tasks = null; + try + { + switch (format) + { + case THOR: + tasks = createReadTestTasks(fileParts, recordDef, context); + break; + case PARQUET: + default: + throw new Exception("Error unsupported format specified: " + format); + }; + } + catch (Exception e) + { + context.addError("Error while attempting to create read tasks for file: '" + datasetName + "': " + e.getMessage()); + return; + } + + try + { + executeTasks(tasks, numThreads); + } + catch (Exception e) + { + context.addError("Error while attempting to execute read tasks for file: '" + datasetName + "': " + e.getMessage()); + return; + } + + if (context.hasError()) + { + return; + } + + try + { + String fileName = file.getFileName().replace(":","_"); + String filePath = outputPath + File.separator + fileName + ".meta"; + FileOutputStream metaFile = new FileOutputStream(filePath); + + String metaStr = RecordDefinitionTranslator.toJsonRecord(file.getRecordDefinition()).toString(); + metaFile.write(metaStr.getBytes()); + metaFile.close(); + } + catch (Exception e) + { + context.addError("Error while attempting to write meta-data for file: '" + datasetName + "': " + e.getMessage()); + return; + } + + context.endOperation(); + } + private static void performCopy(String[] args, TaskContext context) { Options options = getCopyOptions(); @@ -1576,6 +1792,10 @@ public static JSONArray run(String[] args) { performRead(args, context); } + else if (cmd.hasOption("read_test")) + { + performReadTest(args, context); + } else if (cmd.hasOption("copy")) { performCopy(args, context); diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index afec67413..8df2ba73e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -50,6 +50,7 @@ public class HPCCFile implements Serializable private DataPartition[] dataParts; private DataPartition tlkPartition = null; + private boolean useTLK = true; private PartitionProcessor partitionProcessor = null; private long dataPartsCreationTimeMS = -1; @@ -130,12 +131,44 @@ public HPCCFile(String fileName, String connectionString, String user, String pa */ public HPCCFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts, String targetfilecluster) throws HpccFileException + { + this(fileName, espconninfo, targetColumnList, filter, remap_info, maxParts, targetfilecluster, true); + } + + /** + * Constructor for the HpccFile. Captures HPCC logical file information from the DALI Server for the clusters behind + * the ESP named by the IP address and re-maps the address information for the THOR nodes to visible addresses when + * the THOR clusters are virtual. + * + * @param fileName + * The HPCC file name + * @param espconninfo + * the espconninfo + * @param targetColumnList + * a comma separated list of column names in dotted notation for columns within compound columns. + * @param filter + * a file filter to select records of interest (SQL where syntax) + * @param remap_info + * address and port re-mapping info for THOR cluster + * @param maxParts + * optional the maximum number of partitions or zero for no max + * @param targetfilecluster + * optional - the hpcc cluster the target file resides in + * @param useTLK + * optional - whether or not the top level key should be used to help filter index files + * @throws HpccFileException + * the hpcc file exception + */ + public HPCCFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts, + String targetfilecluster, boolean useTLK) throws HpccFileException { this.fileName = fileName; this.recordDefinition = null; this.projectedRecordDefinition = null; this.columnPruner = new ColumnPruner(targetColumnList); this.espConnInfo = espconninfo; + this.useTLK = useTLK; + try { if (filter != null && !filter.isEmpty()) @@ -163,12 +196,12 @@ public static int getFilePartFromFPos(long fpos) } /** - * Extracts the offset in the file part from a fileposition value. + * Extracts the offset in the file part from a fileposition value. * * @param fpos file position * @return the project list */ - public static long getOffsetFromFPos(long fpos) + public static long getOffsetFromFPos(long fpos) { // First 48 bits store the offset return fpos & 0xffffffffffffL; @@ -285,6 +318,34 @@ public HPCCFile setClusterRemapInfo(RemapInfo remapinfo) return this; } + /** + * Get the value of useTLK option + * + * @return a boolean value indicating use of the TLK to filter index file reads + */ + public boolean getUseTLK() + { + return this.useTLK; + } + + /** + * Sets the useTLK option. + * Note: the value must be set before querying any data from the file, including record definition information. + * + * @param useTLK should the TLK be used to filter index file reads + * + * @return this HPCCFile + */ + public HPCCFile setUseTLK(boolean useTLK) + { + this.useTLK = useTLK; + + // Force the data parts to be re-created + this.dataParts = null; + + return this; + } + /** * Gets the filter. * @@ -424,13 +485,20 @@ private void createDataParts() throws HpccFileException this.recordDefinition = RecordDefinitionTranslator.parseJsonRecordDefinition(new JSONObject(originalRecDefInJSON)); - try + if (this.useTLK) { - this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, this.tlkPartition); + try + { + this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, this.tlkPartition); + } + catch (Exception e) + { + log.error("Error while constructing partition processor, reading will continue without partition filtering: " + e.getMessage()); + this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null); + } } - catch (Exception e) + else { - log.error("Error while constructing partition processor, reading will continue without partition filtering: " + e.getMessage()); this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null); } @@ -622,13 +690,13 @@ private static String acquireFileAccess(String fileName, HPCCWsDFUClient hpcc, i String uniqueID = "HPCC-FILE: " + UUID.randomUUID().toString(); return hpcc.getFileAccessBlob(fileName, clusterName, expirySeconds, uniqueID); } - + /** * @return the file metadata information for this HPCCFile (if it exists) */ - public DFUFileDetailWrapper getOriginalFileMetadata() + public DFUFileDetailWrapper getOriginalFileMetadata() { - if (originalFileMetadata==null) + if (originalFileMetadata==null) { HPCCWsDFUClient dfuClient = HPCCWsDFUClient.get(espConnInfo); if (dfuClient.hasInitError()) diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java index 78e3e6500..ade51a57e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HpccRemoteFileReader.java @@ -35,6 +35,8 @@ public class HpccRemoteFileReader implements Iterator private BinaryRecordReader binaryRecordReader; private IRecordBuilder recordBuilder = null; private boolean handlePrefetch = true; + private boolean isClosed = false; + private boolean canReadNext = true; private long openTimeMs = 0; private long recordsRead = 0; @@ -234,7 +236,6 @@ public HpccRemoteFileReader(DataPartition dp, FieldDef originalRD, IRecordBuilde this.binaryRecordReader.initialize(this.recordBuilder); } - log.info("HPCCRemoteFileReader: Opening file part: " + dataPartition.getThisPart() + (resumeInfo != null ? " resume position: " + resumeInfo.inputStreamPos : "" )); log.trace("Original record definition:\n" @@ -315,12 +316,18 @@ public String getRemoteReadMessages() */ public void prefetch() { - if (this.handlePrefetch) + if (handlePrefetch) { log.warn("Prefetch called on an HpccRemoteFileReader that has an internal prefetch thread."); return; } + if (isClosed) + { + log.warn("Prefetch called on an HpccRemoteFileReader that has been closed."); + return; + } + this.inputStream.prefetchData(); } @@ -332,10 +339,19 @@ public void prefetch() @Override public boolean hasNext() { - boolean rslt = false; + if (isClosed) + { + log.warn("hasNext() called on an HpccRemoteFileReader that has been closed."); + return false; + } + + // Keep track of whether we have said there is another record. + // This allows us to handle edge cases around close() being called between hasNext() and next() + canReadNext = false; + try { - rslt = this.binaryRecordReader.hasNext(); + canReadNext = this.binaryRecordReader.hasNext(); // Has next may not catch the prefetch exception if it occurs at the beginning of a read // This is due to InputStream.hasNext() being allowed to throw an IOException when closed. @@ -346,12 +362,14 @@ public boolean hasNext() } catch (HpccFileException e) { - rslt = false; + canReadNext = false; log.error("Read failure for " + this.dataPartition.toString()); - throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + exception.initCause(e); + throw exception; } - return rslt; + return canReadNext; } /** @@ -362,6 +380,11 @@ public boolean hasNext() @Override public T next() { + if (isClosed && !canReadNext) + { + throw new java.util.NoSuchElementException("Fatal read error: Attempting to read next() from a closed file reader."); + } + Object rslt = null; try { @@ -370,10 +393,16 @@ public T next() catch (HpccFileException e) { log.error("Read failure for " + this.dataPartition.toString() + " " + e.getMessage()); - throw new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + java.util.NoSuchElementException exception = new java.util.NoSuchElementException("Fatal read error: " + e.getMessage()); + exception.initCause(e); + throw exception; } recordsRead++; + + // Reset this after each read so we can handle edge cases where close() was called between hasNext() / next() + canReadNext = false; + return (T) rslt; } @@ -385,8 +414,15 @@ public T next() */ public void close() throws Exception { + if (isClosed) + { + log.warn("Calling close on an already closed file reader for file part: " + this.dataPartition.toString()); + return; + } + report(); this.inputStream.close(); + isClosed = true; long closeTimeMs = System.currentTimeMillis(); double readTimeS = (closeTimeMs - openTimeMs) / 1000.0; diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java index 372bdce1e..ab3c38a6e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/RowServiceInputStream.java @@ -522,6 +522,11 @@ public String getIP() return this.dataPart.getCopyIP(prioritizedCopyIndexes.get(getFilePartCopy())); } + private String getCopyPath() + { + return this.dataPart.getCopyPath(prioritizedCopyIndexes.get(getFilePartCopy())); + } + private int getFilePartCopy() { return filePartCopyIndexPointer; @@ -1546,150 +1551,156 @@ private void makeActive() throws HpccFileException this.active.set(false); this.handle = 0; - try + boolean needsRetry = false; + do { - log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + (getFilePartCopy() + 1) + "' on IP: '" - + getIP() + "'"); - + needsRetry = false; try { - if (getUseSSL()) + log.debug("Attempting to connect to file part : '" + dataPart.getThisPart() + "' Copy: '" + + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + "'" + " for Path: '" + getCopyPath() + "'"); + try + { + if (getUseSSL()) + { + SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); + sock = (SSLSocket) ssf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + + log.debug("Attempting SSL handshake..."); + ((SSLSocket) sock).startHandshake(); + log.debug("SSL handshake successful..."); + log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + } + else + { + SocketFactory sf = SocketFactory.getDefault(); + sock = sf.createSocket(); + + // Optimize for bandwidth over latency and connection time. + // We are opening up a long standing connection and potentially reading a significant amount of + // data + // So we don't care as much about individual packet latency or connection time overhead + sock.setPerformancePreferences(0, 1, 2); + sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + } + + this.sock.setSoTimeout(socketOpTimeoutMs); + + log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + } + catch (java.net.UnknownHostException e) { - SSLSocketFactory ssf = (SSLSocketFactory) SSLSocketFactory.getDefault(); - sock = (SSLSocket) ssf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); - - log.debug("Attempting SSL handshake..."); - ((SSLSocket) sock).startHandshake(); - log.debug("SSL handshake successful..."); - log.debug(" Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); + throw new HpccFileException("Bad file part IP address or host name: " + this.getIP(), e); } - else + catch (java.io.IOException e) { - SocketFactory sf = SocketFactory.getDefault(); - sock = sf.createSocket(); - - // Optimize for bandwidth over latency and connection time. - // We are opening up a long standing connection and potentially reading a significant amount of - // data - // So we don't care as much about individual packet latency or connection time overhead - sock.setPerformancePreferences(0, 1, 2); - sock.connect(new InetSocketAddress(this.getIP(), this.dataPart.getPort()), this.connectTimeout); + throw new HpccFileException(e); } - this.sock.setSoTimeout(socketOpTimeoutMs); + try + { + this.dos = new java.io.DataOutputStream(sock.getOutputStream()); + this.dis = new java.io.DataInputStream(sock.getInputStream()); + } + catch (java.io.IOException e) + { + throw new HpccFileException("Failed to create streams", e); + } - log.debug("Connected: Remote address = " + sock.getInetAddress().toString() + " Remote port = " + sock.getPort()); - } - catch (java.net.UnknownHostException e) - { - throw new HpccFileException("Bad file part addr " + this.getIP(), e); - } - catch (java.io.IOException e) - { - throw new HpccFileException(e); - } + //------------------------------------------------------------------------------ + // Check protocol version + //------------------------------------------------------------------------------ - try - { - this.dos = new java.io.DataOutputStream(sock.getOutputStream()); - this.dis = new java.io.DataInputStream(sock.getInputStream()); - } - catch (java.io.IOException e) - { - throw new HpccFileException("Failed to create streams", e); - } + try + { + String msg = makeGetVersionRequest(); + int msgLen = msg.length(); - //------------------------------------------------------------------------------ - // Check protocol version - //------------------------------------------------------------------------------ + this.dos.writeInt(msgLen); + this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); + this.dos.flush(); + } + catch (IOException e) + { + throw new HpccFileException("Failed on initial remote read trans", e); + } - try - { - String msg = makeGetVersionRequest(); - int msgLen = msg.length(); + RowServiceResponse response = readResponse(); + if (response.len == 0) + { + useOldProtocol = true; + } + else + { + useOldProtocol = false; - this.dos.writeInt(msgLen); - this.dos.write(msg.getBytes(HPCCCharSet), 0, msgLen); - this.dos.flush(); - } - catch (IOException e) - { - throw new HpccFileException("Failed on initial remote read trans", e); - } + byte[] versionBytes = new byte[response.len]; + try + { + this.dis.readFully(versionBytes); + } + catch (IOException e) + { + throw new HpccFileException("Error while attempting to read version response.", e); + } - RowServiceResponse response = readResponse(); - if (response.len == 0) - { - useOldProtocol = true; - } - else - { - useOldProtocol = false; + rowServiceVersion = new String(versionBytes, HPCCCharSet); + } + + //------------------------------------------------------------------------------ + // Send initial read request + //------------------------------------------------------------------------------ - byte[] versionBytes = new byte[response.len]; try { - this.dis.readFully(versionBytes); + String readTrans = null; + if (this.tokenBin == null) + { + this.tokenBin = new byte[0]; + readTrans = makeInitialRequest(); + } + else + { + readTrans = makeTokenRequest(); + } + + int transLen = readTrans.length(); + this.dos.writeInt(transLen); + this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); + this.dos.flush(); } catch (IOException e) { - throw new HpccFileException("Error while attempting to read version response.", e); + throw new HpccFileException("Failed on initial remote read read trans", e); } - rowServiceVersion = new String(versionBytes, HPCCCharSet); - } - - //------------------------------------------------------------------------------ - // Send initial read request - //------------------------------------------------------------------------------ - - try - { - String readTrans = null; - if (this.tokenBin == null) - { - this.tokenBin = new byte[0]; - readTrans = makeInitialRequest(); - } - else + if (CompileTimeConstants.PROFILE_CODE) { - readTrans = makeTokenRequest(); + firstByteTimeNS = System.nanoTime(); } - int transLen = readTrans.length(); - this.dos.writeInt(transLen); - this.dos.write(readTrans.getBytes(HPCCCharSet), 0, transLen); - this.dos.flush(); + this.active.set(true); } - catch (IOException e) + catch (Exception e) { - throw new HpccFileException("Failed on initial remote read read trans", e); - } + log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() + + "'"); + log.error(e.getMessage()); - if (CompileTimeConstants.PROFILE_CODE) - { - firstByteTimeNS = System.nanoTime(); + needsRetry = true; + if (!setNextFilePartCopy()) + { + throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); + } } - - this.active.set(true); - } - catch (Exception e) - { - log.error("Could not reach file part: '" + dataPart.getThisPart() + "' copy: '" + (getFilePartCopy() + 1) + "' on IP: '" + getIP() - + "'"); - log.error(e.getMessage()); - - if (!setNextFilePartCopy()) - // This should be a multi exception - throw new HpccFileException("Unsuccessfuly attempted to connect to all file part copies", e); - } + } while (needsRetry); } /* Notes on protocol: diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java index 43c3437e7..46f02f39f 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java @@ -514,4 +514,4 @@ public static void main(String[] args) test.tlkFilterExample(); } catch(Exception e) {} } -} +} \ No newline at end of file diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java index 39f6a5280..eb3f2a2fa 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSReadWriteTest.java @@ -265,6 +265,7 @@ public void readResumeTest() throws Exception System.out.println("Messages from file part (" + i + ") read operation:\n" + fileReader.getRemoteReadMessages()); } + Runtime runtime = Runtime.getRuntime(); int readSizeKB = 100; ArrayList resumedRecords = new ArrayList(); for (int i = 0; i < resumeInfo.size(); i++) @@ -282,6 +283,13 @@ public void readResumeTest() throws Exception resumedRecords.add(record); } + + // Periodically run garbage collector to prevent buffers in remote file readers from exhausting free memory + // This is only needed due to rapidly creating / destroying thousands of HpccRemoteFileReaders + if ((i % 10) == 0) + { + runtime.gc(); + } fileReader.close(); fileReader = null; @@ -1131,6 +1139,53 @@ public void emptyCompressedFileTest() } } + @Test + public void filePartReadRetryTest() + { + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) + { + String firstCopyIP = fileParts[i].getCopyIP(0); + String firstCopyPath = fileParts[i].getCopyPath(0); + fileParts[i].setCopyIP(0, "1.1.1.1"); + fileParts[i].add(1, firstCopyIP, firstCopyPath); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + + { + HPCCFile readFile = null; + try + { + readFile = new HPCCFile(datasets[0], connString, hpccUser, hpccPass); + DataPartition[] fileParts = readFile.getFileParts(); + for (int i = 0; i < fileParts.length; i++) + { + fileParts[i].add(0,"1.1.1.1", ""); + } + + List records = readFile(readFile, null, false); + System.out.println("Record count: " + records.size()); + } + catch (Exception e) + { + Assert.fail(e.getMessage()); + } + } + } + @Test public void invalidSignatureTest() { @@ -1211,6 +1266,77 @@ public void invalidSignatureTest() } } + @Test + public void earlyCloseTest() throws Exception + { + HPCCFile file = new HPCCFile(datasets[0], connString , hpccUser, hpccPass); + + DataPartition[] fileParts = file.getFileParts(); + if (fileParts == null || fileParts.length == 0) + { + Assert.fail("No file parts found"); + } + + FieldDef originalRD = file.getRecordDefinition(); + if (originalRD == null || originalRD.getNumDefs() == 0) + { + Assert.fail("Invalid or null record definition"); + } + + { + HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[0], originalRD, recordBuilder); + + int expectedRecordCounts = 10; + int numRecords = 0; + while (fileReader.hasNext()) + { + try + { + fileReader.next(); + numRecords++; + } + catch (Exception e) + { + System.out.println("Error: " + e.getMessage()); + } + + if (numRecords == expectedRecordCounts) + { + fileReader.close(); + } + } + assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts); + } + + // Check that calling close() inbetween hasNext() & next() allows the current record to be read + { + HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[0], originalRD, recordBuilder); + + int expectedRecordCounts = 11; + int numRecords = 0; + while (fileReader.hasNext()) + { + if (numRecords == expectedRecordCounts-1) + { + fileReader.close(); + } + + try + { + fileReader.next(); + numRecords++; + } + catch (Exception e) + { + System.out.println("Error: " + e.getMessage()); + } + } + assertTrue("Expected record count: " + expectedRecordCounts + " Actual count: " + numRecords, numRecords == expectedRecordCounts); + } + } + public List readFile(HPCCFile file, Integer connectTimeoutMillis, boolean shouldForceTimeout) throws Exception { return readFile(file, connectTimeoutMillis, shouldForceTimeout, false, BinaryRecordReader.NO_STRING_PROCESSING); diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java index 6e07edaf2..f62c3d39a 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/FileUtilityTest.java @@ -56,6 +56,18 @@ public void thorFileTests() Assert.assertTrue("FileUtility operation didn't complete successfully", success); } + { + String readArgs[] = {"-read_test", "benchmark::integer::20kb", "-url", this.connString, + "-user", this.hpccUser, "-pass", this.hpccPass, "-file_parts", "1" }; + + JSONArray results = FileUtility.run(readArgs); + JSONObject result = results.optJSONObject(0); + Assert.assertNotNull("FileUtility result should not be null.", result); + + boolean success = result.optBoolean("successful",false); + Assert.assertTrue("FileUtility operation didn't complete successfully", success); + } + { String copyArgs[] = {"-copy", "benchmark::integer::20kb benchmark::integer::20kb-copy", "-url", this.connString, "-dest_url", this.connString, diff --git a/dfsclient/src/test/resources/generate-datasets.ecl b/dfsclient/src/test/resources/generate-datasets.ecl index 0c1fdafa7..fed129a15 100644 --- a/dfsclient/src/test/resources/generate-datasets.ecl +++ b/dfsclient/src/test/resources/generate-datasets.ecl @@ -10,7 +10,10 @@ childRec := {STRING8 childField1, INTEGER8 childField2, REAL8 childField3}; rec := {INTEGER8 int8, UNSIGNED8 uint8, INTEGER4 int4, UNSIGNED4 uint4, INTEGER2 int2, UNSIGNED2 uint2, REAL8 r8, REAL4 r4, - DECIMAL16_8 dec16, UDECIMAL16_8 udec16, + DECIMAL16_8 dec16, + DECIMAL15_8 dec15, + UDECIMAL16_8 udec16, + UDECIMAL15_8 udec15, QSTRING qStr, STRING8 fixStr8, STRING str, @@ -33,7 +36,9 @@ ds := DATASET(totalrecs1, transform(rec, self.r8 := (REAL)(random() % unique_values); self.r4 := (REAL)(random() % unique_values); self.dec16 := (REAL)(random() % unique_values); + self.dec15 := (REAL)(random() % unique_values); self.udec16 := (REAL)(random() % unique_values); + self.udec15 := (REAL)(random() % unique_values); self.qStr := (STRING)(random() % unique_values); self.fixStr8 := (STRING)(random() % unique_values); self.str := (STRING)(random() % unique_values); diff --git a/pom.xml b/pom.xml index ad846d220..f3c9818b4 100644 --- a/pom.xml +++ b/pom.xml @@ -4,7 +4,7 @@ 4.0.0 org.hpccsystems hpcc4j - 9.2.65-0-SNAPSHOT + 9.4.41-0-SNAPSHOT pom HPCC Systems Java Projects https://hpccsystems.com @@ -31,7 +31,8 @@ UTF-8 3.8.0 - 8 + 1.8 + 1.8 3.1.1 1.6.8 false @@ -54,7 +55,7 @@ 20231013 2.17.1 1.8.1 - 2.10.1 + 3.3.6 1.11.1 1.10.0 1.5.0 @@ -202,7 +203,8 @@ maven-compiler-plugin ${maven.compiler.version} - ${maven.compiler.release} + ${maven.compiler.source} + ${maven.compiler.target} ${maven.compiler.groups} ${maven.compiler.excludedGroups} diff --git a/wsclient/pom.xml b/wsclient/pom.xml index ead156961..dce4342d0 100644 --- a/wsclient/pom.xml +++ b/wsclient/pom.xml @@ -9,7 +9,7 @@ org.hpccsystems hpcc4j - 9.2.65-0-SNAPSHOT + 9.4.41-0-SNAPSHOT diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java index 87d0c5d61..a7283a6cd 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/HPCCFileSprayClient.java @@ -1004,9 +1004,8 @@ public ProgressResponseWrapper sprayVariable(DelimitedDataOptions options, DropZ if (targetDropZone == null) throw new Exception("TargetDropZone object not available!"); SprayVariable request = new SprayVariable(); - request.setSourceIP(targetDropZone.getNetAddress()); - request.setSourcePath(targetDropZone.getPath() + "/" + sourceFileName); + request.setSourcePath(Utils.ensureTrailingPathSlash(targetDropZone.getPath()) + sourceFileName); request.setDestGroup(destGroup); request.setDestLogicalName(targetFileName); request.setOverwrite(overwrite); @@ -1162,7 +1161,7 @@ public ProgressResponseWrapper sprayXML(DropZoneWrapper targetDropZone, String s request.setDestGroup(destGroup); request.setSourceIP(targetDropZone.getNetAddress()); - request.setSourcePath(targetDropZone.getPath() + "/" + sourceFileName); + request.setSourcePath(Utils.ensureTrailingPathSlash(targetDropZone.getPath()) + sourceFileName); request.setDestLogicalName(targetFileName); request.setOverwrite(overwrite); request.setSourceFormat(format.getValue()); @@ -1318,7 +1317,7 @@ public ProgressResponseWrapper sprayFixed(DropZoneWrapper targetDropZone, String request.setDestGroup(destGroup); request.setSourceRecordSize(recordSize); request.setSourceIP(targetDropZone.getNetAddress()); - request.setSourcePath(targetDropZone.getPath() + "/" + sourceFileName); + request.setSourcePath(Utils.ensureTrailingPathSlash(targetDropZone.getPath()) + sourceFileName); request.setDestLogicalName(targetFileLabel); request.setOverwrite(overwrite); request.setPrefix(prefix); @@ -1488,15 +1487,11 @@ public boolean uploadLargeFile(File uploadFile, DropZoneWrapper dropZone) return false; } - uploadurlbuilder += "&NetAddress=" + dropZone.getNetAddress() + "&Path=" + dropZone.getPath(); + uploadurlbuilder += "&NetAddress=" + dropZone.getNetAddress() + "&Path=" + Utils.ensureTrailingPathSlash(dropZone.getPath()); if (!dropZone.getName().isEmpty()) uploadurlbuilder += "&DropZoneName=" + dropZone.getName(); - String path = dropZone.getPath().trim(); - if (!path.endsWith("/")) - path += "/"; - uploadurlbuilder += "&Path=" + path; uploadurlbuilder += "&OS=" + (dropZone.getLinux().equalsIgnoreCase("true") ? "2" : "1"); uploadurlbuilder += "&rawxml_=1"; WritableByteChannel outchannel = null; diff --git a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java index f33d83cf1..df7a56162 100644 --- a/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java +++ b/wsclient/src/main/java/org/hpccsystems/ws/client/utils/Utils.java @@ -1061,21 +1061,64 @@ public static DocumentBuilder newSafeXMLDocBuilder() throws ParserConfigurationE return safeXMLDocBuilder; } + /** + * Ensures the given path contains a trailing path delimiter. + * Does not introduce duplicate trailing path delimiter if one already exists. + * Defaults to Linux style separator if the given path either contains a Linux style separator, or the path is empty. + * Strips all trailing white space character + * @param path The path to be postfixed + * @return original path with proper trailing path delimiter + */ public static String ensureTrailingPathSlash(String path) { return ensureTrailingPathSlash(path, (path.contains(Character.toString(LINUX_SEP)) || path.length() == 0) ? LINUX_SEP : WIN_SEP); } + /** + * Ensures the given path contains a trailing path delimiter. + * Does not introduce duplicate trailing path delimiter if one already exists. + * Uses Linux style path separator 'useLinuxSep' == "true", otherwise uses windows style path separator + * Strips all trailing white space character + * @param path path The path to be postfixed + * @param useLinuxSep String, if "true" linux styled path delimiter will be used + * @return original path with proper trailing path delimiter + */ public static String ensureTrailingPathSlash(String path, String useLinuxSep) { return ensureTrailingPathSlash(path, useLinuxSep.equalsIgnoreCase("true") ? LINUX_SEP : WIN_SEP); } + /** + * Ensures the given path contains a trailing path delimiter. + * Does not introduce duplicate trailing path delimiter if one already exists. + * Uses provided 'slash' as trailing path delimiter + * Strips all trailing white space character + * @param path The path to be postfixed + * @param slash The character to append + * @return original path with proper trailing path delimiter + */ public static String ensureTrailingPathSlash(String path, char slash) { + path = trimTrailing(path); + if (path.length() == 0 || path.charAt(path.length()-1) != slash) path = path + slash; return path; } + + /** + * Removes trailing whitespace characters from a string. + * + * @param originalStr the original string from which trailing whitespace should be removed + * @return a new string with the same characters as the original string, minus any trailing whitespace + */ + public static String trimTrailing(String originalStr) + { + int strIndex = originalStr.length()-1; + while(strIndex >= 0 && Character.isWhitespace(originalStr.charAt(strIndex))) + strIndex--; + + return originalStr.substring(0,strIndex+1); + } } diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java index e9ae11815..e27714e24 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/WSFileIOClientTest.java @@ -21,7 +21,9 @@ HPCC SYSTEMS software Copyright (C) 2019 HPCC Systems®. import static org.junit.Assert.fail; import static org.junit.Assume.assumeTrue; +import java.util.Arrays; import java.util.HashSet; +import java.util.List; import java.util.Set; import org.apache.axis2.AxisFault; @@ -67,8 +69,6 @@ public class WSFileIOClientTest extends BaseRemoteTest @Test public void copyFile() throws Exception { - Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized()); - assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed")); String lzfile=System.currentTimeMillis() + "_csvtest.csv"; String hpccfilename="temp::" + lzfile; client.createHPCCFile(lzfile, targetLZ, true); @@ -76,35 +76,43 @@ public void copyFile() throws Exception client.writeHPCCFileData(data, lzfile, targetLZ, true, 0, 20); try { - ProgressResponseWrapper dfuspray=wsclient.getFileSprayClient().sprayVariable( + System.out.println("Starting file spray."); + ProgressResponseWrapper dfuspray = wsclient.getFileSprayClient().sprayVariable( new DelimitedDataOptions(), wsclient.getFileSprayClient().fetchLocalDropZones().get(0), lzfile,"~" + hpccfilename,"",thorClusterFileGroup,true, HPCCFileSprayClient.SprayVariableFormat.DFUff_csv, null, null, null, null, null, null, null); - Thread.sleep(1000); - int wait=60; if (dfuspray.getExceptions() != null - && dfuspray.getExceptions().getException() != null - && dfuspray.getExceptions().getException().size()>0) + && dfuspray.getExceptions().getException() != null + && dfuspray.getExceptions().getException().size()>0) { fail(dfuspray.getExceptions().getException().get(0).getMessage()); } - if (dfuspray.getSecsLeft()>0) + + List whiteListedStates = Arrays.asList( "queued", "started", "unknown", "finished", "monitoring"); + int waitCount = 0; + int MAX_WAIT_COUNT = 60; + + ProgressResponseWrapper dfuProgress = null; + do { - System.out.println("Still spraying, waiting 1 sec..."); - for (int i=wait;i>0;i--) + dfuProgress = wsclient.getFileSprayClient().getDfuProgress(dfuspray.getWuid()); + boolean stateIsWhiteListed = whiteListedStates.contains(dfuProgress.getState()); + if (!stateIsWhiteListed) { - if (dfuspray.getSecsLeft()==0) - { - i=0; - } - else - { - Thread.sleep(1000); - } + fail("File spray failed: Summary: " + dfuProgress.getSummaryMessage() + " Exceptions: " + dfuProgress.getExceptions()); } - } + + if (dfuProgress.getPercentDone() < 100) + { + Thread.sleep(1000); + System.out.println("File spray percent complete: " + dfuProgress.getPercentDone() + "% Sleeping for 1sec to wait for spray."); + waitCount++; + } + } while (waitCount < 60 && dfuProgress.getPercentDone() < 100); + + assumeTrue("File spray did not complete within: " + MAX_WAIT_COUNT + "s. Failing test.", waitCount < MAX_WAIT_COUNT); System.out.println("Test file successfully sprayed to " + "~" + hpccfilename + ", attempting copy to " + hpccfilename + "_2"); wsclient.getFileSprayClient().copyFile(hpccfilename,hpccfilename + "_2",true); @@ -144,8 +152,6 @@ public void copyFile() throws Exception @Test public void AcreateHPCCFile() throws Exception, ArrayOfEspExceptionWrapper { - Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized()); - assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed")); System.out.println("Creating file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'"); Assert.assertTrue(client.createHPCCFile(testfilename, targetLZ, true)); } @@ -153,8 +159,6 @@ public void AcreateHPCCFile() throws Exception, ArrayOfEspExceptionWrapper @Test public void BwriteHPCCFile() throws Exception, ArrayOfEspExceptionWrapper { - assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed")); - System.out.println("Writing data to file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'"); byte[] data = "HELLO MY DARLING, HELLO MY DEAR!1234567890ABCDEFGHIJKLMNOPQRSTUVXYZ".getBytes(); Assert.assertTrue(client.writeHPCCFileData(data, testfilename, targetLZ, true, 0, 20)); } @@ -162,9 +166,6 @@ public void BwriteHPCCFile() throws Exception, ArrayOfEspExceptionWrapper @Test public void CreadHPCCFile() throws Exception, ArrayOfEspExceptionWrapper { - Assume.assumeFalse("Test not valid on containerized HPCC environment", client.isTargetHPCCContainerized()); - assumeTrue("Ignoring test 'copyFile' because HPCC-30117 is not fixed", HPCC_30117.equalsIgnoreCase("fixed")); - System.out.println("reading data from file: '" + testfilename + "' on LandingZone: '" + targetLZ + "' on HPCC: '" + super.connString +"'"); byte[] data = "HELLO MY DARLING, HELLO MY DEAR!1234567890ABCDEFGHIJKLMNOPQRSTUVXYZ".getBytes(); String response = client.readFileData(targetLZ, testfilename, data.length, 0); diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java index 2a378661a..a320260ee 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/WUQueryTest.java @@ -25,19 +25,35 @@ public static void setup() throws Exception wswuclient = wsclient.getWsWorkunitsClient(); } + public WorkunitWrapper createWU(String ecl, String cluster, String jobName, String owner, ApplicationValueWrapper av) + { + WorkunitWrapper wu=new WorkunitWrapper(); + wu.setECL(ecl); + wu.setCluster(thorclustername); + wu.setJobname(jobName); + wu.setOwner(owner); + wu.getApplicationValues().add(av); + try + { + wu=wswuclient.compileWUFromECL(wu); + wu=wswuclient.runWorkunit(wu.getWuid(),null,null,null,false,null); + } + catch (Exception e) + { + System.err.println("WUQueryTest: Failed to create new WU for test: '" + ecl + "'" ); + System.err.println(e.getLocalizedMessage()); + return null; + } + return wu; + } @Test public void testGetWorkunitByAppName() throws Exception, ArrayOfEspExceptionWrapper, ArrayOfECLExceptionWrapper { - WorkunitWrapper wu=new WorkunitWrapper(); - wu.setECL("OUTPUT('1');"); - wu.setCluster(thorclustername); - wu.setJobname("testGetWorkunitByAppName"); - wu.setOwner("user"); ApplicationValueWrapper av=new ApplicationValueWrapper("HIPIE","testkey","testvalue"); - wu.getApplicationValues().add(av); - wu=wswuclient.compileWUFromECL(wu); - wu=wswuclient.runWorkunit(wu.getWuid(),null,null,null,false,null); + WorkunitWrapper wu = createWU("OUTPUT('1');", thorclustername, "1testGetWorkunitByAppName", "1user", av); + + assumeTrue("testGetWorkunitByAppName failed to create WU!", wu != null); WUQueryWrapper info = new WUQueryWrapper().setSortBy(SortBy.WUID).setDescending(true); info.getApplicationValues().add(av); @@ -57,8 +73,14 @@ public void testGetWorkunitByAppName() throws Exception, ArrayOfEspExceptionWrap @Test public void testGetWorkunitSort() throws Exception { + + assumeTrue("Testing WU sortBy failed to create First WU!", null != createWU("OUTPUT('a');", thorclustername, "aTestWorkunitSortBA", "aTestUser", new ApplicationValueWrapper("AppA","testkeyA","testvalueA"))); + assumeTrue("Testing WU sortBy failed to create Second WU!", null != createWU("OUTPUT('b');", thorclustername, "bTestWorkunitSortBy", "bTestUser", new ApplicationValueWrapper("AppB","testkeyB","testvalueB"))); + //wuid descending List result=wswuclient.getWorkunits(new WUQueryWrapper().setSortBy(SortBy.WUID).setDescending(true)); + assumeTrue("Testing WU sortBy failed to find enough WUs!", result.size() > 1); + if (result.get(0).getWuid().compareToIgnoreCase(result.get(1).getWuid())<0) { Assert.fail("descending workunits in wrong order:" + result.get(0).getWuid() + " then " + result.get(1).getWuid()); diff --git a/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java b/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java index 1ff309ebf..eef268e47 100644 --- a/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java +++ b/wsclient/src/test/java/org/hpccsystems/ws/client/utils/UtilsTest.java @@ -8,59 +8,75 @@ public class UtilsTest { + @Test + public void testEnsureTrailingSlashTrailingWhiteSpace() + { + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("")); + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP)+ " ")); + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP) + " ")); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+"\t")); + assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path ")); + assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\ ")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path ")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/\t\t")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/\n")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/" + '\u005Cn')); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/ " + '\u005Ct')); + } + @Test public void testEnsureTrailingSlashNoSlashSpecified() { - assertEquals(Utils.ensureTrailingPathSlash(""), Character.toString(Utils.LINUX_SEP)); //no sep in path, default to lin sep - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP)), Character.toString(Utils.LINUX_SEP));//no change expected - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)), Character.toString(Utils.WIN_SEP)); //no change expected - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP));//no change expected - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path"), "C:\\some\\Path\\"); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\"), "C:\\some\\Path\\"); - assertEquals(Utils.ensureTrailingPathSlash("/another/path"), "/another/path/"); - assertEquals(Utils.ensureTrailingPathSlash("/another/path/"), "/another/path/"); + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("")); //no sep in path, default to lin sep + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP)));//no change expected + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP))); //no change expected + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)));//no change expected + assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path")); + assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/")); } @Test public void testEnsureTrailingSlashSlashSpecified() { - assertEquals(Utils.ensureTrailingPathSlash("", Utils.LINUX_SEP), Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash("", Utils.WIN_SEP), Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP), Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP), Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.WIN_SEP), Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.LINUX_SEP), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.LINUX_SEP), "C:\\some\\Path\\"+Utils.LINUX_SEP); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.WIN_SEP), "C:\\some\\Path\\"+Utils.WIN_SEP); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.LINUX_SEP), "C:\\some\\Path\\" + Utils.LINUX_SEP); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.WIN_SEP), "C:\\some\\Path\\"); - assertEquals(Utils.ensureTrailingPathSlash("/another/path", Utils.LINUX_SEP), "/another/path" + Utils.LINUX_SEP); - assertEquals(Utils.ensureTrailingPathSlash("/another/path", Utils.WIN_SEP), "/another/path/"+ Utils.WIN_SEP); - assertEquals(Utils.ensureTrailingPathSlash("/another/path/", Utils.LINUX_SEP), "/another/path/"); - assertEquals(Utils.ensureTrailingPathSlash("/another/path/", Utils.WIN_SEP), "/another/path/"+Utils.WIN_SEP); + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("", Utils.LINUX_SEP)); + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash("", Utils.WIN_SEP)); + assertEquals(Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP, Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP)); + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP)); + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.WIN_SEP)); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), Utils.LINUX_SEP)); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.LINUX_SEP)); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.WIN_SEP)); + assertEquals("C:\\some\\Path\\"+Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.LINUX_SEP)); + assertEquals("C:\\some\\Path\\"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", Utils.WIN_SEP)); + assertEquals("C:\\some\\Path\\" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.LINUX_SEP)); + assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\", Utils.WIN_SEP)); + assertEquals("/another/path" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("/another/path", Utils.LINUX_SEP)); + assertEquals("/another/path/"+ Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path", Utils.WIN_SEP)); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/", Utils.LINUX_SEP)); + assertEquals("/another/path/"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path/", Utils.WIN_SEP)); } @Test public void testEnsureTrailingSlashUseLinuxBoolTest() { - assertEquals(Utils.ensureTrailingPathSlash("", "true"), Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash("", "false"), Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash("", "xyz"), Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "false"), Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "true"), Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "false"), Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "true"), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "true"), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)); - assertEquals(Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "false"), Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP)); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", "true"), "C:\\some\\Path\\"+Utils.LINUX_SEP); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path", "false"), "C:\\some\\Path\\"+Utils.WIN_SEP); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "true"), "C:\\some\\Path\\" + Utils.LINUX_SEP); - assertEquals(Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "false"), "C:\\some\\Path\\"); - assertEquals(Utils.ensureTrailingPathSlash("/another/path", "TRUE"), "/another/path" + Utils.LINUX_SEP); - assertEquals(Utils.ensureTrailingPathSlash("/another/path", "FALSE"), "/another/path/"+ Utils.WIN_SEP); - assertEquals(Utils.ensureTrailingPathSlash("/another/path/", "TrUe"), "/another/path/"); - assertEquals(Utils.ensureTrailingPathSlash("/another/path/", "FalSe"), "/another/path/"+Utils.WIN_SEP); + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash("", "true")); + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash("", "false")); + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash("", "xyz")); + assertEquals(Character.toString(Utils.LINUX_SEP)+Utils.WIN_SEP, Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "false")); + assertEquals(Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.LINUX_SEP), "true")); + assertEquals(Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "false")); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP), "true")); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "true")); + assertEquals(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP)+Character.toString(Utils.WIN_SEP), Utils.ensureTrailingPathSlash(Character.toString(Utils.WIN_SEP)+Character.toString(Utils.LINUX_SEP), "false")); + assertEquals("C:\\some\\Path\\"+Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", "true")); + assertEquals("C:\\some\\Path\\"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path", "false")); + assertEquals("C:\\some\\Path\\" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "true")); + assertEquals("C:\\some\\Path\\", Utils.ensureTrailingPathSlash("C:\\some\\Path\\", "false")); + assertEquals("/another/path" + Utils.LINUX_SEP, Utils.ensureTrailingPathSlash("/another/path", "TRUE")); + assertEquals("/another/path/"+ Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path", "FALSE")); + assertEquals("/another/path/", Utils.ensureTrailingPathSlash("/another/path/", "TrUe")); + assertEquals("/another/path/"+Utils.WIN_SEP, Utils.ensureTrailingPathSlash("/another/path/", "FalSe")); } }