From 377331af0d1240dd7968eac41b5f4dbddaf8fc0b Mon Sep 17 00:00:00 2001 From: ClownXC Date: Wed, 3 Apr 2024 08:54:42 +0800 Subject: [PATCH 1/3] remove plugin lifecycle --- .../api/transform/SeaTunnelTransform.java | 19 ++-------- .../execution/TransformExecuteProcessor.java | 28 +++++++-------- .../engine/core/parse/JobConfigParser.java | 35 ------------------- .../parse/MultipleTableJobConfigParser.java | 20 +++-------- .../engine/server/master/JobMetricsTest.java | 10 +++--- .../AbstractCatalogSupportTransform.java | 10 ------ .../common/AbstractSeaTunnelTransform.java | 6 ---- 7 files changed, 26 insertions(+), 102 deletions(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java index a7ccd081cee..28cd279abe1 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.api.transform; import org.apache.seatunnel.api.common.PluginIdentifierInterface; -import org.apache.seatunnel.api.common.SeaTunnelPluginLifeCycle; import org.apache.seatunnel.api.source.SeaTunnelJobAware; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; @@ -26,10 +25,7 @@ import java.io.Serializable; public interface SeaTunnelTransform - extends Serializable, - PluginIdentifierInterface, - SeaTunnelPluginLifeCycle, - SeaTunnelJobAware { + extends Serializable, PluginIdentifierInterface, SeaTunnelJobAware { /** call it when Transformer initialed */ default void open() {} @@ -45,22 +41,13 @@ default void setTypeInfo(SeaTunnelDataType inputDataType) { throw new UnsupportedOperationException("setTypeInfo method is not supported"); } - /** - * Get the data type of the records produced by this transform. - * - * @deprecated Please use {@link #getProducedCatalogTable} - * @return Produced data type. - */ - @Deprecated - SeaTunnelDataType getProducedType(); - /** Get the catalog table output by this transform */ CatalogTable getProducedCatalogTable(); /** - * Transform input data to {@link this#getProducedType()} types data. + * Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types data. * - * @param row the data need be transform. + * @param row the data need be transformed. * @return transformed data. */ T map(T row); diff --git a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java index 450599ff7b1..d91bb9d3da7 100644 --- a/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java +++ b/seatunnel-core/seatunnel-flink-starter/seatunnel-flink-starter-common/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java @@ -37,7 +37,6 @@ import org.apache.flink.api.common.typeinfo.TypeInformation; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.types.Row; -import org.apache.flink.util.Collector; import java.net.URL; import java.util.Collections; @@ -119,24 +118,25 @@ public List execute(List upstreamDataS protected DataStream flinkTransform( SeaTunnelRowType sourceType, SeaTunnelTransform transform, DataStream stream) { - TypeInformation rowTypeInfo = TypeConverterUtils.convert(transform.getProducedType()); + TypeInformation rowTypeInfo = + TypeConverterUtils.convert( + transform.getProducedCatalogTable().getSeaTunnelRowType()); FlinkRowConverter transformInputRowConverter = new FlinkRowConverter(sourceType); FlinkRowConverter transformOutputRowConverter = new FlinkRowConverter(transform.getProducedCatalogTable().getSeaTunnelRowType()); DataStream output = stream.flatMap( - new FlatMapFunction() { - @Override - public void flatMap(Row value, Collector out) throws Exception { - SeaTunnelRow seaTunnelRow = - transformInputRowConverter.reconvert(value); - SeaTunnelRow dataRow = (SeaTunnelRow) transform.map(seaTunnelRow); - if (dataRow != null) { - Row copy = transformOutputRowConverter.convert(dataRow); - out.collect(copy); - } - } - }, + (FlatMapFunction) + (value, out) -> { + SeaTunnelRow seaTunnelRow = + transformInputRowConverter.reconvert(value); + SeaTunnelRow dataRow = + (SeaTunnelRow) transform.map(seaTunnelRow); + if (dataRow != null) { + Row copy = transformOutputRowConverter.convert(dataRow); + out.collect(copy); + } + }, rowTypeInfo); return output; } diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java index 2ef1a28aff8..981b85049aa 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/JobConfigParser.java @@ -23,10 +23,8 @@ import org.apache.seatunnel.api.source.SeaTunnelSource; import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.CatalogTableUtil; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; -import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.constants.CollectionConstants; import org.apache.seatunnel.core.starter.execution.PluginUtil; import org.apache.seatunnel.engine.common.config.JobConfig; @@ -34,7 +32,6 @@ import org.apache.seatunnel.engine.core.dag.actions.Action; import org.apache.seatunnel.engine.core.dag.actions.SinkAction; import org.apache.seatunnel.engine.core.dag.actions.SourceAction; -import org.apache.seatunnel.engine.core.dag.actions.TransformAction; import org.apache.commons.lang3.tuple.ImmutablePair; @@ -99,38 +96,6 @@ public Tuple2 parseSource( return new Tuple2<>(catalogTable, action); } - public Tuple2 parseTransform( - Config config, - JobConfig jobConfig, - String tableId, - int parallelism, - SeaTunnelRowType rowType, - Set inputActions) { - final ImmutablePair, Set> tuple = - ConnectorInstanceLoader.loadTransformInstance( - config, jobConfig.getJobContext(), commonPluginJars); - final SeaTunnelTransform transform = tuple.getLeft(); - // old logic: prepare(initialization) -> set job context -> set row type (There is a logical - // judgment that depends on before and after, not a simple set) - transform.prepare(config); - transform.setJobContext(jobConfig.getJobContext()); - transform.setTypeInfo((SeaTunnelDataType) rowType); - final String actionName = createTransformActionName(0, tuple.getLeft().getPluginName()); - final TransformAction action = - new TransformAction( - idGenerator.getNextId(), - actionName, - new ArrayList<>(inputActions), - transform, - tuple.getRight(), - new HashSet<>()); - action.setParallelism(parallelism); - CatalogTable catalogTable = - CatalogTableUtil.getCatalogTable( - tableId, (SeaTunnelRowType) transform.getProducedType()); - return new Tuple2<>(catalogTable, action); - } - public List> parseSinks( int configIndex, List>> inputVertices, diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index f988f293a5f..a7f486c5913 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -36,7 +36,6 @@ import org.apache.seatunnel.api.table.factory.TableSourceFactory; import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.Constants; import org.apache.seatunnel.common.config.TypesafeConfigUtils; @@ -421,19 +420,6 @@ private void parseTransform( int spareParallelism = inputs.get(0)._2().getParallelism(); int parallelism = readonlyConfig.getOptional(CommonOptions.PARALLELISM).orElse(spareParallelism); - if (fallback) { - Tuple2 tuple = - fallbackParser.parseTransform( - config, - jobConfig, - tableId, - parallelism, - (SeaTunnelRowType) expectedType, - inputActions); - tableWithActionMap.put(tableId, Collections.singletonList(tuple)); - return; - } - CatalogTable catalogTable = inputs.get(0)._1(); SeaTunnelTransform transform = FactoryUtil.createAndPrepareTransform( @@ -476,8 +462,10 @@ public static SeaTunnelDataType getProducedType(Action action) { .getProducedCatalogTable() .getSeaTunnelRowType(); } catch (UnsupportedOperationException e) { - // TODO remove it when all connector use `getProducedCatalogTables` - return ((TransformAction) action).getTransform().getProducedType(); + return ((TransformAction) action) + .getTransform() + .getProducedCatalogTable() + .getSeaTunnelRowType(); } } throw new UnsupportedOperationException(); diff --git a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java index 3b81f2a6555..ed12a565d71 100644 --- a/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java +++ b/seatunnel-engine/seatunnel-engine-server/src/test/java/org/apache/seatunnel/engine/server/master/JobMetricsTest.java @@ -137,7 +137,7 @@ public void testMetricsOnJobRestart() throws InterruptedException { server.getCoordinatorService().getJobStatus(jobId3))); // check metrics - await().atMost(60000, TimeUnit.MILLISECONDS) + await().atMost(600000, TimeUnit.MILLISECONDS) .untilAsserted( () -> { JobMetrics jobMetrics = coordinatorService.getJobMetrics(jobId3); @@ -161,12 +161,12 @@ public void testMetricsOnJobRestart() throws InterruptedException { server.getCoordinatorService().cancelJob(jobId3); } - private void startJob(Long jobid, String path, boolean isStartWithSavePoint) { - LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobid.toString(), jobid); + private void startJob(Long jobId, String path, boolean isStartWithSavePoint) { + LogicalDag testLogicalDag = TestUtils.createTestLogicalPlan(path, jobId.toString(), jobId); JobImmutableInformation jobImmutableInformation = new JobImmutableInformation( - jobid, + jobId, "Test", isStartWithSavePoint, nodeEngine.getSerializationService().toData(testLogicalDag), @@ -177,7 +177,7 @@ private void startJob(Long jobid, String path, boolean isStartWithSavePoint) { Data data = nodeEngine.getSerializationService().toData(jobImmutableInformation); PassiveCompletableFuture voidPassiveCompletableFuture = - server.getCoordinatorService().submitJob(jobid, data); + server.getCoordinatorService().submitJob(jobId, data); voidPassiveCompletableFuture.join(); } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java index 78fe02094f6..5670bcc1296 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractCatalogSupportTransform.java @@ -20,8 +20,6 @@ import org.apache.seatunnel.api.table.catalog.CatalogTable; import org.apache.seatunnel.api.table.catalog.TableIdentifier; import org.apache.seatunnel.api.table.catalog.TableSchema; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; -import org.apache.seatunnel.api.table.type.SeaTunnelRow; import lombok.NonNull; @@ -61,12 +59,4 @@ private CatalogTable transformCatalogTable() { protected abstract TableSchema transformTableSchema(); protected abstract TableIdentifier transformTableIdentifier(); - - @Override - public SeaTunnelDataType getProducedType() { - if (outputRowType != null) { - return outputRowType; - } - return getProducedCatalogTable().getTableSchema().toPhysicalRowDataType(); - } } diff --git a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java index b710034cad9..1892881c277 100644 --- a/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java +++ b/seatunnel-transforms-v2/src/main/java/org/apache/seatunnel/transform/common/AbstractSeaTunnelTransform.java @@ -18,7 +18,6 @@ package org.apache.seatunnel.transform.common; import org.apache.seatunnel.api.table.catalog.CatalogTable; -import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.table.type.SeaTunnelRow; import org.apache.seatunnel.api.table.type.SeaTunnelRowType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; @@ -30,11 +29,6 @@ public abstract class AbstractSeaTunnelTransform implements SeaTunnelTransform getProducedType() { - return outputRowType; - } - @Override public SeaTunnelRow map(SeaTunnelRow row) { return transformRow(row); From 4a16fbb5bb09d0fbbdf412aa68fc9d9795300f19 Mon Sep 17 00:00:00 2001 From: ClownXC Date: Wed, 3 Apr 2024 09:15:48 +0800 Subject: [PATCH 2/3] remove plugin lifecycle --- .../org/apache/seatunnel/api/transform/SeaTunnelTransform.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java index 28cd279abe1..a64e1b7c7d5 100644 --- a/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java +++ b/seatunnel-api/src/main/java/org/apache/seatunnel/api/transform/SeaTunnelTransform.java @@ -45,7 +45,8 @@ default void setTypeInfo(SeaTunnelDataType inputDataType) { CatalogTable getProducedCatalogTable(); /** - * Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types data. + * Transform input data to {@link this#getProducedCatalogTable().getSeaTunnelRowType()} types + * data. * * @param row the data need be transformed. * @return transformed data. From f0b9a62d30eb127299218d79639f901f85ac9caa Mon Sep 17 00:00:00 2001 From: ClownXC Date: Wed, 3 Apr 2024 09:22:51 +0800 Subject: [PATCH 3/3] remove transform fallback --- .../engine/core/parse/MultipleTableJobConfigParser.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java index a7f486c5913..5a26cf4619d 100644 --- a/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java +++ b/seatunnel-engine/seatunnel-engine-core/src/main/java/org/apache/seatunnel/engine/core/parse/MultipleTableJobConfigParser.java @@ -34,7 +34,6 @@ import org.apache.seatunnel.api.table.factory.FactoryUtil; import org.apache.seatunnel.api.table.factory.TableSinkFactory; import org.apache.seatunnel.api.table.factory.TableSourceFactory; -import org.apache.seatunnel.api.table.factory.TableTransformFactory; import org.apache.seatunnel.api.table.type.SeaTunnelDataType; import org.apache.seatunnel.api.transform.SeaTunnelTransform; import org.apache.seatunnel.common.Constants; @@ -404,18 +403,10 @@ private void parseTransform( final String tableId = readonlyConfig.getOptional(CommonOptions.RESULT_TABLE_NAME).orElse(DEFAULT_ID); - boolean fallback = - isFallback( - classLoader, - TableTransformFactory.class, - factoryId, - (factory) -> factory.createTransform(null)); - Set inputActions = inputs.stream() .map(Tuple2::_2) .collect(Collectors.toCollection(LinkedHashSet::new)); - SeaTunnelDataType expectedType = getProducedType(inputs.get(0)._2()); checkProducedTypeEquals(inputActions); int spareParallelism = inputs.get(0)._2().getParallelism(); int parallelism =