diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/DelegatingSparkCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/DelegatingSparkCollection.java index ee9e3512393e..241c57dc5de1 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/DelegatingSparkCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/DelegatingSparkCollection.java @@ -55,7 +55,7 @@ public SparkCollection> transform( @Override public SparkCollection> multiOutputTransform(StageSpec stageSpec, StageStatisticsCollector collector) { - return getDelegate().transform(stageSpec, collector); + return getDelegate().multiOutputTransform(stageSpec, collector); } @Override @@ -94,8 +94,8 @@ public SparkCollection union(SparkCollection other) { @Override public Runnable createStoreTask(StageSpec stageSpec, - SparkSink sink) throws Exception { - return getDelegate().createStoreTask(stageSpec, sink); + SparkSink sink) { + return () -> getDelegate().createStoreTask(stageSpec, sink).run(); } @Override @@ -135,13 +135,13 @@ public SparkCollection compute(StageSpec stageSpec, SparkCompute co @Override public Runnable createStoreTask(StageSpec stageSpec, PairFlatMapFunction sinkFunction) { - return getDelegate().createStoreTask(stageSpec, sinkFunction); + return () -> getDelegate().createStoreTask(stageSpec, sinkFunction).run(); } @Override public Runnable createMultiStoreTask(PhaseSpec phaseSpec, Set group, Set sinks, Map collectors) { - return getDelegate().createMultiStoreTask(phaseSpec, group, sinks, collectors); + return () -> getDelegate().createMultiStoreTask(phaseSpec, group, sinks, collectors).run(); } @Override diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/SparkCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/SparkCollection.java index 2a0a65d27f8a..4c9b37d34eea 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/SparkCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/SparkCollection.java @@ -24,6 +24,8 @@ import io.cdap.cdap.etl.common.RecordInfo; import io.cdap.cdap.etl.common.StageStatisticsCollector; import io.cdap.cdap.etl.proto.v2.spec.StageSpec; +import io.cdap.cdap.etl.spark.function.PluginFunctionContext; +import io.cdap.cdap.etl.spark.function.TransformFunction; import io.cdap.cdap.etl.spark.join.JoinExpressionRequest; import io.cdap.cdap.etl.spark.join.JoinRequest; import org.apache.spark.api.java.function.FlatMapFunction; @@ -81,7 +83,7 @@ SparkCollection> reduceAggregate(StageSpec stageSpec, @Nullab Runnable createMultiStoreTask(PhaseSpec phaseSpec, Set group, Set sinks, Map collectors); - Runnable createStoreTask(StageSpec stageSpec, SparkSink sink) throws Exception; + Runnable createStoreTask(StageSpec stageSpec, SparkSink sink); void publishAlerts(StageSpec stageSpec, StageStatisticsCollector collector) throws Exception; diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java index 29d6c5f07f2b..e586cd98b088 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/DatasetCollection.java @@ -17,25 +17,21 @@ package io.cdap.cdap.etl.spark.batch; import io.cdap.cdap.api.data.DatasetContext; -import io.cdap.cdap.api.data.format.StructuredRecord; -import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.api.spark.JavaSparkExecutionContext; -import io.cdap.cdap.etl.api.batch.SparkCompute; -import io.cdap.cdap.etl.api.batch.SparkExecutionPluginContext; import io.cdap.cdap.etl.common.Constants; -import io.cdap.cdap.etl.common.PipelineRuntime; import io.cdap.cdap.etl.common.RecordInfo; import io.cdap.cdap.etl.common.StageStatisticsCollector; import io.cdap.cdap.etl.proto.v2.spec.StageSpec; import io.cdap.cdap.etl.spark.DelegatingSparkCollection; import io.cdap.cdap.etl.spark.SparkCollection; -import io.cdap.cdap.etl.spark.SparkPipelineRuntime; import io.cdap.cdap.etl.spark.function.DatasetAggregationAccumulator; import io.cdap.cdap.etl.spark.function.DatasetAggregationFinalizeFunction; import io.cdap.cdap.etl.spark.function.DatasetAggregationGetKeyFunction; import io.cdap.cdap.etl.spark.function.DatasetAggregationReduceFunction; import io.cdap.cdap.etl.spark.function.FunctionCache; +import io.cdap.cdap.etl.spark.function.MultiOutputTransformFunction; import io.cdap.cdap.etl.spark.function.PluginFunctionContext; +import io.cdap.cdap.etl.spark.function.TransformFunction; import javax.annotation.Nullable; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaRDD; @@ -176,6 +172,21 @@ public SparkCollection> reduceAggregate(StageSpec stageSpec, return reduceDatasetAggregate(stageSpec, partitions, collector); } + @Override + public SparkCollection> transform(StageSpec stageSpec, StageStatisticsCollector collector) { + PluginFunctionContext pluginFunctionContext = new PluginFunctionContext(stageSpec, sec, collector); + return flatMap(stageSpec, new TransformFunction( + pluginFunctionContext, functionCacheFactory.newCache())); + } + + @Override + public SparkCollection> multiOutputTransform(StageSpec stageSpec, + StageStatisticsCollector collector) { + PluginFunctionContext pluginFunctionContext = new PluginFunctionContext(stageSpec, sec, collector); + return flatMap(stageSpec,new MultiOutputTransformFunction( + pluginFunctionContext, functionCacheFactory.newCache())); + } + /** * Performs reduce aggregate using Dataset API. This allows SPARK to perform various optimizations that * are not available when working on the RDD level. diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java index e2fdb6ba0b96..b25c700b87dd 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/RDDCollection.java @@ -144,16 +144,16 @@ public SparkCollection union(SparkCollection other) { @Override public SparkCollection> transform(StageSpec stageSpec, StageStatisticsCollector collector) { PluginFunctionContext pluginFunctionContext = new PluginFunctionContext(stageSpec, sec, collector); - return wrap(rdd.flatMap(new TransformFunction( - pluginFunctionContext, functionCacheFactory.newCache()))); + return flatMap(stageSpec, new TransformFunction( + pluginFunctionContext, functionCacheFactory.newCache())); } @Override public SparkCollection> multiOutputTransform(StageSpec stageSpec, StageStatisticsCollector collector) { PluginFunctionContext pluginFunctionContext = new PluginFunctionContext(stageSpec, sec, collector); - return wrap(rdd.flatMap(new MultiOutputTransformFunction( - pluginFunctionContext, functionCacheFactory.newCache()))); + return flatMap(stageSpec,new MultiOutputTransformFunction( + pluginFunctionContext, functionCacheFactory.newCache())); } @Override @@ -305,7 +305,7 @@ private void recordLineage(String name) { } @Override - public Runnable createStoreTask(final StageSpec stageSpec, final SparkSink sink) throws Exception { + public Runnable createStoreTask(final StageSpec stageSpec, final SparkSink sink) { return new Runnable() { @Override public void run() { diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SQLEngineCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SQLEngineCollection.java index 2210f98674f0..6640691164bb 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SQLEngineCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/batch/SQLEngineCollection.java @@ -290,7 +290,7 @@ public Runnable createMultiStoreTask(PhaseSpec phaseSpec, Set group, Set } @Override - public Runnable createStoreTask(StageSpec stageSpec, SparkSink sink) throws Exception { + public Runnable createStoreTask(StageSpec stageSpec, SparkSink sink) { return () -> { try { pull().createStoreTask(stageSpec, sink).run(); diff --git a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/streaming/DStreamCollection.java b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/streaming/DStreamCollection.java index a0b960792d62..b23e8bed20b1 100644 --- a/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/streaming/DStreamCollection.java +++ b/cdap-app-templates/cdap-etl/hydrator-spark-core-base/src/main/java/io/cdap/cdap/etl/spark/streaming/DStreamCollection.java @@ -205,7 +205,7 @@ public void run() { } @Override - public Runnable createStoreTask(StageSpec stageSpec, SparkSink sink) throws Exception { + public Runnable createStoreTask(StageSpec stageSpec, SparkSink sink) { return new Runnable() { @Override public void run() {