Skip to content

Commit

Permalink
[FLINK-34986][Runtime/State] Basic implementation of AEC, RecordConte…
Browse files Browse the repository at this point in the history
…xt and reference counting
  • Loading branch information
Zakelly committed Apr 7, 2024
1 parent 5db76ee commit c1cbad7
Show file tree
Hide file tree
Showing 19 changed files with 815 additions and 556 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ public <U, V> StateFuture<V> thenCombine(
});
}

@Override
public void complete(T result) {
throw new UnsupportedOperationException("This state future has already been completed.");
}

@Override
public void thenSyncAccept(Consumer<? super T> action) {
action.accept(result);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@
@Internal
public interface InternalStateFuture<T> extends StateFuture<T> {

/** Complete this future. */
void complete(T result);

/**
* Accept the action in the same thread with the one of complete (or current thread if it has
* been completed).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,11 @@
@Internal
public class StateFutureImpl<T> implements InternalStateFuture<T> {

/** The future holds the result. The completes in async threads. */
CompletableFuture<T> completableFuture;
/** The future holds the result. This may complete in async threads. */
private final CompletableFuture<T> completableFuture;

/** The callback runner. */
CallbackRunner callbackRunner;
protected final CallbackRunner callbackRunner;

public StateFutureImpl(CallbackRunner callbackRunner) {
this.completableFuture = new CompletableFuture<>();
Expand All @@ -53,17 +53,20 @@ public StateFutureImpl(CallbackRunner callbackRunner) {

@Override
public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
U r = fn.apply(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedFuture(r);
} else {
StateFutureImpl<U> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<U> ret = makeNewStateFuture();
completableFuture.thenAccept(
(t) -> {
callbackRunner.submit(
() -> {
ret.complete(fn.apply(t));
callbackFinished();
});
});
return ret;
Expand All @@ -75,18 +78,21 @@ public <U> StateFuture<U> thenApply(Function<? super T, ? extends U> fn) {

@Override
public StateFuture<Void> thenAccept(Consumer<? super T> action) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
action.accept(completableFuture.get());
callbackFinished();
return StateFutureUtils.completedVoidFuture();
} else {
StateFutureImpl<Void> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<Void> ret = makeNewStateFuture();
completableFuture.thenAccept(
(t) -> {
callbackRunner.submit(
() -> {
action.accept(t);
ret.complete(null);
callbackFinished();
});
});
return ret;
Expand All @@ -98,17 +104,20 @@ public StateFuture<Void> thenAccept(Consumer<? super T> action) {

@Override
public <U> StateFuture<U> thenCompose(Function<? super T, ? extends StateFuture<U>> action) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
callbackFinished();
return action.apply(completableFuture.get());
} else {
StateFutureImpl<U> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<U> ret = makeNewStateFuture();
completableFuture.thenAccept(
(t) -> {
callbackRunner.submit(
() -> {
StateFuture<U> su = action.apply(t);
su.thenAccept(ret::complete);
callbackFinished();
});
});
return ret;
Expand All @@ -121,20 +130,22 @@ public <U> StateFuture<U> thenCompose(Function<? super T, ? extends StateFuture<
@Override
public <U, V> StateFuture<V> thenCombine(
StateFuture<? extends U> other, BiFunction<? super T, ? super U, ? extends V> fn) {
callbackRegistered();
try {
if (completableFuture.isDone()) {
return other.thenCompose(
(u) -> {
try {
V v = fn.apply(completableFuture.get(), u);
callbackFinished();
return StateFutureUtils.completedFuture(v);
} catch (Throwable e) {
throw new FlinkRuntimeException(
"Error binding or executing callback", e);
}
});
} else {
StateFutureImpl<V> ret = new StateFutureImpl<>(callbackRunner);
StateFutureImpl<V> ret = makeNewStateFuture();
((InternalStateFuture<? extends U>) other)
.thenSyncAccept(
(u) -> {
Expand All @@ -143,6 +154,7 @@ public <U, V> StateFuture<V> thenCombine(
callbackRunner.submit(
() -> {
ret.complete(fn.apply(t, u));
callbackFinished();
});
});
});
Expand All @@ -153,12 +165,34 @@ public <U, V> StateFuture<V> thenCombine(
}
}

/**
* Make a new future based on context of this future.
*
* @return the new created future.
*/
public <A> StateFutureImpl<A> makeNewStateFuture() {
return new StateFutureImpl<>(callbackRunner);
}

@Override
public void complete(T result) {
completableFuture.complete(result);
postComplete();
}

/** Will be triggered when a callback is registered. */
public void callbackRegistered() {
// does nothing by default.
}

/** Will be triggered when this future completes. */
public void postComplete() {
// does nothing by default.
}

/** Will be triggered when a callback finishes processing. */
public void callbackFinished() {
// does nothing by default.
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.api.common.operators.MailboxExecutor;
import org.apache.flink.core.state.InternalStateFuture;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* The Async Execution Controller (AEC) receives processing requests from operators, and put them
* into execution according to some strategies.
*
* <p>It is responsible for:
* <li>Preserving the sequence of elements bearing the same key by delaying subsequent requests
* until the processing of preceding ones is finalized.
* <li>Tracking the in-flight data(records) and blocking the input if too much data in flight
* (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations,
* allowing for the execution of callbacks (mails in Mailbox).
*
* @param <R> the type of the record
* @param <K> the type of the key
*/
public class AsyncExecutionController<R, K> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);

public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;

/** The max allow number of in-flight records. */
private final int maxInFlightRecordNum;

/** The key accounting unit which is used to detect the key conflict. */
final KeyAccountingUnit<R, K> keyAccountingUnit;

/**
* A factory to build {@link org.apache.flink.core.state.InternalStateFuture}, this will auto
* wire the created future with mailbox executor. Also conducting the context switch.
*/
private final StateFutureFactory<R, K> stateFutureFactory;

/** The state executor where the {@link StateRequest} is actually executed. */
final StateExecutor stateExecutor;

/** The corresponding context that currently runs in task thread. */
RecordContext<R, K> currentContext;

public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
}

public AsyncExecutionController(
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) {
this.keyAccountingUnit = new KeyAccountingUnit<>();
this.stateFutureFactory = new StateFutureFactory<>(this, mailboxExecutor);
this.stateExecutor = stateExecutor;
this.maxInFlightRecordNum = maxInFlightRecords;
LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords);
}

/**
* Build a new context based on record and key. Also wired with internal {@link
* KeyAccountingUnit}.
*
* @param record the given record.
* @param key the given key.
* @return the built record context.
*/
public RecordContext<R, K> buildContext(R record, K key) {
return new RecordContext<>(keyAccountingUnit, record, key);
}

/**
* Each time before a code segment (callback) is about to run in mailbox (task thread), this
* method should be called to switch a context in AEC.
*
* @param switchingContext the context to switch.
*/
public void setCurrentContext(RecordContext<R, K> switchingContext) {
currentContext = switchingContext;
}

/**
* Submit a {@link StateRequest} to this AEC and trigger if needed.
*
* @param request the request to handle.
* @return the future for this request.
*/
public <IN, OUT> InternalStateFuture<OUT> handleRequest(StateRequest<K, IN, OUT> request) {
// Step 1: build state future & assign context.
InternalStateFuture<OUT> stateFuture = stateFutureFactory.build(currentContext);
request.setRecordContext(currentContext);
request.setFuture(stateFuture);
// Step 2: try to occupy the key and place it into right buffer.
if (currentContext.tryOccupyKey()) {
insertActiveBuffer(request);
} else {
insertBlockingBuffer(request);
}
// Step 3: trigger the (active) buffer if needed.
triggerIfNeeded(false);
return stateFuture;
}

<IN, OUT> void insertActiveBuffer(StateRequest<K, IN, OUT> request) {
// TODO: implement the active buffer.
}

<IN, OUT> void insertBlockingBuffer(StateRequest<K, IN, OUT> request) {
// TODO: implement the blocking buffer.
}

/**
* Trigger a batch of requests.
*
* @param force whether to trigger requests in force.
*/
void triggerIfNeeded(boolean force) {
// TODO: implement the trigger logic.
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing;

import org.apache.flink.core.state.StateFutureImpl;

/**
* A state future that holds the {@link RecordContext} and maintains the reference count of it. The
* reason why we maintain the reference here is that the ContextStateFutureImpl can be created
* multiple times since user may chain their code wildly, some of which are only for internal usage
* (See {@link StateFutureImpl}). So maintaining reference counting by the lifecycle of state future
* is relatively simple and less error-prone.
*
* <p>Reference counting added on {@link RecordContext} follows:
* <li>1. +1 when this future created.
* <li>2. -1 when future completed.
* <li>3. +1 when callback registered.
* <li>4. -1 when callback finished.
*/
public class ContextStateFutureImpl<T> extends StateFutureImpl<T> {

private final RecordContext<?, ?> recordContext;

ContextStateFutureImpl(CallbackRunner callbackRunner, RecordContext<?, ?> recordContext) {
super(callbackRunner);
this.recordContext = recordContext;
// When state request submitted, ref count +1, as described in FLIP-425:
// To cover the statements without a callback, in addition to the reference count marked
// in Fig.5, each state request itself is also protected by a paired reference count.
recordContext.retain();
}

@Override
public <A> StateFutureImpl<A> makeNewStateFuture() {
return new ContextStateFutureImpl<>(callbackRunner, recordContext);
}

@Override
public void callbackRegistered() {
// When a callback registered, as shown in Fig.5 of FLIP-425, at the point of 3 and 5, the
// ref count -1.
recordContext.retain();
}

@Override
public void postComplete() {
// When a state request completes, ref count +1, as described in FLIP-425:
// To cover the statements without a callback, in addition to the reference count marked
// in Fig.5, each state request itself is also protected by a paired reference count.
recordContext.release();
}

@Override
public void callbackFinished() {
// When a callback ends, as shown in Fig.5 of FLIP-425, at the
// point of 2,4 and 6, the ref count -1.
recordContext.release();
}
}
Loading

0 comments on commit c1cbad7

Please sign in to comment.