From 1fe87673195aa4983af4ec0764cad5d35c997716 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 10 Dec 2024 14:06:37 +0800 Subject: [PATCH 1/7] fail retry --- .../seatunnel/hbase/client/HbaseClient.java | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java index 3fb0351455f..726198e5a33 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java @@ -17,6 +17,7 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.client; +import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException; @@ -26,6 +27,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.NamespaceDescriptor; +import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; @@ -38,6 +40,7 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; +import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -55,6 +58,7 @@ @Slf4j public class HbaseClient { + private static final long RETRY_DELAY_MS = 1000; private final Connection connection; private final Admin admin; private final BufferedMutator hbaseMutator; @@ -77,7 +81,31 @@ private HbaseClient(Connection connection, HbaseParameters hbaseParameters) { hbaseParameters.getNamespace(), hbaseParameters.getTable())) .pool(HTable.getDefaultExecutor(hbaseConfiguration)) - .writeBufferSize(hbaseParameters.getWriteBufferSize()); + .writeBufferSize(hbaseParameters.getWriteBufferSize()) + .listener( + (e, mutator) -> { + for (int i = 0; i < e.getNumExceptions(); i++) { + Row row = e.getRow(i); + log.error("Failed to sent put {}.", row); + if (mutator != null + && e.getCause() + instanceof NotServingRegionException) { + log.info( + "Retrying put {} after {} ms...", + row, + RETRY_DELAY_MS); + try { + Thread.sleep(RETRY_DELAY_MS); + mutator.mutate((Put) row); + } catch (IOException | InterruptedException ex) { + log.error( + "Unexpected exception during put {}.", + row, + e); + } + } + } + }); hbaseMutator = connection.getBufferedMutator(bufferedMutatorParams); } catch (IOException e) { throw new HbaseConnectorException( From c2b96522c48d8b990695273b958cd0ba6a85f035 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 10 Dec 2024 14:08:04 +0800 Subject: [PATCH 2/7] fail retry --- .../connectors/seatunnel/hbase/client/HbaseClient.java | 1 - .../connectors/seatunnel/hbase/sink/HbaseSinkWriter.java | 2 -- 2 files changed, 3 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java index 726198e5a33..9ef0faf9f25 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java @@ -17,7 +17,6 @@ package org.apache.seatunnel.connectors.seatunnel.hbase.client; -import org.apache.hadoop.hbase.client.RetriesExhaustedWithDetailsException; import org.apache.seatunnel.connectors.seatunnel.hbase.config.HbaseParameters; import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorErrorCode; import org.apache.seatunnel.connectors.seatunnel.hbase.exception.HbaseConnectorException; diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 73ee19f9369..29a14bfc89c 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -37,7 +37,6 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.List; -import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -120,7 +119,6 @@ private Put convertRowToPut(SeaTunnelRow row) { .collect(Collectors.toList()); for (Integer writeColumnIndex : writeColumnIndexes) { String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex); - Map configurationFamilyNames = hbaseParameters.getFamilyNames(); String familyName = hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName); byte[] bytes = convertColumnToBytes(row, writeColumnIndex); From d8548f3d398cb23e81afb0ee5a4677cd1fc73ebc Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sat, 14 Dec 2024 10:38:12 +0800 Subject: [PATCH 3/7] Support Doris Fe Node HA --- .../connectors/doris/rest/RestService.java | 44 +++++++-------- .../doris/sink/writer/DorisSinkWriter.java | 41 +++++++++----- .../doris/rest/models/RestServiceTest.java | 56 ------------------- 3 files changed, 48 insertions(+), 93 deletions(-) delete mode 100644 seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index 77c23f73418..12af5f4d5bd 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -55,9 +55,7 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; -import java.util.Arrays; import java.util.Base64; -import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -240,22 +238,7 @@ static String[] parseIdentifier(String tableIdentifier, Logger logger) } @VisibleForTesting - public static String randomEndpoint(String feNodes, Logger logger) - throws DorisConnectorException { - logger.trace("Parse fenodes '{}'.", feNodes); - if (StringUtils.isEmpty(feNodes)) { - String errMsg = - String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - List nodes = Arrays.asList(feNodes.split(",")); - Collections.shuffle(nodes); - return nodes.get(0).trim(); - } - - @VisibleForTesting - static String getUriStr( - DorisSourceConfig dorisSourceConfig, DorisSourceTable dorisSourceTable, Logger logger) + static String getUriStr(String node, DorisSourceTable dorisSourceTable, Logger logger) throws DorisConnectorException { String tableIdentifier = dorisSourceTable.getTablePath().getDatabaseName() @@ -263,7 +246,7 @@ static String getUriStr( + dorisSourceTable.getTablePath().getTableName(); String[] identifier = parseIdentifier(tableIdentifier, logger); return "http://" - + randomEndpoint(dorisSourceConfig.getFrontends(), logger) + + node.trim() + API_PREFIX + "/" + identifier[0] @@ -298,16 +281,31 @@ public static List findPartitions( } logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); - HttpPost httpPost = - new HttpPost(getUriStr(dorisSourceConfig, dorisSourceTable, logger) + QUERY_PLAN); String entity = "{\"sql\": \"" + sql + "\"}"; logger.debug("Post body Sending to Doris FE is: '{}'.", entity); StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); - httpPost.setEntity(stringEntity); - String resStr = send(dorisSourceConfig, httpPost, logger); + String[] feNodes = dorisSourceConfig.getFrontends().split(","); + int feNodesNum = feNodes.length; + String resStr = null; + + for (int i = 0; i < feNodesNum; i++) { + try { + HttpPost httpPost = + new HttpPost(getUriStr(feNodes[i], dorisSourceTable, logger) + QUERY_PLAN); + httpPost.setEntity(stringEntity); + resStr = send(dorisSourceConfig, httpPost, logger); + break; + } catch (DorisConnectorException e) { + if (i == feNodesNum - 1) { + throw new DorisConnectorException( + DorisConnectorErrorCode.REST_SERVICE_FAILED, e); + } + } + } + logger.debug("Find partition response is '{}'.", resStr); QueryPlan queryPlan = getQueryPlan(resStr, logger); Map> be2Tablets = selectBeForTablet(queryPlan, logger); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index fa0d671e82c..a6a88be1773 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -27,7 +27,6 @@ import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; -import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.seatunnel.connectors.doris.rest.models.RespContent; import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer; import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer; @@ -98,21 +97,35 @@ public DorisSinkWriter( } private void initializeLoad() { - String backend = RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log); - try { - this.dorisStreamLoad = - new DorisStreamLoad( - backend, - catalogTable.getTablePath(), - dorisSinkConfig, - labelGenerator, - new HttpUtil().getHttpClient()); - if (dorisSinkConfig.getEnable2PC()) { - dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + + String[] feNodes = dorisSinkConfig.getFrontends().split(","); + int feNodesNum = feNodes.length; + + for (int i = 0; i < feNodesNum; i++) { + try { + this.dorisStreamLoad = + new DorisStreamLoad( + feNodes[i], + catalogTable.getTablePath(), + dorisSinkConfig, + labelGenerator, + new HttpUtil().getHttpClient()); + if (dorisSinkConfig.getEnable2PC()) { + dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + } + break; + } catch (Exception e) { + if (i == feNodesNum - 1) { + throw new DorisConnectorException( + DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); + } + log.error( + "stream load error for feNode: {} with exception: {}", + feNodes[i], + e.getMessage()); } - } catch (Exception e) { - throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); } + startLoad(labelGenerator.generateLabel(lastCheckpointId + 1)); // when uploading data in streaming mode, we need to regularly detect whether there are // exceptions. diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java deleted file mode 100644 index aa917d57662..00000000000 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import org.apache.seatunnel.connectors.doris.rest.RestService; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import lombok.extern.slf4j.Slf4j; - -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -@Slf4j -public class RestServiceTest { - - @Test - void testRandomEndpoint() { - - List list = - Arrays.asList( - "fe_host1:fe_http_port1", - "fe_host2:fe_http_port2", - "fe_host3:fe_http_port3", - "fe_host4:fe_http_port4", - "fe_host5:fe_http_port5"); - - boolean hasDifferentAddress = false; - for (int i = 0; i < 5; i++) { - Set addresses = - list.stream() - .map(address -> RestService.randomEndpoint(String.join(",", list), log)) - .collect(Collectors.toSet()); - hasDifferentAddress = addresses.size() > 1; - } - Assertions.assertTrue(hasDifferentAddress); - } -} From 6502056b1f4a9ac52215065d9ab24be1053443f8 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 17 Dec 2024 08:24:45 +0800 Subject: [PATCH 4/7] remove wrong code --- .../seatunnel/hbase/client/HbaseClient.java | 32 +++---------------- .../seatunnel/hbase/sink/HbaseSinkWriter.java | 2 ++ 2 files changed, 6 insertions(+), 28 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java index 9ef0faf9f25..2d17a879d76 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java @@ -76,35 +76,11 @@ private HbaseClient(Connection connection, HbaseParameters hbaseParameters) { BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams( - TableName.valueOf( - hbaseParameters.getNamespace(), - hbaseParameters.getTable())) + TableName.valueOf( + hbaseParameters.getNamespace(), + hbaseParameters.getTable())) .pool(HTable.getDefaultExecutor(hbaseConfiguration)) - .writeBufferSize(hbaseParameters.getWriteBufferSize()) - .listener( - (e, mutator) -> { - for (int i = 0; i < e.getNumExceptions(); i++) { - Row row = e.getRow(i); - log.error("Failed to sent put {}.", row); - if (mutator != null - && e.getCause() - instanceof NotServingRegionException) { - log.info( - "Retrying put {} after {} ms...", - row, - RETRY_DELAY_MS); - try { - Thread.sleep(RETRY_DELAY_MS); - mutator.mutate((Put) row); - } catch (IOException | InterruptedException ex) { - log.error( - "Unexpected exception during put {}.", - row, - e); - } - } - } - }); + .writeBufferSize(hbaseParameters.getWriteBufferSize()); hbaseMutator = connection.getBufferedMutator(bufferedMutatorParams); } catch (IOException e) { throw new HbaseConnectorException( diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java index 29a14bfc89c..73ee19f9369 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/sink/HbaseSinkWriter.java @@ -37,6 +37,7 @@ import java.io.IOException; import java.nio.charset.Charset; import java.util.List; +import java.util.Map; import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.IntStream; @@ -119,6 +120,7 @@ private Put convertRowToPut(SeaTunnelRow row) { .collect(Collectors.toList()); for (Integer writeColumnIndex : writeColumnIndexes) { String fieldName = seaTunnelRowType.getFieldName(writeColumnIndex); + Map configurationFamilyNames = hbaseParameters.getFamilyNames(); String familyName = hbaseParameters.getFamilyNames().getOrDefault(fieldName, defaultFamilyName); byte[] bytes = convertColumnToBytes(row, writeColumnIndex); From 17aac54707229bcc46a151c1439985cdd39bf3d8 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 17 Dec 2024 08:26:52 +0800 Subject: [PATCH 5/7] remove wrong code --- .../connectors/seatunnel/hbase/client/HbaseClient.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java index 2d17a879d76..3fb0351455f 100644 --- a/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java +++ b/seatunnel-connectors-v2/connector-hbase/src/main/java/org/apache/seatunnel/connectors/seatunnel/hbase/client/HbaseClient.java @@ -26,7 +26,6 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hbase.HBaseConfiguration; import org.apache.hadoop.hbase.NamespaceDescriptor; -import org.apache.hadoop.hbase.NotServingRegionException; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.client.BufferedMutator; @@ -39,7 +38,6 @@ import org.apache.hadoop.hbase.client.RegionLocator; import org.apache.hadoop.hbase.client.Result; import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Row; import org.apache.hadoop.hbase.client.Scan; import org.apache.hadoop.hbase.client.Table; import org.apache.hadoop.hbase.client.TableDescriptorBuilder; @@ -57,7 +55,6 @@ @Slf4j public class HbaseClient { - private static final long RETRY_DELAY_MS = 1000; private final Connection connection; private final Admin admin; private final BufferedMutator hbaseMutator; @@ -76,9 +73,9 @@ private HbaseClient(Connection connection, HbaseParameters hbaseParameters) { BufferedMutatorParams bufferedMutatorParams = new BufferedMutatorParams( - TableName.valueOf( - hbaseParameters.getNamespace(), - hbaseParameters.getTable())) + TableName.valueOf( + hbaseParameters.getNamespace(), + hbaseParameters.getTable())) .pool(HTable.getDefaultExecutor(hbaseConfiguration)) .writeBufferSize(hbaseParameters.getWriteBufferSize()); hbaseMutator = connection.getBufferedMutator(bufferedMutatorParams); From 8be703297fe23f9c4245acf00efff552276fc2ec Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 17 Dec 2024 13:28:36 +0800 Subject: [PATCH 6/7] random node --- .../seatunnel/connectors/doris/rest/RestService.java | 10 +++++++--- .../connectors/doris/sink/writer/DorisSinkWriter.java | 9 +++++---- 2 files changed, 12 insertions(+), 7 deletions(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index 12af5f4d5bd..02be965ffba 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -55,7 +55,9 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Base64; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.List; @@ -287,14 +289,16 @@ public static List findPartitions( stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); - String[] feNodes = dorisSourceConfig.getFrontends().split(","); - int feNodesNum = feNodes.length; + List feNodes = Arrays.asList(dorisSourceConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + int feNodesNum = feNodes.size(); String resStr = null; for (int i = 0; i < feNodesNum; i++) { try { HttpPost httpPost = - new HttpPost(getUriStr(feNodes[i], dorisSourceTable, logger) + QUERY_PLAN); + new HttpPost( + getUriStr(feNodes.get(i), dorisSourceTable, logger) + QUERY_PLAN); httpPost.setEntity(stringEntity); resStr = send(dorisSourceConfig, httpPost, logger); break; diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index a6a88be1773..2d40ea7bdd0 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -98,14 +98,15 @@ public DorisSinkWriter( private void initializeLoad() { - String[] feNodes = dorisSinkConfig.getFrontends().split(","); - int feNodesNum = feNodes.length; + List feNodes = Arrays.asList(dorisSinkConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + int feNodesNum = feNodes.size(); for (int i = 0; i < feNodesNum; i++) { try { this.dorisStreamLoad = new DorisStreamLoad( - feNodes[i], + feNodes.get(i), catalogTable.getTablePath(), dorisSinkConfig, labelGenerator, @@ -121,7 +122,7 @@ private void initializeLoad() { } log.error( "stream load error for feNode: {} with exception: {}", - feNodes[i], + feNodes.get(i), e.getMessage()); } } From 7c1de4645a2f8e7fa9af5f7ec5541be35a5957da Mon Sep 17 00:00:00 2001 From: ClownXC Date: Wed, 18 Dec 2024 08:15:06 +0800 Subject: [PATCH 7/7] exception --- .../apache/seatunnel/connectors/doris/rest/RestService.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index 02be965ffba..cb2f33df931 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -302,11 +302,15 @@ public static List findPartitions( httpPost.setEntity(stringEntity); resStr = send(dorisSourceConfig, httpPost, logger); break; - } catch (DorisConnectorException e) { + } catch (Exception e) { if (i == feNodesNum - 1) { throw new DorisConnectorException( DorisConnectorErrorCode.REST_SERVICE_FAILED, e); } + log.error( + "Find partition error for feNode: {} with exception: {}", + feNodes.get(i), + e.getMessage()); } }