From ca5442264a3c659647e7c0058aec6f888d4a262d Mon Sep 17 00:00:00 2001 From: Yubin Li Date: Wed, 22 Jan 2025 14:50:11 +0800 Subject: [PATCH] refactor to support total buckets upper bound --- .../generated/core_configuration.html | 4 +- .../java/org/apache/paimon/CoreOptions.java | 10 ++-- .../apache/paimon/index/PartitionIndex.java | 46 +++++++++++++---- .../index/SimpleHashBucketAssigner.java | 10 ++-- .../table/sink/KeyAndBucketExtractor.java | 4 +- .../paimon/index/HashBucketAssignerTest.java | 51 ++++++++++++++++++- .../index/SimpleHashBucketAssignerTest.java | 44 +++++++++++++++- .../sink/HashBucketAssignerOperator.java | 16 ++++-- .../spark/commands/BucketProcessor.scala | 8 +-- .../spark/commands/PaimonSparkWriter.scala | 8 ++- 10 files changed, 168 insertions(+), 33 deletions(-) diff --git a/docs/layouts/shortcodes/generated/core_configuration.html b/docs/layouts/shortcodes/generated/core_configuration.html index cd4df50d7c71..e3e1c0f6733d 100644 --- a/docs/layouts/shortcodes/generated/core_configuration.html +++ b/docs/layouts/shortcodes/generated/core_configuration.html @@ -291,10 +291,10 @@ Initial buckets for a partition in assigner operator for dynamic bucket mode. -
dynamic-bucket.max-buckets-per-assigner
+
dynamic-bucket.max-buckets
-1 Integer - Max buckets per assigner operator for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound). + Max buckets for a partition in dynamic bucket mode, It should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound).
dynamic-bucket.target-row-num
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java index d1bce166d569..0d29549efde1 100644 --- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java +++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java @@ -1076,12 +1076,12 @@ public class CoreOptions implements Serializable { .withDescription( "Initial buckets for a partition in assigner operator for dynamic bucket mode."); - public static final ConfigOption DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER = - key("dynamic-bucket.max-buckets-per-assigner") + public static final ConfigOption DYNAMIC_BUCKET_MAX_BUCKETS = + key("dynamic-bucket.max-buckets") .intType() .defaultValue(-1) .withDescription( - "Max buckets per assigner operator for a partition in dynamic bucket mode, It should " + "Max buckets for a partition in dynamic bucket mode, It should " + "either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound)."); public static final ConfigOption DYNAMIC_BUCKET_ASSIGNER_PARALLELISM = @@ -2227,8 +2227,8 @@ public Integer dynamicBucketInitialBuckets() { return options.get(DYNAMIC_BUCKET_INITIAL_BUCKETS); } - public Integer dynamicBucketMaxBucketsPerAssigner() { - return options.get(DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER); + public Integer dynamicBucketMaxBuckets() { + return options.get(DYNAMIC_BUCKET_MAX_BUCKETS); } public Integer dynamicBucketAssignerParallelism() { diff --git a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java index c7bbe5083dbf..bcf8b2dc4122 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/PartitionIndex.java @@ -24,9 +24,13 @@ import org.apache.paimon.utils.Int2ShortHashMap; import org.apache.paimon.utils.IntIterator; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import java.io.EOFException; import java.io.IOException; import java.io.UncheckedIOException; +import java.util.Arrays; import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashSet; @@ -39,6 +43,7 @@ /** Bucket Index Per Partition. */ public class PartitionIndex { + private static final Logger LOG = LoggerFactory.getLogger(PartitionIndex.class); public final Int2ShortHashMap hash2Bucket; @@ -81,15 +86,9 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) { Long number = entry.getValue(); if (number < targetBucketRowNumber) { entry.setValue(number + 1); - return cacheBucketAndGet(hash, bucket); + return cacheBucketAndGet(hash2Bucket, hash, bucket); } else { iterator.remove(); - if (-1 != maxBucketsNum && totalBucket.size() == maxBucketsNum) { - return cacheBucketAndGet( - hash, - KeyAndBucketExtractor.bucketWithUpperBound( - totalBucket, hash, maxBucketsNum)); - } } } @@ -99,7 +98,7 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) { if (bucketFilter.test(i) && !totalBucket.contains(i)) { nonFullBucketInformation.put(i, 1L); totalBucket.add(i); - return cacheBucketAndGet(hash, i); + return cacheBucketAndGet(hash2Bucket, hash, i); } } @@ -110,7 +109,9 @@ public int assign(int hash, IntPredicate bucketFilter, int maxBucketsNum) { "Too more bucket %s, you should increase target bucket row number %s.", maxBucket, targetBucketRowNumber)); } else { + // exceed buckets upper bound return cacheBucketAndGet( + hash2Bucket, hash, KeyAndBucketExtractor.bucketWithUpperBound(totalBucket, hash, maxBucketsNum)); } @@ -149,8 +150,35 @@ public static PartitionIndex loadIndex( return new PartitionIndex(mapBuilder.build(), buckets, targetBucketRowNumber); } - private int cacheBucketAndGet(int hash, int bucket) { + public static int cacheBucketAndGet(Int2ShortHashMap hash2Bucket, int hash, int bucket) { hash2Bucket.put(hash, (short) bucket); return bucket; } + + public static int[] getMaxBucketsPerAssigner(int maxBuckets, int assigners) { + int[] maxBucketsArr = new int[assigners]; + if (-1 == maxBuckets) { + Arrays.fill(maxBucketsArr, -1); + return maxBucketsArr; + } + if (0 >= maxBuckets) { + throw new IllegalStateException( + "Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound)."); + } + int avg = maxBuckets / assigners; + int remainder = maxBuckets % assigners; + for (int i = 0; i < assigners; i++) { + maxBucketsArr[i] = avg; + if (remainder > 0) { + maxBucketsArr[i]++; + remainder--; + } + } + LOG.info( + "After distributing max-buckets {} to {} assigners evenly, maxBuckets layout: {}", + maxBuckets, + assigners, + Arrays.toString(maxBucketsArr)); + return maxBucketsArr; + } } diff --git a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java index e9160d22454c..e09691927dbe 100644 --- a/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java +++ b/paimon-core/src/main/java/org/apache/paimon/index/SimpleHashBucketAssigner.java @@ -28,6 +28,8 @@ import java.util.Map; import java.util.Set; +import static org.apache.paimon.index.PartitionIndex.cacheBucketAndGet; + /** When we need to overwrite the table, we should use this to avoid loading index. */ public class SimpleHashBucketAssigner implements BucketAssigner { @@ -89,11 +91,11 @@ public int assign(int hash) { Long num = bucketInformation.computeIfAbsent(currentBucket, i -> 0L); if (num >= targetBucketRowNumber) { if (-1 != maxBucketsNum && bucketInformation.size() >= maxBucketsNum) { - int bucket = + return cacheBucketAndGet( + hash2Bucket, + hash, KeyAndBucketExtractor.bucketWithUpperBound( - bucketInformation.keySet(), hash, maxBucketsNum); - hash2Bucket.put(hash, (short) bucket); - return bucket; + bucketInformation.keySet(), hash, maxBucketsNum)); } else { loadNewBucket(); } diff --git a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java index b97bdd9611a7..dcbd02508d7b 100644 --- a/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java +++ b/paimon-core/src/main/java/org/apache/paimon/table/sink/KeyAndBucketExtractor.java @@ -27,7 +27,7 @@ import java.util.Set; import java.util.concurrent.ThreadLocalRandom; -import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER; +import static org.apache.paimon.CoreOptions.DYNAMIC_BUCKET_MAX_BUCKETS; import static org.apache.paimon.utils.Preconditions.checkArgument; /** @@ -66,7 +66,7 @@ static int bucketWithUpperBound(Set bucketsSet, int hashcode, int maxBu "Assign record (hashcode '{}') to new bucket exceed upper bound '{}' defined in '{}', Stop creating new buckets.", hashcode, maxBucketsNum, - DYNAMIC_BUCKET_MAX_BUCKETS_PER_ASSIGNER.key()); + DYNAMIC_BUCKET_MAX_BUCKETS.key()); return bucketsSet.stream() .skip(ThreadLocalRandom.current().nextInt(maxBucketsNum)) .findFirst() diff --git a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java index a2e71f045851..2bf2a74eb0b7 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/HashBucketAssignerTest.java @@ -37,6 +37,7 @@ import java.util.Arrays; import java.util.Collections; +import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThatThrownBy; @@ -151,7 +152,55 @@ public void testAssignWithUpperBound() { } } - @ParameterizedTest(name = "maxBucket: {0}") + @Test + public void testAssignWithUpperBoundMultiAssigners() { + int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2); + Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1}); + + assertThatThrownBy(() -> getMaxBucketsPerAssigner(-10, 2)) + .hasMessageContaining( + "Max-buckets should either be equal to -1 (unlimited), or it must be greater than 0 (fixed upper bound)."); + + HashBucketAssigner assigner0 = createAssigner(2, 2, 0, maxBucketsArr[0]); + HashBucketAssigner assigner1 = createAssigner(2, 2, 1, maxBucketsArr[1]); + + // assigner0: assign + assertThat(assigner0.assign(row(1), 0)).isEqualTo(0); + assertThat(assigner0.assign(row(1), 2)).isEqualTo(0); + assertThat(assigner0.assign(row(1), 4)).isEqualTo(0); + assertThat(assigner0.assign(row(1), 6)).isEqualTo(0); + assertThat(assigner0.assign(row(1), 8)).isEqualTo(0); + + // assigner0: full + assertThat(assigner0.assign(row(1), 10)).isEqualTo(2); + assertThat(assigner0.assign(row(1), 12)).isEqualTo(2); + assertThat(assigner0.assign(row(1), 14)).isEqualTo(2); + assertThat(assigner0.assign(row(1), 16)).isEqualTo(2); + assertThat(assigner0.assign(row(1), 18)).isEqualTo(2); + + // assigner0: exceed upper bound + int hash = 18; + for (int i = 0; i < 200; i++) { + int bucket = assigner0.assign(row(2), hash += 2); + Assertions.assertThat(bucket).isIn(0, 2); + } + + // assigner1: assign + assertThat(assigner1.assign(row(1), 1)).isEqualTo(1); + assertThat(assigner1.assign(row(1), 3)).isEqualTo(1); + assertThat(assigner1.assign(row(1), 5)).isEqualTo(1); + assertThat(assigner1.assign(row(1), 7)).isEqualTo(1); + assertThat(assigner1.assign(row(1), 9)).isEqualTo(1); + + // assigner1: exceed upper bound + hash = 9; + for (int i = 0; i < 200; i++) { + int bucket = assigner1.assign(row(2), hash += 2); + Assertions.assertThat(bucket).isIn(1); + } + } + + @ParameterizedTest(name = "maxBuckets: {0}") @ValueSource(ints = {-1, 1, 2}) public void testPartitionCopy(int maxBucketsNum) { HashBucketAssigner assigner = createAssigner(1, 1, 0, maxBucketsNum); diff --git a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java index 1c72310a2f13..ae2e198a58fb 100644 --- a/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java +++ b/paimon-core/src/test/java/org/apache/paimon/index/SimpleHashBucketAssignerTest.java @@ -25,6 +25,7 @@ import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.ValueSource; +import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner; import static org.apache.paimon.io.DataFileTestUtils.row; import static org.assertj.core.api.Assertions.assertThat; @@ -83,7 +84,46 @@ public void testAssignWithUpperBound() { } } - @ParameterizedTest(name = "maxBucket: {0}") + @Test + public void testAssignWithUpperBoundMultiAssigners() { + int[] maxBucketsArr = getMaxBucketsPerAssigner(3, 2); + Assertions.assertThat(maxBucketsArr).isEqualTo(new int[] {2, 1}); + + SimpleHashBucketAssigner simpleHashBucketAssigner0 = + new SimpleHashBucketAssigner(2, 0, 100, maxBucketsArr[0]); + SimpleHashBucketAssigner simpleHashBucketAssigner1 = + new SimpleHashBucketAssigner(2, 1, 100, maxBucketsArr[1]); + + BinaryRow binaryRow = BinaryRow.EMPTY_ROW; + int hash = 0; + + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(0); + } + + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(1); + } + + for (int i = 0; i < 100; i++) { + int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isEqualTo(2); + } + + // exceed upper bound + for (int i = 0; i < 200; i++) { + int bucket = simpleHashBucketAssigner0.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isIn(0, 2); + } + for (int i = 0; i < 200; i++) { + int bucket = simpleHashBucketAssigner1.assign(binaryRow, hash++); + Assertions.assertThat(bucket).isIn(1); + } + } + + @ParameterizedTest(name = "maxBuckets: {0}") @ValueSource(ints = {-1, 1, 2}) public void testAssignWithSameHash(int maxBucketsNum) { SimpleHashBucketAssigner simpleHashBucketAssigner = @@ -105,7 +145,7 @@ public void testAssignWithSameHash(int maxBucketsNum) { } } - @ParameterizedTest(name = "maxBucket: {0}") + @ParameterizedTest(name = "maxBuckets: {0}") @ValueSource(ints = {-1, 1, 2}) public void testPartitionCopy(int maxBucketsNum) { SimpleHashBucketAssigner assigner = new SimpleHashBucketAssigner(1, 0, 5, maxBucketsNum); diff --git a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java index c38b62f6023a..0117bbb54eac 100644 --- a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java +++ b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/HashBucketAssignerOperator.java @@ -37,6 +37,8 @@ import org.apache.flink.streaming.runtime.streamrecord.RecordAttributes; import org.apache.flink.streaming.runtime.streamrecord.StreamRecord; +import static org.apache.paimon.index.PartitionIndex.getMaxBucketsPerAssigner; + /** Assign bucket for the input record, output record with bucket. */ public class HashBucketAssignerOperator extends AbstractStreamOperator> implements OneInputStreamOperator> { @@ -49,6 +51,7 @@ public class HashBucketAssignerOperator extends AbstractStreamOperator> extractorFunction; private final boolean overwrite; + private int[] maxBucketsArr; private transient BucketAssigner assigner; private transient PartitionKeyExtractor extractor; @@ -80,11 +83,18 @@ public void initializeState(StateInitializationContext context) throws Exception int numberTasks = RuntimeContextUtils.getNumberOfParallelSubtasks(getRuntimeContext()); int taskId = RuntimeContextUtils.getIndexOfThisSubtask(getRuntimeContext()); long targetRowNum = table.coreOptions().dynamicBucketTargetRowNum(); - Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBucketsPerAssigner(); + Integer maxBucketsNum = table.coreOptions().dynamicBucketMaxBuckets(); + if (maxBucketsArr == null) { + this.maxBucketsArr = + overwrite + ? getMaxBucketsPerAssigner(maxBucketsNum, numberTasks) + : getMaxBucketsPerAssigner( + maxBucketsNum, MathUtils.min(numAssigners, numberTasks)); + } this.assigner = overwrite ? new SimpleHashBucketAssigner( - numberTasks, taskId, targetRowNum, maxBucketsNum) + numberTasks, taskId, targetRowNum, maxBucketsArr[taskId]) : new HashBucketAssigner( table.snapshotManager(), commitUser, @@ -93,7 +103,7 @@ public void initializeState(StateInitializationContext context) throws Exception MathUtils.min(numAssigners, numberTasks), taskId, targetRowNum, - maxBucketsNum); + maxBucketsArr[taskId]); this.extractor = extractorFunction.apply(table.schema()); } diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala index 87b53f13ec44..e75aca3bcaa5 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/BucketProcessor.scala @@ -21,7 +21,7 @@ package org.apache.paimon.spark.commands import org.apache.paimon.crosspartition.{GlobalIndexAssigner, KeyPartOrRow} import org.apache.paimon.data.{BinaryRow, GenericRow, InternalRow => PaimonInternalRow, JoinedRow} import org.apache.paimon.disk.IOManager -import org.apache.paimon.index.HashBucketAssigner +import org.apache.paimon.index.{HashBucketAssigner, PartitionIndex} import org.apache.paimon.spark.{DataConverter, SparkRow} import org.apache.paimon.spark.SparkUtils.createIOManager import org.apache.paimon.spark.util.EncoderUtils @@ -99,9 +99,11 @@ case class DynamicBucketProcessor( ) extends BucketProcessor[Row] { private val targetBucketRowNumber = fileStoreTable.coreOptions.dynamicBucketTargetRowNum - private val maxBucketsNum = fileStoreTable.coreOptions.dynamicBucketMaxBucketsPerAssigner() private val rowType = fileStoreTable.rowType private val commitUser = UUID.randomUUID.toString + private val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner( + fileStoreTable.coreOptions.dynamicBucketMaxBuckets, + numAssigners) def processPartition(rowIterator: Iterator[Row]): Iterator[Row] = { val rowPartitionKeyExtractor = new RowPartitionKeyExtractor(fileStoreTable.schema) @@ -113,7 +115,7 @@ case class DynamicBucketProcessor( numAssigners, TaskContext.getPartitionId(), targetBucketRowNumber, - maxBucketsNum + maxBucketsArr(TaskContext.getPartitionId()) ) new Iterator[Row]() { diff --git a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala index bf42f3b40449..704fd1bce02a 100644 --- a/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala +++ b/paimon-spark/paimon-spark-common/src/main/scala/org/apache/paimon/spark/commands/PaimonSparkWriter.scala @@ -24,7 +24,7 @@ import org.apache.paimon.crosspartition.{IndexBootstrap, KeyPartOrRow} import org.apache.paimon.data.serializer.InternalSerializers import org.apache.paimon.deletionvectors.DeletionVector import org.apache.paimon.deletionvectors.append.AppendDeletionFileMaintainer -import org.apache.paimon.index.{BucketAssigner, SimpleHashBucketAssigner} +import org.apache.paimon.index.{BucketAssigner, PartitionIndex, SimpleHashBucketAssigner} import org.apache.paimon.io.{CompactIncrement, DataIncrement, IndexIncrement} import org.apache.paimon.manifest.{FileKind, IndexManifestEntry} import org.apache.paimon.spark.{SparkRow, SparkTableWrite, SparkTypeUtils} @@ -176,6 +176,9 @@ case class PaimonSparkWriter(table: FileStoreTable) { val numAssigners = Option(table.coreOptions.dynamicBucketInitialBuckets) .map(initialBuckets => Math.min(initialBuckets.toInt, assignerParallelism)) .getOrElse(assignerParallelism) + val maxBucketsArr = PartitionIndex.getMaxBucketsPerAssigner( + table.coreOptions.dynamicBucketMaxBuckets, + numAssigners) def partitionByKey(): DataFrame = { repartitionByKeyPartitionHash( @@ -197,7 +200,8 @@ case class PaimonSparkWriter(table: FileStoreTable) { numAssigners, TaskContext.getPartitionId(), table.coreOptions.dynamicBucketTargetRowNum, - table.coreOptions.dynamicBucketMaxBucketsPerAssigner()) + maxBucketsArr(TaskContext.getPartitionId()) + ) row => { val sparkRow = new SparkRow(rowType, row) assigner.assign(