diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java index 77c23f73418..cb2f33df931 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/rest/RestService.java @@ -240,22 +240,7 @@ static String[] parseIdentifier(String tableIdentifier, Logger logger) } @VisibleForTesting - public static String randomEndpoint(String feNodes, Logger logger) - throws DorisConnectorException { - logger.trace("Parse fenodes '{}'.", feNodes); - if (StringUtils.isEmpty(feNodes)) { - String errMsg = - String.format(ErrorMessages.ILLEGAL_ARGUMENT_MESSAGE, "fenodes", feNodes); - throw new DorisConnectorException(DorisConnectorErrorCode.REST_SERVICE_FAILED, errMsg); - } - List nodes = Arrays.asList(feNodes.split(",")); - Collections.shuffle(nodes); - return nodes.get(0).trim(); - } - - @VisibleForTesting - static String getUriStr( - DorisSourceConfig dorisSourceConfig, DorisSourceTable dorisSourceTable, Logger logger) + static String getUriStr(String node, DorisSourceTable dorisSourceTable, Logger logger) throws DorisConnectorException { String tableIdentifier = dorisSourceTable.getTablePath().getDatabaseName() @@ -263,7 +248,7 @@ static String getUriStr( + dorisSourceTable.getTablePath().getTableName(); String[] identifier = parseIdentifier(tableIdentifier, logger); return "http://" - + randomEndpoint(dorisSourceConfig.getFrontends(), logger) + + node.trim() + API_PREFIX + "/" + identifier[0] @@ -298,16 +283,37 @@ public static List findPartitions( } logger.debug("Query SQL Sending to Doris FE is: '{}'.", sql); - HttpPost httpPost = - new HttpPost(getUriStr(dorisSourceConfig, dorisSourceTable, logger) + QUERY_PLAN); String entity = "{\"sql\": \"" + sql + "\"}"; logger.debug("Post body Sending to Doris FE is: '{}'.", entity); StringEntity stringEntity = new StringEntity(entity, StandardCharsets.UTF_8); stringEntity.setContentEncoding("UTF-8"); stringEntity.setContentType("application/json"); - httpPost.setEntity(stringEntity); - String resStr = send(dorisSourceConfig, httpPost, logger); + List feNodes = Arrays.asList(dorisSourceConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + int feNodesNum = feNodes.size(); + String resStr = null; + + for (int i = 0; i < feNodesNum; i++) { + try { + HttpPost httpPost = + new HttpPost( + getUriStr(feNodes.get(i), dorisSourceTable, logger) + QUERY_PLAN); + httpPost.setEntity(stringEntity); + resStr = send(dorisSourceConfig, httpPost, logger); + break; + } catch (Exception e) { + if (i == feNodesNum - 1) { + throw new DorisConnectorException( + DorisConnectorErrorCode.REST_SERVICE_FAILED, e); + } + log.error( + "Find partition error for feNode: {} with exception: {}", + feNodes.get(i), + e.getMessage()); + } + } + logger.debug("Find partition response is '{}'.", resStr); QueryPlan queryPlan = getQueryPlan(resStr, logger); Map> be2Tablets = selectBeForTablet(queryPlan, logger); diff --git a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java index fa0d671e82c..2d40ea7bdd0 100644 --- a/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java +++ b/seatunnel-connectors-v2/connector-doris/src/main/java/org/apache/seatunnel/connectors/doris/sink/writer/DorisSinkWriter.java @@ -27,7 +27,6 @@ import org.apache.seatunnel.connectors.doris.config.DorisSinkConfig; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorErrorCode; import org.apache.seatunnel.connectors.doris.exception.DorisConnectorException; -import org.apache.seatunnel.connectors.doris.rest.RestService; import org.apache.seatunnel.connectors.doris.rest.models.RespContent; import org.apache.seatunnel.connectors.doris.serialize.DorisSerializer; import org.apache.seatunnel.connectors.doris.serialize.SeaTunnelRowSerializer; @@ -98,21 +97,36 @@ public DorisSinkWriter( } private void initializeLoad() { - String backend = RestService.randomEndpoint(dorisSinkConfig.getFrontends(), log); - try { - this.dorisStreamLoad = - new DorisStreamLoad( - backend, - catalogTable.getTablePath(), - dorisSinkConfig, - labelGenerator, - new HttpUtil().getHttpClient()); - if (dorisSinkConfig.getEnable2PC()) { - dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + + List feNodes = Arrays.asList(dorisSinkConfig.getFrontends().split(",")); + Collections.shuffle(feNodes); + int feNodesNum = feNodes.size(); + + for (int i = 0; i < feNodesNum; i++) { + try { + this.dorisStreamLoad = + new DorisStreamLoad( + feNodes.get(i), + catalogTable.getTablePath(), + dorisSinkConfig, + labelGenerator, + new HttpUtil().getHttpClient()); + if (dorisSinkConfig.getEnable2PC()) { + dorisStreamLoad.abortPreCommit(labelPrefix, lastCheckpointId + 1); + } + break; + } catch (Exception e) { + if (i == feNodesNum - 1) { + throw new DorisConnectorException( + DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); + } + log.error( + "stream load error for feNode: {} with exception: {}", + feNodes.get(i), + e.getMessage()); } - } catch (Exception e) { - throw new DorisConnectorException(DorisConnectorErrorCode.STREAM_LOAD_FAILED, e); } + startLoad(labelGenerator.generateLabel(lastCheckpointId + 1)); // when uploading data in streaming mode, we need to regularly detect whether there are // exceptions. diff --git a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java b/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java deleted file mode 100644 index aa917d57662..00000000000 --- a/seatunnel-connectors-v2/connector-doris/src/test/java/org/apache/seatunnel/connectors/doris/rest/models/RestServiceTest.java +++ /dev/null @@ -1,56 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.seatunnel.connectors.doris.rest.models; - -import org.apache.seatunnel.connectors.doris.rest.RestService; - -import org.junit.jupiter.api.Assertions; -import org.junit.jupiter.api.Test; - -import lombok.extern.slf4j.Slf4j; - -import java.util.Arrays; -import java.util.List; -import java.util.Set; -import java.util.stream.Collectors; - -@Slf4j -public class RestServiceTest { - - @Test - void testRandomEndpoint() { - - List list = - Arrays.asList( - "fe_host1:fe_http_port1", - "fe_host2:fe_http_port2", - "fe_host3:fe_http_port3", - "fe_host4:fe_http_port4", - "fe_host5:fe_http_port5"); - - boolean hasDifferentAddress = false; - for (int i = 0; i < 5; i++) { - Set addresses = - list.stream() - .map(address -> RestService.randomEndpoint(String.join(",", list), log)) - .collect(Collectors.toSet()); - hasDifferentAddress = addresses.size() > 1; - } - Assertions.assertTrue(hasDifferentAddress); - } -}