Skip to content

Commit

Permalink
[FLINK-34986][Runtime/State] Introduce element-control component
Browse files Browse the repository at this point in the history
  • Loading branch information
fredia authored and Zakelly committed Apr 9, 2024
1 parent 68cc61a commit d7776b2
Show file tree
Hide file tree
Showing 11 changed files with 863 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public class StateFutureImpl<T> implements InternalStateFuture<T> {
/** The callback runner. */
CallbackRunner callbackRunner;

StateFutureImpl(CallbackRunner callbackRunner) {
public StateFutureImpl(CallbackRunner callbackRunner) {
this.completableFuture = new CompletableFuture<>();
this.callbackRunner = callbackRunner;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskprocessing;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.operators.MailboxExecutor;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Collections;

/**
* The Async Execution Controller (AEC) receives processing requests from operators, and put them
* into execution according to some strategies.
*
* <p>It is responsible for:
* <li>Preserving the sequence of elements bearing the same key by delaying subsequent requests
* until the processing of preceding ones is finalized.
* <li>Tracking the in-flight data(records) and blocking the input if too much data in flight
* (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations,
* allowing for the execution of callbacks (mails in Mailbox).
*
* @param <R> the type of the record
* @param <K> the type of the key
*/
@Internal
public class AsyncExecutionController<R, K> {

private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class);

public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000;

/** The max allow number of in-flight records. */
private final int maxInFlightRecordNum;

/** The key accounting unit which is used to detect the key conflict. */
private final KeyAccountingUnit<R, K> keyAccountingUnit;

/** The mailbox executor, borrowed from {@code StreamTask}. */
private final MailboxExecutor mailboxExecutor;

/** The state executor where the {@link ProcessingRequest} is actually executed. */
private final StateExecutor stateExecutor;

public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) {
this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM);
}

public AsyncExecutionController(
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) {
this.mailboxExecutor = mailboxExecutor;
this.stateExecutor = stateExecutor;
this.maxInFlightRecordNum = maxInFlightRecords;
this.keyAccountingUnit = new KeyAccountingUnit<>();
LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords);
}

public <OUT> void handleProcessingRequest(
ProcessingRequest<OUT> request, RecordContext<K, R> recordContext) {
// TODO(implement): preserve key order
stateExecutor.executeBatchRequests(Collections.singleton(request));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskprocessing;

import org.apache.flink.annotation.Internal;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

/**
* Key accounting unit holds the current in-flight key and tracks the corresponding ongoing records,
* which is used to preserve the ordering of independent chained {@link
* org.apache.flink.api.common.state.v2.StateFuture}.
*
* @param <R> the type of record
* @param <K> the type of key
*/
@Internal
public class KeyAccountingUnit<R, K> {

private static final Logger LOG = LoggerFactory.getLogger(KeyAccountingUnit.class);

/** The in-flight records that are being processed, their keys are different from each other. */
private final Map<K, R> noConflictInFlightRecords;

public KeyAccountingUnit() {
this.noConflictInFlightRecords = new ConcurrentHashMap<>();
}

/**
* Check if the record is available for processing. This method should be called in main task
* thread. For the same record, this method can be reentered.
*
* @param record the record
* @param key the key inside the record
* @return true if the key is available
*/
public boolean available(R record, K key) {
if (noConflictInFlightRecords.containsKey(key)) {
return noConflictInFlightRecords.get(key) == record;
}
return true;
}

/**
* Occupy a key for processing, the subsequent records with the same key would be blocked until
* the previous key release.
*/
public void occupy(R record, K key) {
if (!available(record, key)) {
throw new IllegalStateException(
String.format("The record %s(%s) is already occupied.", record, key));
}
noConflictInFlightRecords.put(key, record);
LOG.trace("occupy key {} for record {}", key, record);
}

/** Release a key, which is invoked when a {@link RecordContext} is released. */
public void release(R record, K key) {
R existingRecord = noConflictInFlightRecords.remove(key);
LOG.trace("release key {} for record {}, existing record {}", key, record, existingRecord);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskprocessing;

import org.apache.flink.annotation.Internal;

/**
* Enumeration for the execution order under asynchronous state APIs. For synchronous state APIs,
* the execution order is always {@link #ELEMENT_ORDER}. {@link #STATE_ORDER} generally has higher
* performance than {@link #ELEMENT_ORDER}. Note: {@link #STATE_ORDER} is an advance option, please
* make sure you are aware of possible out-of-order situations under asynchronous state APIs.
*/
@Internal
public enum OrderPreserveMode {
/** The records with same keys are strictly processed in order of arrival. */
ELEMENT_ORDER,
/**
* For same-key records, state requests and subsequent callbacks are processed in the order in
* which each record makes its first state request. But the code before the first state request
* for each record can be processed out-of-order with requests from other records.
*/
STATE_ORDER
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskprocessing;

import org.apache.flink.api.common.state.v2.State;
import org.apache.flink.api.common.state.v2.StateFuture;

import java.util.Optional;

/**
* A processing request encapsulates the parameters, the {@link State} to access, and the type of
* access.
*
* @param <OUT> Type of value that request will return.
*/
public interface ProcessingRequest<OUT> {
/** The underlying state to be accessed, can be empty. */
Optional<?> getUnderlyingState();

/** The parameter of the request. */
Parameter getParameter();

/** The future to collect the result of the request. */
StateFuture<OUT> getFuture();

RequestType getRequestType();

/** The type of processing request. */
enum RequestType {
/** Process one record without state access. */
SYNC,
/** Get from one {@link State}. */
/** Delete from one {@link State}. */
DELETE,
GET,
/** Put to one {@link State}. */
PUT,
/** Merge value to an exist key in {@link State}. Mainly used for listState. */
MERGE
}

/** The parameter of the request. */
interface Parameter<K> {
/**
* The key of one request. Except for requests of {@link RequestType#SYNC}, all other
* requests should provide a key.
*/
Optional<K> getKey();

/** The value of one request. */
Optional<?> getValue();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.taskprocessing;

import org.apache.flink.annotation.Internal;

import java.util.Objects;

/**
* A context that preserves the necessary variables required by each operation, all operations for
* one record will share the same element context.
*
* @param <R> The type of the record that extends {@link
* org.apache.flink.streaming.runtime.streamrecord.StreamElement}. TODO(FLIP-409): move
* StreamElement to flink-core or flink-runtime module.
* @param <K> The type of the key inside the record.
*/
@Internal
public class RecordContext<R, K> extends ReferenceCounted {

/** The record to be processed. */
private final R record;

/** The key inside the record. */
private final K key;

public RecordContext(R record, K key) {
super(0);
this.record = record;
this.key = key;
}

public K getKey() {
return this.key;
}

@Override
protected void referenceCountReachedZero() {
// TODO: release internal resources that this record context holds.
}

@Override
public int hashCode() {
return Objects.hash(record, key);
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
RecordContext<?, ?> that = (RecordContext<?, ?>) o;
if (!Objects.equals(record, that.record)) {
return false;
}
return Objects.equals(key, that.key);
}

@Override
public String toString() {
return "RecordContext{" + "record=" + record + ", key=" + key + '}';
}
}
Loading

0 comments on commit d7776b2

Please sign in to comment.