forked from apache/flink
-
Notifications
You must be signed in to change notification settings - Fork 1
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[FLINK-34986][Runtime/State] Introduce element-control component
- Loading branch information
Showing
11 changed files
with
863 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
80 changes: 80 additions & 0 deletions
80
...ntime/src/main/java/org/apache/flink/runtime/taskprocessing/AsyncExecutionController.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,80 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.taskprocessing; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
import org.apache.flink.api.common.operators.MailboxExecutor; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Collections; | ||
|
||
/** | ||
* The Async Execution Controller (AEC) receives processing requests from operators, and put them | ||
* into execution according to some strategies. | ||
* | ||
* <p>It is responsible for: | ||
* <li>Preserving the sequence of elements bearing the same key by delaying subsequent requests | ||
* until the processing of preceding ones is finalized. | ||
* <li>Tracking the in-flight data(records) and blocking the input if too much data in flight | ||
* (back-pressure). It invokes {@link MailboxExecutor#yield()} to pause current operations, | ||
* allowing for the execution of callbacks (mails in Mailbox). | ||
* | ||
* @param <R> the type of the record | ||
* @param <K> the type of the key | ||
*/ | ||
@Internal | ||
public class AsyncExecutionController<R, K> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(AsyncExecutionController.class); | ||
|
||
public static final int DEFAULT_MAX_IN_FLIGHT_RECORD_NUM = 6000; | ||
|
||
/** The max allow number of in-flight records. */ | ||
private final int maxInFlightRecordNum; | ||
|
||
/** The key accounting unit which is used to detect the key conflict. */ | ||
private final KeyAccountingUnit<R, K> keyAccountingUnit; | ||
|
||
/** The mailbox executor, borrowed from {@code StreamTask}. */ | ||
private final MailboxExecutor mailboxExecutor; | ||
|
||
/** The state executor where the {@link ProcessingRequest} is actually executed. */ | ||
private final StateExecutor stateExecutor; | ||
|
||
public AsyncExecutionController(MailboxExecutor mailboxExecutor, StateExecutor stateExecutor) { | ||
this(mailboxExecutor, stateExecutor, DEFAULT_MAX_IN_FLIGHT_RECORD_NUM); | ||
} | ||
|
||
public AsyncExecutionController( | ||
MailboxExecutor mailboxExecutor, StateExecutor stateExecutor, int maxInFlightRecords) { | ||
this.mailboxExecutor = mailboxExecutor; | ||
this.stateExecutor = stateExecutor; | ||
this.maxInFlightRecordNum = maxInFlightRecords; | ||
this.keyAccountingUnit = new KeyAccountingUnit<>(); | ||
LOG.info("Create AsyncExecutionController: maxInFlightRecordsNum {}", maxInFlightRecords); | ||
} | ||
|
||
public <OUT> void handleProcessingRequest( | ||
ProcessingRequest<OUT> request, RecordContext<K, R> recordContext) { | ||
// TODO(implement): preserve key order | ||
stateExecutor.executeBatchRequests(Collections.singleton(request)); | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/KeyAccountingUnit.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.taskprocessing; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
|
||
import org.slf4j.Logger; | ||
import org.slf4j.LoggerFactory; | ||
|
||
import java.util.Map; | ||
import java.util.concurrent.ConcurrentHashMap; | ||
|
||
/** | ||
* Key accounting unit holds the current in-flight key and tracks the corresponding ongoing records, | ||
* which is used to preserve the ordering of independent chained {@link | ||
* org.apache.flink.api.common.state.v2.StateFuture}. | ||
* | ||
* @param <R> the type of record | ||
* @param <K> the type of key | ||
*/ | ||
@Internal | ||
public class KeyAccountingUnit<R, K> { | ||
|
||
private static final Logger LOG = LoggerFactory.getLogger(KeyAccountingUnit.class); | ||
|
||
/** The in-flight records that are being processed, their keys are different from each other. */ | ||
private final Map<K, R> noConflictInFlightRecords; | ||
|
||
public KeyAccountingUnit() { | ||
this.noConflictInFlightRecords = new ConcurrentHashMap<>(); | ||
} | ||
|
||
/** | ||
* Check if the record is available for processing. This method should be called in main task | ||
* thread. For the same record, this method can be reentered. | ||
* | ||
* @param record the record | ||
* @param key the key inside the record | ||
* @return true if the key is available | ||
*/ | ||
public boolean available(R record, K key) { | ||
if (noConflictInFlightRecords.containsKey(key)) { | ||
return noConflictInFlightRecords.get(key) == record; | ||
} | ||
return true; | ||
} | ||
|
||
/** | ||
* Occupy a key for processing, the subsequent records with the same key would be blocked until | ||
* the previous key release. | ||
*/ | ||
public void occupy(R record, K key) { | ||
if (!available(record, key)) { | ||
throw new IllegalStateException( | ||
String.format("The record %s(%s) is already occupied.", record, key)); | ||
} | ||
noConflictInFlightRecords.put(key, record); | ||
LOG.trace("occupy key {} for record {}", key, record); | ||
} | ||
|
||
/** Release a key, which is invoked when a {@link RecordContext} is released. */ | ||
public void release(R record, K key) { | ||
R existingRecord = noConflictInFlightRecords.remove(key); | ||
LOG.trace("release key {} for record {}, existing record {}", key, record, existingRecord); | ||
} | ||
} |
38 changes: 38 additions & 0 deletions
38
flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/OrderPreserveMode.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,38 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one or more | ||
* contributor license agreements. See the NOTICE file distributed with | ||
* this work for additional information regarding copyright ownership. | ||
* The ASF licenses this file to You under the Apache License, Version 2.0 | ||
* (the "License"); you may not use this file except in compliance with | ||
* the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.taskprocessing; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
|
||
/** | ||
* Enumeration for the execution order under asynchronous state APIs. For synchronous state APIs, | ||
* the execution order is always {@link #ELEMENT_ORDER}. {@link #STATE_ORDER} generally has higher | ||
* performance than {@link #ELEMENT_ORDER}. Note: {@link #STATE_ORDER} is an advance option, please | ||
* make sure you are aware of possible out-of-order situations under asynchronous state APIs. | ||
*/ | ||
@Internal | ||
public enum OrderPreserveMode { | ||
/** The records with same keys are strictly processed in order of arrival. */ | ||
ELEMENT_ORDER, | ||
/** | ||
* For same-key records, state requests and subsequent callbacks are processed in the order in | ||
* which each record makes its first state request. But the code before the first state request | ||
* for each record can be processed out-of-order with requests from other records. | ||
*/ | ||
STATE_ORDER | ||
} |
69 changes: 69 additions & 0 deletions
69
flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/ProcessingRequest.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,69 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.taskprocessing; | ||
|
||
import org.apache.flink.api.common.state.v2.State; | ||
import org.apache.flink.api.common.state.v2.StateFuture; | ||
|
||
import java.util.Optional; | ||
|
||
/** | ||
* A processing request encapsulates the parameters, the {@link State} to access, and the type of | ||
* access. | ||
* | ||
* @param <OUT> Type of value that request will return. | ||
*/ | ||
public interface ProcessingRequest<OUT> { | ||
/** The underlying state to be accessed, can be empty. */ | ||
Optional<?> getUnderlyingState(); | ||
|
||
/** The parameter of the request. */ | ||
Parameter getParameter(); | ||
|
||
/** The future to collect the result of the request. */ | ||
StateFuture<OUT> getFuture(); | ||
|
||
RequestType getRequestType(); | ||
|
||
/** The type of processing request. */ | ||
enum RequestType { | ||
/** Process one record without state access. */ | ||
SYNC, | ||
/** Get from one {@link State}. */ | ||
/** Delete from one {@link State}. */ | ||
DELETE, | ||
GET, | ||
/** Put to one {@link State}. */ | ||
PUT, | ||
/** Merge value to an exist key in {@link State}. Mainly used for listState. */ | ||
MERGE | ||
} | ||
|
||
/** The parameter of the request. */ | ||
interface Parameter<K> { | ||
/** | ||
* The key of one request. Except for requests of {@link RequestType#SYNC}, all other | ||
* requests should provide a key. | ||
*/ | ||
Optional<K> getKey(); | ||
|
||
/** The value of one request. */ | ||
Optional<?> getValue(); | ||
} | ||
} |
82 changes: 82 additions & 0 deletions
82
flink-runtime/src/main/java/org/apache/flink/runtime/taskprocessing/RecordContext.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,82 @@ | ||
/* | ||
* Licensed to the Apache Software Foundation (ASF) under one | ||
* or more contributor license agreements. See the NOTICE file | ||
* distributed with this work for additional information | ||
* regarding copyright ownership. The ASF licenses this file | ||
* to you under the Apache License, Version 2.0 (the | ||
* "License"); you may not use this file except in compliance | ||
* with the License. You may obtain a copy of the License at | ||
* | ||
* http://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package org.apache.flink.runtime.taskprocessing; | ||
|
||
import org.apache.flink.annotation.Internal; | ||
|
||
import java.util.Objects; | ||
|
||
/** | ||
* A context that preserves the necessary variables required by each operation, all operations for | ||
* one record will share the same element context. | ||
* | ||
* @param <R> The type of the record that extends {@link | ||
* org.apache.flink.streaming.runtime.streamrecord.StreamElement}. TODO(FLIP-409): move | ||
* StreamElement to flink-core or flink-runtime module. | ||
* @param <K> The type of the key inside the record. | ||
*/ | ||
@Internal | ||
public class RecordContext<R, K> extends ReferenceCounted { | ||
|
||
/** The record to be processed. */ | ||
private final R record; | ||
|
||
/** The key inside the record. */ | ||
private final K key; | ||
|
||
public RecordContext(R record, K key) { | ||
super(0); | ||
this.record = record; | ||
this.key = key; | ||
} | ||
|
||
public K getKey() { | ||
return this.key; | ||
} | ||
|
||
@Override | ||
protected void referenceCountReachedZero() { | ||
// TODO: release internal resources that this record context holds. | ||
} | ||
|
||
@Override | ||
public int hashCode() { | ||
return Objects.hash(record, key); | ||
} | ||
|
||
@Override | ||
public boolean equals(Object o) { | ||
if (this == o) { | ||
return true; | ||
} | ||
if (o == null || getClass() != o.getClass()) { | ||
return false; | ||
} | ||
RecordContext<?, ?> that = (RecordContext<?, ?>) o; | ||
if (!Objects.equals(record, that.record)) { | ||
return false; | ||
} | ||
return Objects.equals(key, that.key); | ||
} | ||
|
||
@Override | ||
public String toString() { | ||
return "RecordContext{" + "record=" + record + ", key=" + key + '}'; | ||
} | ||
} |
Oops, something went wrong.