diff --git a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java index 3cdc873e789c17..dfae92c845fbea 100644 --- a/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java +++ b/flink-core/src/main/java/org/apache/flink/core/state/StateFutureImpl.java @@ -46,7 +46,7 @@ public class StateFutureImpl implements InternalStateFuture { /** The callback runner. */ CallbackRunner callbackRunner; - StateFutureImpl(CallbackRunner callbackRunner) { + public StateFutureImpl(CallbackRunner callbackRunner) { this.completableFuture = new CompletableFuture<>(); this.callbackRunner = callbackRunner; } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java new file mode 100644 index 00000000000000..133e6dcd27a986 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.api.common.operators.MailboxExecutor; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collections; + +/** + * The Async Execution Controller (AEC) receives processing requests from operators, and put them + * into execution according to some strategies. + * + *

It is responsible for: + *

  • Preserving the sequence of elements bearing the same key by delaying subsequent requests + * until the processing of preceding ones is finalized. + *
  • Tracking the in-flight data(records) and blocking the input if too much data in flight + * (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, + * allowing for the execution of callbacks (mails in Mailbox). + * + * @param the type of the record + * @param the type of the key + */ +@Internal +public class AsyncExecutionController { + + private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); + + public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; + + /** The max allow number of in-flight records. */ + private final int maxInFlightRecordNum; + + /** The key accounting unit which is used to detect the key conflict. */ + private final KeyAccountingUnit keyAccountingUnit; + + /** The mailbox executor, borrowed from {@code StreamTask}. */ + private final MailboxExecutor mailboxExecutor; + + /** The state executor where the {@link ProcessingRequest} is actually executed. */ + private final StateExecutor stateExecutor; + + public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { + this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); + } + + public AsyncExecutionController( + MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { + this.mailboxExecutor = mailboxExecutor; + this.stateExecutor = stateExecutor; + this.maxInFlightRecordNum = maxInFlightRecords; + this.keyAccountingUnit = new KeyAccountingUnit<>(); + LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); + } + + public void handleProcessingRequest( + ProcessingRequest request, RecordContext recordContext) { + // TODO(implement): preserve key order + stateExecutor.executeBatchRequests(Collections.singleton(request)); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java new file mode 100644 index 00000000000000..984c53aacb1227 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.annotation.Internal; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * Key accounting unit holds the current in-flight key and tracks the corresponding ongoing records, + * which is used to preserve the ordering of independent chained {@link + * org.apache.flink.api.common.state.v2.StateFuture}. + * + * @param the type of record + * @param the type of key + */ +@Internal +public class KeyAccountingUnit { + + private static final Logger LOG = LoggerFactory.getLogger(KeyAccountingUnit.class); + + /** The in-flight records that are being processed, their keys are different from each other. */ + private final Map noConflictInFlightRecords; + + public KeyAccountingUnit() { + this.noConflictInFlightRecords = new ConcurrentHashMap<>(); + } + + /** + * Check if the record is available for processing. This method should be called in main task + * thread. For the same record, this method can be reentered. + * + * @param record the record + * @param key the key inside the record + * @return true if the key is available + */ + public boolean available(R record, K key) { + if (noConflictInFlightRecords.containsKey(key)) { + return noConflictInFlightRecords.get(key) == record; + } + return true; + } + + /** + * Occupy a key for processing, the subsequent records with the same key would be blocked until + * the previous key release. + */ + public void occupy(R record, K key) { + if (!available(record, key)) { + throw new IllegalStateException( + String.format("The record %s(%s) is already occupied.", record, key)); + } + noConflictInFlightRecords.put(key, record); + LOG.trace("occupy key {} for record {}", key, record); + } + + /** Release a key, which is invoked when a {@link RecordContext} is released. */ + public void release(R record, K key) { + R existingRecord = noConflictInFlightRecords.remove(key); + LOG.trace("release key {} for record {}, existing record {}", key, record, existingRecord); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java new file mode 100644 index 00000000000000..48ed767b561adc --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.annotation.Internal; + +/** + * Enumeration for the execution order under asynchronous state APIs. For synchronous state APIs, + * the execution order is always {@link #ELEMENT_ORDER}. {@link #STATE_ORDER} generally has higher + * performance than {@link #ELEMENT_ORDER}. Note: {@link #STATE_ORDER} is an advance option, please + * make sure you are aware of possible out-of-order situations under asynchronous state APIs. + */ +@Internal +public enum OrderPreserveMode { + /** The records with same keys are strictly processed in order of arrival. */ + ELEMENT_ORDER, + /** + * For same-key records, state requests and subsequent callbacks are processed in the order in + * which each record makes its first state request. But the code before the first state request + * for each record can be processed out-of-order with requests from other records. + */ + STATE_ORDER +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java new file mode 100644 index 00000000000000..b74ab5e7d73f5a --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.api.common.state.v2.State; +import org.apache.flink.api.common.state.v2.StateFuture; + +import java.util.Optional; + +/** + * A processing request encapsulates the parameters, the {@link State} to access, and the type of + * access. + * + * @param Type of value that request will return. + */ +public interface ProcessingRequest { + /** The underlying state to be accessed, can be empty. */ + Optional getUnderlyingState(); + + /** The parameter of the request. */ + Parameter getParameter(); + + /** The future to collect the result of the request. */ + StateFuture getFuture(); + + RequestType getRequestType(); + + /** The type of processing request. */ + enum RequestType { + /** Process one record without state access. */ + SYNC, + /** Get from one {@link State}. */ + /** Delete from one {@link State}. */ + DELETE, + GET, + /** Put to one {@link State}. */ + PUT, + /** Merge value to an exist key in {@link State}. Mainly used for listState. */ + MERGE + } + + /** The parameter of the request. */ + interface Parameter { + /** + * The key of one request. Except for requests of {@link RequestType#SYNC}, all other + * requests should provide a key. + */ + Optional getKey(); + + /** The value of one request. */ + Optional getValue(); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java new file mode 100644 index 00000000000000..0375930db7b141 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.annotation.Internal; + +import java.util.Objects; + +/** + * A context that preserves the necessary variables required by each operation, all operations for + * one record will share the same element context. + * + * @param The type of the record that extends {@link + * org.apache.flink.streaming.runtime.streamrecord.StreamElement}. TODO(FLIP-409): move + * StreamElement to flink-core or flink-runtime module. + * @param The type of the key inside the record. + */ +@Internal +public class RecordContext extends ReferenceCounted { + + /** The record to be processed. */ + private final R record; + + /** The key inside the record. */ + private final K key; + + public RecordContext(R record, K key) { + super(0); + this.record = record; + this.key = key; + } + + public K getKey() { + return this.key; + } + + @Override + protected void referenceCountReachedZero() { + // TODO: release internal resources that this record context holds. + } + + @Override + public int hashCode() { + return Objects.hash(record, key); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + RecordContext that = (RecordContext) o; + if (!Objects.equals(record, that.record)) { + return false; + } + return Objects.equals(key, that.key); + } + + @Override + public String toString() { + return "RecordContext{" + "record=" + record + ", key=" + key + '}'; + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java new file mode 100644 index 00000000000000..059b826c55dfec --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ReferenceCounted.java @@ -0,0 +1,98 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.core.memory.MemoryUtils; + +import sun.misc.Unsafe; + +/** + * An object that can be reference counted, the internal resource would be released when the + * reference count reaches zero. + */ +@Internal +public abstract class ReferenceCounted { + + /** The "unsafe", which can be used to perform native memory accesses. */ + @SuppressWarnings({"restriction", "UseOfSunClasses"}) + private static final Unsafe unsafe = MemoryUtils.UNSAFE; + + private static final long referenceOffset; + + static { + try { + referenceOffset = + unsafe.objectFieldOffset( + ReferenceCounted.class.getDeclaredField("referenceCount")); + } catch (SecurityException e) { + throw new Error( + "Could not get field 'referenceCount' offset in class 'ReferenceCounted' for unsafe operations, " + + "permission denied by security manager.", + e); + } catch (NoSuchFieldException e) { + throw new Error( + "Could not get field 'referenceCount' offset in class 'ReferenceCounted' for unsafe operations", + e); + } catch (Throwable t) { + throw new Error( + "Could not get field 'referenceCount' offset in class 'ReferenceCounted' for unsafe operations," + + " unclassified error", + t); + } + } + + private volatile int referenceCount; + + public ReferenceCounted(int initReference) { + this.referenceCount = initReference; + } + + public int retain() { + return unsafe.getAndAddInt(this, referenceOffset, 1) + 1; + } + + /** + * Try to retain this object. Fail if reference count is already zero. + * + * @return zero if failed, otherwise current reference count. + */ + public int tryRetain() { + int v; + do { + v = unsafe.getIntVolatile(this, referenceOffset); + } while (v != 0 && !unsafe.compareAndSwapInt(this, referenceOffset, v, v + 1)); + return v == 0 ? 0 : v + 1; + } + + public int release() { + int r = unsafe.getAndAddInt(this, referenceOffset, -1) - 1; + if (r == 0) { + referenceCountReachedZero(); + } + return r; + } + + public int getReferenceCount() { + return referenceCount; + } + + /** A method called when the reference count reaches zero. */ + protected abstract void referenceCountReachedZero(); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java new file mode 100644 index 00000000000000..21ee33aaf250e5 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/StateExecutor.java @@ -0,0 +1,19 @@ +package org.apache.flink.runtime.taskprocessing; + +import java.util.concurrent.CompletableFuture; + +/** + * Executor for executing batch {@link ProcessingRequest}s. + * + * @param the type of key. + */ +public interface StateExecutor { + /** + * Execute a batch of state requests. + * + * @param processingRequests the given batch of processing requests + * @return A future can determine whether execution has completed. + */ + CompletableFuture executeBatchRequests( + Iterable> processingRequests); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java new file mode 100644 index 00000000000000..f64120e7276deb --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionControllerTest.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.apache.flink.api.common.state.v2.StateFuture; +import org.apache.flink.api.common.state.v2.ValueState; +import org.apache.flink.core.state.InternalStateFuture; +import org.apache.flink.core.state.StateFutureImpl; +import org.apache.flink.runtime.mailbox.SyncMailboxExecutor; +import org.apache.flink.runtime.taskprocessing.ProcessingRequest.RequestType; +import org.apache.flink.util.Preconditions; + +import org.junit.jupiter.api.Test; + +import java.util.HashMap; +import java.util.Optional; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link AsyncExecutionController}. */ +class AsyncExecutionControllerTest { + + // todo(20240330): this test is not completed, cause the order preservation is not implemented + // yet, just for illustrating the interaction between AEC and Async state API. + @Test + void testStateOrder() { + AsyncExecutionController aec = + new AsyncExecutionController<>( + new SyncMailboxExecutor(), new TestStateExecutor(), 3); + TestUnderlyingState underlyingState = new TestUnderlyingState(); + TestValueState valueState = new TestValueState(aec, underlyingState); + AtomicInteger output = new AtomicInteger(); + Consumer userCode = + empty -> + valueState + .asyncValue() + .thenAccept( + val -> { + if (val == null) { + valueState + .asyncUpdate(1) + .thenAccept(o -> output.set(1)); + } else { + valueState + .asyncUpdate(val + 1) + .thenAccept(o -> output.set(val + 1)); + } + }); + + // ============================ element1 ============================ + String record1 = "key1-r1"; + String key1 = "key1"; + // Simulate the wrapping in {@link RecordProcessorUtils#getRecordProcessor()}, wrapping the + // record and key with RecordContext. + RecordContext recordContext1 = new RecordContext<>(record1, key1); + valueState.setCurrentRecordCtx(recordContext1); + // execute user code + userCode.accept(null); + recordContext1.release(); + assertThat(output.get()).isEqualTo(1); + + // ============================ element2 ============================ + String record2 = "key1-r2"; + String key2 = "key1"; + RecordContext recordContext2 = new RecordContext<>(record2, key2); + valueState.setCurrentRecordCtx(recordContext2); + // execute user code + userCode.accept(null); + recordContext2.release(); + assertThat(output.get()).isEqualTo(2); + + // ============================ element3 ============================ + String record3 = "key3-r3"; + String key3 = "key3"; + RecordContext recordContext3 = new RecordContext<>(record3, key3); + valueState.setCurrentRecordCtx(recordContext3); + // execute user code + userCode.accept(null); + recordContext3.release(); + assertThat(output.get()).isEqualTo(1); + } + + class TestRequestParameter implements ProcessingRequest.Parameter { + private String key; + private Integer value; + + public TestRequestParameter(String key) { + this(key, null); + } + + public TestRequestParameter(String key, Integer value) { + this.key = key; + this.value = value; + } + + @Override + public Optional getKey() { + return key == null ? Optional.empty() : Optional.of(key); + } + + @Override + public Optional getValue() { + return value == null ? Optional.empty() : Optional.of(value); + } + } + + /** Simulate the underlying state that is actually used to execute the request. */ + class TestUnderlyingState { + + private HashMap hashMap; + + public TestUnderlyingState() { + this.hashMap = new HashMap<>(); + } + + public Integer get(String key) { + return hashMap.get(key); + } + + public void update(String key, Integer val) { + hashMap.put(key, val); + } + } + + class TestProcessingRequest implements ProcessingRequest { + + private TestUnderlyingState underlyingState; + + private TestRequestParameter requestParameter; + + private RequestType requestType; + + private InternalStateFuture stateFuture; + + public TestProcessingRequest( + TestUnderlyingState underlyingState, + TestRequestParameter parameter, + RequestType requestType, + InternalStateFuture stateFuture) { + this.underlyingState = underlyingState; + this.requestParameter = parameter; + this.requestType = requestType; + this.stateFuture = stateFuture; + } + + @Override + public Optional getUnderlyingState() { + if (requestType == RequestType.SYNC) { + return Optional.empty(); + } + return Optional.of(underlyingState); + } + + @Override + public Parameter getParameter() { + return requestParameter; + } + + @Override + public StateFuture getFuture() { + return stateFuture; + } + + @Override + public RequestType getRequestType() { + return requestType; + } + } + + class TestValueState implements ValueState { + + private AsyncExecutionController asyncExecutionController; + + private TestUnderlyingState underlyingState; + + private StateFutureImpl.CallbackRunner runner = Runnable::run; + + private RecordContext currentRecordCtx; + + public TestValueState(AsyncExecutionController aec, TestUnderlyingState underlyingState) { + this.asyncExecutionController = aec; + this.underlyingState = underlyingState; + } + + @Override + public StateFuture asyncClear() { + StateFutureImpl stateFuture = new StateFutureImpl<>(runner); + TestRequestParameter parameter = new TestRequestParameter(currentRecordCtx.getKey()); + ProcessingRequest request = + new TestProcessingRequest( + underlyingState, parameter, RequestType.DELETE, stateFuture); + asyncExecutionController.handleProcessingRequest(request, currentRecordCtx); + return stateFuture; + } + + @Override + public StateFuture asyncValue() { + StateFutureImpl stateFuture = new StateFutureImpl<>(runner); + TestRequestParameter parameter = new TestRequestParameter(currentRecordCtx.getKey()); + ProcessingRequest request = + new TestProcessingRequest<>( + underlyingState, parameter, RequestType.GET, stateFuture); + asyncExecutionController.handleProcessingRequest(request, currentRecordCtx); + return stateFuture; + } + + @Override + public StateFuture asyncUpdate(Integer value) { + StateFutureImpl stateFuture = new StateFutureImpl<>(runner); + TestRequestParameter parameter = + new TestRequestParameter(currentRecordCtx.getKey(), value); + ProcessingRequest request = + new TestProcessingRequest( + underlyingState, parameter, RequestType.PUT, stateFuture); + asyncExecutionController.handleProcessingRequest(request, currentRecordCtx); + return stateFuture; + } + + public void setCurrentRecordCtx(RecordContext recordCtx) { + this.currentRecordCtx = recordCtx; + } + } + + /** + * A brief implementation of {@link StateExecutor}, to illustrate the interaction between AEC + * and StateExecutor. + */ + class TestStateExecutor implements StateExecutor { + + public TestStateExecutor() {} + + @Override + public CompletableFuture executeBatchRequests( + Iterable> processingRequests) { + CompletableFuture future = new CompletableFuture<>(); + for (ProcessingRequest request : processingRequests) { + if (request.getRequestType() == RequestType.GET) { + Preconditions.checkState(request.getUnderlyingState().isPresent()); + TestUnderlyingState underlyingState = + (TestUnderlyingState) request.getUnderlyingState().get(); + Integer val = + underlyingState.get( + ((TestRequestParameter) request.getParameter()).getKey().get()); + ((StateFutureImpl) request.getFuture()).complete(val); + } else if (request.getRequestType() == RequestType.PUT) { + Preconditions.checkState(request.getUnderlyingState().isPresent()); + TestUnderlyingState underlyingState = + (TestUnderlyingState) request.getUnderlyingState().get(); + TestRequestParameter parameter = (TestRequestParameter) request.getParameter(); + underlyingState.update( + parameter.getKey().get(), (Integer) parameter.getValue().get()); + ((StateFutureImpl) request.getFuture()).complete(null); + } else { + throw new UnsupportedOperationException("Unsupported request type"); + } + } + future.complete(true); + return future; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java new file mode 100644 index 00000000000000..03075759468fa7 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/ReferenceCountedTest.java @@ -0,0 +1,71 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.flink.runtime.taskprocessing; + +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.List; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link org.apache.flink.runtime.taskprocessing.ReferenceCounted} */ +class ReferenceCountedTest { + @Test + void testRefCountReachedZero() { + TestReferenceCounted referenceCounted = new TestReferenceCounted(); + referenceCounted.retain(); + assertThat(referenceCounted.getReferenceCount()).isEqualTo(1); + referenceCounted.release(); + assertThat(referenceCounted.getReferenceCount()).isEqualTo(0); + assertThat(referenceCounted.reachedZero).isTrue(); + } + + @Test + void testConcurrency() throws InterruptedException { + TestReferenceCounted referenceCounted = new TestReferenceCounted(); + List threads = new ArrayList<>(); + for (int i = 0; i < 5; i++) { + Thread thread = new Thread(() -> referenceCounted.retain()); + thread.start(); + threads.add(thread); + } + for (int i = 0; i < 5; i++) { + Thread thread = new Thread(() -> referenceCounted.release()); + thread.start(); + threads.add(thread); + } + for (Thread thread : threads) { + thread.join(); + } + assertThat(referenceCounted.getReferenceCount()).isEqualTo(0); + } + + private class TestReferenceCounted extends ReferenceCounted { + private boolean reachedZero = false; + + public TestReferenceCounted() { + super(0); + } + + @Override + protected void referenceCountReachedZero() { + reachedZero = true; + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java new file mode 100644 index 00000000000000..87693631dde684 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskprocessing/keyAccountingUnitTest.java @@ -0,0 +1,43 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.taskprocessing; + +import org.junit.jupiter.api.Test; + +import static org.assertj.core.api.Assertions.assertThatThrownBy; +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +/** Test for {@link KeyAccountingUnit}. */ +class keyAccountingUnitTest { + + @Test + void testBasic() { + KeyAccountingUnit keyAccountingUnit = new KeyAccountingUnit<>(); + assertThat(keyAccountingUnit.available("record1", 1)).isTrue(); + keyAccountingUnit.occupy("record1", 1); + assertThat(keyAccountingUnit.available("record1", 1)).isTrue(); + assertThat(keyAccountingUnit.available("record2", 2)).isTrue(); + assertThat(keyAccountingUnit.available("record3", 1)).isFalse(); + assertThatThrownBy(() -> keyAccountingUnit.occupy("record3", 1)) + .isInstanceOf(IllegalStateException.class) + .hasMessage("The record record3(1) is already occupied."); + keyAccountingUnit.release("record1", 1); + assertThat(keyAccountingUnit.available("record2", 1)).isTrue(); + } +}