diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.java new file mode 100644 index 00000000000000..784c4272baf9c4 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationChain.java @@ -0,0 +1,216 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing.declare; + +import org.apache.flink.annotation.Experimental; +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.util.function.FunctionWithException; +import org.apache.flink.util.function.ThrowingConsumer; + +import java.util.Deque; +import java.util.LinkedList; + +/** + * A declaration chain allows to declare multiple async operations in a single chain. + * + * @param The type of the input elements. + */ +@Experimental +public class DeclarationChain implements ThrowingConsumer { + + private final DeclarationContext context; + + private final Deque> transformations; + + private DeclarationStage currentStage; + + DeclarationChain(DeclarationContext context) { + this.context = context; + this.transformations = new LinkedList<>(); + } + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public void accept(IN in) throws Exception { + StateFuture future = StateFutureUtils.completedFuture(in); + for (Transformation trans : transformations) { + future = trans.apply(future); + } + } + + public DeclarationStage firstStage() throws DeclarationException { + if (currentStage != null) { + throw new DeclarationException( + "Diverged declaration. Please make sure you call firstStage() once."); + } + DeclarationStage declarationStage = new DeclarationStage<>(); + currentStage = declarationStage; + return declarationStage; + } + + Transformation getLastTransformation() { + return transformations.getLast(); + } + + /** + * A DeclarationStage is a single stage in a declaration chain. It allows a further chaining of + * operations. + * + * @param The output of previous transformations. Will be the input type of further chained + * operation. + */ + public class DeclarationStage { + + private boolean afterThen = false; + + private void preCheck() throws DeclarationException { + if (afterThen) { + throw new DeclarationException( + "Double thenCompose called for single declaration block."); + } + if (currentStage != this) { + throw new DeclarationException( + "Diverged declaration. Please make sure you are declaring on the last point."); + } + afterThen = true; + } + + public DeclarationStage thenCompose( + FunctionWithException, Exception> action) + throws DeclarationException { + preCheck(); + DeclarationStage next = new DeclarationStage<>(); + ComposeTransformation trans = new ComposeTransformation<>(action, next); + transformations.add(trans); + currentStage = next; + getLastTransformation().declare(); + return next; + } + + public DeclarationStage thenAccept(ThrowingConsumer action) + throws DeclarationException { + preCheck(); + DeclarationStage next = new DeclarationStage<>(); + AcceptTransformation trans = new AcceptTransformation<>(action, next); + transformations.add(trans); + currentStage = next; + getLastTransformation().declare(); + return next; + } + + public DeclarationStage withName(String name) throws DeclarationException { + getLastTransformation().withName(name); + return this; + } + + public DeclarationChain finish() throws DeclarationException { + preCheck(); + getLastTransformation().declare(); + return DeclarationChain.this; + } + } + + @Internal + interface Transformation { + StateFuture apply(StateFuture upstream) throws Exception; + + void withName(String name) throws DeclarationException; + + void declare() throws DeclarationException; + } + + private abstract static class AbstractTransformation + implements Transformation { + + String name = null; + + @Override + public void withName(String newName) throws DeclarationException { + if (name != null) { + throw new DeclarationException("Double naming"); + } + name = newName; + declare(); + } + } + + private class ComposeTransformation extends AbstractTransformation { + + DeclarationStage to; + + FunctionWithException, ? extends Exception> action; + + NamedFunction> namedFunction; + + ComposeTransformation( + FunctionWithException, Exception> action, + DeclarationStage to) { + this.action = action; + this.to = to; + } + + @Override + public StateFuture apply(StateFuture upstream) throws Exception { + return upstream.thenCompose(namedFunction); + } + + @Override + public void declare() throws DeclarationException { + if (namedFunction == null) { + if (name == null) { + namedFunction = context.declare(action); + } else { + namedFunction = context.declare(name, action); + } + } + } + } + + private class AcceptTransformation extends AbstractTransformation { + + DeclarationStage to; + + ThrowingConsumer action; + + NamedConsumer namedFunction; + + AcceptTransformation(ThrowingConsumer action, DeclarationStage to) { + this.action = action; + this.to = to; + } + + @Override + public StateFuture apply(StateFuture upstream) throws Exception { + return upstream.thenAccept(namedFunction); + } + + @Override + public void declare() throws DeclarationException { + if (namedFunction == null) { + if (name == null) { + namedFunction = context.declare(action); + } else { + namedFunction = context.declare(name, action); + } + } + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java index a50e117f747375..43608bd472799f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/declare/DeclarationContext.java @@ -93,4 +93,15 @@ public DeclaredVariable declareVariable( throws DeclarationException { return manager.register(serializer, name, initialValue); } + + /** + * Declaring a processing in chain-style. This method start a chain with an input type. + * + * @return the chain itself. + * @param the in type of the first block. + */ + public DeclarationChain.DeclarationStage declareChain() + throws DeclarationException { + return new DeclarationChain(this).firstStage(); + } }