You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
publicvoidsetUidHash(StringuidHash) {
Preconditions.checkNotNull(uidHash);
// 校验hashcode为大小写26字母和0到9的数字,并且个数为32个Preconditions.checkArgument(uidHash.matches("^[0-9A-Fa-f]{32}$"),
"Node hash must be a 32 character String that describes a hex code. Found: " + uidHash);
// 用于提供的hashNodethis.userProvidedNodeHash = uidHash;
}
/***PIPELINED:以流水线方式(包括shuffle和广播)执行程序,但在流水线操作时容易引起死锁的数据交换除外。 这些数据交换以批处理方式执行。 容易发生死锁的情况(当以流水线方式执行时)的一个例子是分支的数据流(一个数据集被多个操作消耗)并在以后重新加入*PIPELINED_FORCED:强制流水线方式,PIPELINED是可选配置可以在死锁是执行BATCH模式*BATCH:此模式以批处理方式执行所有随机播放和广播,同时在仅在一个生产者和一个消费者之间本地交换数据的操作之间对数据进行流水线处理。BATCH_FORCED:此模式以严格的批处理方式执行程序,包括将数据从一个生产者本地转发到一个消费者的所有点。 与BATCH模式相比,执行此模式通常会更昂贵。 它确实保证不会同时执行任何后续操作。*/privateExecutionModeexecutionMode = ExecutionMode.PIPELINED;
/*RECURSIVE:递归全部字段* TOP_LEVEL:递归最上层字段* NONE:不进行clean*/privateClosureCleanerLevelclosureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
// 默认并行度,parallelism.defaultprivateintparallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
privateintmaxParallelism = -1;
// 是否强制使用kryoprivatebooleanforceKryo = false;
// kryo不支持泛型privatebooleandisableGenericTypes = false;
// 开启自动生成算子uidprivatebooleanenableAutoGeneratedUids = true;
// 是否开启对象reuseprivatebooleanobjectReuse = false;
// 动态类型注册privatebooleanautoTypeRegistrationEnabled = true;
// 使用avroprivatebooleanforceAvro = false;
privateCodeAnalysisModecodeAnalysisMode = CodeAnalysisMode.DISABLE;
// 自动生成watermark间隔privatelongautoWatermarkInterval = 0;
/** * Interval in milliseconds for sending latency tracking marks from the sources to the sinks. * 发送任务指标的间隔 */privatelonglatencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
//是否采集指标privatebooleanisLatencyTrackingConfigured = false;
// 重启策略privateRestartStrategies.RestartStrategyConfigurationrestartStrategyConfiguration =
newRestartStrategies.FallbackRestartStrategyConfiguration();
// 任务取消间隔privatelongtaskCancellationIntervalMillis = -1;
/** * 超时之后,正在进行的任务取消将导致致命的TaskManager错误,通常会杀死JVM。 * Timeout after which an ongoing task cancellation will lead to a fatal * TaskManager error, usually killing the JVM. */privatelongtaskCancellationTimeoutMillis = -1;
/** This flag defines if we use compression for the state snapshot data or not. Default: false */privatebooleanuseSnapshotCompression = false;
public <RextendsTuple> SingleOutputStreamOperator<R> project(int... fieldIndexes) {
returnnewStreamProjection<>(this, fieldIndexes).projectTupleX();
}
publicclassStreamProjection<IN> {
privateDataStream<IN> dataStream;
// 需要project字段的inedxprivateint[] fieldIndexes;
protectedStreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
// 判断是否是为tuple类型if (!dataStream.getType().isTupleType()) {
thrownewRuntimeException("Only Tuple DataStreams can be projected");
}
// 判断fieldIndexes长度是否合法if (fieldIndexes.length == 0) {
thrownewIllegalArgumentException("project() needs to select at least one (1) field.");
} elseif (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
thrownewIllegalArgumentException(
"project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
}
// 获取tuple最大字段indexintmaxFieldIndex = (dataStream.getType()).getArity();
// 校验索引是否大于maxFieldIndexfor (inti = 0; i < fieldIndexes.length; i++) {
Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
}
this.dataStream = dataStream;
this.fieldIndexes = fieldIndexes;
}
/** * Chooses a projectTupleX according to the length of {@link * org.apache.flink.streaming.api.datastream.StreamProjection#fieldIndexes}. * * @return The projected DataStream. * @see org.apache.flink.api.java.operators.ProjectOperator.Projection */@SuppressWarnings("unchecked")
public <OUTextendsTuple> SingleOutputStreamOperator<OUT> projectTupleX(){
// 返回具体的tuplex
}
public <T0> SingleOutputStreamOperator<Tuple1<T0>> projectTuple1() {
TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
TupleTypeInfo<Tuple1<T0>> tType = newTupleTypeInfo<Tuple1<T0>>(fTypes);
returndataStream.transform(
"Projection",
tType,
newStreamProject<IN, Tuple1<T0>>(
fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
}
}
// 具体的project流publicclassStreamProject<IN, OUTextendsTuple>
extendsAbstractStreamOperator<OUT>
implementsOneInputStreamOperator<IN, OUT> {
privatestaticfinallongserialVersionUID = 1L;
privateTypeSerializer<OUT> outSerializer;
privateint[] fields;
privateintnumFields;
privatetransientOUToutTuple;
publicStreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
this.fields = fields;
this.numFields = this.fields.length;
this.outSerializer = outSerializer;
chainingStrategy = ChainingStrategy.ALWAYS;
}
@OverridepublicvoidprocessElement(StreamRecord<IN> element) throwsException {
for (inti = 0; i < this.numFields; i++) {
outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
}
output.collect(element.replace(outTuple));
}
@Overridepublicvoidopen() throwsException {
super.open();
outTuple = outSerializer.createInstance();
}
}
多输出算子
coGroup
俩个Datastream group组合成CoGroupStream,apply操作数据为数据集合
public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
returnnewCoGroupedStreams<>(this, otherStream);
}
publicclassCoGroupedStreams<T1, T2> {
/** The first input stream. */privatefinalDataStream<T1> input1;
/** The second input stream. */privatefinalDataStream<T2> input2;
/** * Creates new CoGrouped data streams, which are the first step towards building a streaming * co-group. * * @param input1 The first data stream. * @param input2 The second data stream. */publicCoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
this.input1 = requireNonNull(input1);
this.input2 = requireNonNull(input2);
}
/** * Specifies a {@link KeySelector} for elements from the first input. * * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning. */public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector) {
Preconditions.checkNotNull(keySelector);
// 校验类型是否一致finalTypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
returnwhere(keySelector, keyType);
}
/** * Specifies a {@link KeySelector} for elements from the first input with explicit type information. * * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning. * @param keyType The type information describing the key type. */public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType) {
Preconditions.checkNotNull(keySelector);
Preconditions.checkNotNull(keyType);
// 指定关联键returnnewWhere<>(input1.clean(keySelector), keyType);
}
// ------------------------------------------------------------------------/** * CoGrouped streams that have the key for one side defined. * * @param <KEY> The type of the key. */@PublicpublicclassWhere<KEY> {
privatefinalKeySelector<T1, KEY> keySelector1;
privatefinalTypeInformation<KEY> keyType;
Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
this.keySelector1 = keySelector1;
this.keyType = keyType;
}
/** * Specifies a {@link KeySelector} for elements from the second input. * * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning. */publicEqualToequalTo(KeySelector<T2, KEY> keySelector) {
Preconditions.checkNotNull(keySelector);
finalTypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
returnequalTo(keySelector, otherKey);
}
/** * Specifies a {@link KeySelector} for elements from the second input with explicit type information for the key type. * * @param keySelector The KeySelector to be used for extracting the key for partitioning. * @param keyType The type information describing the key type. */publicEqualToequalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType) {
Preconditions.checkNotNull(keySelector);
Preconditions.checkNotNull(keyType);
if (!keyType.equals(this.keyType)) {
thrownewIllegalArgumentException("The keys for the two inputs are not equal: " +
"first key = " + this.keyType + " , second key = " + keyType);
}
returnnewEqualTo(input2.clean(keySelector));
}
// --------------------------------------------------------------------/** * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs. */@PublicpublicclassEqualTo {
privatefinalKeySelector<T2, KEY> keySelector2;
EqualTo(KeySelector<T2, KEY> keySelector2) {
this.keySelector2 = requireNonNull(keySelector2);
}
/** * 指定双流窗口函数 * Specifies the window on which the co-group operation works. */@PublicEvolvingpublic <WextendsWindow> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? superTaggedUnion<T1, T2>, W> assigner) {
returnnewWithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
}
}
}
publicclassTumblingProcessingTimeWindowsextendsWindowAssigner<Object, TimeWindow> {
privatestaticfinallongserialVersionUID = 1L;
// 窗口大小privatefinallongsize;
// 全局offsetprivatefinallongglobalOffset;
// 错开窗口offsetprivateLongstaggerOffset = null;
// 错开窗口策略privatefinalWindowStaggerwindowStagger;
/** * * @param size 窗口大小 * @param offset 全局offset * @param windowStagger 窗口错开策略 */privateTumblingProcessingTimeWindows(longsize, longoffset, WindowStaggerwindowStagger) {
if (Math.abs(offset) >= size) {
thrownewIllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
this.globalOffset = offset;
this.windowStagger = windowStagger;
}
/** * 指定窗口元素到Collection中 * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The {@link WindowAssignerContext} in which the assigner operates. * @return */@OverridepublicCollection<TimeWindow> assignWindows(Objectelement, longtimestamp, WindowAssignerContextcontext) {
// 获取当前processTimefinallongnow = context.getCurrentProcessingTime();
if (staggerOffset == null) {
// 获取错开窗口offset,支持三种策略:ALIGNED(从0开始)、RANDOM(窗口大小的任意倍)、NATURAL(在窗口操作符中接收到第一个事件时,取开始窗口和当前处理时间作为偏移量。这样,窗户是交错的基于每个并行运算符接收到第一个事件的时间。)staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
// 计算开始窗口起始时间longstart = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);
// 塞入窗口集合returnCollections.singletonList(newTimeWindow(start, start + size));
}
publiclonggetSize() {
returnsize;
}
@OverridepublicTrigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironmentenv) {
// 创建默认triggerreturnProcessingTimeTrigger.create();
}
@OverridepublicStringtoString() {
return"TumblingProcessingTimeWindows(" + size + ")";
}
/** * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp. * * @param size The size of the generated windows. * @return The time policy. */publicstaticTumblingProcessingTimeWindowsof(Timesize) {
// 创建滚动窗口,size为毫秒,offset设置为0,在全部分区上触发returnnewTumblingProcessingTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
}
/** * 创建一个新的TumblingProcessingTimeWindows WindowAssigner ,根据元素的时间戳和偏移量将元素分配给时间窗口。 * 例如,如果of(Time.hours(1),Time.minutes(15))小时显示流,但窗口从每小时的第15分钟开始,则可以使用of(Time.hours(1),Time.minutes(15)) ,然后将获得时间窗口开始在0:15:00、1:15:00、2:15:00等 * 而不是那样,如果您居住在不使用UTC±00:00时间的地方,例如使用UTC + 08:00的中国,并且您想要一个大小为一天的时间窗口,并且该窗口在每个时间开始当地时间00:00:00,您可以使用of(Time.days(1),Time.hours(-8)) 。 由于UTC + 08:00比UTC时间早8小时,所以offset参数为Time.hours(-8)) */publicstaticTumblingProcessingTimeWindowsof(Timesize, Timeoffset) {
returnnewTumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
}
@PublicEvolvingpublicstaticTumblingProcessingTimeWindowsof(Timesize, Timeoffset, WindowStaggerwindowStagger) {
returnnewTumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
}
@OverridepublicTypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfigexecutionConfig) {
returnnewTimeWindow.Serializer();
}
@OverridepublicbooleanisEventTime() {
returnfalse;
}
}
publicclassTumblingEventTimeWindowsextendsWindowAssigner<Object, TimeWindow> {
privatestaticfinallongserialVersionUID = 1L;
privatefinallongsize;
// 全局偏移privatefinallongglobalOffset;
// 错位偏移privateLongstaggerOffset = null;
privatefinalWindowStaggerwindowStagger;
protectedTumblingEventTimeWindows(longsize, longoffset, WindowStaggerwindowStagger) {
if (Math.abs(offset) >= size) {
thrownewIllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
}
this.size = size;
this.globalOffset = offset;
this.windowStagger = windowStagger;
}
/** * 分配窗口 * @param element The element to which windows should be assigned. * @param timestamp The timestamp of the element. * @param context The {@link WindowAssignerContext} in which the assigner operates. * @return */@OverridepublicCollection<TimeWindow> assignWindows(Objectelement, longtimestamp, WindowAssignerContextcontext) {
// 如果传入的eventTime大于最小值if (timestamp > Long.MIN_VALUE) {
//if (staggerOffset == null) {
staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
}
// Long.MIN_VALUE is currently assigned when no timestamp is presentlongstart = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
returnCollections.singletonList(newTimeWindow(start, start + size));
} else {
thrownewRuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
"'DataStream.assignTimestampsAndWatermarks(...)'?");
}
}
@OverridepublicTrigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironmentenv) {
returnEventTimeTrigger.create();
}
@OverridepublicStringtoString() {
return"TumblingEventTimeWindows(" + size + ")";
}
/** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp. * * @param size The size of the generated windows. * @return The time policy. */publicstaticTumblingEventTimeWindowsof(Timesize) {
returnnewTumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
}
/** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp and offset. * * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get * time windows start at 0:15:00,1:15:00,2:15:00,etc. * * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, * such as China which is using UTC+08:00,and you want a time window with size of one day, * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * * @param size The size of the generated windows. * @param offset The offset which window start would be shifted by. */publicstaticTumblingEventTimeWindowsof(Timesize, Timeoffset) {
returnnewTumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
}
/** * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp, offset and a staggering offset, * depending on the staggering policy. * * @param size The size of the generated windows. * @param offset The globalOffset which window start would be shifted by. * @param windowStagger The utility that produces staggering offset in runtime. */@PublicEvolvingpublicstaticTumblingEventTimeWindowsof(Timesize, Timeoffset, WindowStaggerwindowStagger) {
returnnewTumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
}
@OverridepublicTypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfigexecutionConfig) {
returnnewTimeWindow.Serializer();
}
@OverridepublicbooleanisEventTime() {
returntrue;
}
}
SlidingWindows
滑动窗口分为ProcessTime和EventTime
publicclassSlidingProcessingTimeWindowsextendsWindowAssigner<Object, TimeWindow> {
privatestaticfinallongserialVersionUID = 1L;
// 窗口大小privatefinallongsize;
// offsetprivatefinallongoffset;
// 滑动距离privatefinallongslide;
privateSlidingProcessingTimeWindows(longsize, longslide, longoffset) {
if (Math.abs(offset) >= slide || size <= 0) {
thrownewIllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " +
"abs(offset) < slide and size > 0");
}
this.size = size;
this.slide = slide;
this.offset = offset;
}
@OverridepublicCollection<TimeWindow> assignWindows(Objectelement, longtimestamp, WindowAssignerContextcontext) {
timestamp = context.getCurrentProcessingTime();
// 窗口大小/滑动补长 计算一个window size下存在几个windowList<TimeWindow> windows = newArrayList<>((int) (size / slide));
// 计算起始位置longlastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
// 从start开始,在size范围内,每次滑动slide,比如start为0,size为5 slide为3,windows存储为0~5,3~8,6~11.for (longstart = lastStart;
start > timestamp - size;
start -= slide) {
// 计算的窗口放入listwindows.add(newTimeWindow(start, start + size));
}
returnwindows;
}
publiclonggetSize() {
returnsize;
}
publiclonggetSlide() {
returnslide;
}
@OverridepublicTrigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironmentenv) {
// 只会到窗口时间触发计算,累加的方式returnProcessingTimeTrigger.create();
}
@OverridepublicStringtoString() {
return"SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
}
/** * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns * elements to sliding time windows based on the element timestamp. * * @param size The size of the generated windows. * @param slide The slide interval of the generated windows. * @return The time policy. */publicstaticSlidingProcessingTimeWindowsof(Timesize, Timeslide) {
returnnewSlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
}
/** * 用于控制数据丢失时区问题 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns * elements to time windows based on the element timestamp and offset. * * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get * time windows start at 0:15:00,1:15:00,2:15:00,etc. * * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time, * such as China which is using UTC+08:00,and you want a time window with size of one day, * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}. * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time. * * @param size The size of the generated windows. * @param slide The slide interval of the generated windows. * @param offset The offset which window start would be shifted by. * @return The time policy. */publicstaticSlidingProcessingTimeWindowsof(Timesize, Timeslide, Timeoffset) {
returnnewSlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
}
@OverridepublicTypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfigexecutionConfig) {
returnnewTimeWindow.Serializer();
}
@OverridepublicbooleanisEventTime() {
returnfalse;
}
}
// eventTime核心逻辑publicTriggerResultonElement(Objectelement, longtimestamp, TimeWindowwindow, TriggerContextctx) throwsException {
// 如果窗口的最大时间,小于等于当前watermarker,则将窗口数据发出,但是不会清空数据if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediatelyreturnTriggerResult.FIRE;
} else {
// 如果时间超过watermark,则注册计时器,并且统计数据ctx.registerEventTimeTimer(window.maxTimestamp());
returnTriggerResult.CONTINUE;
}
}
GlobalWindows
privateGlobalWindow() { }
publicstaticGlobalWindowget() {
returnINSTANCE;
}
@OverridepubliclongmaxTimestamp() {
returnLong.MAX_VALUE;
}
publicclassGlobalWindowsextendsWindowAssigner<Object, GlobalWindow> {
privatestaticfinallongserialVersionUID = 1L;
privateGlobalWindows() {}
@OverridepublicCollection<GlobalWindow> assignWindows(Objectelement, longtimestamp, WindowAssignerContextcontext) {
returnCollections.singletonList(GlobalWindow.get());
}
/** * 默认触发器为:直接跳过 * @param env * @return */@OverridepublicTrigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironmentenv) {
returnnewNeverTrigger();
}
@OverridepublicStringtoString() {
return"GlobalWindows()";
}
/** * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns * all elements to the same {@link GlobalWindow}. * * @return The global window policy. */publicstaticGlobalWindowscreate() {
returnnewGlobalWindows();
}
/** * A trigger that never fires, as default Trigger for GlobalWindows. */@InternalpublicstaticclassNeverTriggerextendsTrigger<Object, GlobalWindow> {
privatestaticfinallongserialVersionUID = 1L;
@OverridepublicTriggerResultonElement(Objectelement, longtimestamp, GlobalWindowwindow, TriggerContextctx) {
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonEventTime(longtime, GlobalWindowwindow, TriggerContextctx) {
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonProcessingTime(longtime, GlobalWindowwindow, TriggerContextctx) {
returnTriggerResult.CONTINUE;
}
@Overridepublicvoidclear(GlobalWindowwindow, TriggerContextctx) throwsException {}
@OverridepublicvoidonMerge(GlobalWindowwindow, OnMergeContextctx) {
}
}
@OverridepublicTypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfigexecutionConfig) {
returnnewGlobalWindow.Serializer();
}
@OverridepublicbooleanisEventTime() {
returnfalse;
}
}
publicclassCountEvictor<WextendsWindow> implementsEvictor<Object, W> {
privatestaticfinallongserialVersionUID = 1L;
// 最大的读取countprivatefinallongmaxCount;
// 是否之后处理privatefinalbooleandoEvictAfter;
privateCountEvictor(longcount, booleandoEvictAfter) {
this.maxCount = count;
this.doEvictAfter = doEvictAfter;
}
privateCountEvictor(longcount) {
this.maxCount = count;
this.doEvictAfter = false;
}
@OverridepublicvoidevictBefore(Iterable<TimestampedValue<Object>> elements, intsize, Wwindow, EvictorContextctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
@OverridepublicvoidevictAfter(Iterable<TimestampedValue<Object>> elements, intsize, Wwindow, EvictorContextctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
/** * 处理前n条元素 * @param elements * @param size * @param ctx */privatevoidevict(Iterable<TimestampedValue<Object>> elements, intsize, EvictorContextctx) {
// 如果如果count大于等于size,直接可以在窗口内处理,否则去要处理elements迭代器if (size <= maxCount) {
return;
} else {
// 记录evicted的数据数量intevictedCount = 0;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
iterator.next();
evictedCount++;
// 如果删除的数量大于阈值终止if (evictedCount > size - maxCount) {
break;
} else {
iterator.remove();
}
}
}
}
/** * Creates a {@code CountEvictor} that keeps the given number of elements. * Eviction is done before the window function. * * @param maxCount The number of elements to keep in the pane. */publicstatic <WextendsWindow> CountEvictor<W> of(longmaxCount) {
returnnewCountEvictor<>(maxCount);
}
/** * Creates a {@code CountEvictor} that keeps the given number of elements in the pane * Eviction is done before/after the window function based on the value of doEvictAfter. * * @param maxCount The number of elements to keep in the pane. * @param doEvictAfter Whether to do eviction after the window function. */publicstatic <WextendsWindow> CountEvictor<W> of(longmaxCount, booleandoEvictAfter) {
returnnewCountEvictor<>(maxCount, doEvictAfter);
}
}
TimeEvictor
publicclassTimeEvictor<WextendsWindow> implementsEvictor<Object, W> {
privatestaticfinallongserialVersionUID = 1L;
// 窗口大小privatefinallongwindowSize;
privatefinalbooleandoEvictAfter;
publicTimeEvictor(longwindowSize) {
this.windowSize = windowSize;
this.doEvictAfter = false;
}
publicTimeEvictor(longwindowSize, booleandoEvictAfter) {
this.windowSize = windowSize;
this.doEvictAfter = doEvictAfter;
}
@OverridepublicvoidevictBefore(Iterable<TimestampedValue<Object>> elements, intsize, Wwindow, EvictorContextctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
@OverridepublicvoidevictAfter(Iterable<TimestampedValue<Object>> elements, intsize, Wwindow, EvictorContextctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
privatevoidevict(Iterable<TimestampedValue<Object>> elements, intsize, EvictorContextctx) {
// 如果记录没有时间戳直接终止if (!hasTimestamp(elements)) {
return;
}
// 获取记录中最大时间longcurrentTime = getMaxTimestamp(elements);
// 获取可以读取的timestamplongevictCutoff = currentTime - windowSize;
// 遍历删除小于等于evictCutoff的记录for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
TimestampedValue<Object> record = iterator.next();
if (record.getTimestamp() <= evictCutoff) {
iterator.remove();
}
}
}
/** * Returns true if the first element in the Iterable of {@link TimestampedValue} has a timestamp. */privatebooleanhasTimestamp(Iterable<TimestampedValue<Object>> elements) {
Iterator<TimestampedValue<Object>> it = elements.iterator();
if (it.hasNext()) {
returnit.next().hasTimestamp();
}
returnfalse;
}
/** * @param elements The elements currently in the pane. * @return The maximum value of timestamp among the elements. */privatelonggetMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
longcurrentTime = Long.MIN_VALUE;
for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<Object> record = iterator.next();
currentTime = Math.max(currentTime, record.getTimestamp());
}
returncurrentTime;
}
@OverridepublicStringtoString() {
return"TimeEvictor(" + windowSize + ")";
}
@VisibleForTestingpubliclonggetWindowSize() {
returnwindowSize;
}
/** * Creates a {@code TimeEvictor} that keeps the given number of elements. * Eviction is done before the window function. * * @param windowSize The amount of time for which to keep elements. */publicstatic <WextendsWindow> TimeEvictor<W> of(TimewindowSize) {
returnnewTimeEvictor<>(windowSize.toMilliseconds());
}
/** * Creates a {@code TimeEvictor} that keeps the given number of elements. * Eviction is done before/after the window function based on the value of doEvictAfter. * * @param windowSize The amount of time for which to keep elements. * @param doEvictAfter Whether eviction is done after window function. */publicstatic <WextendsWindow> TimeEvictor<W> of(TimewindowSize, booleandoEvictAfter) {
returnnewTimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter);
}
}
DeltaEvictor
publicclassDeltaEvictor<T, WextendsWindow> implementsEvictor<T, W> {
privatestaticfinallongserialVersionUID = 1L;
// delta函数,新老数据做操作的函数DeltaFunction<T> deltaFunction;
// 阈值 在阈值范围内的元素保留privatedoublethreshold;
privatefinalbooleandoEvictAfter;
privateDeltaEvictor(doublethreshold, DeltaFunction<T> deltaFunction) {
this.deltaFunction = deltaFunction;
this.threshold = threshold;
this.doEvictAfter = false;
}
privateDeltaEvictor(doublethreshold, DeltaFunction<T> deltaFunction, booleandoEvictAfter) {
this.deltaFunction = deltaFunction;
this.threshold = threshold;
this.doEvictAfter = doEvictAfter;
}
@OverridepublicvoidevictBefore(Iterable<TimestampedValue<T>> elements, intsize, Wwindow, EvictorContextctx) {
if (!doEvictAfter) {
evict(elements, size, ctx);
}
}
@OverridepublicvoidevictAfter(Iterable<TimestampedValue<T>> elements, intsize, Wwindow, EvictorContextctx) {
if (doEvictAfter) {
evict(elements, size, ctx);
}
}
privatevoidevict(Iterable<TimestampedValue<T>> elements, intsize, EvictorContextctx) {
// 获取最后元素,作为新元素TimestampedValue<T> lastElement = Iterables.getLast(elements);
for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
TimestampedValue<T> element = iterator.next();
// 前面的旧元素和新元素放入deltaFunction中计算,大于等于阈值则移除if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
iterator.remove();
}
}
}
@OverridepublicStringtoString() {
return"DeltaEvictor(" + deltaFunction + ", " + threshold + ")";
}
/** * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}. * Eviction is done before the window function. * * @param threshold The threshold * @param deltaFunction The {@code DeltaFunction} */publicstatic <T, WextendsWindow> DeltaEvictor<T, W> of(doublethreshold, DeltaFunction<T> deltaFunction) {
returnnewDeltaEvictor<>(threshold, deltaFunction);
}
/** * Creates a {@code DeltaEvictor} from the given threshold, {@code DeltaFunction}. * Eviction is done before/after the window function based on the value of doEvictAfter. * * @param threshold The threshold * @param deltaFunction The {@code DeltaFunction} * @param doEvictAfter Whether eviction should be done after window function */publicstatic <T, WextendsWindow> DeltaEvictor<T, W> of(doublethreshold, DeltaFunction<T> deltaFunction, booleandoEvictAfter) {
returnnewDeltaEvictor<>(threshold, deltaFunction, doEvictAfter);
}
}
Trigger
窗口触发器机制,当满足阈值触发对应动作
CountTrigger
publicclassCountTrigger<WextendsWindow> extendsTrigger<Object, W> {
privatestaticfinallongserialVersionUID = 1L;
// 最大触发条数privatefinallongmaxCount;
// 创建Reducing状态,传入Sum reduce函数,计算放入stateDesc的数据之和privatefinalReducingStateDescriptor<Long> stateDesc =
newReducingStateDescriptor<>("count", newSum(), LongSerializer.INSTANCE);
privateCountTrigger(longmaxCount) {
this.maxCount = maxCount;
}
@OverridepublicTriggerResultonElement(Objectelement, longtimestamp, Wwindow, TriggerContextctx) throwsException {
// 获取ReducingState状态ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
// 添加1个元素count.add(1L);
// 如果窗口中以及满足count数,则清空状态并且输出窗口结果if (count.get() >= maxCount) {
count.clear();
returnTriggerResult.FIRE;
}
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonEventTime(longtime, Wwindow, TriggerContextctx) {
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonProcessingTime(longtime, Wwindow, TriggerContextctx) throwsException {
returnTriggerResult.CONTINUE;
}
@Overridepublicvoidclear(Wwindow, TriggerContextctx) throwsException {
ctx.getPartitionedState(stateDesc).clear();
}
@OverridepublicbooleancanMerge() {
returntrue;
}
@OverridepublicvoidonMerge(Wwindow, OnMergeContextctx) throwsException {
ctx.mergePartitionedState(stateDesc);
}
@OverridepublicStringtoString() {
return"CountTrigger(" + maxCount + ")";
}
/** * Creates a trigger that fires once the number of elements in a pane reaches the given count. * * @param maxCount The count of elements at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */publicstatic <WextendsWindow> CountTrigger<W> of(longmaxCount) {
returnnewCountTrigger<>(maxCount);
}
privatestaticclassSumimplementsReduceFunction<Long> {
privatestaticfinallongserialVersionUID = 1L;
@OverridepublicLongreduce(Longvalue1, Longvalue2) throwsException {
returnvalue1 + value2;
}
}
}
ProcessingTimeTrigger
每次到窗口的end时触发窗口计算
publicclassProcessingTimeTriggerextendsTrigger<Object, TimeWindow> {
privatestaticfinallongserialVersionUID = 1L;
privateProcessingTimeTrigger() {}
@OverridepublicTriggerResultonElement(Objectelement, longtimestamp, TimeWindowwindow, TriggerContextctx) {
// 注册定时器ctx.registerProcessingTimeTimer(window.maxTimestamp());
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonEventTime(longtime, TimeWindowwindow, TriggerContextctx) throwsException {
returnTriggerResult.CONTINUE;
}
/** * timer触发是调用 * @param time The timestamp at which the timer fired. * @param window The window for which the timer fired. * @param ctx A context object that can be used to register timer callbacks. * @return */@OverridepublicTriggerResultonProcessingTime(longtime, TimeWindowwindow, TriggerContextctx) {
returnTriggerResult.FIRE;
}
@Overridepublicvoidclear(TimeWindowwindow, TriggerContextctx) throwsException {
ctx.deleteProcessingTimeTimer(window.maxTimestamp());
}
@OverridepublicbooleancanMerge() {
returntrue;
}
@OverridepublicvoidonMerge(TimeWindowwindow,
OnMergeContextctx) {
// only register a timer if the time is not yet past the end of the merged window// this is in line with the logic in onElement(). If the time is past the end of// the window onElement() will fire and setting a timer here would fire the window twice.longwindowMaxTimestamp = window.maxTimestamp();
if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
ctx.registerProcessingTimeTimer(windowMaxTimestamp);
}
}
@OverridepublicStringtoString() {
return"ProcessingTimeTrigger()";
}
/** * Creates a new trigger that fires once system time passes the end of the window. */publicstaticProcessingTimeTriggercreate() {
returnnewProcessingTimeTrigger();
}
}
ProcessingTimeoutTrigger
publicclassProcessingTimeoutTrigger<T, WextendsWindow> extendsTrigger<T, W> {
privatestaticfinallongserialVersionUID = 1L;
// 内嵌的触发器privatefinalTrigger<T, W> nestedTrigger;
// Trigger触发的间隔privatefinallonginterval;
// 新纪录是否重设timerprivatefinalbooleanresetTimerOnNewRecord;
// 超时窗口是否清理privatefinalbooleanshouldClearOnTimeout;
privatefinalValueStateDescriptor<Long> timeoutStateDesc;
privateProcessingTimeoutTrigger(
Trigger<T, W> nestedTrigger,
longinterval,
booleanresetTimerOnNewRecord,
booleanshouldClearOnTimeout) {
this.nestedTrigger = nestedTrigger;
this.interval = interval;
this.resetTimerOnNewRecord = resetTimerOnNewRecord;
this.shouldClearOnTimeout = shouldClearOnTimeout;
this.timeoutStateDesc = newValueStateDescriptor<>("timeout", LongSerializer.INSTANCE);
}
@OverridepublicTriggerResultonElement(Telement, longtimestamp, Wwindow, TriggerContextctx)
throwsException {
// 记录塞入内嵌触发器TriggerResulttriggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
// 如果可以输出则输出if (triggerResult.isFire()) {
this.clear(window, ctx);
returntriggerResult;
}
ValueState<Long> timeoutState = ctx.getPartitionedState(this.timeoutStateDesc);
// 下次触发的时间longnextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
LongtimeoutTimestamp = timeoutState.value();
// 如果新纪录就清空计时器和超时if (timeoutTimestamp != null && resetTimerOnNewRecord) {
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
timeoutState.clear();
timeoutTimestamp = null;
}
// 第一次没有获取if (timeoutTimestamp == null) {
// 将下次窗口触发时间放入timeoutStatetimeoutState.update(nextFireTimestamp);
// 注册定时器ctx.registerProcessingTimeTimer(nextFireTimestamp);
}
returntriggerResult;
}
@OverridepublicTriggerResultonProcessingTime(longtimestamp, Wwindow, TriggerContextctx)
throwsException {
//调用内嵌trigger的onProcessingTime方法TriggerResulttriggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
// 如果需要清空,则清空if (shouldClearOnTimeout) {
this.clear(window, ctx);
}
returntriggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
}
@OverridepublicTriggerResultonEventTime(longtimestamp, Wwindow, TriggerContextctx)
throwsException {
TriggerResulttriggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
if (shouldClearOnTimeout) {
this.clear(window, ctx);
}
returntriggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
}
@Overridepublicvoidclear(Wwindow, TriggerContextctx) throwsException {
ValueState<Long> timeoutTimestampState = ctx.getPartitionedState(this.timeoutStateDesc);
LongtimeoutTimestamp = timeoutTimestampState.value();
if (timeoutTimestamp != null) {
ctx.deleteProcessingTimeTimer(timeoutTimestamp);
timeoutTimestampState.clear();
}
this.nestedTrigger.clear(window, ctx);
}
@OverridepublicStringtoString() {
return"TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
}
/** * Creates a new {@link ProcessingTimeoutTrigger} that fires when the inner trigger is fired or * when the timeout timer fires. * * <p>For example: * {@code ProcessingTimeoutTrigger.of(CountTrigger.of(3), 100)}, will create a CountTrigger with * timeout of 100 millis. So, if the first record arrives at time {@code t}, and the second * record arrives at time {@code t+50 }, the trigger will fire when the third record arrives or * when the time is {code t+100} (timeout). * * @param nestedTrigger the nested {@link Trigger} * @param timeout the timeout interval * * @return {@link ProcessingTimeoutTrigger} with the above configuration. */publicstatic <T, WextendsWindow> ProcessingTimeoutTrigger<T, W> of(
Trigger<T, W> nestedTrigger,
Durationtimeout) {
returnnewProcessingTimeoutTrigger<>(nestedTrigger, timeout.toMillis(), false, true);
}
/** * Creates a new {@link ProcessingTimeoutTrigger} that fires when the inner trigger is fired or * when the timeout timer fires. * * <p>For example: * {@code ProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false, true)}, will create a * CountTrigger with timeout of 100 millis. So, if the first record arrives at time {@code t}, * and the second record arrives at time {@code t+50 }, the trigger will fire when the third * record arrives or when the time is {code t+100} (timeout). * * @param nestedTrigger the nested {@link Trigger} * @param timeout the timeout interval * @param resetTimerOnNewRecord each time a new element arrives, reset the timer and start a * new one * @param shouldClearOnTimeout whether to call {@link Trigger#clear(Window, TriggerContext)} * when the processing-time timer fires * @param <T> The type of the element. * @param <W> The type of {@link Window Windows} on which this trigger can operate. * * @return {@link ProcessingTimeoutTrigger} with the above configuration. */publicstatic <T, WextendsWindow> ProcessingTimeoutTrigger<T, W> of(
Trigger<T, W> nestedTrigger,
Durationtimeout,
booleanresetTimerOnNewRecord,
booleanshouldClearOnTimeout) {
returnnewProcessingTimeoutTrigger<>(
nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
}
}
PurgingTrigger
满足窗口清空后清楚窗口状态,清空触发器
publicclassPurgingTrigger<T, WextendsWindow> extendsTrigger<T, W> {
privatestaticfinallongserialVersionUID = 1L;
privateTrigger<T, W> nestedTrigger;
privatePurgingTrigger(Trigger<T, W> nestedTrigger) {
this.nestedTrigger = nestedTrigger;
}
/** * 如果为fire则为fire_purge触发结果 * @param element The element that arrived. * @param timestamp The timestamp of the element that arrived. * @param window The window to which the element is being added. * @param ctx A context object that can be used to register timer callbacks. * @return * @throws Exception */@OverridepublicTriggerResultonElement(Telement, longtimestamp, Wwindow, TriggerContextctx) throwsException {
TriggerResulttriggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
returntriggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@OverridepublicTriggerResultonEventTime(longtime, Wwindow, TriggerContextctx) throwsException {
TriggerResulttriggerResult = nestedTrigger.onEventTime(time, window, ctx);
returntriggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@OverridepublicTriggerResultonProcessingTime(longtime, Wwindow, TriggerContextctx) throwsException {
TriggerResulttriggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
returntriggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
}
@Overridepublicvoidclear(Wwindow, TriggerContextctx) throwsException {
nestedTrigger.clear(window, ctx);
}
@OverridepublicbooleancanMerge() {
returnnestedTrigger.canMerge();
}
@OverridepublicvoidonMerge(Wwindow, OnMergeContextctx) throwsException {
nestedTrigger.onMerge(window, ctx);
}
@OverridepublicStringtoString() {
return"PurgingTrigger(" + nestedTrigger.toString() + ")";
}
/** * Creates a new purging trigger from the given {@code Trigger}. * * @param nestedTrigger The trigger that is wrapped by this purging trigger */publicstatic <T, WextendsWindow> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
returnnewPurgingTrigger<>(nestedTrigger);
}
@VisibleForTestingpublicTrigger<T, W> getNestedTrigger() {
returnnestedTrigger;
}
}
ContinuousEventTimeTrigger
publicclassContinuousEventTimeTrigger<WextendsWindow> extendsTrigger<Object, W> {
privatestaticfinallongserialVersionUID = 1L;
// Trigger触发的间隔privatefinallonginterval;
/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */// 计算最小的时间privatefinalReducingStateDescriptor<Long> stateDesc =
newReducingStateDescriptor<>("fire-time", newMin(), LongSerializer.INSTANCE);
privateContinuousEventTimeTrigger(longinterval) {
this.interval = interval;
}
@OverridepublicTriggerResultonElement(Objectelement, longtimestamp, Wwindow, TriggerContextctx) throwsException {
// 如果watermark大于等于窗口结束时间,则输出当前窗口记录if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediatelyreturnTriggerResult.FIRE;
} else {
// 注册event定时器ctx.registerEventTimeTimer(window.maxTimestamp());
}
// 获取输出记录时间ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
if (fireTimestamp.get() == null) {
// 计算开始时间longstart = timestamp - (timestamp % interval);
longnextFireTimestamp = start + interval;
// 注册计时器ctx.registerEventTimeTimer(nextFireTimestamp);
//放入ReducingStatefireTimestamp.add(nextFireTimestamp);
}
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonEventTime(longtime, Wwindow, TriggerContextctx) throwsException {
if (time == window.maxTimestamp()){
returnTriggerResult.FIRE;
}
ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);
LongfireTimestamp = fireTimestampState.get();
if (fireTimestamp != null && fireTimestamp == time) {
fireTimestampState.clear();
fireTimestampState.add(time + interval);
ctx.registerEventTimeTimer(time + interval);
returnTriggerResult.FIRE;
}
returnTriggerResult.CONTINUE;
}
@OverridepublicTriggerResultonProcessingTime(longtime, Wwindow, TriggerContextctx) throwsException {
returnTriggerResult.CONTINUE;
}
@Overridepublicvoidclear(Wwindow, TriggerContextctx) throwsException {
ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
Longtimestamp = fireTimestamp.get();
if (timestamp != null) {
ctx.deleteEventTimeTimer(timestamp);
fireTimestamp.clear();
}
}
@OverridepublicbooleancanMerge() {
returntrue;
}
@OverridepublicvoidonMerge(Wwindow, OnMergeContextctx) throwsException {
ctx.mergePartitionedState(stateDesc);
LongnextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
if (nextFireTimestamp != null) {
ctx.registerEventTimeTimer(nextFireTimestamp);
}
}
@OverridepublicStringtoString() {
return"ContinuousEventTimeTrigger(" + interval + ")";
}
@VisibleForTestingpubliclonggetInterval() {
returninterval;
}
/** * Creates a trigger that continuously fires based on the given interval. * * @param interval The time interval at which to fire. * @param <W> The type of {@link Window Windows} on which this trigger can operate. */publicstatic <WextendsWindow> ContinuousEventTimeTrigger<W> of(Timeinterval) {
returnnewContinuousEventTimeTrigger<>(interval.toMilliseconds());
}
privatestaticclassMinimplementsReduceFunction<Long> {
privatestaticfinallongserialVersionUID = 1L;
@OverridepublicLongreduce(Longvalue1, Longvalue2) throwsException {
returnMath.min(value1, value2);
}
}
}
publicDataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {
// read the output type of the input Transform to coax out errors about MissingTypeInfotransformation.getOutputType();
// configure the type if neededif (sinkFunctioninstanceofInputTypeConfigurable) {
((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
}
// 包装成StreamSinkStreamSink<T> sinkOperator = newStreamSink<>(clean(sinkFunction));
DataStreamSink<T> sink = newDataStreamSink<>(this, sinkOperator);
// 添加到算子链中getExecutionEnvironment().addOperator(sink.getTransformation());
returnsink;
}
DataStreamSink
publicclassDataStreamSink<T> {
privatefinalSinkTransformation<T> transformation;
@SuppressWarnings("unchecked")
protectedDataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
// 封装成SinkTransformationthis.transformation = newSinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
}
/** * Returns the transformation that contains the actual sink operator of this sink. */@InternalpublicSinkTransformation<T> getTransformation() {
returntransformation;
}
/** * Sets the name of this sink. This name is * used by the visualization and logging during runtime. * * @return The named sink. */publicDataStreamSink<T> name(Stringname) {
transformation.setName(name);
returnthis;
}
/** * Sets an ID for this operator. * * <p>The specified ID is used to assign the same operator ID across job * submissions (for example when starting a job from a savepoint). * * <p><strong>Important</strong>: this ID needs to be unique per * transformation and job. Otherwise, job submission will fail. * * @param uid The unique user-specified ID of this transformation. * @return The operator with the specified ID. */@PublicEvolvingpublicDataStreamSink<T> uid(Stringuid) {
transformation.setUid(uid);
returnthis;
}
/** * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID. * * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an * operator through the default hash mechanics fails (e.g. because of changes between Flink versions). * * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail. * * <p>A use case for this is in migration between Flink versions or changing the jobs in a way that changes the * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g. * obtained from old logs) can help to reestablish a lost mapping from states to their target operator. * * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the * logs and web ui. * @return The operator with the user provided hash. */@PublicEvolvingpublicDataStreamSink<T> setUidHash(StringuidHash) {
transformation.setUidHash(uidHash);
returnthis;
}
/** * Sets the parallelism for this sink. The degree must be higher than zero. * * @param parallelism The parallelism for this sink. * @return The sink with set parallelism. */publicDataStreamSink<T> setParallelism(intparallelism) {
transformation.setParallelism(parallelism);
returnthis;
}
// ---------------------------------------------------------------------------// Fine-grained resource profiles are an incomplete work-in-progress feature// The setters are hence private at this point.// ---------------------------------------------------------------------------/** * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will * be considered in resource resize feature for future plan. * * @param minResources The minimum resources for this sink. * @param preferredResources The preferred resources for this sink * @return The sink with set minimum and preferred resources. */privateDataStreamSink<T> setResources(ResourceSpecminResources, ResourceSpecpreferredResources) {
transformation.setResources(minResources, preferredResources);
returnthis;
}
/** * Sets the resources for this sink, the minimum and preferred resources are the same by default. * * @param resources The resources for this sink. * @return The sink with set minimum and preferred resources. */privateDataStreamSink<T> setResources(ResourceSpecresources) {
transformation.setResources(resources, resources);
returnthis;
}
/** * Turns off chaining for this operator so thread co-location will not be * used as an optimization. * * <p>Chaining can be turned off for the whole * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()} * however it is not advised for performance considerations. * * @return The sink with chaining disabled */@PublicEvolvingpublicDataStreamSink<T> disableChaining() {
this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
returnthis;
}
/** * Sets the slot sharing group of this operation. Parallel instances of * operations that are in the same slot sharing group will be co-located in the same * TaskManager slot, if possible. * * <p>Operations inherit the slot sharing group of input operations if all input operations * are in the same slot sharing group and no slot sharing group was explicitly specified. * * <p>Initially an operation is in the default slot sharing group. An operation can be put into * the default group explicitly by setting the slot sharing group to {@code "default"}. * * @param slotSharingGroup The slot sharing group name. */@PublicEvolvingpublicDataStreamSink<T> slotSharingGroup(StringslotSharingGroup) {
transformation.setSlotSharingGroup(slotSharingGroup);
returnthis;
}
}