Skip to content

Commit

Permalink
HPCC4J-569 Correct EOS character handling (#676)
Browse files Browse the repository at this point in the history
- Modified BinaryRecordWriter to only truncate strings at EOS in var strings
- Added unit test that verifies read / write all of Unicode chars and mid EOS

Signed-off-by: James McMullan [email protected]

Signed-off-by: James McMullan [email protected]
  • Loading branch information
jpmcmu authored Jan 31, 2024
1 parent f78ecb4 commit 87ea98f
Show file tree
Hide file tree
Showing 2 changed files with 128 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@

/**
* Serializes records into the provided OutputStream utilizing the provided IRecordAccessor to access record data.
*
*
* The IRecordAccessor must match the type of records that are provided to {@link #writeRecord(Object) writeRecord}.
* The data written to the OutputStream will be in the HPCC Systems binary record format.
*/
Expand All @@ -52,6 +52,8 @@ public class BinaryRecordWriter implements IRecordWriter
private static final int QSTR_COMPRESSED_CHUNK_LEN = 3;
private static final int QSTR_EXPANDED_CHUNK_LEN = 4;

private static final byte NULL_TERMINATOR = '\0';

private byte[] scratchBuffer = new byte[SCRATCH_BUFFER_SIZE];

private OutputStream outputStream = null;
Expand Down Expand Up @@ -104,7 +106,7 @@ public BinaryRecordWriter(OutputStream output, ByteOrder byteOrder) throws Excep

/*
* (non-Javadoc)
*
*
* @see org.hpccsystems.dfs.client.IRecordWriter#initialize(org.hpccsystems.dfs.client.IRecordAccessor)
*/
public void initialize(IRecordAccessor recordAccessor)
Expand Down Expand Up @@ -307,7 +309,7 @@ private void writeField(FieldDef fd, Object fieldValue) throws Exception
fillLength = SCRATCH_BUFFER_SIZE;
}

