From 76d77b5d6332f901f8e4dcd2f01a7bd77290c4ee Mon Sep 17 00:00:00 2001 From: ClownXC Date: Tue, 19 Mar 2024 23:29:01 +0800 Subject: [PATCH] wal support asynchronous write --- .../api/common/ImapStorageThreadFactory.java | 50 +++++++++++++++ .../imap/storage/file/IMapFileStorage.java | 36 ++++++----- .../imap/storage/file/common/WALSyncType.java | 26 ++++++++ .../imap/storage/file/common/WALWriter.java | 59 +++++++++++++++++- .../storage/file/disruptor/WALDisruptor.java | 5 +- .../file/disruptor/WALWorkHandler.java | 6 +- .../storage/file/wal/writer/CloudWriter.java | 1 - .../file/common/WALReaderAndWriterTest.java | 61 ++++++++++++++++++- .../file/disruptor/WALDisruptorTest.java | 38 +++++++++++- 9 files changed, 258 insertions(+), 24 deletions(-) create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ImapStorageThreadFactory.java create mode 100644 seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncType.java diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ImapStorageThreadFactory.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ImapStorageThreadFactory.java new file mode 100644 index 00000000000..837131a02fe --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-api/src/main/java/org/apache/seatunnel/engine/imap/storage/api/common/ImapStorageThreadFactory.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.api.common; + +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.atomic.AtomicInteger; + +public class ImapStorageThreadFactory implements ThreadFactory { + private final AtomicInteger poolNumber = new AtomicInteger(1); + private final ThreadGroup group; + private final AtomicInteger threadNumber = new AtomicInteger(1); + + private final String namePrefix; + + public ImapStorageThreadFactory() { + SecurityManager s = System.getSecurityManager(); + group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); + namePrefix = "Imap-StorageThread-" + poolNumber.getAndIncrement() + "-thread-"; + } + + @Override + public Thread newThread(Runnable runnable) { + Thread thread = new Thread(group, runnable, namePrefix + threadNumber.getAndIncrement(), 0); + if (thread.isDaemon()) { + thread.setDaemon(false); + } + if (thread.getPriority() != Thread.NORM_PRIORITY) { + thread.setPriority(Thread.NORM_PRIORITY); + } + return thread; + } +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java index 915981e476d..2caae4c2b10 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/IMapFileStorage.java @@ -25,6 +25,7 @@ import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.common.FileConstants; import org.apache.seatunnel.engine.imap.storage.file.common.WALReader; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType; import org.apache.seatunnel.engine.imap.storage.file.config.AbstractConfiguration; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.disruptor.WALDisruptor; @@ -71,6 +72,8 @@ public class IMapFileStorage implements IMapStorage { private static final String STORAGE_TYPE_KEY = "storage.type"; + private static final String WAL_SYNC_TYPE_KEY = "wal.sync.type"; + public FileSystem fs; public String namespace; @@ -107,6 +110,7 @@ public class IMapFileStorage implements IMapStorage { public static final long DEFAULT_WRITE_DATA_TIMEOUT_MILLISECONDS = 1000 * 60; private Configuration conf; + private WALSyncType walSyncType; private FileConfiguration fileConfiguration; @@ -120,9 +124,16 @@ public void initialize(Map configuration) { String storageType = String.valueOf( - configuration.getOrDefault( - STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString())); - this.fileConfiguration = FileConfiguration.valueOf(storageType.toUpperCase()); + configuration.getOrDefault( + STORAGE_TYPE_KEY, FileConfiguration.HDFS.toString())) + .toUpperCase(); + String walSyncMethod = + String.valueOf( + configuration.getOrDefault( + WAL_SYNC_TYPE_KEY, WALSyncType.SYNC.toString())) + .toUpperCase(); + this.walSyncType = WALSyncType.valueOf(walSyncMethod); + this.fileConfiguration = FileConfiguration.valueOf(storageType); // build configuration AbstractConfiguration fileConfiguration = this.fileConfiguration.getConfiguration(); @@ -156,7 +167,8 @@ public void initialize(Map configuration) { this.walDisruptor = new WALDisruptor( fs, - FileConfiguration.valueOf(storageType.toUpperCase()), + FileConfiguration.valueOf(storageType), + WALSyncType.valueOf(walSyncMethod), businessRootPath + region + DEFAULT_IMAP_FILE_PATH_SPLIT, serializer); } @@ -306,6 +318,9 @@ private long sendToDisruptorQueue(IMapFileData data, WALEventType type) { } private boolean queryExecuteStatus(long requestId) { + if (WALSyncType.ASYNC == walSyncType) { + return true; + } return queryExecuteStatus(requestId, this.writDataTimeoutMilliseconds); } @@ -327,17 +342,8 @@ private boolean queryExecuteStatus(long requestId, long timeout) { private Set batchQueryExecuteFailsStatus( Map requestMap, Set failures) { for (Map.Entry entry : requestMap.entrySet()) { - boolean success = false; - RequestFuture requestFuture = RequestFutureCache.get(entry.getKey()); - try { - if (requestFuture.isDone() || Boolean.TRUE.equals(requestFuture.get())) { - success = true; - } - } catch (Exception e) { - log.error("wait for write status error", e); - } finally { - RequestFutureCache.remove(entry.getKey()); - } + Long requestId = entry.getKey(); + boolean success = queryExecuteStatus(requestId); if (!success) { failures.add(entry.getValue()); } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncType.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncType.java new file mode 100644 index 00000000000..8e7b79e2695 --- /dev/null +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALSyncType.java @@ -0,0 +1,26 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.seatunnel.engine.imap.storage.file.common; + +public enum WALSyncType { + SYNC, + ASYNC +} diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java index 168c13efd58..5eb0888c9d4 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/common/WALWriter.java @@ -20,6 +20,7 @@ package org.apache.seatunnel.engine.imap.storage.file.common; +import org.apache.seatunnel.engine.imap.storage.api.common.ImapStorageThreadFactory; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.wal.DiscoveryWalFileFactory; @@ -29,29 +30,85 @@ import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import lombok.extern.slf4j.Slf4j; + import java.io.IOException; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +@Slf4j public class WALWriter implements AutoCloseable { IFileWriter writer; + private ExecutorService walWriterService; + + Future writeTaskFuture; + + private static final int DEFAULT_THREAD_POOL_MIN_SIZE = + Runtime.getRuntime().availableProcessors() * 2 + 1; + + private static final int DEFAULT_THREAD_POOL_MAX_SIZE = + Runtime.getRuntime().availableProcessors() * 4 + 1; + + private static final int DEFAULT_THREAD_POOL_QUENE_SIZE = 1024; + + private final WALSyncType walSyncType; + public WALWriter( FileSystem fs, FileConfiguration fileConfiguration, + WALSyncType walSyncType, Path parentPath, Serializer serializer) throws IOException { this.writer = DiscoveryWalFileFactory.getWriter(fileConfiguration.getName()); this.writer.setBlockSize(fileConfiguration.getConfiguration().getBlockSize()); this.writer.initialize(fs, parentPath, serializer); + this.walSyncType = walSyncType; + if (WALSyncType.ASYNC == walSyncType) { + this.walWriterService = + new ThreadPoolExecutor( + DEFAULT_THREAD_POOL_MIN_SIZE, + DEFAULT_THREAD_POOL_MAX_SIZE, + 0L, + TimeUnit.MILLISECONDS, + new LinkedBlockingQueue<>(DEFAULT_THREAD_POOL_QUENE_SIZE), + new ImapStorageThreadFactory()); + } } public void write(IMapFileData data) throws IOException { - this.writer.write(data); + + switch (walSyncType) { + case SYNC: + this.writer.write(data); + return; + case ASYNC: + writeTaskFuture = + this.walWriterService.submit( + () -> { + try { + this.writer.write(data); + } catch (Exception e) { + log.error(String.format("store imap failed : %s", data), e); + } + }); + return; + } } @Override public void close() throws Exception { + if (WALSyncType.ASYNC == walSyncType && writeTaskFuture != null) { + writeTaskFuture.cancel(false); + if (walWriterService != null) { + walWriterService.shutdown(); + } + } this.writer.close(); } } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java index 35db6d2842f..063b9742287 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptor.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.serializer.api.Serializer; @@ -62,6 +63,7 @@ public class WALDisruptor implements Closeable { public WALDisruptor( FileSystem fs, FileConfiguration fileConfiguration, + WALSyncType walSyncType, String parentPath, Serializer serializer) { // todo should support multi thread producer @@ -73,9 +75,8 @@ public WALDisruptor( threadFactory, ProducerType.SINGLE, new BlockingWaitStrategy()); - disruptor.handleEventsWithWorkerPool( - new WALWorkHandler(fs, fileConfiguration, parentPath, serializer)); + new WALWorkHandler(fs, fileConfiguration, walSyncType, parentPath, serializer)); disruptor.start(); } diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java index 70c411e44ef..9de8a1b0450 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALWorkHandler.java @@ -22,6 +22,7 @@ import org.apache.seatunnel.engine.imap.storage.api.exception.IMapStorageException; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType; import org.apache.seatunnel.engine.imap.storage.file.common.WALWriter; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; @@ -44,10 +45,13 @@ public class WALWorkHandler implements WorkHandler { public WALWorkHandler( FileSystem fs, FileConfiguration fileConfiguration, + WALSyncType walSyncType, String parentPath, Serializer serializer) { try { - writer = new WALWriter(fs, fileConfiguration, new Path(parentPath), serializer); + writer = + new WALWriter( + fs, fileConfiguration, walSyncType, new Path(parentPath), serializer); } catch (IOException e) { throw new IMapStorageException( e, "create new current writer failed, parent path is %s", parentPath); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java index 48c9cea0156..aec160fb21d 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/main/java/org/apache/seatunnel/engine/imap/storage/file/wal/writer/CloudWriter.java @@ -73,7 +73,6 @@ public void setBlockSize(Long blockSize) { } } - // TODO Synchronous write, asynchronous write can be added in the future @Override public void write(IMapFileData data) throws IOException { byte[] bytes = serializer.serialize(data); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java index 388be848360..c0c1b504d26 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/common/WALReaderAndWriterTest.java @@ -59,8 +59,10 @@ public static void init() throws IOException { } @Test - public void testWriterAndReader() throws Exception { - WALWriter writer = new WALWriter(FS, FileConfiguration.HDFS, PARENT_PATH, SERIALIZER); + public void testSyncWriterAndReader() throws Exception { + WALWriter writer = + new WALWriter( + FS, FileConfiguration.HDFS, WALSyncType.SYNC, PARENT_PATH, SERIALIZER); IMapFileData data; boolean isDelete; for (int i = 0; i < 1024; i++) { @@ -114,6 +116,61 @@ public void testWriterAndReader() throws Exception { Assertions.assertNull(result.get("key519")); } + @Test + public void testAsyncWriterAndReader() throws Exception { + WALWriter writer = + new WALWriter( + FS, FileConfiguration.HDFS, WALSyncType.ASYNC, PARENT_PATH, SERIALIZER); + IMapFileData data; + boolean isDelete; + for (int i = 0; i < 1024; i++) { + data = + IMapFileData.builder() + .key(SERIALIZER.serialize("key" + i)) + .keyClassName(String.class.getName()) + .value(SERIALIZER.serialize("value" + i)) + .valueClassName(Integer.class.getName()) + .timestamp(System.nanoTime()) + .build(); + if (i % 2 == 0) { + isDelete = true; + data.setKey(SERIALIZER.serialize(i)); + data.setKeyClassName(Integer.class.getName()); + } else { + isDelete = false; + } + data.setDeleted(isDelete); + + writer.write(data); + } + // update key 511 + data = + IMapFileData.builder() + .key(SERIALIZER.serialize("key" + 511)) + .keyClassName(String.class.getName()) + .value(SERIALIZER.serialize("Kristen")) + .valueClassName(String.class.getName()) + .deleted(false) + .timestamp(System.nanoTime()) + .build(); + writer.write(data); + // delete key 519 + data = + IMapFileData.builder() + .key(SERIALIZER.serialize("key" + 519)) + .keyClassName(String.class.getName()) + .deleted(true) + .timestamp(System.nanoTime()) + .build(); + + writer.write(data); + writer.close(); + await().atMost(10, java.util.concurrent.TimeUnit.SECONDS).await(); + + WALReader reader = new WALReader(FS, FileConfiguration.HDFS, new ProtoStuffSerializer()); + Map result = reader.loadAllData(PARENT_PATH, new HashSet<>()); + } + @AfterAll public static void close() throws IOException { FS.delete(PARENT_PATH, true); diff --git a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java index a8bc80a89a7..ac60777e569 100644 --- a/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java +++ b/seatunnel-engine/seatunnel-engine-storage/imap-storage-plugins/imap-storage-file/src/test/java/org/apache/seatunnel/engine/imap/storage/file/disruptor/WALDisruptorTest.java @@ -21,6 +21,7 @@ package org.apache.seatunnel.engine.imap.storage.file.disruptor; import org.apache.seatunnel.engine.imap.storage.file.bean.IMapFileData; +import org.apache.seatunnel.engine.imap.storage.file.common.WALSyncType; import org.apache.seatunnel.engine.imap.storage.file.config.FileConfiguration; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFuture; import org.apache.seatunnel.engine.imap.storage.file.future.RequestFutureCache; @@ -58,10 +59,43 @@ public class WALDisruptorTest { } @Test - void testProducerAndConsumer() throws IOException { + void testSyncProducerAndConsumer() throws IOException { FS = FileSystem.get(CONF); DISRUPTOR = - new WALDisruptor(FS, FileConfiguration.HDFS, FILEPATH, new ProtoStuffSerializer()); + new WALDisruptor( + FS, + FileConfiguration.HDFS, + WALSyncType.SYNC, + FILEPATH, + new ProtoStuffSerializer()); + IMapFileData data; + for (int i = 0; i < 100; i++) { + data = + IMapFileData.builder() + .deleted(false) + .key(("key" + i).getBytes()) + .keyClassName(String.class.getName()) + .value(("value" + i).getBytes()) + .valueClassName(String.class.getName()) + .timestamp(System.nanoTime()) + .build(); + long requestId = RequestFutureCache.getRequestId(); + RequestFutureCache.put(requestId, new RequestFuture()); + DISRUPTOR.tryAppendPublish(data, requestId); + } + DISRUPTOR.close(); + } + + @Test + void testAsyncProducerAndConsumer() throws IOException { + FS = FileSystem.get(CONF); + DISRUPTOR = + new WALDisruptor( + FS, + FileConfiguration.HDFS, + WALSyncType.ASYNC, + FILEPATH, + new ProtoStuffSerializer()); IMapFileData data; for (int i = 0; i < 100; i++) { data =