Skip to content

Commit

Permalink
refactor to support total buckets upper bound
Browse files Browse the repository at this point in the history
  • Loading branch information
liyubin117 committed Jan 22, 2025
1 parent ed6ebe2 commit ca54422
Show file tree
Hide file tree
Showing 10 changed files with 168 additions and 33 deletions.
4 changes: 2 additions & 2 deletions docs/layouts/shortcodes/generated/core_configuration.html
Original file line number Diff line number Diff line change
Expand Up @@ -291,10 +291,10 @@
<td>Initial buckets for a partition in assigner operator for dynamic bucket mode.</td>
</tr>
<tr>
<td><h5>dynamic-bucket.max-buckets-per-assigner</h5></td>
<td><h5>dynamic-bucket.max-buckets</h5></td>
<td style="word-wrap: break-word;">-1</td>
<td>Integer</td>
<td>Max buckets per assigner operator for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).</td>
<td>Max buckets for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).</td>
</tr>
<tr>
<td><h5>dynamic-bucket.target-row-num</h5></td>
Expand Down
10 changes: 5 additions & 5 deletions paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
Original file line number Diff line number Diff line change
Expand Up @@ -1076,12 +1076,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Initial buckets for a partition in assigner operator for dynamic bucket mode.");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER =
key("dynamic-bucket.max-buckets-per-assigner")
public static final ConfigOption<Integer> DYNAMIC_BUCKET_MAX_BUCKETS =
key("dynamic-bucket.max-buckets")
.intType()
.defaultValue(-1)
.withDescription(
"Max buckets per assigner operator for a partition in dynamic bucket mode, It should "
"Max buckets for a partition in dynamic bucket mode, It should "
+ "either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");

public static final ConfigOption<Integer> DYNAMIC_BUCKET_ASSIGNER_PARALLELISM =
Expand Down Expand Up @@ -2227,8 +2227,8 @@ public Integer dynamicBucketInitialBuckets() {
return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS);
}

public Integer dynamicBucketMaxBucketsPerAssigner() {
return options.get(DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER);
public Integer dynamicBucketMaxBuckets() {
return options.get(DYNAMIC_BUCKET_MAX_BUCKETS);
}

public Integer dynamicBucketAssignerParallelism() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,13 @@
import org.apache.paimon.utils.Int2ShortHashMap;
import org.apache.paimon.utils.IntIterator;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.EOFException;
import java.io.IOException;
import java.io.UncheckedIOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashSet;
Expand All @@ -39,6 +43,7 @@

/** Bucket Index Per Partition. */
public class PartitionIndex {
private static final Logger LOG = LoggerFactory.getLogger(PartitionIndex.class);

public final Int2ShortHashMap hash2Bucket;

Expand Down Expand Up @@ -81,15 +86,9 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
Long number = entry.getValue();
if (number < targetBucketRowNumber) {
entry.setValue(number + 1);
return cacheBucketAndGet(hash, bucket);
return cacheBucketAndGet(hash2Bucket, hash, bucket);
} else {
iterator.remove();
if (-1 != maxBucketsNum && totalBucket.size() == maxBucketsNum) {
return cacheBucketAndGet(
hash,
KeyAndBucketExtractor.bucketWithUpperBound(
totalBucket, hash, maxBucketsNum));
}
}
}

Expand All @@ -99,7 +98,7 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
if (bucketFilter.test(i) && !totalBucket.contains(i)) {
nonFullBucketInformation.put(i, 1L);
totalBucket.add(i);
return cacheBucketAndGet(hash, i);
return cacheBucketAndGet(hash2Bucket, hash, i);
}
}

Expand All @@ -110,7 +109,9 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) {
"Too more bucket %s, you should increase target bucket row number %s.",
maxBucket, targetBucketRowNumber));
} else {
// exceed buckets upper bound
return cacheBucketAndGet(
hash2Bucket,
hash,
KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum));
}
Expand Down Expand Up @@ -149,8 +150,35 @@ public static PartitionIndex loadIndex(
return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber);
}

