diff --git a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java index 615b8e78c32317..13b7de90ceebce 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/CompletedStateFuture.java @@ -67,6 +67,11 @@ public StateFuture thenCombine( }); } + @Override + public void complete(T result) { + throw new UnsupportedOperationException("This state future has already been completed."); + } + @Override public void thenSyncAccept(Consumer action) { action.accept(result); diff --git a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java index 7fd6d7485fff67..bbbd4a94f71604 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/InternalStateFuture.java @@ -29,6 +29,9 @@ @Internal public interface InternalStateFuture extends StateFuture { + /** Complete this future. */ + void complete(T result); + /** * Accept the action in the same thread with the one of complete (or current thread if it has * been completed). diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index dfae92c845fbea..63de46cc91e638 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -40,11 +40,11 @@ @Internal public class StateFutureImpl implements InternalStateFuture { - /** The future holds the result. The completes in async threads. */ - CompletableFuture completableFuture; + /** The future holds the result. This may complete in async threads. */ + private final CompletableFuture completableFuture; /** The callback runner. */ - CallbackRunner callbackRunner; + protected final CallbackRunner callbackRunner; public StateFutureImpl(CallbackRunner callbackRunner) { this.completableFuture = new CompletableFuture<>(); @@ -53,17 +53,20 @@ public StateFutureImpl(CallbackRunner callbackRunner) { @Override public StateFuture thenApply(Function fn) { + callbackRegistered(); try { if (completableFuture.isDone()) { U r = fn.apply(completableFuture.get()); + callbackFinished(); return StateFutureUtils.completedFuture(r); } else { - StateFutureImpl ret = new StateFutureImpl<>(callbackRunner); + StateFutureImpl ret = makeNewStateFuture(); completableFuture.thenAccept( (t) -> { callbackRunner.submit( () -> { ret.complete(fn.apply(t)); + callbackFinished(); }); }); return ret; @@ -75,18 +78,21 @@ public StateFuture thenApply(Function fn) { @Override public StateFuture thenAccept(Consumer action) { + callbackRegistered(); try { if (completableFuture.isDone()) { action.accept(completableFuture.get()); + callbackFinished(); return StateFutureUtils.completedVoidFuture(); } else { - StateFutureImpl ret = new StateFutureImpl<>(callbackRunner); + StateFutureImpl ret = makeNewStateFuture(); completableFuture.thenAccept( (t) -> { callbackRunner.submit( () -> { action.accept(t); ret.complete(null); + callbackFinished(); }); }); return ret; @@ -98,17 +104,20 @@ public StateFuture thenAccept(Consumer action) { @Override public StateFuture thenCompose(Function> action) { + callbackRegistered(); try { if (completableFuture.isDone()) { + callbackFinished(); return action.apply(completableFuture.get()); } else { - StateFutureImpl ret = new StateFutureImpl<>(callbackRunner); + StateFutureImpl ret = makeNewStateFuture(); completableFuture.thenAccept( (t) -> { callbackRunner.submit( () -> { StateFuture su = action.apply(t); su.thenAccept(ret::complete); + callbackFinished(); }); }); return ret; @@ -121,12 +130,14 @@ public StateFuture thenCompose(Function StateFuture thenCombine( StateFuture other, BiFunction fn) { + callbackRegistered(); try { if (completableFuture.isDone()) { return other.thenCompose( (u) -> { try { V v = fn.apply(completableFuture.get(), u); + callbackFinished(); return StateFutureUtils.completedFuture(v); } catch (Throwable e) { throw new FlinkRuntimeException( @@ -134,7 +145,7 @@ public StateFuture thenCombine( } }); } else { - StateFutureImpl ret = new StateFutureImpl<>(callbackRunner); + StateFutureImpl ret = makeNewStateFuture(); ((InternalStateFuture) other) .thenSyncAccept( (u) -> { @@ -143,6 +154,7 @@ public StateFuture thenCombine( callbackRunner.submit( () -> { ret.complete(fn.apply(t, u)); + callbackFinished(); }); }); }); @@ -153,12 +165,34 @@ public StateFuture thenCombine( } } + /** + * Make a new future based on context of this future. + * + * @return the new created future. + */ public StateFutureImpl makeNewStateFuture() { return new StateFutureImpl<>(callbackRunner); } + @Override public void complete(T result) { completableFuture.complete(result); + postComplete(); + } + + /** Will be triggered when a callback is registered. */ + public void callbackRegistered() { + // does nothing by default. + } + + /** Will be triggered when this future completes. */ + public void postComplete() { + // does nothing by default. + } + + /** Will be triggered when a callback finishes processing. */ + public void callbackFinished() { + // does nothing by default. } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java new file mode 100644 index 00000000000000..e9073e5e7b4cef --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionController.java @@ -0,0 +1,144 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import javax.annotation.Nullable; + +/** + * The Async Execution Controller (AEC) receives processing requests from operators, and put them + * into execution according to some strategies. + * + *

It is responsible for: + *

  • Preserving the sequence of elements bearing the same key by delaying subsequent requests + * until the processing of preceding ones is finalized. + *
  • Tracking the in-flight data(records) and blocking the input if too much data in flight + * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, + * allowing for the execution of callbacks (mails in Mailbox). + * + * @param the type of the record + * @param the type of the key + */ +public class AsyncExecutionController { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); + + public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; + + /** The max allow number of in-flight records. */ + private final int maxInFlightRecordNum; + + /** The key accounting unit which is used to detect the key conflict. */ + final KeyAccountingUnit keyAccountingUnit; + + /** + * A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto + * wire the created future with mailbox executor. Also conducting the context switch. + */ + private final StateFutureFactory stateFutureFactory; + + /** The state executor where the {@link StateRequest} is actually executed. */ + final StateExecutor stateExecutor; + + /** The corresponding context that currently runs in task thread. */ + RecordContext currentContext; + + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { + this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); + } + + public AsyncExecutionController( + MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { + this.keyAccountingUnit = new KeyAccountingUnit<>(); + this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor); + this.stateExecutor = stateExecutor; + this.maxInFlightRecordNum = maxInFlightRecords; + LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); + } + + /** + * Build a new context based on record and key. Also wired with internal {@link + * KeyAccountingUnit}. + * + * @param record the given record. + * @param key the given key. + * @return the built record context. + */ + public RecordContext buildContext(R record, K key) { + return new RecordContext<>(keyAccountingUnit, record, key); + } + + /** + * Each time before a code segment (callback) is about to run in mailbox (task thread), this + * method should be called to switch a context in AEC. + * + * @param switchingContext the context to switch. + */ + public void setCurrentContext(RecordContext switchingContext) { + currentContext = switchingContext; + } + + /** + * Submit a {@link StateRequest} to this AEC and trigger if needed. + * + * @param state the state to request. + * @param type the type of this request. + * @param payload the payload input for this request. + * @return the state future. + */ + public InternalStateFuture handleRequest( + @Nullable State state, StateRequestType type, @Nullable IN payload) { + // Step 1: build state future & assign context. + InternalStateFuture stateFuture = stateFutureFactory.build(currentContext); + StateRequest request = + new StateRequest<>(state, type, payload, stateFuture, currentContext); + // Step 2: try to occupy the key and place it into right buffer. + if (currentContext.tryOccupyKey()) { + insertActiveBuffer(request); + } else { + insertBlockingBuffer(request); + } + // Step 3: trigger the (active) buffer if needed. + triggerIfNeeded(false); + return stateFuture; + } + + void insertActiveBuffer(StateRequest request) { + // TODO: implement the active buffer. + } + + void insertBlockingBuffer(StateRequest request) { + // TODO: implement the blocking buffer. + } + + /** + * Trigger a batch of requests. + * + * @param force whether to trigger requests in force. + */ + void triggerIfNeeded(boolean force) { + // TODO: implement the trigger logic. + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java new file mode 100644 index 00000000000000..f1f182b25fe6b8 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ContextStateFutureImpl.java @@ -0,0 +1,75 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.core.state.StateFutureImpl; + +/** + * A state future that holds the {@link RecordContext} and maintains the reference count of it. The + * reason why we maintain the reference here is that the ContextStateFutureImpl can be created + * multiple times since user may chain their code wildly, some of which are only for internal usage + * (See {@link StateFutureImpl}). So maintaining reference counting by the lifecycle of state future + * is relatively simple and less error-prone. + * + *

    Reference counting added on {@link RecordContext} follows: + *

  • 1. +1 when this future created. + *
  • 2. -1 when future completed. + *
  • 3. +1 when callback registered. + *
  • 4. -1 when callback finished. + */ +public class ContextStateFutureImpl extends StateFutureImpl { + + private final RecordContext recordContext; + + ContextStateFutureImpl(CallbackRunner callbackRunner, RecordContext recordContext) { + super(callbackRunner); + this.recordContext = recordContext; + // When state request submitted, ref count +1, as described in FLIP-425: + // To cover the statements without a callback, in addition to the reference count marked + // in Fig.5, each state request itself is also protected by a paired reference count. + recordContext.retain(); + } + + @Override + public StateFutureImpl makeNewStateFuture() { + return new ContextStateFutureImpl<>(callbackRunner, recordContext); + } + + @Override + public void callbackRegistered() { + // When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the + // ref count +1. + recordContext.retain(); + } + + @Override + public void postComplete() { + // When a state request completes, ref count -1, as described in FLIP-425: + // To cover the statements without a callback, in addition to the reference count marked + // in Fig.5, each state request itself is also protected by a paired reference count. + recordContext.release(); + } + + @Override + public void callbackFinished() { + // When a callback ends, as shown in Fig.5 of FLIP-425, at the + // point of 2,4 and 6, the ref count -1. + recordContext.release(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java similarity index 60% rename from flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java index 984c53aacb1227..9c08c920e7dc2d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnit.java @@ -16,12 +16,9 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskprocessing; +package org.apache.flink.runtime.asyncprocessing; -import org.apache.flink.annotation.Internal; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; +import org.apache.flink.annotation.VisibleForTesting; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; @@ -34,11 +31,8 @@ * @param the type of record * @param the type of key */ -@Internal public class KeyAccountingUnit { - private static final Logger LOG = LoggerFactory.getLogger(KeyAccountingUnit.class); - /** The in-flight records that are being processed, their keys are different from each other. */ private final Map noConflictInFlightRecords; @@ -47,36 +41,27 @@ public KeyAccountingUnit() { } /** - * Check if the record is available for processing. This method should be called in main task - * thread. For the same record, this method can be reentered. + * Occupy a key for processing, the subsequent records with the same key would be blocked until + * the previous key release. * - * @param record the record - * @param key the key inside the record - * @return true if the key is available + * @return true if no one is occupying this key, and this record succeeds to take it. */ - public boolean available(R record, K key) { - if (noConflictInFlightRecords.containsKey(key)) { - return noConflictInFlightRecords.get(key) == record; - } - return true; + public boolean occupy(R record, K key) { + return noConflictInFlightRecords.putIfAbsent(key, record) == null; } - /** - * Occupy a key for processing, the subsequent records with the same key would be blocked until - * the previous key release. - */ - public void occupy(R record, K key) { - if (!available(record, key)) { + /** Release a key, which is invoked when a {@link RecordContext} is released. */ + public void release(R record, K key) { + if (noConflictInFlightRecords.remove(key) != record) { throw new IllegalStateException( - String.format("The record %s(%s) is already occupied.", record, key)); + String.format( + "The record %s(%s) is trying to release key which it actually does not hold.", + record, key)); } - noConflictInFlightRecords.put(key, record); - LOG.trace("occupy key {} for record {}", key, record); } - /** Release a key, which is invoked when a {@link RecordContext} is released. */ - public void release(R record, K key) { - R existingRecord = noConflictInFlightRecords.remove(key); - LOG.trace("release key {} for record {}, existing record {}", key, record, existingRecord); + @VisibleForTesting + public int occupiedCount() { + return noConflictInFlightRecords.size(); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java similarity index 56% rename from flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java index 0375930db7b141..7d73494380cbae 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/RecordContext.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskprocessing; - -import org.apache.flink.annotation.Internal; +package org.apache.flink.runtime.asyncprocessing; import java.util.Objects; @@ -26,12 +24,12 @@ * A context that preserves the necessary variables required by each operation, all operations for * one record will share the same element context. * - * @param The type of the record that extends {@link - * org.apache.flink.streaming.runtime.streamrecord.StreamElement}. TODO(FLIP-409): move - * StreamElement to flink-core or flink-runtime module. + *

    Reference counting mechanism, please refer to {@link ContextStateFutureImpl}. + * + * @param The type of the record that extends {@code + * org.apache.flink.streaming.runtime.streamrecord.StreamElement}. * @param The type of the key inside the record. */ -@Internal public class RecordContext extends ReferenceCounted { /** The record to be processed. */ @@ -40,19 +38,45 @@ public class RecordContext extends ReferenceCounted { /** The key inside the record. */ private final K key; - public RecordContext(R record, K key) { + /** Whether this Record(Context) has occupied the corresponding key. */ + private volatile boolean keyOccupied; + + /** + * The KeyAccountingUnit that this record context will interact with. The reason why it keeps + * this reference here is that it would release the key within {@link + * #referenceCountReachedZero()}, which may be called once the ref count reaches zero in any + * thread. + */ + private final KeyAccountingUnit keyAccountingUnit; + + RecordContext(KeyAccountingUnit keyAccountingUnit, R record, K key) { super(0); + this.keyAccountingUnit = keyAccountingUnit; this.record = record; this.key = key; + this.keyOccupied = false; } public K getKey() { return this.key; } + /** + * Try to occupy the key via {@link KeyAccountingUnit}. When it has been already occupied once, + * the record could continue running. Package-private. + * + * @return true if occupying succeed. + */ + boolean tryOccupyKey() { + return keyOccupied || (keyOccupied = keyAccountingUnit.occupy(record, key)); + } + @Override protected void referenceCountReachedZero() { - // TODO: release internal resources that this record context holds. + if (keyOccupied) { + keyAccountingUnit.release(record, key); + keyOccupied = false; + } } @Override @@ -77,6 +101,15 @@ public boolean equals(Object o) { @Override public String toString() { - return "RecordContext{" + "record=" + record + ", key=" + key + '}'; + return "RecordContext{" + + "record=" + + record + + ", key=" + + key + + ", occupied=" + + keyOccupied + + ", ref=" + + getReferenceCount() + + "}"; } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java similarity index 85% rename from flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java rename to flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java index 059b826c55dfec..aebd19ec4e181a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/ReferenceCounted.java @@ -16,18 +16,22 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskprocessing; +package org.apache.flink.runtime.asyncprocessing; import org.apache.flink.annotation.Internal; import org.apache.flink.core.memory.MemoryUtils; import sun.misc.Unsafe; +import javax.annotation.concurrent.ThreadSafe; + /** * An object that can be reference counted, the internal resource would be released when the - * reference count reaches zero. + * reference count reaches zero. This class is designed to be high-performance, lock-free and + * thread-safe. */ @Internal +@ThreadSafe public abstract class ReferenceCounted { /** The "unsafe", which can be used to perform native memory accesses. */ @@ -43,17 +47,18 @@ public abstract class ReferenceCounted { ReferenceCounted.class.getDeclaredField("referenceCount")); } catch (SecurityException e) { throw new Error( - "Could not get field 'referenceCount' offset in class 'ReferenceCounted' for unsafe operations, " - + "permission denied by security manager.", + "Could not get field 'referenceCount' offset in class 'ReferenceCounted'" + + " for unsafe operations, permission denied by security manager.", e); } catch (NoSuchFieldException e) { throw new Error( - "Could not get field 'referenceCount' offset in class 'ReferenceCounted' for unsafe operations", + "Could not get field 'referenceCount' offset in class 'ReferenceCounted'" + + " for unsafe operations", e); } catch (Throwable t) { throw new Error( - "Could not get field 'referenceCount' offset in class 'ReferenceCounted' for unsafe operations," - + " unclassified error", + "Could not get field 'referenceCount' offset in class 'ReferenceCounted'" + + " for unsafe operations, unclassified error", t); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java new file mode 100644 index 00000000000000..63b0257f8ec183 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateExecutor.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.annotation.Internal; + +import java.util.concurrent.CompletableFuture; + +/** Executor for executing batch {@link StateRequest}s. */ +@Internal +public interface StateExecutor { + /** + * Execute a batch of state requests. + * + * @param processingRequests the given batch of processing requests + * @return A future can determine whether execution has completed. + */ + CompletableFuture executeBatchRequests( + Iterable> processingRequests); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java new file mode 100644 index 00000000000000..98f1e3e41f1d4f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateFutureFactory.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.core.state.InternalStateFuture; + +/** + * An internal factory for {@link InternalStateFuture} that build future with necessary context + * switch and wired with mailbox executor. + */ +public class StateFutureFactory { + + private final AsyncExecutionController asyncExecutionController; + private final MailboxExecutor mailboxExecutor; + + StateFutureFactory( + AsyncExecutionController asyncExecutionController, + MailboxExecutor mailboxExecutor) { + this.asyncExecutionController = asyncExecutionController; + this.mailboxExecutor = mailboxExecutor; + } + + public InternalStateFuture build(RecordContext context) { + return new ContextStateFutureImpl<>( + (runnable) -> + mailboxExecutor.submit( + () -> { + asyncExecutionController.setCurrentContext(context); + runnable.run(); + }, + "State callback."), + context); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java new file mode 100644 index 00000000000000..be4164df22da08 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/asyncprocessing/StateRequest.java @@ -0,0 +1,88 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.core.state.InternalStateFuture; + +import javax.annotation.Nullable; + +import java.io.Serializable; + +/** + * A request encapsulates the necessary data to perform a state request. + * + * @param Type of partitioned key. + * @param Type of input of this request. + * @param Type of value that request will return. + */ +public class StateRequest implements Serializable { + + /** + * The underlying state to be accessed, can be empty for {@link StateRequestType#SYNC_POINT}. + */ + @Nullable private final State state; + + /** The type of this request. */ + private final StateRequestType type; + + /** The payload(input) of this request. */ + @Nullable private final IN payload; + + /** The future to collect the result of the request. */ + private final InternalStateFuture stateFuture; + + /** The record context of this request. */ + private final RecordContext context; + + StateRequest( + @Nullable State state, + StateRequestType type, + @Nullable IN payload, + InternalStateFuture stateFuture, + RecordContext context) { + this.state = state; + this.type = type; + this.payload = payload; + this.stateFuture = stateFuture; + this.context = context; + } + + StateRequestType getRequestType() { + return type; + } + + @Nullable + IN getPayload() { + return payload; + } + + @Nullable + State getState() { + return state; + } + + InternalStateFuture getFuture() { + return stateFuture; + } + + RecordContext getRecordContext() { + return context; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java deleted file mode 100644 index 133e6dcd27a986..00000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java +++ /dev/null @@ -1,80 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskprocessing; - -import org.apache.flink.annotation.Internal; -import org.apache.flink.api.common.operators.MailboxExecutor; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; - -/** - * The Async Execution Controller (AEC) receives processing requests from operators, and put them - * into execution according to some strategies. - * - *

    It is responsible for: - *

  • Preserving the sequence of elements bearing the same key by delaying subsequent requests - * until the processing of preceding ones is finalized. - *
  • Tracking the in-flight data(records) and blocking the input if too much data in flight - * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, - * allowing for the execution of callbacks (mails in Mailbox). - * - * @param the type of the record - * @param the type of the key - */ -@Internal -public class AsyncExecutionController { - - private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); - - public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; - - /** The max allow number of in-flight records. */ - private final int maxInFlightRecordNum; - - /** The key accounting unit which is used to detect the key conflict. */ - private final KeyAccountingUnit keyAccountingUnit; - - /** The mailbox executor, borrowed from {@code StreamTask}. */ - private final MailboxExecutor mailboxExecutor; - - /** The state executor where the {@link ProcessingRequest} is actually executed. */ - private final StateExecutor stateExecutor; - - public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { - this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); - } - - public AsyncExecutionController( - MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { - this.mailboxExecutor = mailboxExecutor; - this.stateExecutor = stateExecutor; - this.maxInFlightRecordNum = maxInFlightRecords; - this.keyAccountingUnit = new KeyAccountingUnit<>(); - LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); - } - - public void handleProcessingRequest( - ProcessingRequest request, RecordContext recordContext) { - // TODO(implement): preserve key order - stateExecutor.executeBatchRequests(Collections.singleton(request)); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java deleted file mode 100644 index 48ed767b561adc..00000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java +++ /dev/null @@ -1,38 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskprocessing; - -import org.apache.flink.annotation.Internal; - -/** - * Enumeration for the execution order under asynchronous state APIs. For synchronous state APIs, - * the execution order is always {@link #ELEMENT_ORDER}. {@link #STATE_ORDER} generally has higher - * performance than {@link #ELEMENT_ORDER}. Note: {@link #STATE_ORDER} is an advance option, please - * make sure you are aware of possible out-of-order situations under asynchronous state APIs. - */ -@Internal -public enum OrderPreserveMode { - /** The records with same keys are strictly processed in order of arrival. */ - ELEMENT_ORDER, - /** - * For same-key records, state requests and subsequent callbacks are processed in the order in - * which each record makes its first state request. But the code before the first state request - * for each record can be processed out-of-order with requests from other records. - */ - STATE_ORDER -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java deleted file mode 100644 index b74ab5e7d73f5a..00000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java +++ /dev/null @@ -1,69 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskprocessing; - -import org.apache.flink.api.common.state.v2.State; -import org.apache.flink.api.common.state.v2.StateFuture; - -import java.util.Optional; - -/** - * A processing request encapsulates the parameters, the {@link State} to access, and the type of - * access. - * - * @param Type of value that request will return. - */ -public interface ProcessingRequest { - /** The underlying state to be accessed, can be empty. */ - Optional getUnderlyingState(); - - /** The parameter of the request. */ - Parameter getParameter(); - - /** The future to collect the result of the request. */ - StateFuture getFuture(); - - RequestType getRequestType(); - - /** The type of processing request. */ - enum RequestType { - /** Process one record without state access. */ - SYNC, - /** Get from one {@link State}. */ - /** Delete from one {@link State}. */ - DELETE, - GET, - /** Put to one {@link State}. */ - PUT, - /** Merge value to an exist key in {@link State}. Mainly used for listState. */ - MERGE - } - - /** The parameter of the request. */ - interface Parameter { - /** - * The key of one request. Except for requests of {@link RequestType#SYNC}, all other - * requests should provide a key. - */ - Optional getKey(); - - /** The value of one request. */ - Optional getValue(); - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java deleted file mode 100644 index 21ee33aaf250e5..00000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java +++ /dev/null @@ -1,19 +0,0 @@ -package org.apache.flink.runtime.taskprocessing; - -import java.util.concurrent.CompletableFuture; - -/** - * Executor for executing batch {@link ProcessingRequest}s. - * - * @param the type of key. - */ -public interface StateExecutor { - /** - * Execute a batch of state requests. - * - * @param processingRequests the given batch of processing requests - * @return A future can determine whether execution has completed. - */ - CompletableFuture executeBatchRequests( - Iterable> processingRequests); -} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java new file mode 100644 index 00000000000000..8d3eae19cc4792 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/AsyncExecutionControllerTest.java @@ -0,0 +1,300 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.asyncprocessing; + +import org.apache.flink.api.common.operators.MailboxExecutor; +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.core.state.StateFutureUtils; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link AsyncExecutionController}. */ +class AsyncExecutionControllerTest { + + // TODO: this test is not well completed, cause buffering in AEC is not implemented. + // Yet, just for illustrating the interaction between AEC and Async state API. + @Test + void testBasicRun() { + TestAsyncExecutionController aec = + new TestAsyncExecutionController<>( + new SyncMailboxExecutor(), new TestStateExecutor()); + TestUnderlyingState underlyingState = new TestUnderlyingState(); + TestValueState valueState = new TestValueState(aec, underlyingState); + AtomicInteger output = new AtomicInteger(); + Runnable userCode = + () -> { + valueState + .asyncValue() + .thenCompose( + val -> { + int updated = (val == null ? 1 : (val + 1)); + return valueState + .asyncUpdate(updated) + .thenCompose( + o -> + StateFutureUtils.completedFuture( + updated)); + }) + .thenAccept(val -> output.set(val)); + }; + + // ============================ element1 ============================ + String record1 = "key1-r1"; + String key1 = "key1"; + // Simulate the wrapping in {@link RecordProcessorUtils#getRecordProcessor()}, wrapping the + // record and key with RecordContext. + RecordContext recordContext1 = aec.buildContext(record1, key1); + aec.setCurrentContext(recordContext1); + // execute user code + userCode.run(); + + // Single-step run. + // Firstly, the user code generates value get in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update finishes. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(1); + assertThat(recordContext1.getReferenceCount()).isEqualTo(0); + + // ============================ element 2 & 3 ============================ + String record2 = "key1-r2"; + String key2 = "key1"; + RecordContext recordContext2 = aec.buildContext(record2, key2); + aec.setCurrentContext(recordContext2); + // execute user code + userCode.run(); + + String record3 = "key1-r3"; + String key3 = "key1"; + RecordContext recordContext3 = aec.buildContext(record3, key3); + aec.setCurrentContext(recordContext3); + // execute user code + userCode.run(); + + // Single-step run. + // Firstly, the user code for record2 generates value get in active buffer, + // while user code for record3 generates value get in blocking buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.blockingBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update for record2 is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.blockingBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update for record2 finishes. The value get for record3 is still in blocking status. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(2); + assertThat(recordContext2.getReferenceCount()).isEqualTo(0); + assertThat(aec.blockingBuffer.size()).isEqualTo(1); + + aec.migrateBlockingToActive(); + // Value get for record3 is ready for run. + + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.blockingBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update for record3 is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update for record3 finishes. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(3); + assertThat(recordContext3.getReferenceCount()).isEqualTo(0); + + // ============================ element4 ============================ + String record4 = "key3-r3"; + String key4 = "key3"; + RecordContext recordContext4 = aec.buildContext(record4, key4); + aec.setCurrentContext(recordContext4); + // execute user code + userCode.run(); + + // Single-step run for another key. + // Firstly, the user code generates value get in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // After running, the value update is in active buffer. + assertThat(aec.activeBuffer.size()).isEqualTo(1); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(1); + aec.triggerIfNeeded(true); + // Value update finishes. + assertThat(aec.activeBuffer.size()).isEqualTo(0); + assertThat(aec.keyAccountingUnit.occupiedCount()).isEqualTo(0); + assertThat(output.get()).isEqualTo(1); + assertThat(recordContext4.getReferenceCount()).isEqualTo(0); + } + + /** + * An AsyncExecutionController for testing purpose, which integrates with basic buffer + * mechanism. + */ + class TestAsyncExecutionController extends AsyncExecutionController { + + LinkedList> activeBuffer; + + LinkedList> blockingBuffer; + + public TestAsyncExecutionController( + MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { + super(mailboxExecutor, stateExecutor); + activeBuffer = new LinkedList<>(); + blockingBuffer = new LinkedList<>(); + } + + @Override + void insertActiveBuffer(StateRequest request) { + activeBuffer.push(request); + } + + void insertBlockingBuffer(StateRequest request) { + blockingBuffer.push(request); + } + + void triggerIfNeeded(boolean force) { + if (!force) { + // Disable normal trigger, to perform single-step debugging and check. + return; + } + LinkedList> toRun = new LinkedList<>(activeBuffer); + activeBuffer.clear(); + stateExecutor.executeBatchRequests(toRun); + } + + void migrateBlockingToActive() { + Iterator> blockingIter = blockingBuffer.iterator(); + while (blockingIter.hasNext()) { + StateRequest request = blockingIter.next(); + if (request.getRecordContext().tryOccupyKey()) { + insertActiveBuffer(request); + blockingIter.remove(); + } + } + } + } + + /** Simulate the underlying state that is actually used to execute the request. */ + class TestUnderlyingState { + + private final HashMap hashMap; + + public TestUnderlyingState() { + this.hashMap = new HashMap<>(); + } + + public Integer get(String key) { + return hashMap.get(key); + } + + public void update(String key, Integer val) { + hashMap.put(key, val); + } + } + + class TestValueState implements ValueState { + + private final AsyncExecutionController asyncExecutionController; + + private final TestUnderlyingState underlyingState; + + public TestValueState( + AsyncExecutionController aec, TestUnderlyingState underlyingState) { + this.asyncExecutionController = aec; + this.underlyingState = underlyingState; + } + + @Override + public StateFuture asyncClear() { + return asyncExecutionController.handleRequest(this, StateRequestType.CLEAR, null); + } + + @Override + public StateFuture asyncValue() { + return asyncExecutionController.handleRequest(this, StateRequestType.VALUE_GET, null); + } + + @Override + public StateFuture asyncUpdate(Integer value) { + return asyncExecutionController.handleRequest( + this, StateRequestType.VALUE_UPDATE, value); + } + } + + /** + * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC + * and StateExecutor. + */ + class TestStateExecutor implements StateExecutor { + + public TestStateExecutor() {} + + @Override + @SuppressWarnings({"unchecked", "rawtypes"}) + public CompletableFuture executeBatchRequests( + Iterable> processingRequests) { + CompletableFuture future = new CompletableFuture<>(); + for (StateRequest request : processingRequests) { + if (request.getRequestType() == StateRequestType.VALUE_GET) { + Preconditions.checkState(request.getState() != null); + TestValueState state = (TestValueState) request.getState(); + Integer val = + state.underlyingState.get((String) request.getRecordContext().getKey()); + request.getFuture().complete(val); + } else if (request.getRequestType() == StateRequestType.VALUE_UPDATE) { + Preconditions.checkState(request.getState() != null); + TestValueState state = (TestValueState) request.getState(); + + state.underlyingState.update( + (String) request.getRecordContext().getKey(), + (Integer) request.getPayload()); + request.getFuture().complete(null); + } else { + throw new UnsupportedOperationException("Unsupported request type"); + } + } + future.complete(true); + return future; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnitTest.java similarity index 63% rename from flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnitTest.java index 87693631dde684..59d2eb2ed13139 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/KeyAccountingUnitTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskprocessing; +package org.apache.flink.runtime.asyncprocessing; import org.junit.jupiter.api.Test; @@ -24,20 +24,17 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; /** Test for {@link KeyAccountingUnit}. */ -class keyAccountingUnitTest { +class KeyAccountingUnitTest { @Test void testBasic() { KeyAccountingUnit keyAccountingUnit = new KeyAccountingUnit<>(); - assertThat(keyAccountingUnit.available("record1", 1)).isTrue(); - keyAccountingUnit.occupy("record1", 1); - assertThat(keyAccountingUnit.available("record1", 1)).isTrue(); - assertThat(keyAccountingUnit.available("record2", 2)).isTrue(); - assertThat(keyAccountingUnit.available("record3", 1)).isFalse(); - assertThatThrownBy(() -> keyAccountingUnit.occupy("record3", 1)) - .isInstanceOf(IllegalStateException.class) - .hasMessage("The record record3(1) is already occupied."); + assertThat(keyAccountingUnit.occupy("record1", 1)).isTrue(); + assertThat(keyAccountingUnit.occupy("record1", 1)).isFalse(); + assertThat(keyAccountingUnit.occupy("record2", 2)).isTrue(); keyAccountingUnit.release("record1", 1); - assertThat(keyAccountingUnit.available("record2", 1)).isTrue(); + assertThat(keyAccountingUnit.occupy("record2", 1)).isTrue(); + assertThatThrownBy(() -> keyAccountingUnit.release("record1", 1)) + .isInstanceOf(IllegalStateException.class); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java similarity index 86% rename from flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java index 03075759468fa7..1de590cad94f50 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/asyncprocessing/ReferenceCountedTest.java @@ -15,7 +15,8 @@ * See the License for the specific language governing permissions and * limitations under the License. */ -package org.apache.flink.runtime.taskprocessing; + +package org.apache.flink.runtime.asyncprocessing; import org.junit.jupiter.api.Test; @@ -24,7 +25,7 @@ import static org.assertj.core.api.AssertionsForClassTypes.assertThat; -/** Test for {@link org.apache.flink.runtime.taskprocessing.ReferenceCounted} */ +/** Tests for {@link ReferenceCounted}. */ class ReferenceCountedTest { @Test void testRefCountReachedZero() { @@ -41,12 +42,12 @@ void testConcurrency() throws InterruptedException { TestReferenceCounted referenceCounted = new TestReferenceCounted(); List threads = new ArrayList<>(); for (int i = 0; i < 5; i++) { - Thread thread = new Thread(() -> referenceCounted.retain()); + Thread thread = new Thread(referenceCounted::retain); thread.start(); threads.add(thread); } for (int i = 0; i < 5; i++) { - Thread thread = new Thread(() -> referenceCounted.release()); + Thread thread = new Thread(referenceCounted::release); thread.start(); threads.add(thread); } @@ -56,7 +57,7 @@ void testConcurrency() throws InterruptedException { assertThat(referenceCounted.getReferenceCount()).isEqualTo(0); } - private class TestReferenceCounted extends ReferenceCounted { + private static class TestReferenceCounted extends ReferenceCounted { private boolean reachedZero = false; public TestReferenceCounted() { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java deleted file mode 100644 index f64120e7276deb..00000000000000 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java +++ /dev/null @@ -1,280 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskprocessing; - -import org.apache.flink.api.common.state.v2.StateFuture; -import org.apache.flink.api.common.state.v2.ValueState; -import org.apache.flink.core.state.InternalStateFuture; -import org.apache.flink.core.state.StateFutureImpl; -import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; -import org.apache.flink.runtime.taskprocessing.ProcessingRequest.RequestType; -import org.apache.flink.util.Preconditions; - -import org.junit.jupiter.api.Test; - -import java.util.HashMap; -import java.util.Optional; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.function.Consumer; - -import static org.assertj.core.api.AssertionsForClassTypes.assertThat; - -/** Test for {@link AsyncExecutionController}. */ -class AsyncExecutionControllerTest { - - // todo(20240330): this test is not completed, cause the order preservation is not implemented - // yet, just for illustrating the interaction between AEC and Async state API. - @Test - void testStateOrder() { - AsyncExecutionController aec = - new AsyncExecutionController<>( - new SyncMailboxExecutor(), new TestStateExecutor(), 3); - TestUnderlyingState underlyingState = new TestUnderlyingState(); - TestValueState valueState = new TestValueState(aec, underlyingState); - AtomicInteger output = new AtomicInteger(); - Consumer userCode = - empty -> - valueState - .asyncValue() - .thenAccept( - val -> { - if (val == null) { - valueState - .asyncUpdate(1) - .thenAccept(o -> output.set(1)); - } else { - valueState - .asyncUpdate(val + 1) - .thenAccept(o -> output.set(val + 1)); - } - }); - - // ============================ element1 ============================ - String record1 = "key1-r1"; - String key1 = "key1"; - // Simulate the wrapping in {@link RecordProcessorUtils#getRecordProcessor()}, wrapping the - // record and key with RecordContext. - RecordContext recordContext1 = new RecordContext<>(record1, key1); - valueState.setCurrentRecordCtx(recordContext1); - // execute user code - userCode.accept(null); - recordContext1.release(); - assertThat(output.get()).isEqualTo(1); - - // ============================ element2 ============================ - String record2 = "key1-r2"; - String key2 = "key1"; - RecordContext recordContext2 = new RecordContext<>(record2, key2); - valueState.setCurrentRecordCtx(recordContext2); - // execute user code - userCode.accept(null); - recordContext2.release(); - assertThat(output.get()).isEqualTo(2); - - // ============================ element3 ============================ - String record3 = "key3-r3"; - String key3 = "key3"; - RecordContext recordContext3 = new RecordContext<>(record3, key3); - valueState.setCurrentRecordCtx(recordContext3); - // execute user code - userCode.accept(null); - recordContext3.release(); - assertThat(output.get()).isEqualTo(1); - } - - class TestRequestParameter implements ProcessingRequest.Parameter { - private String key; - private Integer value; - - public TestRequestParameter(String key) { - this(key, null); - } - - public TestRequestParameter(String key, Integer value) { - this.key = key; - this.value = value; - } - - @Override - public Optional getKey() { - return key == null ? Optional.empty() : Optional.of(key); - } - - @Override - public Optional getValue() { - return value == null ? Optional.empty() : Optional.of(value); - } - } - - /** Simulate the underlying state that is actually used to execute the request. */ - class TestUnderlyingState { - - private HashMap hashMap; - - public TestUnderlyingState() { - this.hashMap = new HashMap<>(); - } - - public Integer get(String key) { - return hashMap.get(key); - } - - public void update(String key, Integer val) { - hashMap.put(key, val); - } - } - - class TestProcessingRequest implements ProcessingRequest { - - private TestUnderlyingState underlyingState; - - private TestRequestParameter requestParameter; - - private RequestType requestType; - - private InternalStateFuture stateFuture; - - public TestProcessingRequest( - TestUnderlyingState underlyingState, - TestRequestParameter parameter, - RequestType requestType, - InternalStateFuture stateFuture) { - this.underlyingState = underlyingState; - this.requestParameter = parameter; - this.requestType = requestType; - this.stateFuture = stateFuture; - } - - @Override - public Optional getUnderlyingState() { - if (requestType == RequestType.SYNC) { - return Optional.empty(); - } - return Optional.of(underlyingState); - } - - @Override - public Parameter getParameter() { - return requestParameter; - } - - @Override - public StateFuture getFuture() { - return stateFuture; - } - - @Override - public RequestType getRequestType() { - return requestType; - } - } - - class TestValueState implements ValueState { - - private AsyncExecutionController asyncExecutionController; - - private TestUnderlyingState underlyingState; - - private StateFutureImpl.CallbackRunner runner = Runnable::run; - - private RecordContext currentRecordCtx; - - public TestValueState(AsyncExecutionController aec, TestUnderlyingState underlyingState) { - this.asyncExecutionController = aec; - this.underlyingState = underlyingState; - } - - @Override - public StateFuture asyncClear() { - StateFutureImpl stateFuture = new StateFutureImpl<>(runner); - TestRequestParameter parameter = new TestRequestParameter(currentRecordCtx.getKey()); - ProcessingRequest request = - new TestProcessingRequest( - underlyingState, parameter, RequestType.DELETE, stateFuture); - asyncExecutionController.handleProcessingRequest(request, currentRecordCtx); - return stateFuture; - } - - @Override - public StateFuture asyncValue() { - StateFutureImpl stateFuture = new StateFutureImpl<>(runner); - TestRequestParameter parameter = new TestRequestParameter(currentRecordCtx.getKey()); - ProcessingRequest request = - new TestProcessingRequest<>( - underlyingState, parameter, RequestType.GET, stateFuture); - asyncExecutionController.handleProcessingRequest(request, currentRecordCtx); - return stateFuture; - } - - @Override - public StateFuture asyncUpdate(Integer value) { - StateFutureImpl stateFuture = new StateFutureImpl<>(runner); - TestRequestParameter parameter = - new TestRequestParameter(currentRecordCtx.getKey(), value); - ProcessingRequest request = - new TestProcessingRequest( - underlyingState, parameter, RequestType.PUT, stateFuture); - asyncExecutionController.handleProcessingRequest(request, currentRecordCtx); - return stateFuture; - } - - public void setCurrentRecordCtx(RecordContext recordCtx) { - this.currentRecordCtx = recordCtx; - } - } - - /** - * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC - * and StateExecutor. - */ - class TestStateExecutor implements StateExecutor { - - public TestStateExecutor() {} - - @Override - public CompletableFuture executeBatchRequests( - Iterable> processingRequests) { - CompletableFuture future = new CompletableFuture<>(); - for (ProcessingRequest request : processingRequests) { - if (request.getRequestType() == RequestType.GET) { - Preconditions.checkState(request.getUnderlyingState().isPresent()); - TestUnderlyingState underlyingState = - (TestUnderlyingState) request.getUnderlyingState().get(); - Integer val = - underlyingState.get( - ((TestRequestParameter) request.getParameter()).getKey().get()); - ((StateFutureImpl) request.getFuture()).complete(val); - } else if (request.getRequestType() == RequestType.PUT) { - Preconditions.checkState(request.getUnderlyingState().isPresent()); - TestUnderlyingState underlyingState = - (TestUnderlyingState) request.getUnderlyingState().get(); - TestRequestParameter parameter = (TestRequestParameter) request.getParameter(); - underlyingState.update( - parameter.getKey().get(), (Integer) parameter.getValue().get()); - ((StateFutureImpl) request.getFuture()).complete(null); - } else { - throw new UnsupportedOperationException("Unsupported request type"); - } - } - future.complete(true); - return future; - } - } -}