Skip to content

Commit

Permalink
Merge branch 'apache:dev' into dev
Browse files Browse the repository at this point in the history
  • Loading branch information
xiaochen-zhou authored Dec 19, 2024
2 parents 0c4683f + 19d5325 commit e3a1522
Show file tree
Hide file tree
Showing 36 changed files with 700 additions and 256 deletions.
81 changes: 39 additions & 42 deletions .github/workflows/backend.yml

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/en/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ seatunnel:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
enable-dynamic-port: true
port-range: 100
```
Expand Down
2 changes: 1 addition & 1 deletion docs/zh/seatunnel-engine/rest-api-v2.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ seatunnel:
http:
enable-http: true
port: 8080
enable-dynamic-port: false
enable-dynamic-port: true
port-range: 100
```
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.seatunnel.api;

import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;

import com.github.javaparser.JavaParser;
import com.github.javaparser.ParseResult;
import com.github.javaparser.ast.CompilationUnit;
import com.github.javaparser.ast.comments.Comment;
import com.github.javaparser.ast.visitor.VoidVisitorAdapter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.nio.file.FileVisitOption;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.List;
import java.util.regex.Pattern;
import java.util.stream.Stream;

import static org.apache.seatunnel.api.ImportShadeClassCheckTest.isWindows;