private int cacheBucketAndGet(int hash, int bucket) {
public static int cacheBucketAndGet(Int2ShortHashMap hash2Bucket, int hash, int bucket) {
hash2Bucket.put(hash, (short) bucket);
return bucket;
}

public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) {
int[] maxBucketsArr = new int[assigners];
if (-1 == maxBuckets) {
Arrays.fill(maxBucketsArr, -1);
return maxBucketsArr;
}
if (0 >= maxBuckets) {
throw new IllegalStateException(
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");
}
int avg = maxBuckets / assigners;
int remainder = maxBuckets % assigners;
for (int i = 0; i < assigners; i++) {
maxBucketsArr[i] = avg;
if (remainder > 0) {
maxBucketsArr[i]++;
remainder--;
}
}
LOG.info(
"After distributing max-buckets {} to {} assigners evenly, maxBuckets layout: {}",
maxBuckets,
assigners,
Arrays.toString(maxBucketsArr));
return maxBucketsArr;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import java.util.Map;
import java.util.Set;

import static org.apache.paimon.index.PartitionIndex.cacheBucketAndGet;

/** When we need to overwrite the table, we should use this to avoid loading index. */
public class SimpleHashBucketAssigner implements BucketAssigner {

Expand Down Expand Up @@ -89,11 +91,11 @@ public int assign(int hash) {
Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L);
if (num >= targetBucketRowNumber) {
if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) {
int bucket =
return cacheBucketAndGet(
hash2Bucket,
hash,
KeyAndBucketExtractor.bucketWithUpperBound(
bucketInformation.keySet(), hash, maxBucketsNum);
hash2Bucket.put(hash, (short) bucket);
return bucket;
bucketInformation.keySet(), hash, maxBucketsNum));
} else {
loadNewBucket();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;

import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER;
import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS;
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
Expand Down Expand Up @@ -66,7 +66,7 @@ static int bucketWithUpperBound(Set<Integer> bucketsSet, int hashcode, int maxBu
"Assign record (hashcode '{}') to new bucket exceed upper bound '{}' defined in '{}', Stop creating new buckets.",
hashcode,
maxBucketsNum,
DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER.key());
DYNAMIC_BUCKET_MAX_BUCKETS.key());
return bucketsSet.stream()
.skip(ThreadLocalRandom.current().nextInt(maxBucketsNum))
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import java.util.Arrays;
import java.util.Collections;

import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
Expand Down Expand Up @@ -151,7 +152,55 @@ public void testAssignWithUpperBound() {
}
}

@ParameterizedTest(name = "maxBucket: {0}")
@Test
public void testAssignWithUpperBoundMultiAssigners() {
int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});

assertThatThrownBy(() -> getMaxBucketsPerAssigner(-10, 2))
.hasMessageContaining(
"Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).");

HashBucketAssigner assigner0 = createAssigner(2, 2, 0, maxBucketsArr[0]);
HashBucketAssigner assigner1 = createAssigner(2, 2, 1, maxBucketsArr[1]);

// assigner0: assign
assertThat(assigner0.assign(row(1), 0)).isEqualTo(0);
assertThat(assigner0.assign(row(1), 2)).isEqualTo(0);
assertThat(assigner0.assign(row(1), 4)).isEqualTo(0);
assertThat(assigner0.assign(row(1), 6)).isEqualTo(0);
assertThat(assigner0.assign(row(1), 8)).isEqualTo(0);

// assigner0: full
assertThat(assigner0.assign(row(1), 10)).isEqualTo(2);
assertThat(assigner0.assign(row(1), 12)).isEqualTo(2);
assertThat(assigner0.assign(row(1), 14)).isEqualTo(2);
assertThat(assigner0.assign(row(1), 16)).isEqualTo(2);
assertThat(assigner0.assign(row(1), 18)).isEqualTo(2);

// assigner0: exceed upper bound
int hash = 18;
for (int i = 0; i < 200; i++) {
int bucket = assigner0.assign(row(2), hash += 2);
Assertions.assertThat(bucket).isIn(0, 2);
}

// assigner1: assign
assertThat(assigner1.assign(row(1), 1)).isEqualTo(1);
assertThat(assigner1.assign(row(1), 3)).isEqualTo(1);
assertThat(assigner1.assign(row(1), 5)).isEqualTo(1);
assertThat(assigner1.assign(row(1), 7)).isEqualTo(1);
assertThat(assigner1.assign(row(1), 9)).isEqualTo(1);

// assigner1: exceed upper bound
hash = 9;
for (int i = 0; i < 200; i++) {
int bucket = assigner1.assign(row(2), hash += 2);
Assertions.assertThat(bucket).isIn(1);
}
}

@ParameterizedTest(name = "maxBuckets: {0}")
@ValueSource(ints = {-1, 1, 2})
public void testPartitionCopy(int maxBucketsNum) {
HashBucketAssigner assigner = createAssigner(1, 1, 0, maxBucketsNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;

import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;
import static org.apache.paimon.io.DataFileTestUtils.row;
import static org.assertj.core.api.Assertions.assertThat;

Expand Down Expand Up @@ -83,7 +84,46 @@ public void testAssignWithUpperBound() {
}
}

@ParameterizedTest(name = "maxBucket: {0}")
@Test
public void testAssignWithUpperBoundMultiAssigners() {
int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2);
Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1});

