From 98fb34be38874b0f9f631a406e561c043963bf34 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Sat, 30 Mar 2024 20:54:24 +0800 Subject: [PATCH] add license header --- .../imap/storage/file/IMapFileStorage.java | 17 ++++++------ .../storage/file/common/WALSyncMethod.java | 26 +++++++++++++++++++ .../storage/file/common/WALWriteType.java | 6 ----- .../imap/storage/file/common/WALWriter.java | 12 ++++----- .../storage/file/disruptor/WALDisruptor.java | 6 ++--- .../file/disruptor/WALWorkHandler.java | 6 ++--- .../file/common/WALReaderAndWriterTest.java | 4 +-- .../file/disruptor/WALDisruptorTest.java | 6 ++--- 8 files changed, 52 insertions(+), 31 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncMethod.java delete mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriteType.java diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java index e1257e6c276c..a3746e37b254 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java @@ -25,7 +25,7 @@ import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; import org.apache.seatunnel.engine.imap.storage.file.common.WALReader; -import org.apache.seatunnel.engine.imap.storage.file.common.WALWriteType; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncMethod; import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor; @@ -72,7 +72,7 @@ public class IMapFileStorage implements IMapStorage { private static final String STORAGE_TYPE_KEY = "storage.type"; - private static final String WAL_TYPE_KEY = "wal.type"; + private static final String WAL_SYNC_METHOD_KEY = "wal.sync.method"; public FileSystem fs; @@ -110,7 +110,7 @@ public class IMapFileStorage implements IMapStorage { public static final long DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS = 1000 * 60; private Configuration conf; - private WALWriteType walWriteType; + private WALSyncMethod walSyncMethod; private FileConfiguration fileConfiguration; @@ -126,10 +126,11 @@ public void initialize(Map configuration) { String.valueOf( configuration.getOrDefault( STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString())); - String walType = + String walSyncMethod = String.valueOf( - configuration.getOrDefault(WAL_TYPE_KEY, WALWriteType.SYNC.toString())); - this.walWriteType = WALWriteType.valueOf(walType.toUpperCase()); + configuration.getOrDefault( + WAL_SYNC_METHOD_KEY, WALSyncMethod.SYNC.toString())); + this.walSyncMethod = WALSyncMethod.valueOf(walSyncMethod.toUpperCase()); this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase()); // build configuration AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration(); @@ -165,7 +166,7 @@ public void initialize(Map configuration) { new WALDisruptor( fs, FileConfiguration.valueOf(storageType.toUpperCase()), - WALWriteType.valueOf(walType.toUpperCase()), + WALSyncMethod.valueOf(walSyncMethod.toUpperCase()), businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, serializer); } @@ -315,7 +316,7 @@ private long sendToDisruptorQueue(IMapFileData data, WALEventType type) { } private boolean queryExecuteStatus(long requestId) { - if (WALWriteType.ASYNC == walWriteType) { + if (WALSyncMethod.ASYNC == walSyncMethod) { return true; } return queryExecuteStatus(requestId, this.writDataTimeoutMilliseconds); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncMethod.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncMethod.java new file mode 100644 index 000000000000..b3f06ed6618e --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncMethod.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +public enum WALSyncMethod { + SYNC, + ASYNC +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriteType.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriteType.java deleted file mode 100644 index c93ad49e441b..000000000000 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriteType.java +++ /dev/null @@ -1,6 +0,0 @@ -package org.apache.seatunnel.engine.imap.storage.file.common; - -public enum WALWriteType { - SYNC, - ASYNC -} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java index 06f91328e7ca..97417ad1f291 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java @@ -56,20 +56,20 @@ public class WALWriter implements AutoCloseable { private static final int DEFAULT_THREAD_POOL_QUENE_SIZE = 1024; - private WALWriteType walWriteType; + private WALSyncMethod walSyncMethod; public WALWriter( FileSystem fs, FileConfiguration fileConfiguration, - WALWriteType walWriteType, + WALSyncMethod walSyncMethod, Path parentPath, Serializer serializer) throws IOException { this.writer = DiscoveryWalFileFactory.getWriter(fileConfiguration.getName()); this.writer.setBlockSize(fileConfiguration.getConfiguration().getBlockSize()); this.writer.initialize(fs, parentPath, serializer); - this.walWriteType = walWriteType; - if (WALWriteType.ASYNC == walWriteType) { + this.walSyncMethod = walSyncMethod; + if (WALSyncMethod.ASYNC == walSyncMethod) { this.walWriterService = new ThreadPoolExecutor( DEFAULT_THREAD_POOL_MIN_SIZE, @@ -83,7 +83,7 @@ public WALWriter( public void write(IMapFileData data) throws IOException { - switch (walWriteType) { + switch (walSyncMethod) { case SYNC: this.writer.write(data); return; @@ -103,7 +103,7 @@ public void write(IMapFileData data) throws IOException { @Override public void close() throws Exception { - if (WALWriteType.ASYNC == walWriteType && writeTaskFuture != null) { + if (WALSyncMethod.ASYNC == walSyncMethod && writeTaskFuture != null) { writeTaskFuture.get(); writeTaskFuture.cancel(false); if (walWriterService != null) { diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java index 42a1d25acb2b..eceffe6a4545 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; -import org.apache.seatunnel.engine.imap.storage.file.common.WALWriteType; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncMethod; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.serializer.api.Serializer; @@ -63,7 +63,7 @@ public class WALDisruptor implements Closeable { public WALDisruptor( FileSystem fs, FileConfiguration fileConfiguration, - WALWriteType walWriteType, + WALSyncMethod walSyncMethod, String parentPath, Serializer serializer) { // todo should support multi thread producer @@ -76,7 +76,7 @@ public WALDisruptor( ProducerType.SINGLE, new BlockingWaitStrategy()); disruptor.handleEventsWithWorkerPool( - new WALWorkHandler(fs, fileConfiguration, walWriteType, parentPath, serializer)); + new WALWorkHandler(fs, fileConfiguration, walSyncMethod, parentPath, serializer)); disruptor.start(); } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java index 38e5b8b68dfe..16d33fd46ae1 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java @@ -22,7 +22,7 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; -import org.apache.seatunnel.engine.imap.storage.file.common.WALWriteType; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncMethod; import org.apache.seatunnel.engine.imap.storage.file.common.WALWriter; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; @@ -45,13 +45,13 @@ public class WALWorkHandler implements WorkHandler { public WALWorkHandler( FileSystem fs, FileConfiguration fileConfiguration, - WALWriteType walWriteType, + WALSyncMethod walSyncMethod, String parentPath, Serializer serializer) { try { writer = new WALWriter( - fs, fileConfiguration, walWriteType, new Path(parentPath), serializer); + fs, fileConfiguration, walSyncMethod, new Path(parentPath), serializer); } catch (IOException e) { throw new IMapStorageException( e, "create new current writer failed, parent path is %s", parentPath); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java index 863df2ced229..7353d0dfeae9 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java @@ -62,7 +62,7 @@ public static void init() throws IOException { public void testSyncWriterAndReader() throws Exception { WALWriter writer = new WALWriter( - FS, FileConfiguration.HDFS, WALWriteType.SYNC, PARENT_PATH, SERIALIZER); + FS, FileConfiguration.HDFS, WALSyncMethod.SYNC, PARENT_PATH, SERIALIZER); IMapFileData data; boolean isDelete; for (int i = 0; i < 1024; i++) { @@ -120,7 +120,7 @@ public void testSyncWriterAndReader() throws Exception { public void testAsyncWriterAndReader() throws Exception { WALWriter writer = new WALWriter( - FS, FileConfiguration.HDFS, WALWriteType.ASYNC, PARENT_PATH, SERIALIZER); + FS, FileConfiguration.HDFS, WALSyncMethod.ASYNC, PARENT_PATH, SERIALIZER); IMapFileData data; boolean isDelete; for (int i = 0; i < 1024; i++) { diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java index e4745eaad874..979717d8cad2 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java @@ -21,7 +21,7 @@ package org.apache.seatunnel.engine.imap.storage.file.disruptor; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; -import org.apache.seatunnel.engine.imap.storage.file.common.WALWriteType; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncMethod; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; @@ -65,7 +65,7 @@ void testSyncProducerAndConsumer() throws IOException { new WALDisruptor( FS, FileConfiguration.HDFS, - WALWriteType.SYNC, + WALSyncMethod.SYNC, FILEPATH, new ProtoStuffSerializer()); IMapFileData data; @@ -93,7 +93,7 @@ void testAsyncProducerAndConsumer() throws IOException { new WALDisruptor( FS, FileConfiguration.HDFS, - WALWriteType.ASYNC, + WALSyncMethod.ASYNC, FILEPATH, new ProtoStuffSerializer()); IMapFileData data;