Arrays.fill(scratchBuffer, 0, fillLength, (byte) '\0');
Arrays.fill(scratchBuffer, 0, fillLength, NULL_TERMINATOR);
writeByteArray(scratchBuffer, 0, fillLength);
numFillBytes -= fillLength;
}
Expand Down Expand Up @@ -335,7 +337,7 @@ private void writeField(FieldDef fd, Object fieldValue) throws Exception
case FILEPOS:
{
Long value = null;
if (fieldValue==null)
if (fieldValue==null)
{
value=Long.valueOf(0);
}
Expand Down Expand Up @@ -391,7 +393,7 @@ else if (fd.getDataLen() < 8 && fd.getDataLen() > 0)
{
this.buffer.put((byte) ((value >> (i*8)) & 0xFF));
}

long signBit = value < 0 ? 0x80L : 0;
this.buffer.put((byte) (((value >> (lastByteIdx*8)) & 0xFF) | signBit));
}
Expand Down Expand Up @@ -436,8 +438,8 @@ else if (fd.getDataLen() == 8)
}
case CHAR:
{
byte c='\0';
if (fieldValue!=null)
byte c = NULL_TERMINATOR;
if (fieldValue!=null)
{
String value = (String) fieldValue;
c = (byte) value.charAt(0);
Expand All @@ -449,6 +451,15 @@ else if (fd.getDataLen() == 8)
case STRING:
{
String value = fieldValue != null ? (String) fieldValue : "";
if (fd.getFieldType() == FieldType.VAR_STRING)
{
int eosIdx = value.indexOf(NULL_TERMINATOR);
if (eosIdx > -1)
{
value = value.substring(0,eosIdx);
}
}

byte[] data = new byte[0];
if (fd.getSourceType() == HpccSrcType.UTF16LE)
{
Expand All @@ -475,7 +486,7 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
int compressedDataLen = tempData.length * QSTR_COMPRESSED_CHUNK_LEN + (QSTR_EXPANDED_CHUNK_LEN-1);
compressedDataLen /= QSTR_EXPANDED_CHUNK_LEN;
data = new byte[compressedDataLen];

int bitOffset = 0;
for (int i = 0; i < tempData.length; i++)
{
Expand All @@ -491,7 +502,7 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
case 2:
// The top 4 bits of Char 2 are in the bot 4 bits of byte1
data[byteIdx] |= (byte) ((qstrByteValue & 0x3C) >> 2);

// The bot 2 bits of Char 2 are in the top 2 bits of byte2
data[byteIdx+1] = (byte) ((qstrByteValue & 0x3) << 6);
break;
Expand Down Expand Up @@ -541,10 +552,10 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
{
if (fd.getFieldType() == FieldType.VAR_STRING && bytesToWrite > 0)
{
data[bytesToWrite - 1] = '\0';
data[bytesToWrite - 1] = NULL_TERMINATOR;
if (fd.getSourceType().isUTF16() && bytesToWrite > 1)
{
data[bytesToWrite - 2] = '\0';
data[bytesToWrite - 2] = NULL_TERMINATOR;
}
}

Expand All @@ -562,7 +573,7 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)
fillLength = SCRATCH_BUFFER_SIZE;
}

Arrays.fill(scratchBuffer, 0, fillLength, (byte) '\0');
Arrays.fill(scratchBuffer, 0, fillLength, NULL_TERMINATOR);
writeByteArray(scratchBuffer, 0, fillLength);
numFillBytes -= fillLength;
}
Expand All @@ -576,11 +587,25 @@ else if (fd.getSourceType() == HpccSrcType.QSTRING)

if (fd.getFieldType() == FieldType.VAR_STRING)
{
byte nullByte = '\0';
this.buffer.put(nullByte);
if (fd.getSourceType().isUTF16())
{
this.buffer.put(nullByte);
boolean needsNullAdded = data.length < 2
|| data[data.length - 1] != NULL_TERMINATOR
|| data[data.length - 2] != NULL_TERMINATOR;
if (needsNullAdded)
{
this.buffer.put(NULL_TERMINATOR);
this.buffer.put(NULL_TERMINATOR);
}
}
else
{
boolean needsNullAdded = data.length < 1
|| data[data.length - 1] != NULL_TERMINATOR;
if (needsNullAdded)
{
this.buffer.put(NULL_TERMINATOR);
}
}
}
}
Expand Down Expand Up @@ -885,7 +910,7 @@ private void writeDecimal(FieldDef fd, BigDecimal decimalValue)

// 1e18
BigInteger divisor = BigInteger.valueOf(1000000000000000000L);
for (int currentDigit = 0; currentDigit < desiredPrecision;)
for (int currentDigit = 0; currentDigit < desiredPrecision;)
{
// Consume 18 digits at a time
BigInteger[] quotientRemainder = unscaledInt.divideAndRemainder(divisor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,93 @@ public void readWithForcedTimeoutTest() throws Exception
assertEquals("Not all records loaded",expectedCounts[0], records.size());
}

@Test
public void nullCharTests() throws Exception
{
// Unicode
{
FieldDef recordDef = null;
{
FieldDef[] fieldDefs = new FieldDef[2];
fieldDefs[0] = new FieldDef("uni", FieldType.STRING, "STRING", 100, false, false, HpccSrcType.UTF16LE, new FieldDef[0]);
fieldDefs[1] = new FieldDef("fixedUni", FieldType.STRING, "STRING", 100, true, false, HpccSrcType.UTF16LE, new FieldDef[0]);

recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs);
}

List<HPCCRecord> records = new ArrayList<HPCCRecord>();
int maxUTF16BMPChar = Character.MAX_CODE_POINT;
for (int i = 0; i < maxUTF16BMPChar; i++) {
String strMidEOS = "";
for (int j = 0; j < 98; j++, i++) {
if (j == 50) {
strMidEOS += "\0";
}
strMidEOS += Character.toString((char) i);
}

Object[] fields = {strMidEOS, strMidEOS};
records.add(new HPCCRecord(fields, recordDef));
}

String fileName = "unicode::all_chars::test";
writeFile(records, fileName, recordDef, connTO);

HPCCFile file = new HPCCFile(fileName, connString , hpccUser, hpccPass);
List<HPCCRecord> readRecords = readFile(file, 10000, false, false, BinaryRecordReader.TRIM_STRINGS);

for (int i = 0; i < records.size(); i++) {
HPCCRecord record = records.get(i);
HPCCRecord readRecord = readRecords.get(i);
if (readRecord.equals(record) == false)
{
System.out.println("Record: " + i + " did not match\n" + record + "\n" + readRecord);
}
}
}

// SBC / ASCII
{
FieldDef recordDef = null;
{
FieldDef[] fieldDefs = new FieldDef[2];
fieldDefs[0] = new FieldDef("str", FieldType.STRING, "STRING", 10, false, false, HpccSrcType.SINGLE_BYTE_CHAR, new FieldDef[0]);
fieldDefs[1] = new FieldDef("fixedStr", FieldType.STRING, "STRING", 10, true, false, HpccSrcType.SINGLE_BYTE_CHAR, new FieldDef[0]);

recordDef = new FieldDef("RootRecord", FieldType.RECORD, "rec", 4, false, false, HpccSrcType.LITTLE_ENDIAN, fieldDefs);
}

List<HPCCRecord> records = new ArrayList<HPCCRecord>();
for (int i = 0; i < 255; i++) {
String strMidEOS = "";
for (int j = 0; j < 9; j++, i++) {
if (j == 5) {
strMidEOS += "\0";
}
strMidEOS += Character.toString((char) i);
}

Object[] fields = {strMidEOS, strMidEOS};
records.add(new HPCCRecord(fields, recordDef));
}

String fileName = "ascii::all_chars::test";
writeFile(records, fileName, recordDef, connTO);

HPCCFile file = new HPCCFile(fileName, connString , hpccUser, hpccPass);
List<HPCCRecord> readRecords = readFile(file, 10000, false, false, BinaryRecordReader.TRIM_STRINGS);

for (int i = 0; i < records.size(); i++) {
HPCCRecord record = records.get(i);
HPCCRecord readRecord = readRecords.get(i);
if (readRecord.equals(record) == false)
{
System.out.println("Record: " + i + " did not match\n" + record + "\n" + readRecord);
}
}
}
}

@Test
public void integrationReadWriteBackTest() throws Exception
{
Expand Down

0 comments on commit 87ea98f

Please sign in to comment.