@Slf4j
public class ChineseCharacterCheckTest {

private final JavaParser JAVA_PARSER = new JavaParser();

private static final Pattern CHINESE_PATTERN = Pattern.compile("[\\u4e00-\\u9fa5]");

/** Defines what content should be checked for Chinese characters */
public enum CheckScope {
/** Check both comments and code */
ALL,
/** Check only comments */
COMMENTS_ONLY,
/** Check only code (string literals) */
CODE_ONLY
}

@Disabled("Currently only checking comments")
@Test
public void checkChineseCharactersInAll() {
checkChineseCharacters(CheckScope.ALL);
}

@Test
public void checkChineseCharactersInCommentsOnly() {
checkChineseCharacters(CheckScope.COMMENTS_ONLY);
}

@Disabled("Currently only checking comments")
@Test
public void checkChineseCharactersInCodeOnly() {
checkChineseCharacters(CheckScope.CODE_ONLY);
}

private void checkChineseCharacters(CheckScope scope) {
// Define path fragments for source and test Java files
String mainPathFragment = isWindows ? "src\\main\\java" : "src/main/java";
String testPathFragment2 = isWindows ? "src\\test\\java" : "src/test/java";

try (Stream<Path> paths = Files.walk(Paths.get(".."), FileVisitOption.FOLLOW_LINKS)) {
List<String> filesWithChinese = new ArrayList<>();

// Filter Java files in the specified directories
paths.filter(
path -> {
String pathString = path.toString();
return pathString.endsWith(".java")
&& (pathString.contains(mainPathFragment)
|| pathString.contains(testPathFragment2));
})
.forEach(
path -> {
try {
// Parse the Java file
ParseResult<CompilationUnit> parseResult =
JAVA_PARSER.parse(Files.newInputStream(path));

parseResult
.getResult()
.ifPresent(
cu -> {
// Check for Chinese characters in comments
// if needed
if (scope != CheckScope.CODE_ONLY) {
List<Comment> comments =
cu.getAllContainedComments();
for (Comment comment : comments) {
if (CHINESE_PATTERN
.matcher(
comment
.getContent())
.find()) {
filesWithChinese.add(
String.format(
"Found Chinese characters in comment at %s: %s",
path
.toAbsolutePath(),
comment.getContent()
.trim()));
}
}
}

// Check for Chinese characters in code if
// needed
if (scope != CheckScope.COMMENTS_ONLY) {
ChineseCharacterVisitor visitor =
new ChineseCharacterVisitor(
path, filesWithChinese);
visitor.visit(cu, null);
}
});

} catch (Exception e) {
log.error("Error parsing file: {}", path, e);
}
});

// Assert that no files contain Chinese characters
Assertions.assertEquals(
0,
filesWithChinese.size(),
() ->
String.format(
"Found Chinese characters in following files (Scope: %s):\n%s",
scope, String.join("\n", filesWithChinese)));

} catch (IOException e) {
throw new RuntimeException(e);
}
}

private static class ChineseCharacterVisitor extends VoidVisitorAdapter<Void> {
private final Path filePath;
private final List<String> filesWithChinese;

public ChineseCharacterVisitor(Path filePath, List<String> filesWithChinese) {
this.filePath = filePath;
this.filesWithChinese = filesWithChinese;
}

@Override
public void visit(CompilationUnit cu, Void arg) {
// Check for Chinese characters in string literals
cu.findAll(com.github.javaparser.ast.expr.StringLiteralExpr.class)
.forEach(
str -> {
if (CHINESE_PATTERN.matcher(str.getValue()).find()) {
filesWithChinese.add(
String.format(
"Found Chinese characters in string literal at %s: %s",
filePath.toAbsolutePath(), str.getValue()));
}
});
super.visit(cu, arg);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,7 @@ private boolean exists(FTPClient client, Path file) throws IOException {
try {
return getFileStatus(client, file) != null;
} catch (FileNotFoundException fnfe) {
LOG.debug("File does not exist: " + file, fnfe);
return false;
}
}
Expand Down Expand Up @@ -557,12 +558,18 @@ private boolean mkdirs(FTPClient client, Path file, FsPermission permission)
if (created) {
String parentDir = parent.toUri().getPath();
client.changeWorkingDirectory(parentDir);
created = created && client.makeDirectory(pathName);
LOG.debug("Creating directory " + pathName);
created = client.makeDirectory(pathName);
}
} else if (isFile(client, absolute)) {
throw new ParentNotDirectoryException(
String.format(
"Can't make directory for path %s since it is a file.", absolute));
} else {
LOG.debug("Skipping creation of existing directory " + file);
}
if (!created) {
LOG.debug("Failed to create " + file);
}
return created;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,7 @@ private boolean exists(ChannelSftp channel, Path file) throws IOException {
getFileStatus(channel, file);
return true;
} catch (FileNotFoundException fnfe) {
LOG.debug("File does not exist: " + file, fnfe);
return false;
} catch (IOException ioe) {
throw new IOException(E_FILE_STATUS, ioe);
Expand Down Expand Up @@ -284,6 +285,7 @@ private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission)
try {
final String previousCwd = client.pwd();
client.cd(parentDir);
LOG.debug("Creating directory " + pathName);
client.mkdir(pathName);
client.cd(previousCwd);
} catch (SftpException e) {
Expand All @@ -293,6 +295,11 @@ private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission)
}
} else if (isFile(client, absolute)) {
throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute));
} else {
LOG.debug("Skipping creation of existing directory " + file);
}
if (!created) {
LOG.debug("Failed to create " + file);
}
return created;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -219,6 +219,7 @@ private HadoopConf createHadoopConf(ReadonlyConfig readonlyConfig) {
.getOptional(HiveOptions.HDFS_SITE_PATH)
.ifPresent(hadoopConf::setHdfsSitePath);
readonlyConfig.getOptional(HiveOptions.REMOTE_USER).ifPresent(hadoopConf::setRemoteUser);
readonlyConfig.getOptional(HiveOptions.KRB5_PATH).ifPresent(hadoopConf::setKrb5Path);
readonlyConfig
.getOptional(HiveOptions.KERBEROS_PRINCIPAL)
.ifPresent(hadoopConf::setKerberosPrincipal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,9 @@ private HadoopConf parseHiveHadoopConfig(ReadonlyConfig readonlyConfig, Table ta
readonlyConfig
.getOptional(HdfsSourceConfigOptions.HDFS_SITE_PATH)
.ifPresent(hadoopConf::setHdfsSitePath);
readonlyConfig
.getOptional(HdfsSourceConfigOptions.KRB5_PATH)
.ifPresent(hadoopConf::setKrb5Path);
readonlyConfig
.getOptional(HdfsSourceConfigOptions.KERBEROS_PRINCIPAL)
.ifPresent(hadoopConf::setKerberosPrincipal);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,9 @@ public void open() throws CatalogException {
@Override
public void close() throws CatalogException {
try {
fs.close();
if (fs != null) {
fs.close();
}
} catch (Exception e) {
log.info("Hudi catalog close error.", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Optional;

public class RedisSinkWriter extends AbstractSinkWriter<SeaTunnelRow, Void>
implements SupportMultiTableSinkWriter<Void> {
Expand Down Expand Up @@ -78,8 +79,7 @@ public void write(SeaTunnelRow element) throws IOException {
String value = getValue(element, fields);
valueBuffer.add(value);
if (keyBuffer.size() >= batchSize) {
doBatchWrite();
clearBuffer();
flush();
}
}

Expand Down Expand Up @@ -221,6 +221,16 @@ private void doBatchWrite() {

@Override
public void close() throws IOException {
flush();
}

@Override
public Optional<Void> prepareCommit() {
flush();
return Optional.empty();
}

private synchronized void flush() {
if (!keyBuffer.isEmpty()) {
doBatchWrite();
clearBuffer();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,8 @@ private SeaTunnelRow convertFastLogContent(FastLog log) {
.append("\":\"")
.append(content.getValue())
.append("\","));
jsonStringBuilder.deleteCharAt(jsonStringBuilder.length() - 1); // 删除最后一个逗号
// Remove the last comma
jsonStringBuilder.deleteCharAt(jsonStringBuilder.length() - 1);
jsonStringBuilder.append("}");
// content field
transformedRow.add(jsonStringBuilder.toString());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,10 +115,10 @@ SeaTunnelRow convert(TypesenseRecord rowRecord) {
try {
for (int i = 0; i < rowTypeInfo.getTotalFields(); i++) {
fieldName = rowTypeInfo.getFieldName(i);
value = doc.get(fieldName); // 字段值
value = doc.get(fieldName);
if (value != null) {
seaTunnelDataType =
rowTypeInfo.getFieldType(i); // seaTunnelDataType 为SeaTunnel类型
// seaTunnelDataType is the SeaTunnel type
seaTunnelDataType = rowTypeInfo.getFieldType(i);
seaTunnelFields[i] = convertValue(seaTunnelDataType, value);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.JobIdGenerator;

import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
Expand Down Expand Up @@ -65,6 +66,7 @@
import static org.awaitility.Awaitility.await;
import static org.awaitility.Awaitility.with;
import static org.awaitility.Durations.TWO_SECONDS;
import static org.testcontainers.shaded.org.awaitility.Awaitility.given;

@Slf4j
@TestMethodOrder(MethodOrderer.OrderAnnotation.class)
Expand Down Expand Up @@ -192,17 +194,24 @@ public void testOracleCdc2MysqlWithSchemaEvolutionCase(TestContainer container)
dropTable(ORACLE_CONTAINER.getJdbcUrl(), SCEHMA_NAME, SOURCE_TABLE1);
dropTable(ORACLE_CONTAINER.getJdbcUrl(), SCEHMA_NAME, SOURCE_TABLE1 + "_SINK");
createAndInitialize("full_types", ADMIN_USER, ADMIN_PWD);
String jobId = String.valueOf(JobIdGenerator.newJobId());
CompletableFuture.runAsync(
() -> {
try {
container.executeJob("/oraclecdc_to_mysql_with_schema_change.conf");
container.executeJob("/oraclecdc_to_mysql_with_schema_change.conf", jobId);
} catch (Exception e) {
log.error("Commit task exception :" + e.getMessage());
throw new RuntimeException(e);
}
});

Thread.sleep(10000L);
given().pollDelay(10, TimeUnit.SECONDS)
.await()
.pollDelay(5000L, TimeUnit.MILLISECONDS)
.untilAsserted(
() -> {
Assertions.assertEquals("RUNNING", container.getJobStatus(jobId));
});

assertSchemaEvolution(
ORACLE_CONTAINER.getJdbcUrl(),
Expand Down
Loading

0 comments on commit e3a1522

Please sign in to comment.