SimpleHashBucketAssigner simpleHashBucketAssigner0 =
new SimpleHashBucketAssigner(2, 0, 100, maxBucketsArr[0]);
SimpleHashBucketAssigner simpleHashBucketAssigner1 =
new SimpleHashBucketAssigner(2, 1, 100, maxBucketsArr[1]);

BinaryRow binaryRow = BinaryRow.EMPTY_ROW;
int hash = 0;

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(0);
}

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(1);
}

for (int i = 0; i < 100; i++) {
int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isEqualTo(2);
}

// exceed upper bound
for (int i = 0; i < 200; i++) {
int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isIn(0, 2);
}
for (int i = 0; i < 200; i++) {
int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++);
Assertions.assertThat(bucket).isIn(1);
}
}

@ParameterizedTest(name = "maxBuckets: {0}")
@ValueSource(ints = {-1, 1, 2})
public void testAssignWithSameHash(int maxBucketsNum) {
SimpleHashBucketAssigner simpleHashBucketAssigner =
Expand All @@ -105,7 +145,7 @@ public void testAssignWithSameHash(int maxBucketsNum) {
}
}

@ParameterizedTest(name = "maxBucket: {0}")
@ParameterizedTest(name = "maxBuckets: {0}")
@ValueSource(ints = {-1, 1, 2})
public void testPartitionCopy(int maxBucketsNum) {
SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0, 5, maxBucketsNum);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;

import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner;

/** Assign bucket for the input record, output record with bucket. */
public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2<T, Integer>>
implements OneInputStreamOperator<T, Tuple2<T, Integer>> {
Expand All @@ -49,6 +51,7 @@ public class HashBucketAssignerOperator<T> extends AbstractStreamOperator<Tuple2
private final Integer numAssigners;
private final SerializableFunction<TableSchema, PartitionKeyExtractor<T>> extractorFunction;
private final boolean overwrite;
private int[] maxBucketsArr;

private transient BucketAssigner assigner;
private transient PartitionKeyExtractor<T> extractor;
Expand Down Expand Up @@ -80,11 +83,18 @@ public void initializeState(StateInitializationContext context) throws Exception
int numberTasks = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext());
int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext());
long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum();
Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBucketsPerAssigner();
Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets();
if (maxBucketsArr == null) {
this.maxBucketsArr =
overwrite
? getMaxBucketsPerAssigner(maxBucketsNum, numberTasks)
: getMaxBucketsPerAssigner(
maxBucketsNum, MathUtils.min(numAssigners, numberTasks));
}
this.assigner =
overwrite
? new SimpleHashBucketAssigner(
numberTasks, taskId, targetRowNum, maxBucketsNum)
numberTasks, taskId, targetRowNum, maxBucketsArr[taskId])
: new HashBucketAssigner(
table.snapshotManager(),
commitUser,
Expand All @@ -93,7 +103,7 @@ public void initializeState(StateInitializationContext context) throws Exception
MathUtils.min(numAssigners, numberTasks),
taskId,
targetRowNum,
maxBucketsNum);
maxBucketsArr[taskId]);
this.extractor = extractorFunction.apply(table.schema());
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ package org.apache.paimon.spark.commands
import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow}
import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow => PaimonInternalRow, JoinedRow}
import org.apache.paimon.disk.IOManager
import org.apache.paimon.index.HashBucketAssigner
import org.apache.paimon.index.{HashBucketAssigner, PartitionIndex}
import org.apache.paimon.spark.{DataConverter, SparkRow}
import org.apache.paimon.spark.SparkUtils.createIOManager
import org.apache.paimon.spark.util.EncoderUtils
Expand Down Expand Up @@ -99,9 +99,11 @@ case class DynamicBucketProcessor(
) extends BucketProcessor[Row] {

private val targetBucketRowNumber = fileStoreTable.coreOptions.dynamicBucketTargetRowNum
private val maxBucketsNum = fileStoreTable.coreOptions.dynamicBucketMaxBucketsPerAssigner()
private val rowType = fileStoreTable.rowType
private val commitUser = UUID.randomUUID.toString
private val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner(
fileStoreTable.coreOptions.dynamicBucketMaxBuckets,
numAssigners)

def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = {
val rowPartitionKeyExtractor = new RowPartitionKeyExtractor(fileStoreTable.schema)
Expand All @@ -113,7 +115,7 @@ case class DynamicBucketProcessor(
numAssigners,
TaskContext.getPartitionId(),
targetBucketRowNumber,
maxBucketsNum
maxBucketsArr(TaskContext.getPartitionId())
)

new Iterator[Row]() {
Expand Down
Loading

0 comments on commit ca54422

Please sign in to comment.