Skip to content

Commit

Permalink
Fix Bug in Handling Equal Comparison Column Values in Upsert (apache#…
Browse files Browse the repository at this point in the history
  • Loading branch information
ankitsultana authored Feb 21, 2024
1 parent d0ce68b commit d0c28fb
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,11 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
Expand All @@ -46,6 +48,7 @@
import org.apache.pinot.segment.local.indexsegment.immutable.EmptyIndexSegment;
import org.apache.pinot.segment.local.indexsegment.immutable.ImmutableSegmentImpl;
import org.apache.pinot.segment.local.segment.readers.PinotSegmentColumnReader;
import org.apache.pinot.segment.local.utils.HashUtils;
import org.apache.pinot.segment.local.utils.SegmentLocks;
import org.apache.pinot.segment.spi.ImmutableSegment;
import org.apache.pinot.segment.spi.IndexSegment;
Expand Down Expand Up @@ -601,6 +604,46 @@ protected void handleOutOfOrderEvent(Object currentComparisonValue, Object recor
}
}

/**
* When we have to process a new segment, if there are comparison value ties for the same primary-key within the
* segment, then for Partial Upsert tables we need to make sure that the record location map is updated only
* for the latest version of the record. This is specifically a concern for Partial Upsert tables because Realtime
* consumption can potentially end up reading the wrong version of a record, which will lead to permanent
* data-inconsistency.
*
* <p>
* This function returns an iterator that will de-dup records with the same primary-key. Moreover, for comparison
* ties, it will only keep the latest record. This iterator can then further be used to update the primary-key
* record location map safely.
* </p>
*
* @param recordInfoIterator iterator over the new segment
* @param hashFunction hash function configured for Upsert's primary keys
* @return iterator that returns de-duplicated records. To resolve ties for comparison column values, we prefer to
* return the latest record.
*/
@SuppressWarnings({"rawtypes", "unchecked"})
protected static Iterator<RecordInfo> resolveComparisonTies(
Iterator<RecordInfo> recordInfoIterator, HashFunction hashFunction) {
Map<Object, RecordInfo> deDuplicatedRecordInfo = new HashMap<>();
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
Comparable newComparisonValue = recordInfo.getComparisonValue();
deDuplicatedRecordInfo.compute(HashUtils.hashPrimaryKey(recordInfo.getPrimaryKey(), hashFunction),
(key, maxComparisonValueRecordInfo) -> {
if (maxComparisonValueRecordInfo == null) {
return recordInfo;
}
int comparisonResult = newComparisonValue.compareTo(maxComparisonValueRecordInfo.getComparisonValue());
if (comparisonResult >= 0) {
return recordInfo;
}
return maxComparisonValueRecordInfo;
});
}
return deDuplicatedRecordInfo.values().iterator();
}

@Override
public void takeSnapshot() {
if (!_enableSnapshot) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ protected void addOrReplaceSegment(ImmutableSegmentImpl segment, ThreadSafeMutab
String segmentName = segment.getSegmentName();
segment.enableUpsert(this, validDocIds, queryableDocIds);

if (_partialUpsertHandler != null) {
recordInfoIterator = resolveComparisonTies(recordInfoIterator, _hashFunction);
}
AtomicInteger numKeysInWrongSegment = new AtomicInteger();
while (recordInfoIterator.hasNext()) {
RecordInfo recordInfo = recordInfoIterator.next();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import javax.annotation.Nullable;
import org.apache.commons.io.FileUtils;
import org.apache.pinot.common.metrics.ServerMetrics;
Expand All @@ -36,7 +37,9 @@
import org.apache.pinot.segment.spi.index.metadata.SegmentMetadataImpl;
import org.apache.pinot.segment.spi.index.mutable.ThreadSafeMutableRoaringBitmap;
import org.apache.pinot.segment.spi.store.SegmentDirectory;
import org.apache.pinot.spi.config.table.HashFunction;
import org.apache.pinot.spi.data.readers.GenericRow;
import org.apache.pinot.spi.data.readers.PrimaryKey;
import org.roaringbitmap.buffer.MutableRoaringBitmap;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand All @@ -45,6 +48,7 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;


Expand Down Expand Up @@ -111,6 +115,34 @@ public void testTakeSnapshotInOrder()
assertEquals(seg03.loadValidDocIdsFromSnapshot().getCardinality(), 3);
}

@Test
public void testResolveComparisonTies() {
// Build a record info list for testing
int[] primaryKeys = new int[]{0, 1, 2, 0, 1, 0};
int[] timestamps = new int[]{0, 0, 0, 0, 0, 0};
int numRecords = primaryKeys.length;
List<RecordInfo> recordInfoList = new ArrayList<>();
for (int docId = 0; docId < numRecords; docId++) {
recordInfoList.add(new RecordInfo(
makePrimaryKey(primaryKeys[docId]), docId, timestamps[docId], false));
}
// Resolve comparison ties
Iterator<RecordInfo> deDuplicatedRecords =
BasePartitionUpsertMetadataManager.resolveComparisonTies(recordInfoList.iterator(), HashFunction.NONE);
// Ensure we have only 1 record for each unique primary key
Map<PrimaryKey, RecordInfo> recordsByPrimaryKeys = new HashMap<>();
while (deDuplicatedRecords.hasNext()) {
RecordInfo recordInfo = deDuplicatedRecords.next();
assertFalse(recordsByPrimaryKeys.containsKey(recordInfo.getPrimaryKey()));
recordsByPrimaryKeys.put(recordInfo.getPrimaryKey(), recordInfo);
}
assertEquals(recordsByPrimaryKeys.size(), 3);
// Ensure that to resolve ties, we pick the last docId
assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(0)).getDocId(), 5);
assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(1)).getDocId(), 4);
assertEquals(recordsByPrimaryKeys.get(makePrimaryKey(2)).getDocId(), 2);
}

private static ThreadSafeMutableRoaringBitmap createValidDocIds(int... docIds) {
MutableRoaringBitmap bitmap = new MutableRoaringBitmap();
bitmap.add(docIds);
Expand All @@ -132,6 +164,10 @@ public void persistValidDocIdsSnapshot() {
};
}

private static PrimaryKey makePrimaryKey(int value) {
return new PrimaryKey(new Object[]{value});
}

private static class DummyPartitionUpsertMetadataManager extends BasePartitionUpsertMetadataManager {

protected DummyPartitionUpsertMetadataManager(String tableNameWithType, int partitionId, UpsertContext context) {
Expand Down

0 comments on commit d0c28fb

Please sign in to comment.