diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java index 589e62f4c..a846969e0 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/DataPartition.java @@ -296,6 +296,7 @@ public void setCopyIP(int copyIndex, String copyIP) * @param index The index at which to insert the file part copy * @param copyIP The IP of the new file part copy * @param copyPath The path of the new file part copy + * @throws Exception The exception */ public void add(int index, String copyIP, String copyPath) throws Exception { diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index afec67413..0f745450e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -50,6 +50,7 @@ public class HPCCFile implements Serializable private DataPartition[] dataParts; private DataPartition tlkPartition = null; + private boolean useTLK = true; private PartitionProcessor partitionProcessor = null; private long dataPartsCreationTimeMS = -1; @@ -130,12 +131,44 @@ public HPCCFile(String fileName, String connectionString, String user, String pa */ public HPCCFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts, String targetfilecluster) throws HpccFileException + { + this(fileName, espconninfo, targetColumnList, filter, remap_info, maxParts, targetfilecluster, true); + } + + /** + * Constructor for the HpccFile. Captures HPCC logical file information from the DALI Server for the clusters behind + * the ESP named by the IP address and re-maps the address information for the THOR nodes to visible addresses when + * the THOR clusters are virtual. + * + * @param fileName + * The HPCC file name + * @param espconninfo + * the espconninfo + * @param targetColumnList + * a comma separated list of column names in dotted notation for columns within compound columns. + * @param filter + * a file filter to select records of interest (SQL where syntax) + * @param remap_info + * address and port re-mapping info for THOR cluster + * @param maxParts + * optional the maximum number of partitions or zero for no max + * @param targetfilecluster + * optional - the hpcc cluster the target file resides in + * @param useTLK + * optional - whether or not the top level key should be used to help filter index files + * @throws HpccFileException + * the hpcc file exception + */ + public HPCCFile(String fileName, Connection espconninfo, String targetColumnList, String filter, RemapInfo remap_info, int maxParts, + String targetfilecluster, boolean useTLK) throws HpccFileException { this.fileName = fileName; this.recordDefinition = null; this.projectedRecordDefinition = null; this.columnPruner = new ColumnPruner(targetColumnList); this.espConnInfo = espconninfo; + this.useTLK = useTLK; + try { if (filter != null && !filter.isEmpty()) @@ -163,12 +196,12 @@ public static int getFilePartFromFPos(long fpos) } /** - * Extracts the offset in the file part from a fileposition value. + * Extracts the offset in the file part from a fileposition value. * * @param fpos file position * @return the project list */ - public static long getOffsetFromFPos(long fpos) + public static long getOffsetFromFPos(long fpos) { // First 48 bits store the offset return fpos & 0xffffffffffffL; @@ -424,13 +457,20 @@ private void createDataParts() throws HpccFileException this.recordDefinition = RecordDefinitionTranslator.parseJsonRecordDefinition(new JSONObject(originalRecDefInJSON)); - try + if (this.useTLK) { - this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, this.tlkPartition); + try + { + this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, this.tlkPartition); + } + catch (Exception e) + { + log.error("Error while constructing partition processor, reading will continue without partition filtering: " + e.getMessage()); + this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null); + } } - catch (Exception e) + else { - log.error("Error while constructing partition processor, reading will continue without partition filtering: " + e.getMessage()); this.partitionProcessor = new PartitionProcessor(this.recordDefinition, this.dataParts, null); } @@ -622,13 +662,13 @@ private static String acquireFileAccess(String fileName, HPCCWsDFUClient hpcc, i String uniqueID = "HPCC-FILE: " + UUID.randomUUID().toString(); return hpcc.getFileAccessBlob(fileName, clusterName, expirySeconds, uniqueID); } - + /** * @return the file metadata information for this HPCCFile (if it exists) */ - public DFUFileDetailWrapper getOriginalFileMetadata() + public DFUFileDetailWrapper getOriginalFileMetadata() { - if (originalFileMetadata==null) + if (originalFileMetadata==null) { HPCCWsDFUClient dfuClient = HPCCWsDFUClient.get(espConnInfo); if (dfuClient.hasInitError()) diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java index d4abc553a..74b92e4cb 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java @@ -30,6 +30,7 @@ import org.hpccsystems.ws.client.HPCCWsDFUClient; import org.hpccsystems.ws.client.HPCCWsWorkUnitsClient; import org.hpccsystems.ws.client.platform.test.BaseRemoteTest; +import org.hpccsystems.ws.client.utils.Connection; import org.hpccsystems.ws.client.wrappers.wsworkunits.WorkunitWrapper; import org.hpccsystems.ws.client.wrappers.wsdfu.DFUCreateFileWrapper; import org.hpccsystems.ws.client.wrappers.wsdfu.DFUFileDetailWrapper; @@ -222,6 +223,28 @@ public void tlkFilterExample() throws Exception fileReader.close(); } + @Test + public void tlkBypassTest() throws Exception + { + //------------------------------------------------------------------------------ + // Read index ignoring TLK and check that all partitions are returned + //------------------------------------------------------------------------------ + + Connection espConn = new Connection(connString); + espConn.setUserName(hpccUser); + espConn.setPassword(hpccPass); + + HPCCFile file = new HPCCFile("~test::index::integer::key", espConn, "", "", new RemapInfo(), 0, "", false); + DataPartition[] dataParts = file.getFileParts(); + + Long searchValue = 3L; + FileFilter filter = new FileFilter("key = " + searchValue); + List filteredPartitions = file.findMatchingPartitions(filter); + + // Without the TLK being read the above filter should return all file parts + assertTrue("Unexpected number of partitions", filteredPartitions.size() == dataParts.length); + } + @Test public void biasedIntTest() throws Exception {