Skip to content

Commit

Permalink
[FLINK-36120][Runtime] Declare context variables without the ability …
Browse files Browse the repository at this point in the history
…of serialization
  • Loading branch information
Zakelly committed Dec 25, 2024
1 parent 6b3bca9 commit f6b3f57
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ public class RecordContext<K> extends ReferenceCounted<RecordContext.DisposerRun
private Map<InternalPartitionedState<?>, Object> namespaces = null;

/** User-defined variables. */
private final AtomicReferenceArray<Object> declaredVariables;
private final AtomicReferenceArray<Object> contextVariables;

/**
* The extra context info which is used to hold customized data defined by state backend. The
Expand Down Expand Up @@ -100,7 +100,7 @@ public RecordContext(
this.disposer = disposer;
this.keyGroup = keyGroup;
this.epoch = epoch;
this.declaredVariables = variables;
this.contextVariables = variables;
}

public Object getRecord() {
Expand Down Expand Up @@ -152,24 +152,24 @@ public <N> void setNamespace(InternalPartitionedState<N> state, N namespace) {
@SuppressWarnings("unchecked")
public <T> T getVariable(int i) {
checkVariableIndex(i);
return (T) declaredVariables.get(i);
return (T) contextVariables.get(i);
}

public <T> void setVariable(int i, T value) {
checkVariableIndex(i);
declaredVariables.set(i, value);
contextVariables.set(i, value);
}

private void checkVariableIndex(int i) {
if (i >= declaredVariables.length()) {
if (i >= contextVariables.length()) {
throw new UnsupportedOperationException(
"Variable index out of bounds. Maybe you are accessing "
+ "a variable that have not been declared.");
}
}

AtomicReferenceArray<Object> getVariablesReference() {
return declaredVariables;
return contextVariables;
}

public void setExtra(Object extra) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.runtime.asyncprocessing.declare;

import javax.annotation.Nullable;

import java.util.function.Supplier;

/** A value that will have different values across different contexts. */
public class ContextVariable<T> {

final DeclarationManager manager;

final int ordinal;

@Nullable final Supplier<T> initializer;

boolean initialized = false;

ContextVariable(DeclarationManager manager, int ordinal, Supplier<T> initializer) {
this.manager = manager;
this.ordinal = ordinal;
this.initializer = initializer;
}

public T get() {
if (!initialized && initializer != null) {
manager.setVariableValue(ordinal, initializer.get());
initialized = true;
}
return manager.getVariableValue(ordinal);
}

public void set(T newValue) {
manager.setVariableValue(ordinal, newValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,19 @@ public <T, U, V> NamedBiFunction<T, U, V> declare(
public <T> DeclaredVariable<T> declareVariable(
TypeSerializer<T> serializer, String name, @Nullable Supplier<T> initialValue)
throws DeclarationException {
return manager.register(serializer, name, initialValue);
return manager.registerVariable(serializer, name, initialValue);
}

/**
* Declare a variable that will keep value across callback with same context. This value cannot
* be serialized into checkpoint.
*
* @param initializer the initializer of variable. Can be null if no need to initialize.
* @param <T> The type of value.
*/
public <T> ContextVariable<T> declareVariable(@Nullable Supplier<T> initializer)
throws DeclarationException {
return manager.registerVariable(initializer);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ public class DeclarationManager {

private int nextValidNameSequence = 0;

private int contextVariableCount = 0;

public DeclarationManager() {
this.knownCallbacks = new HashMap<>();
this.knownVariables = new HashMap<>();
Expand All @@ -50,14 +52,19 @@ <T extends NamedCallback> T register(T knownCallback) throws DeclarationExceptio
return knownCallback;
}

<T> DeclaredVariable<T> register(
<T> ContextVariable<T> registerVariable(@Nullable Supplier<T> initializer)
throws DeclarationException {
return new ContextVariable<>(this, contextVariableCount++, initializer);
}

<T> DeclaredVariable<T> registerVariable(
TypeSerializer<T> serializer, String name, @Nullable Supplier<T> initializer)
throws DeclarationException {
if (knownVariables.containsKey(name)) {
throw new DeclarationException("Duplicated variable key " + name);
}
DeclaredVariable<T> variable =
new DeclaredVariable<>(this, knownVariables.size(), serializer, name, initializer);
new DeclaredVariable<>(this, contextVariableCount++, serializer, name, initializer);
knownVariables.put(name, variable);
return variable;
}
Expand All @@ -81,7 +88,7 @@ public <T> void setVariableValue(int ordinal, T value) {
}

public int variableCount() {
return knownVariables.size();
return contextVariableCount;
}

String nextAssignedName(String prefix) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,41 +25,20 @@
import java.util.function.Supplier;

/** A variable declared in async state processing. The value could be persisted in checkpoint. */
public class DeclaredVariable<T> {

final DeclarationManager manager;

final int ordinal;
public class DeclaredVariable<T> extends ContextVariable<T> {

final TypeSerializer<T> typeSerializer;

final String name;

@Nullable final Supplier<T> initializer;

DeclaredVariable(
DeclarationManager manager,
int ordinal,
TypeSerializer<T> typeSerializer,
String name,
@Nullable Supplier<T> initializer) {
this.manager = manager;
this.ordinal = ordinal;
super(manager, ordinal, initializer);
this.typeSerializer = typeSerializer;
this.name = name;
this.initializer = initializer;
}

public T get() {
T t = manager.getVariableValue(ordinal);
if (t == null && initializer != null) {
t = initializer.get();
manager.setVariableValue(ordinal, t);
}
return t;
}

public void set(T newValue) {
manager.setVariableValue(ordinal, newValue);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.core.state.StateFutureUtils;
import org.apache.flink.runtime.asyncprocessing.declare.ContextVariable;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationContext;
import org.apache.flink.runtime.asyncprocessing.declare.DeclarationException;
import org.apache.flink.runtime.asyncprocessing.declare.NamedCallback;
Expand Down Expand Up @@ -61,11 +62,11 @@ public void testNormalProcessor(boolean chained) throws Exception {
testOperator, (e) -> e.f0, TypeInformation.of(Integer.class))) {
testHarness.open();
testHarness.processElement(new StreamRecord<>(Tuple2.of(5, "5")));
expectedOutput.add(new StreamRecord<>("12"));
assertThat(function.getValue()).isEqualTo(12);
expectedOutput.add(new StreamRecord<>("11"));
assertThat(function.getValue()).isEqualTo(11);
testHarness.processElement(new StreamRecord<>(Tuple2.of(6, "6")));
expectedOutput.add(new StreamRecord<>("38"));
assertThat(function.getValue()).isEqualTo(38);
expectedOutput.add(new StreamRecord<>("24"));
assertThat(function.getValue()).isEqualTo(24);
assertThat(testHarness.getOutput()).containsExactly(expectedOutput.toArray());
}
}
Expand All @@ -86,13 +87,13 @@ public void testTimerProcessor() throws Exception {
testHarness.processElement(new StreamRecord<>(Tuple2.of(6, "5")));
assertThat(function.getValue()).isEqualTo(0);
testHarness.processWatermark(5L);
expectedOutput.add(new StreamRecord<>("12", 5L));
expectedOutput.add(new StreamRecord<>("11", 5L));
expectedOutput.add(new Watermark(5L));
assertThat(function.getValue()).isEqualTo(12);
assertThat(function.getValue()).isEqualTo(11);
testHarness.processWatermark(6L);
expectedOutput.add(new StreamRecord<>("38", 6L));
expectedOutput.add(new StreamRecord<>("24", 6L));
expectedOutput.add(new Watermark(6L));
assertThat(function.getValue()).isEqualTo(38);
assertThat(function.getValue()).isEqualTo(24);
assertThat(testHarness.getOutput()).containsExactly(expectedOutput.toArray());
}
}
Expand All @@ -112,6 +113,7 @@ private static class TestNormalDeclarationFunction extends TestDeclarationFuncti
public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(
DeclarationContext context, Context ctx, Collector<String> out)
throws DeclarationException {
ContextVariable<Integer> inputValue = context.declareVariable(null);
NamedFunction<Void, StateFuture<Integer>> adder =
context.declare(
"adder",
Expand All @@ -122,12 +124,15 @@ public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(
context.declare(
"doubler",
(v) -> {
value.addAndGet(v);
value.addAndGet(inputValue.get());
out.collect(String.valueOf(value.get()));
});
assertThat(adder).isInstanceOf(NamedCallback.class);
assertThat(doubler).isInstanceOf(NamedCallback.class);
return (e) -> {
if (inputValue.get() == null) {
inputValue.set(e.f0);
}
value.addAndGet(e.f0);
StateFutureUtils.<Void>completedVoidFuture().thenCompose(adder).thenAccept(doubler);
};
Expand All @@ -140,17 +145,21 @@ private static class TestChainDeclarationFunction extends TestDeclarationFunctio
public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(
DeclarationContext context, Context ctx, Collector<String> out)
throws DeclarationException {
ContextVariable<Integer> inputValue = context.declareVariable(null);
return context.<Tuple2<Integer, String>>declareChain()
.thenCompose(
e -> {
if (inputValue.get() == null) {
inputValue.set(e.f0);
}
value.addAndGet(e.f0);
return StateFutureUtils.completedVoidFuture();
})
.thenCompose(v -> StateFutureUtils.completedFuture(value.incrementAndGet()))
.withName("adder")
.thenAccept(
(v) -> {
value.addAndGet(v);
value.addAndGet(inputValue.get());
out.collect(String.valueOf(value.get()));
})
.withName("doubler")
Expand All @@ -176,17 +185,21 @@ public ThrowingConsumer<Tuple2<Integer, String>, Exception> declareProcess(
public ThrowingConsumer<Long, Exception> declareOnTimer(
DeclarationContext context, OnTimerContext ctx, Collector<String> out)
throws DeclarationException {
ContextVariable<Integer> inputValue = context.declareVariable(null);
return context.<Long>declareChain()
.thenCompose(
e -> {
if (inputValue.get() == null) {
inputValue.set(e.intValue());
}
value.addAndGet(e.intValue());
return StateFutureUtils.completedVoidFuture();
})
.thenCompose(v -> StateFutureUtils.completedFuture(value.incrementAndGet()))
.withName("adder")
.thenAccept(
(v) -> {
value.addAndGet(v);
value.addAndGet(inputValue.get());
out.collect(String.valueOf(value.get()));
})
.withName("doubler")
Expand Down

0 comments on commit f6b3f57

Please sign in to comment.