From 83b0c72bb6d0e4af652875cd402ec2f4c97ff9aa Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sat, 27 Apr 2024 17:11:20 +0800 Subject: [PATCH 1/4] paimon source optimize --- .../source/PaimonSourceSplitEnumerator.java | 111 +++++++++++------- .../paimon/source/PaimonSourceState.java | 20 ++-- 2 files changed, 79 insertions(+), 52 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java index aab44b2260f..3dff07449ed 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java @@ -19,6 +19,7 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator; +import org.apache.commons.collections.map.HashedMap; import org.apache.paimon.table.Table; import org.apache.paimon.table.source.Split; @@ -26,8 +27,12 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import java.util.stream.Collectors; @@ -39,36 +44,51 @@ public class PaimonSourceSplitEnumerator /** Source split enumerator context */ private final Context context; - /** The splits that has assigned */ - private final Set assignedSplit; + private Map> pendingSplit; - /** The splits that have not assigned */ - private Set pendingSplit; + private volatile boolean shouldEnumerate; + + private final Object stateLock = new Object(); /** The table that wants to read */ private final Table table; public PaimonSourceSplitEnumerator(Context context, Table table) { - this.context = context; - this.table = table; - this.assignedSplit = new HashSet<>(); + this(context, table, null); } public PaimonSourceSplitEnumerator( Context context, Table table, PaimonSourceState sourceState) { this.context = context; this.table = table; - this.assignedSplit = sourceState.getAssignedSplits(); + this.pendingSplit = new HashMap<>(); + this.shouldEnumerate = sourceState == null; + if (sourceState != null) { + this.shouldEnumerate = sourceState.isShouldEnumerate(); + this.pendingSplit.putAll(sourceState.getPendingSplits()); + } } @Override public void open() { - this.pendingSplit = new HashSet<>(); + this.pendingSplit = new HashedMap(); } @Override public void run() throws Exception { - // do nothing + Set readers = context.registeredReaders(); + if (shouldEnumerate) { + Set newSplits = getTableSplits(); + synchronized (stateLock) { + addPendingSplit(newSplits); + shouldEnumerate = false; + } + + assignSplit(readers); + } + log.debug( + "No more splits to assign." + " Sending NoMoreSplitsEvent to reader {}.", readers); + readers.forEach(context::signalNoMoreSplits); } @Override @@ -79,8 +99,8 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - pendingSplit.addAll(splits); - assignSplit(subtaskId); + addPendingSplit(splits); + assignSplit(Collections.singletonList(subtaskId)); } } @@ -91,13 +111,17 @@ public int currentUnassignedSplitSize() { @Override public void registerReader(int subtaskId) { - pendingSplit = getTableSplits(); - assignSplit(subtaskId); + log.debug("Register reader {} to PaimonSourceSplitEnumerator.", subtaskId); + if (!pendingSplit.isEmpty()) { + assignSplit(Collections.singletonList(subtaskId)); + } } @Override public PaimonSourceState snapshotState(long checkpointId) throws Exception { - return new PaimonSourceState(assignedSplit); + synchronized (stateLock) { + return new PaimonSourceState(pendingSplit, shouldEnumerate); + } } @Override @@ -110,36 +134,41 @@ public void handleSplitRequest(int subtaskId) { // do nothing } + private void addPendingSplit(Collection splits) { + int readerCount = context.currentParallelism(); + for (PaimonSourceSplit split : splits) { + int ownerReader = getSplitOwner(split.splitId(), readerCount); + log.info("Assigning {} to {} reader.", split.getSplit().toString(), ownerReader); + pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + } + } + /** Assign split by reader task id */ - private void assignSplit(int taskId) { - ArrayList currentTaskSplits = new ArrayList<>(); - if (context.currentParallelism() == 1) { - // if parallelism == 1, we should assign all the splits to reader - currentTaskSplits.addAll(pendingSplit); - } else { - // if parallelism > 1, according to hashCode of split's id to determine whether to - // allocate the current task - for (PaimonSourceSplit fileSourceSplit : pendingSplit) { - final int splitOwner = - getSplitOwner(fileSourceSplit.splitId(), context.currentParallelism()); - if (splitOwner == taskId) { - currentTaskSplits.add(fileSourceSplit); + private void assignSplit(Collection readers) { + + log.debug("Assign pendingSplits to readers {}", readers); + + for (int reader : readers) { + List assignmentForReader = pendingSplit.remove(reader); + if (assignmentForReader != null && !assignmentForReader.isEmpty()) { + log.info( + "Assign splits {} to reader {}", + assignmentForReader.stream() + .map(p -> p.getSplit().toString()) + .collect(Collectors.joining(",")), + reader); + try { + context.assignSplit(reader, assignmentForReader); + } catch (Exception e) { + log.error( + "Failed to assign splits {} to reader {}", + assignmentForReader, + reader, + e); + pendingSplit.put(reader, assignmentForReader); } } } - // assign splits - context.assignSplit(taskId, currentTaskSplits); - // save the state of assigned splits - assignedSplit.addAll(currentTaskSplits); - // remove the assigned splits from pending splits - currentTaskSplits.forEach(split -> pendingSplit.remove(split)); - log.info( - "SubTask {} is assigned to [{}]", - taskId, - currentTaskSplits.stream() - .map(PaimonSourceSplit::splitId) - .collect(Collectors.joining(","))); - context.signalNoMoreSplits(taskId); } /** Get all splits of table */ diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java index c8336b0d03c..17117ba7a3d 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java @@ -17,21 +17,19 @@ package org.apache.seatunnel.connectors.seatunnel.paimon.source; +import lombok.AllArgsConstructor; +import lombok.Getter; + import java.io.Serializable; -import java.util.Set; +import java.util.List; +import java.util.Map; +@AllArgsConstructor +@Getter /** Paimon connector source state, saves the splits has assigned to readers. */ public class PaimonSourceState implements Serializable { private static final long serialVersionUID = 1L; - - private final Set assignedSplits; - - public PaimonSourceState(Set assignedSplits) { - this.assignedSplits = assignedSplits; - } - - public Set getAssignedSplits() { - return assignedSplits; - } + private final Map> pendingSplits; + private boolean shouldEnumerate; } From ae76cb415052af81b7290e4df26173018485bc91 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sat, 27 Apr 2024 17:19:59 +0800 Subject: [PATCH 2/4] replace dead links --- docs/en/connector-v2/sink/Doris.md | 46 +++++++++++++++--------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index 453a410bbe6..b8f58adf042 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -26,29 +26,29 @@ The internal implementation of Doris sink connector is cached and imported by st ## Sink Options -| Name | Type | Required | Default | Description | -|--------------------------------|---------|----------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | -| query-port | int | No | 9030 | `Doris` Fenodes query_port | -| username | String | Yes | - | `Doris` user username | -| password | String | Yes | - | `Doris` user password | -| database | String | Yes | - | The database name of `Doris` table, use `${database_name}` to represent the upstream table name | -| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name | -| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. | -| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | -| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). | -| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) | -| sink.check-interval | int | No | 10000 | check exception with the interval while loading | -| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | -| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | -| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | -| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | -| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | -| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | -| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | -| save_mode_create_template | string | no | see below | see below | -| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | -| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | +| Name | Type | Required | Default | Description | +|--------------------------------|---------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | +| query-port | int | No | 9030 | `Doris` Fenodes query_port | +| username | String | Yes | - | `Doris` user username | +| password | String | Yes | - | `Doris` user password | +| database | String | Yes | - | The database name of `Doris` table, use `${database_name}` to represent the upstream table name | +| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name | +| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. | +| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | +| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/STREAM-LOAD). | +| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/delete/batch-delete-manual) | +| sink.check-interval | int | No | 10000 | check exception with the interval while loading | +| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | +| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | +| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | +| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | +| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | +| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | +| save_mode_create_template | string | no | see below | see below | +| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | +| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | ### schema_save_mode[Enum] From 76e2593f205eec322d98dc50f2400c455c534baf Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sun, 28 Apr 2024 11:26:24 +0800 Subject: [PATCH 3/4] sr --- docs/en/connector-v2/sink/Doris.md | 46 +++++++++++++++--------------- 1 file changed, 23 insertions(+), 23 deletions(-) diff --git a/docs/en/connector-v2/sink/Doris.md b/docs/en/connector-v2/sink/Doris.md index b8f58adf042..453a410bbe6 100644 --- a/docs/en/connector-v2/sink/Doris.md +++ b/docs/en/connector-v2/sink/Doris.md @@ -26,29 +26,29 @@ The internal implementation of Doris sink connector is cached and imported by st ## Sink Options -| Name | Type | Required | Default | Description | -|--------------------------------|---------|----------|------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| -| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | -| query-port | int | No | 9030 | `Doris` Fenodes query_port | -| username | String | Yes | - | `Doris` user username | -| password | String | Yes | - | `Doris` user password | -| database | String | Yes | - | The database name of `Doris` table, use `${database_name}` to represent the upstream table name | -| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name | -| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. | -| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | -| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-statements/Data-Manipulation-Statements/Load/STREAM-LOAD). | -| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/delete/batch-delete-manual) | -| sink.check-interval | int | No | 10000 | check exception with the interval while loading | -| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | -| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | -| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | -| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | -| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | -| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | -| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | -| save_mode_create_template | string | no | see below | see below | -| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | -| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | +| Name | Type | Required | Default | Description | +|--------------------------------|---------|----------|------------------------------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| fenodes | String | Yes | - | `Doris` cluster fenodes address, the format is `"fe_ip:fe_http_port, ..."` | +| query-port | int | No | 9030 | `Doris` Fenodes query_port | +| username | String | Yes | - | `Doris` user username | +| password | String | Yes | - | `Doris` user password | +| database | String | Yes | - | The database name of `Doris` table, use `${database_name}` to represent the upstream table name | +| table | String | Yes | - | The table name of `Doris` table, use `${table_name}` to represent the upstream table name | +| table.identifier | String | Yes | - | The name of `Doris` table, it will deprecate after version 2.3.5, please use `database` and `table` instead. | +| sink.label-prefix | String | Yes | - | The label prefix used by stream load imports. In the 2pc scenario, global uniqueness is required to ensure the EOS semantics of SeaTunnel. | +| sink.enable-2pc | bool | No | false | Whether to enable two-phase commit (2pc), the default is false. For two-phase commit, please refer to [here](https://doris.apache.org/docs/dev/sql-manual/sql-reference/Data-Manipulation-Statements/Load/STREAM-LOAD). | +| sink.enable-delete | bool | No | - | Whether to enable deletion. This option requires Doris table to enable batch delete function (0.15+ version is enabled by default), and only supports Unique model. you can get more detail at this [link](https://doris.apache.org/docs/dev/data-operate/update-delete/batch-delete-manual) | +| sink.check-interval | int | No | 10000 | check exception with the interval while loading | +| sink.max-retries | int | No | 3 | the max retry times if writing records to database failed | +| sink.buffer-size | int | No | 256 * 1024 | the buffer size to cache data for stream load. | +| sink.buffer-count | int | No | 3 | the buffer count to cache data for stream load. | +| doris.batch.size | int | No | 1024 | the batch size of the write to doris each http request, when the row reaches the size or checkpoint is executed, the data of cached will write to server. | +| needs_unsupported_type_casting | boolean | No | false | Whether to enable the unsupported type casting, such as Decimal64 to Double | +| schema_save_mode | Enum | no | CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to `schema_save_mode` below | +| data_save_mode | Enum | no | APPEND_DATA | the data save mode, please refer to `data_save_mode` below | +| save_mode_create_template | string | no | see below | see below | +| custom_sql | String | no | - | When data_save_mode selects CUSTOM_PROCESSING, you should fill in the CUSTOM_SQL parameter. This parameter usually fills in a SQL that can be executed. SQL will be executed before synchronization tasks. | +| doris.config | map | yes | - | This option is used to support operations such as `insert`, `delete`, and `update` when automatically generate sql,and supported formats. | ### schema_save_mode[Enum] From b9c2490d27cd726e63bd92730b2d7a9324fd7d03 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Wed, 1 May 2024 10:47:11 +0800 Subject: [PATCH 4/4] remove pentting splits --- .../source/PaimonSourceSplitEnumerator.java | 39 ++++++++++--------- .../paimon/source/PaimonSourceState.java | 4 +- 2 files changed, 22 insertions(+), 21 deletions(-) diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java index 3dff07449ed..af7b0e02008 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceSplitEnumerator.java @@ -29,7 +29,6 @@ import java.util.ArrayList; import java.util.Collection; import java.util.Collections; -import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -44,7 +43,9 @@ public class PaimonSourceSplitEnumerator /** Source split enumerator context */ private final Context context; - private Map> pendingSplit; + private final Set pendingSplits = new HashSet<>(); + + private Map> assignedSplits; private volatile boolean shouldEnumerate; @@ -61,29 +62,26 @@ public PaimonSourceSplitEnumerator( Context context, Table table, PaimonSourceState sourceState) { this.context = context; this.table = table; - this.pendingSplit = new HashMap<>(); - this.shouldEnumerate = sourceState == null; + this.shouldEnumerate = (sourceState == null || sourceState.isShouldEnumerate()); + this.assignedSplits = new HashedMap(); if (sourceState != null) { - this.shouldEnumerate = sourceState.isShouldEnumerate(); - this.pendingSplit.putAll(sourceState.getPendingSplits()); + this.assignedSplits.putAll(sourceState.getAssignedSplits()); } } @Override public void open() { - this.pendingSplit = new HashedMap(); + this.pendingSplits.addAll(getTableSplits()); } @Override public void run() throws Exception { Set readers = context.registeredReaders(); if (shouldEnumerate) { - Set newSplits = getTableSplits(); synchronized (stateLock) { - addPendingSplit(newSplits); + addAssignSplit(pendingSplits); shouldEnumerate = false; } - assignSplit(readers); } log.debug( @@ -99,20 +97,20 @@ public void close() throws IOException { @Override public void addSplitsBack(List splits, int subtaskId) { if (!splits.isEmpty()) { - addPendingSplit(splits); + addAssignSplit(splits); assignSplit(Collections.singletonList(subtaskId)); } } @Override public int currentUnassignedSplitSize() { - return pendingSplit.size(); + return pendingSplits.size(); } @Override public void registerReader(int subtaskId) { log.debug("Register reader {} to PaimonSourceSplitEnumerator.", subtaskId); - if (!pendingSplit.isEmpty()) { + if (!pendingSplits.isEmpty()) { assignSplit(Collections.singletonList(subtaskId)); } } @@ -120,7 +118,7 @@ public void registerReader(int subtaskId) { @Override public PaimonSourceState snapshotState(long checkpointId) throws Exception { synchronized (stateLock) { - return new PaimonSourceState(pendingSplit, shouldEnumerate); + return new PaimonSourceState(assignedSplits, shouldEnumerate); } } @@ -134,12 +132,15 @@ public void handleSplitRequest(int subtaskId) { // do nothing } - private void addPendingSplit(Collection splits) { + private void addAssignSplit(Collection splits) { int readerCount = context.currentParallelism(); for (PaimonSourceSplit split : splits) { int ownerReader = getSplitOwner(split.splitId(), readerCount); log.info("Assigning {} to {} reader.", split.getSplit().toString(), ownerReader); - pendingSplit.computeIfAbsent(ownerReader, r -> new ArrayList<>()).add(split); + // remove the assigned splits from pending splits + pendingSplits.remove(split); + // save the state of assigned splits + assignedSplits.computeIfAbsent(ownerReader, r -> new HashSet<>()).add(split); } } @@ -149,7 +150,7 @@ private void assignSplit(Collection readers) { log.debug("Assign pendingSplits to readers {}", readers); for (int reader : readers) { - List assignmentForReader = pendingSplit.remove(reader); + Set assignmentForReader = assignedSplits.remove(reader); if (assignmentForReader != null && !assignmentForReader.isEmpty()) { log.info( "Assign splits {} to reader {}", @@ -158,14 +159,14 @@ private void assignSplit(Collection readers) { .collect(Collectors.joining(",")), reader); try { - context.assignSplit(reader, assignmentForReader); + context.assignSplit(reader, new ArrayList<>(assignmentForReader)); } catch (Exception e) { log.error( "Failed to assign splits {} to reader {}", assignmentForReader, reader, e); - pendingSplit.put(reader, assignmentForReader); + pendingSplits.addAll(assignmentForReader); } } } diff --git a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java index 17117ba7a3d..5a14ce77fc8 100644 --- a/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java +++ b/seatunnel-connectors-v2/connector-paimon/src/main/java/org/apache/seatunnel/connectors/seatunnel/paimon/source/PaimonSourceState.java @@ -21,8 +21,8 @@ import lombok.Getter; import java.io.Serializable; -import java.util.List; import java.util.Map; +import java.util.Set; @AllArgsConstructor @Getter @@ -30,6 +30,6 @@ public class PaimonSourceState implements Serializable { private static final long serialVersionUID = 1L; - private final Map> pendingSplits; + private final Map> assignedSplits; private boolean shouldEnumerate; }