diff --git a/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/FieldDef.java b/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/FieldDef.java index d813d0e64..f22a7cf8b 100644 --- a/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/FieldDef.java +++ b/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/FieldDef.java @@ -35,6 +35,7 @@ public class FieldDef implements Serializable private long len = 0; private boolean fixedLength = false; private boolean isUnsigned = false; + private boolean isBlob = false; private int additionalFlags = 0; /** @@ -59,6 +60,7 @@ public FieldDef(FieldDef rhs) this.len = rhs.len; this.fixedLength = rhs.fixedLength; this.isUnsigned = rhs.isUnsigned; + this.isBlob = rhs.isBlob; this.additionalFlags = rhs.additionalFlags; } @@ -330,6 +332,25 @@ public boolean isBiased() return isNonStandardInt(); } + /** + * Is the field stored in a blob? + * + * @return true when blob + */ + public boolean isBlob() + { + return this.isBlob; + } + + /** + * Sets the blob flag. + * @param blob is the field a blob? + */ + public void setIsBlob(boolean blob) + { + this.isBlob = blob; + } + /** * * diff --git a/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/RecordDefinitionTranslator.java b/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/RecordDefinitionTranslator.java index dbae989e4..9fbaf57bf 100644 --- a/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/RecordDefinitionTranslator.java +++ b/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/RecordDefinitionTranslator.java @@ -32,10 +32,12 @@ public class RecordDefinitionTranslator private static final String NAME_KEY = "name"; private static final String TYPE_KEY = "type"; private static final String CHILD_KEY = "child"; + private static final String XPATH_KEY = "xpath"; private static final String FLAGS_KEY = "flags"; private static final String ESP_TYPE_NAME_PREFIX = "ty"; + private static final int BLOB_LENGTH = 8; private static final int FLAG_UNSIGNED = 256; private static final int FLAG_UNKNOWN_SIZE = 1024; private static final int TYPE_ID_MASK = 0xff; // 0x7fff & ~FLAG_UNKNOWN_SIZE & ~FLAG_UNSIGNED; @@ -48,6 +50,7 @@ public class RecordDefinitionTranslator final private static int type_keyedint = 10; // Convert to integer final private static int type_record = 13; final private static int type_varstring = 14; + final private static int type_blob = 15; final private static int type_data = 16; final private static int type_table = 20; final private static int type_set = 21; @@ -264,28 +267,32 @@ public static String toECLRecord(FieldDef field) throws Exception */ private static String getEClTypeDefinition(FieldDef field, HashMap recordDefinitionMap) throws Exception { + String type = ""; switch (field.getFieldType()) { case SET: { - return "SET OF " + getEClTypeDefinition(field.getDef(0), recordDefinitionMap); + type = "SET OF " + getEClTypeDefinition(field.getDef(0), recordDefinitionMap); + break; } case DATASET: { - return "DATASET(" + getEClTypeDefinition(field.getDef(0), recordDefinitionMap) + ")"; + type = "DATASET(" + getEClTypeDefinition(field.getDef(0), recordDefinitionMap) + ")"; + break; } case BINARY: { + type = "DATA"; if (field.isFixed()) { - return "DATA" + field.getDataLen(); + type += field.getDataLen(); } - - return "DATA"; + break; } case BOOLEAN: { - return "BOOLEAN"; + type = "BOOLEAN"; + break; } case INTEGER: { @@ -300,7 +307,8 @@ private static String getEClTypeDefinition(FieldDef field, HashMap 0) { - childJson.put("flags", flags); + childJson.put(FLAGS_KEY, flags); } if (childField.getFieldType() == FieldType.DATASET) { - childJson.put("xpath", childField.getFieldName() + XPATH_DELIMITER + "Row"); + childJson.put(XPATH_KEY, childField.getFieldName() + XPATH_DELIMITER + "Row"); } else if (childField.getFieldType() == FieldType.SET) { - childJson.put("xpath", childField.getFieldName() + XPATH_DELIMITER + "Item"); + childJson.put(XPATH_KEY, childField.getFieldName() + XPATH_DELIMITER + "Item"); } fields.put(childJson); @@ -954,6 +1001,14 @@ private static FieldDef parseJsonTypeDefinition(JSONObject jsonTypeDefinitions, int typeID = typeDef.getInt(FIELD_TYPE_KEY); long length = typeDef.getLong(LENGTH_KEY); + if (typeID == type_blob) + { + String blobType = typeDef.getString(CHILD_KEY); + FieldDef def = getOrParseJsonTypeDefintion(blobType, jsonTypeDefinitions, protoTypeDefs); + def.setIsBlob(true); + return def; + } + FieldType fieldType = getFieldType(typeID); switch (fieldType) { diff --git a/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/TestFieldDefinitions.java b/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/TestFieldDefinitions.java index 761d02736..1a5970c85 100644 --- a/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/TestFieldDefinitions.java +++ b/commons-hpcc/src/main/java/org/hpccsystems/commons/ecl/TestFieldDefinitions.java @@ -64,6 +64,22 @@ public class TestFieldDefinitions + " \"name\": \"isactive\",\r\n \"type\": \"ty15\",\r\n \"flags\": 65536\r\n },\r\n {\r\n \"name\": \"__internal_fpos__\",\r\n \"type\": \"ty16\",\r\n" + " \"flags\": 65821\r\n }\r\n ]\r\n}"; + private static final String blobIndexDefinitionStr = "{\n \"ty6\": { \"vinit\": 2, \"length\": 8, \"fieldType\": 285 },\n" + + " \"ty5\": { \"length\": 8, \"fieldType\": 15, \"child\": \"ty4\" },\n" + + " \"length\": 36,\n" + + " \"ty2\": { \"length\": 0, \"fieldType\": 1028 },\n" + + " \"fields\": [\n" + + " { \"name\": \"str12\", \"flags\": 4, \"type\": \"ty1\" },\n" + + " { \"name\": \"content_string\", \"flags\": 65551, \"type\": \"ty3\" },\n" + + " { \"name\": \"content_data\", \"flags\": 65551, \"type\": \"ty5\" },\n" + + " { \"name\": \"__internal_fpos__\", \"flags\": 65821, \"type\": \"ty6\" }\n" + + " ],\n" + + " \"ty1\": { \"length\": 12, \"fieldType\": 4 },\n" + + " \"fieldType\": 13,\n" + + " \"ty4\": { \"length\": 0, \"fieldType\": 1040 },\n" + + " \"ty3\": { \"length\": 8, \"fieldType\": 15, \"child\": \"ty2\" }\n" + + "}\n"; + /** * Gets the complex record definition json. * @@ -79,6 +95,11 @@ public static String getAllTypesIndexRecordDefinitionJson() return allTypesIndexRecordDefinitionStr; } + public static String getBlobIndexDefinitionJson() + { + return blobIndexDefinitionStr; + } + /** * Gets the complex record definition. * diff --git a/commons-hpcc/src/test/java/org/hpccsystems/commons/ecl/RecordDefinitionTests.java b/commons-hpcc/src/test/java/org/hpccsystems/commons/ecl/RecordDefinitionTests.java index b0d17dcbf..16a97ff21 100644 --- a/commons-hpcc/src/test/java/org/hpccsystems/commons/ecl/RecordDefinitionTests.java +++ b/commons-hpcc/src/test/java/org/hpccsystems/commons/ecl/RecordDefinitionTests.java @@ -39,7 +39,8 @@ public void setup() public void testJsonRecordParsing() throws Exception { String[] recordDefStrs = new String[] { TestFieldDefinitions.getComplexRecordDefinitionJson(), - TestFieldDefinitions.getAllTypesIndexRecordDefinitionJson() }; + TestFieldDefinitions.getAllTypesIndexRecordDefinitionJson(), + TestFieldDefinitions.getBlobIndexDefinitionJson() }; for (String recordDefStr : recordDefStrs) { try diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java index 6b6ab9273..8042f710e 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/BinaryRecordReader.java @@ -558,6 +558,36 @@ private Object parseRecord(FieldDef recordDef, IRecordBuilder recordBuilder, boo for (int fieldIndex = 0; fieldIndex < recordDef.getNumDefs(); fieldIndex++) { FieldDef fd = recordDef.getDef(fieldIndex); + if (fd.isBlob()) + { + // If we encounter a blob field, we only have access to the blob file location + // So read that location and construct a default value field + long blobFileLoc = (long) getUnsigned(8, true); + try + { + switch (fd.getFieldType()) + { + case BINARY: + recordBuilder.setFieldValue(fieldIndex, new byte[0]); + continue; + case STRING: + case VAR_STRING: + recordBuilder.setFieldValue(fieldIndex, ""); + continue; + case SET: + case DATASET: + recordBuilder.setFieldValue(fieldIndex, new ArrayList()); + continue; + default: + throw new UnparsableContentException("Unexpected blob type: " + fd.getFieldType() + " for field: " + fd.getFieldName()); + } + } + catch (IllegalAccessException e) + { + throw new UnparsableContentException("Unable to set field value for field: " + fd.getFieldName() + " with error: " + e.getMessage()); + } + } + Object fieldValue = null; switch (fd.getFieldType()) { diff --git a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java index 4e73bdbd3..ba66895f9 100644 --- a/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java +++ b/dfsclient/src/main/java/org/hpccsystems/dfs/client/HPCCFile.java @@ -52,6 +52,7 @@ public class HPCCFile implements Serializable private DataPartition[] dataParts; private DataPartition tlkPartition = null; private boolean useTLK = true; + private boolean readBlobs = true; private PartitionProcessor partitionProcessor = null; private long dataPartsCreationTimeMS = -1; @@ -241,7 +242,7 @@ private void updateProjectedRecordDef() throws Exception { this.projectedRecordDefinition = this.columnPruner.pruneRecordDefinition(this.recordDefinition); - // By default project all sub-integer types to standard integers + // By default project all sub-integer types to standard integers and all blobs to non-blobs for (int i = 0; i < this.projectedRecordDefinition.getNumDefs(); i++) { FieldDef field = this.projectedRecordDefinition.getDef(i); @@ -250,6 +251,11 @@ private void updateProjectedRecordDef() throws Exception field.setSourceType(HpccSrcType.LITTLE_ENDIAN); } + // Project blobs to non-blobs, otherwise we will only get back the file position of the blob + if (readBlobs && field.isBlob()) + { + field.setIsBlob(false); + } } } @@ -363,6 +369,24 @@ public HPCCFile setUseTLK(boolean useTLK) return this; } + /** + * Sets the read blobs options + * Note: Blobs are read by default, on older HPCC systems reading blobs can cause issues reading blobs should be disabled for these systems. + * + * @param readBlobs should blobs be read + * + * @return this file + */ + public HPCCFile setReadBlobs(boolean readBlobs) + { + this.readBlobs = readBlobs; + + // Force the data parts to be re-created + this.dataParts = null; + + return this; + } + /** * Gets the filter. * diff --git a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java index 46f02f39f..598090990 100644 --- a/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java +++ b/dfsclient/src/test/java/org/hpccsystems/dfs/client/DFSIndexTest.java @@ -260,6 +260,27 @@ public void biasedIntTest() throws Exception } } + @Test + public void indexBlobTest() throws Exception + { + HPCCFile file = new HPCCFile("~test::index::blobs::key", connString , hpccUser, hpccPass); + + DataPartition[] fileParts = file.getFileParts(); + + List records = new ArrayList(); + FieldDef originalRD = file.getRecordDefinition(); + for (int j = 0; j < fileParts.length; j++) + { + HPCCRecordBuilder recordBuilder = new HPCCRecordBuilder(file.getProjectedRecordDefinition()); + HpccRemoteFileReader fileReader = new HpccRemoteFileReader(fileParts[j], originalRD, recordBuilder); + while (fileReader.hasNext()) + { + records.add(fileReader.next()); + } + fileReader.close(); + } + } + private String partitionListToString(List partitions) { String matchedPartitionStr = "[ "; diff --git a/dfsclient/src/test/resources/generate-datasets.ecl b/dfsclient/src/test/resources/generate-datasets.ecl index 49a2f55a8..a57203cd4 100644 --- a/dfsclient/src/test/resources/generate-datasets.ecl +++ b/dfsclient/src/test/resources/generate-datasets.ecl @@ -217,3 +217,27 @@ IF(~Std.File.FileExists(dataset_name9), OUTPUT(ds9,,dataset_name9, overwrite)); key_name2 := '~test::index::child_dataset::key'; idx := INDEX(ds9, {str12}, {str8, int8, DATASET(childRec2) childDS {maxcount(100)} := children}, key_name2); IF(~Std.File.FileExists(key_name2), BUILDINDEX(idx, OVERWRITE)); + +rec10 := RECORD + string12 str12; + string content_string{blob, maxlength(2000000)}; + data content_data{blob, maxlength(2000000)}; + DATASET(childRec2) children{blob, maxcount(100)}; +END; + +ds10 := DATASET(125, transform(rec10, + self.str12 := (STRING) random(); + self.content_string := (STRING) random(); + self.content_data := (DATA) random(); + self.children := DATASET(random() % 100, transform(childRec2, + self.str8 := (string) random(); + self.int8 := random(); + )); + ), DISTRIBUTED); + +dataset_name10 := '~test::index::blobs'; +IF(~Std.File.FileExists(dataset_name10), OUTPUT(ds10,,dataset_name10, overwrite)); + +key_name3 := '~test::index::blobs::key'; +idx2 := INDEX(ds10, {str12}, {content_string, content_data, DATASET(childRec2) childDS {blob, maxcount(100)} := children}, key_name3); +IF(~Std.File.FileExists(key_name3), BUILDINDEX(idx2, OVERWRITE)); \ No newline at end of file