Skip to content

Latest commit

 

History

History
2819 lines (2330 loc) · 88.7 KB

StreamSource源解析.md

File metadata and controls

2819 lines (2330 loc) · 88.7 KB

DataStream相关

  • StreamSource根类数据源,包含指定watermark,broadcast,map等操作。

Transformation

  • 算子的实体结构,存储Flink计算过程的各个算子

BufferTimeout

  • 表示处理数据的缓存时间,越低延迟越低,-1表示为默认的缓冲区超时,0的话表示不缓存,但是系统吞吐量会下降
public void setBufferTimeout(long bufferTimeout) {
		checkArgument(bufferTimeout >= -1);
		this.bufferTimeout = bufferTimeout;
	}

ManagedMemoryWeight

  • 托管内存权重,用于算子状态存储
/**
 * 设置托管内存权重,该权重指示此转换在多大程度上依赖于托管内存,
 * 以便转换高度依赖于托管内存将能够在运行时获取更多托管内存(线性关联)。 默认权重值为1。请注意,当前仅在资源未知的情况下才可以设置权重
 */
public void setManagedMemoryWeight(int managedMemoryWeight) {
   OperatorValidationUtils.validateResourceRequirements(minResources, preferredResources, managedMemoryWeight);
   this.managedMemoryWeight = managedMemoryWeight;
}

uidHash

  • 根据用户提供的hash值创建JobVertexID
	public void setUidHash(String uidHash) {

		Preconditions.checkNotNull(uidHash);
		// 校验hashcode为大小写26字母和0到9的数字,并且个数为32个
		Preconditions.checkArgument(uidHash.matches("^[0-9A-Fa-f]{32}$"),
				"Node hash must be a 32 character String that describes a hex code. Found: " + uidHash);

		// 用于提供的hashNode
		this.userProvidedNodeHash = uidHash;
	}

slotSharingGroup

  • 如果没有reblance算子的话,相同group名字的在同个taskmanager的slot下
public String getSlotSharingGroup() {
		return slotSharingGroup;
	}

PhysicalTransformation

  • 一个物理Transformation算子,它开启设置任务链策略设置,包含ALWAYS(尽全力的优化任务链,将相同并行度的算子放在同一个jvm实例中),NEVER(关闭任务链优化),HEAD(运算符不会链接到前任,但是后继者可以链接到此运算符。)
public abstract void setChainingStrategy(ChainingStrategy strategy);

DataStream

  • 用于处理Flink流式算子和环境以及算子操作

ExecutionConfig

  • 执行任务配置
/**
*PIPELINED:以流水线方式(包括shuffle和广播)执行程序,但在流水线操作时容易引起死锁的数据交换除外。 这些数据交换以批处理方式执行。 容易发生死锁的情况(当以流水线方式执行时)的一个例子是分支的数据流(一个数据集被多个操作消耗)并在以后重新加入
*PIPELINED_FORCED:强制流水线方式,PIPELINED是可选配置可以在死锁是执行BATCH模式
*BATCH:此模式以批处理方式执行所有随机播放和广播,同时在仅在一个生产者和一个消费者之间本地交换数据的操作之间对数据进行流水线处理。
BATCH_FORCED:此模式以严格的批处理方式执行程序,包括将数据从一个生产者本地转发到一个消费者的所有点。 与BATCH模式相比,执行此模式通常会更昂贵。 它确实保证不会同时执行任何后续操作。
*/
private ExecutionMode executionMode = ExecutionMode.PIPELINED;
/*RECURSIVE:递归全部字段
* TOP_LEVEL:递归最上层字段
* NONE:不进行clean
*/
private ClosureCleanerLevel closureCleanerLevel = ClosureCleanerLevel.RECURSIVE;
// 默认并行度,parallelism.default
private int parallelism = CoreOptions.DEFAULT_PARALLELISM.defaultValue();
private int maxParallelism = -1;
// 是否强制使用kryo
private boolean forceKryo = false;
// kryo不支持泛型
private boolean disableGenericTypes = false;
// 开启自动生成算子uid
	private boolean enableAutoGeneratedUids = true;
	// 是否开启对象reuse
	private boolean objectReuse = false;
	// 动态类型注册
	private boolean autoTypeRegistrationEnabled = true;
	// 使用avro
	private boolean forceAvro = false;
	private CodeAnalysisMode codeAnalysisMode = CodeAnalysisMode.DISABLE;
	// 自动生成watermark间隔
	private long autoWatermarkInterval = 0;
	/**
	 * Interval in milliseconds for sending latency tracking marks from the sources to the sinks.
	 * 发送任务指标的间隔
	 */
	private long latencyTrackingInterval = MetricOptions.LATENCY_INTERVAL.defaultValue();
//是否采集指标
	private boolean isLatencyTrackingConfigured = false;
// 重启策略
	private RestartStrategies.RestartStrategyConfiguration restartStrategyConfiguration =
		new RestartStrategies.FallbackRestartStrategyConfiguration();
	// 任务取消间隔
	private long taskCancellationIntervalMillis = -1;

	/**
	 * 超时之后,正在进行的任务取消将导致致命的TaskManager错误,通常会杀死JVM。
	 * Timeout after which an ongoing task cancellation will lead to a fatal
	 * TaskManager error, usually killing the JVM.
	 */
	private long taskCancellationTimeoutMillis = -1;

	/** This flag defines if we use compression for the state snapshot data or not. Default: false */
	private boolean useSnapshotCompression = false;

多流算子操作

// union
public final DataStream<T> union(DataStream<T>... streams) {
		List<Transformation<T>> unionedTransforms = new ArrayList<>();
		// 将当前算子将入union算子列表
		unionedTransforms.add(this.transformation);

		for (DataStream<T> newStream : streams) {
			// 每个流输出类型必须一致
			if (!getType().equals(newStream.getType())) {
				throw new IllegalArgumentException("Cannot union streams of different types: "
						+ getType() + " and " + newStream.getType());
			}

			unionedTransforms.add(newStream.getTransformation());
		}
		// 重新包装算子为Uinon算子
		return new DataStream<>(this.environment, new UnionTransformation<>(unionedTransforms));
	}
//split,分割流
public SplitStream<T> split(OutputSelector<T> outputSelector) {
		return new SplitStream<>(this, clean(outputSelector));
	}
// connect
public <R> ConnectedStreams<T, R> connect(DataStream<R> dataStream) {
		return new ConnectedStreams<>(environment, this, dataStream);
	}
// 广播双流操作
public <R> BroadcastConnectedStream<T, R> connect(BroadcastStream<R> broadcastStream) {
		return new BroadcastConnectedStream<>(
				environment,
				this,
				Preconditions.checkNotNull(broadcastStream),
				broadcastStream.getBroadcastStateDescriptor());
	}

keyBy算子

  • KeyedStream
 private KeyedStream<T, Tuple> keyBy(Keys<T> keys) {
        return new KeyedStream<>(
                this,
                clean(KeySelectorUtil.getSelectorForKeys(keys, getType(), getExecutionConfig())));
    }
    public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key, TypeInformation<K> keyType) {
        Preconditions.checkNotNull(key);
        Preconditions.checkNotNull(keyType);
        return new KeyedStream<>(this, clean(key), keyType);
    }
 public <K> KeyedStream<T, K> keyBy(KeySelector<T, K> key) {
        Preconditions.checkNotNull(key);
        return new KeyedStream<>(this, clean(key));
    }

基于分区器

partitionCustom

  • 使用自定义分区程序,对选择器返回的键上的DataStream进行分区。 此方法使用KeySelector来获取要在其上进行分区的键,以及一个接收键类型的分区程序。 **注意:**此方法仅适用于单个字段键,即选择器无法返回字段元组。
 public <K> DataStream<T> partitionCustom(
            Partitioner<K> partitioner, KeySelector<T, K> keySelector) {
        return setConnectionType(
                new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
    }

    //	private helper method for custom partitioning
    private <K> DataStream<T> partitionCustom(Partitioner<K> partitioner, Keys<T> keys) {
        KeySelector<T, K> keySelector =
                KeySelectorUtil.getSelectorForOneKey(
                        keys, partitioner, getType(), getExecutionConfig());

        return setConnectionType(
                new CustomPartitionerWrapper<>(clean(partitioner), clean(keySelector)));
    }

broadcast

//设置DataStream的分区,以便将输出元素广播到下一个操作的每个并行实例。
public DataStream<T> broadcast() {
		return setConnectionType(new BroadcastPartitioner<T>());
	}
//设置DataStream的分区,以便将输出元素广播到下一个操作的每个并行实例。 另外,它隐含与指定描述符一样多的broadcast states ,这些描述符可用于存储流的元素
public BroadcastStream<T> broadcast(final MapStateDescriptor<?, ?>... broadcastStateDescriptors) {
		Preconditions.checkNotNull(broadcastStateDescriptors);
		final DataStream<T> broadcastStream = setConnectionType(new BroadcastPartitioner<>());
		// 构造广播状态
		return new BroadcastStream<>(environment, broadcastStream, broadcastStateDescriptors);
	}

shuffle

  • 设置DataStream的分区,以便将输出元素均匀随机地随机混入下一个操作。
public DataStream<T> shuffle() {
		return setConnectionType(new ShufflePartitioner<T>());
	}

public class ShufflePartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

	private Random random = new Random();

	// 选择管道,随机选择
	@Override
	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
		return random.nextInt(numberOfChannels);
	}

	@Override
	public StreamPartitioner<T> copy() {
		return new ShufflePartitioner<T>();
	}

	@Override
	public String toString() {
		return "SHUFFLE";
	}
}

forward

  • 设置DataStream的分区,以便将输出元素转发到下一个操作的本地子任务。
public class ForwardPartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

  // 固定发向channel 0
	@Override
	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
		return 0;
	}

	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "FORWARD";
	}
}

rebalance

  • 设置DataStream的分区,以便以循环方式将输出元素平均分配给下一个操作的实例。
/**
 * 通过循环通过输出通道来平均分配数据的分区程序
 * Partitioner that distributes the data equally by cycling through the output
 * channels.
 *
 * @param <T> Type of the elements in the Stream being rebalanced
 */
@Internal
public class RebalancePartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

	private int nextChannelToSendTo;

	@Override
	public void setup(int numberOfChannels) {
		super.setup(numberOfChannels);

		nextChannelToSendTo = ThreadLocalRandom.current().nextInt(numberOfChannels);
	}

	@Override
	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
		nextChannelToSendTo = (nextChannelToSendTo + 1) % numberOfChannels;
		return nextChannelToSendTo;
	}

	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "REBALANCE";
	}
}

rescale

  • 设置DataStream的分区,以便以循环方式将输出元素均匀地分布到下一操作实例的子集。 上游操作向其发送元素的下游操作的子集取决于上游操作和下游操作两者的并行度。 例如,如果上游操作具有并行度2,而下游操作具有并行度4,则一个上游操作将元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作。 另一方面,如果下游操作具有并行性2,而上游操作具有并行性4,则两个上游操作将分配给一个下游操作,而其他两个上游操作将分配给另一个下游操作。 如果不同的并行度不是彼此的倍数,则一个或多个下游操作将具有与上游操作不同的输入数量
public class RescalePartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;
	//下个发送的管道
	private int nextChannelToSendTo = -1;

	@Override
	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
		// 如果大于则nextChannelToSendTo设置为0
		if (++nextChannelToSendTo >= numberOfChannels) {
			nextChannelToSendTo = 0;
		}
		return nextChannelToSendTo;
	}

	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "RESCALE
	}
}

global

  • 设置DataStream的分区,以便所有输出值都进入下一个处理运算符的第一个实例。 请谨慎使用此设置,因为它可能会导致应用程序出现严重的性能瓶颈。
public class GlobalPartitioner<T> extends StreamPartitioner<T> {
	private static final long serialVersionUID = 1L;

	// 永远为实例1
	@Override
	public int selectChannel(SerializationDelegate<StreamRecord<T>> record) {
		return 0;
	}

	@Override
	public StreamPartitioner<T> copy() {
		return this;
	}

	@Override
	public String toString() {
		return "GLOBAL";
	}
}

单一输出算子

map

  • map算子返回SingleOutputStreamOperator流,oneToOne算子
public <R> SingleOutputStreamOperator<R> map(MapFunction<T, R> mapper, TypeInformation<R> outputType) {
		return transform("Map", outputType, new StreamMap<>(clean(mapper)));
	}

 // 转换算子为对应的stream,并且加入到transformations中
	public <R> SingleOutputStreamOperator<R> transform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			OneInputStreamOperatorFactory<T, R> operatorFactory) {

		return doTransform(operatorName, outTypeInfo, operatorFactory);
	}

	protected <R> SingleOutputStreamOperator<R> doTransform(
			String operatorName,
			TypeInformation<R> outTypeInfo,
			StreamOperatorFactory<R> operatorFactory) {

		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();
		// 创建单输入算子
		OneInputTransformation<T, R> resultTransform = new OneInputTransformation<>(
				this.transformation,
				operatorName,
				operatorFactory,
				outTypeInfo,
				environment.getParallelism());

		@SuppressWarnings({"unchecked", "rawtypes"})
		SingleOutputStreamOperator<R> returnStream = new SingleOutputStreamOperator(environment, resultTransform);

		// 添加到dag算子链条
		getExecutionEnvironment().addOperator(resultTransform);

		return returnStream;
	}

flatMap

  • oneToOne算子,单input单output类型算子
public class StreamFlatMap<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, FlatMapFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private static final long serialVersionUID = 1L;
	private transient TimestampedCollector<OUT> collector;

	public StreamFlatMap(FlatMapFunction<IN, OUT> flatMapper) {
		super(flatMapper);
		// 算子链策略,尽全力保证任务链优化,任务链运行非shuffle算子能够合并在相同jvm实例的thread,充分避免序列化和反序列化操作
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		collector = new TimestampedCollector<>(output);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		// 是指处理元素的时间,如果不存在设置时间表示不存在
		collector.setTimestamp(element);
		userFunction.flatMap(element.getValue(), collector);
	}
}
// one to one算子,无reduce、shuffle操作,因此可以进行chain优化
public <R> SingleOutputStreamOperator<R> flatMap(FlatMapFunction<T, R> flatMapper, TypeInformation<R> outputType) {
		return transform("Flat Map", outputType, new StreamFlatMap<>(clean(flatMapper)));
	}

process

public <R> SingleOutputStreamOperator<R> process(
			ProcessFunction<T, R> processFunction,
			TypeInformation<R> outputType) {

		ProcessOperator<T, R> operator = new ProcessOperator<>(clean(processFunction));

		return transform("Process", outputType, operator);
	}

public class ProcessOperator<IN, OUT>
		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT> {

	private static final long serialVersionUID = 1L;

	private transient TimestampedCollector<OUT> collector;

	private transient ContextImpl context;

	/** We listen to this ourselves because we don't have an {@link InternalTimerService}. */
	private long currentWatermark = Long.MIN_VALUE;

	public ProcessOperator(ProcessFunction<IN, OUT> function) {
		super(function);

		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		collector = new TimestampedCollector<>(output);

		context = new ContextImpl(userFunction, getProcessingTimeService());
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		collector.setTimestamp(element);
		context.element = element;
		// 处理元素
		userFunction.processElement(element.getValue(), context, collector);
		context.element = null;
	}

	// 处理watermark,维护processTime的Watermark
	@Override
	public void processWatermark(Watermark mark) throws Exception {
		super.processWatermark(mark);
		// 记录当前currentWatermark
		this.currentWatermark = mark.getTimestamp();
	}

	/**
	 * opeartor算子process函数,仅支持opeartor state,不支持keyedState和EventTime
	 */
	private class ContextImpl extends ProcessFunction<IN, OUT>.Context implements TimerService {
		private StreamRecord<IN> element;

		private final ProcessingTimeService processingTimeService;

		ContextImpl(ProcessFunction<IN, OUT> function, ProcessingTimeService processingTimeService) {
			function.super();
			this.processingTimeService = processingTimeService;
		}

		@Override
		public Long timestamp() {
			checkState(element != null);

			if (element.hasTimestamp()) {
				return element.getTimestamp();
			} else {
				return null;
			}
		}

		// 侧边输出
		@Override
		public <X> void output(OutputTag<X> outputTag, X value) {
			if (outputTag == null) {
				throw new IllegalArgumentException("OutputTag must not be null.");
			}
			// 将记录放入Output
			output.collect(outputTag, new StreamRecord<>(value, element.getTimestamp()));
		}

		@Override
		public long currentProcessingTime() {
			return processingTimeService.getCurrentProcessingTime();
		}

		@Override
		public long currentWatermark() {
			return currentWatermark;
		}

		@Override
		public void registerProcessingTimeTimer(long time) {
			throw new UnsupportedOperationException(UNSUPPORTED_REGISTER_TIMER_MSG);
		}

		@Override
		public void registerEventTimeTimer(long time) {
			throw new UnsupportedOperationException(UNSUPPORTED_REGISTER_TIMER_MSG);
		}

		@Override
		public void deleteProcessingTimeTimer(long time) {
			throw new UnsupportedOperationException(UNSUPPORTED_DELETE_TIMER_MSG);
		}

		@Override
		public void deleteEventTimeTimer(long time) {
			throw new UnsupportedOperationException(UNSUPPORTED_DELETE_TIMER_MSG);
		}

		@Override
		public TimerService timerService() {
			return this;
		}
	}
}

filter

public SingleOutputStreamOperator<T> filter(FilterFunction<T> filter) {
		return transform("Filter", getType(), new StreamFilter<>(clean(filter)));

	}

public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {

	private static final long serialVersionUID = 1L;

	public StreamFilter(FilterFunction<IN> filterFunction) {
		super(filterFunction);
		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		// 调用udf
		if (userFunction.filter(element.getValue())) {
			output.collect(element);
		}
	}
}

project

public <R extends Tuple> SingleOutputStreamOperator<R> project(int... fieldIndexes) {
		return new StreamProjection<>(this, fieldIndexes).projectTupleX();
	}
public class StreamProjection<IN> {

    private DataStream<IN> dataStream;
    // 需要project字段的inedx
    private int[] fieldIndexes;

    protected StreamProjection(DataStream<IN> dataStream, int[] fieldIndexes) {
        // 判断是否是为tuple类型
        if (!dataStream.getType().isTupleType()) {
            throw new RuntimeException("Only Tuple DataStreams can be projected");
        }
        // 判断fieldIndexes长度是否合法
        if (fieldIndexes.length == 0) {
            throw new IllegalArgumentException("project() needs to select at least one (1) field.");
        } else if (fieldIndexes.length > Tuple.MAX_ARITY - 1) {
            throw new IllegalArgumentException(
                    "project() may select only up to (" + (Tuple.MAX_ARITY - 1) + ") fields.");
        }

        // 获取tuple最大字段index
        int maxFieldIndex = (dataStream.getType()).getArity();
        // 校验索引是否大于maxFieldIndex
        for (int i = 0; i < fieldIndexes.length; i++) {
            Preconditions.checkElementIndex(fieldIndexes[i], maxFieldIndex);
        }

        this.dataStream = dataStream;
        this.fieldIndexes = fieldIndexes;
    }

    /**
     * Chooses a projectTupleX according to the length of {@link
     * org.apache.flink.streaming.api.datastream.StreamProjection#fieldIndexes}.
     *
     * @return The projected DataStream.
     * @see org.apache.flink.api.java.operators.ProjectOperator.Projection
     */
    @SuppressWarnings("unchecked")
    public <OUT extends Tuple> SingleOutputStreamOperator<OUT> projectTupleX(){
			// 返回具体的tuplex      
    }
  
  	public <T0> SingleOutputStreamOperator<Tuple1<T0>> projectTuple1() {
        TypeInformation<?>[] fTypes = extractFieldTypes(fieldIndexes, dataStream.getType());
        TupleTypeInfo<Tuple1<T0>> tType = new TupleTypeInfo<Tuple1<T0>>(fTypes);

        return dataStream.transform(
                "Projection",
                tType,
                new StreamProject<IN, Tuple1<T0>>(
                        fieldIndexes, tType.createSerializer(dataStream.getExecutionConfig())));
    }
}

// 具体的project流
public class StreamProject<IN, OUT extends Tuple>
		extends AbstractStreamOperator<OUT>
		implements OneInputStreamOperator<IN, OUT> {

	private static final long serialVersionUID = 1L;

	private TypeSerializer<OUT> outSerializer;
	private int[] fields;
	private int numFields;

	private transient OUT outTuple;

	public StreamProject(int[] fields, TypeSerializer<OUT> outSerializer) {
		this.fields = fields;
		this.numFields = this.fields.length;
		this.outSerializer = outSerializer;

		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		for (int i = 0; i < this.numFields; i++) {
			outTuple.setField(((Tuple) element.getValue()).getField(fields[i]), i);
		}
		output.collect(element.replace(outTuple));
	}

	@Override
	public void open() throws Exception {
		super.open();
		outTuple = outSerializer.createInstance();
	}
}

多输出算子

coGroup

  • 俩个Datastream group组合成CoGroupStream,apply操作数据为数据集合
	public <T2> CoGroupedStreams<T, T2> coGroup(DataStream<T2> otherStream) {
		return new CoGroupedStreams<>(this, otherStream);
	}

public class CoGroupedStreams<T1, T2> {

	/** The first input stream. */
	private final DataStream<T1> input1;

	/** The second input stream. */
	private final DataStream<T2> input2;

	/**
	 * Creates new CoGrouped data streams, which are the first step towards building a streaming
	 * co-group.
	 *
	 * @param input1 The first data stream.
	 * @param input2 The second data stream.
	 */
	public CoGroupedStreams(DataStream<T1> input1, DataStream<T2> input2) {
		this.input1 = requireNonNull(input1);
		this.input2 = requireNonNull(input2);
	}

	/**
	 * Specifies a {@link KeySelector} for elements from the first input.
	 *
	 * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning.
	 */
	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector)  {
		Preconditions.checkNotNull(keySelector);
		// 校验类型是否一致
		final TypeInformation<KEY> keyType = TypeExtractor.getKeySelectorTypes(keySelector, input1.getType());
		return where(keySelector, keyType);
	}

	/**
	 * Specifies a {@link KeySelector} for elements from the first input with explicit type information.
	 *
	 * @param keySelector The KeySelector to be used for extracting the first input's key for partitioning.
	 * @param keyType The type information describing the key type.
	 */
	public <KEY> Where<KEY> where(KeySelector<T1, KEY> keySelector, TypeInformation<KEY> keyType)  {
		Preconditions.checkNotNull(keySelector);
		Preconditions.checkNotNull(keyType);
		// 指定关联键
		return new Where<>(input1.clean(keySelector), keyType);
	}

	// ------------------------------------------------------------------------

	/**
	 * CoGrouped streams that have the key for one side defined.
	 *
	 * @param <KEY> The type of the key.
	 */
	@Public
	public class Where<KEY> {

		private final KeySelector<T1, KEY> keySelector1;
		private final TypeInformation<KEY> keyType;

		Where(KeySelector<T1, KEY> keySelector1, TypeInformation<KEY> keyType) {
			this.keySelector1 = keySelector1;
			this.keyType = keyType;
		}

		/**
		 * Specifies a {@link KeySelector} for elements from the second input.
		 *
		 * @param keySelector The KeySelector to be used for extracting the second input's key for partitioning.
		 */
		public EqualTo equalTo(KeySelector<T2, KEY> keySelector)  {
			Preconditions.checkNotNull(keySelector);
			final TypeInformation<KEY> otherKey = TypeExtractor.getKeySelectorTypes(keySelector, input2.getType());
			return equalTo(keySelector, otherKey);
		}

		/**
		 * Specifies a {@link KeySelector} for elements from the second input with explicit type information for the key type.
		 *
		 * @param keySelector The KeySelector to be used for extracting the key for partitioning.
		 * @param keyType The type information describing the key type.
		 */
		public EqualTo equalTo(KeySelector<T2, KEY> keySelector, TypeInformation<KEY> keyType)  {
			Preconditions.checkNotNull(keySelector);
			Preconditions.checkNotNull(keyType);

			if (!keyType.equals(this.keyType)) {
				throw new IllegalArgumentException("The keys for the two inputs are not equal: " +
						"first key = " + this.keyType + " , second key = " + keyType);
			}

			return new EqualTo(input2.clean(keySelector));
		}

		// --------------------------------------------------------------------

		/**
		 * A co-group operation that has {@link KeySelector KeySelectors} defined for both inputs.
		 */
		@Public
		public class EqualTo {

			private final KeySelector<T2, KEY> keySelector2;

			EqualTo(KeySelector<T2, KEY> keySelector2) {
				this.keySelector2 = requireNonNull(keySelector2);
			}

			/**
			 * 指定双流窗口函数
			 * Specifies the window on which the co-group operation works.
			 */
			@PublicEvolving
			public <W extends Window> WithWindow<T1, T2, KEY, W> window(WindowAssigner<? super TaggedUnion<T1, T2>, W> assigner) {
				return new WithWindow<>(input1, input2, keySelector1, keySelector2, keyType, assigner, null, null, null);
			}
		}
  }
  

join

  • 底层处理逻辑与Cogroup类似,但是输出函数为一条记录一条记录处理
public <T2> JoinedStreams<T, T2> join(DataStream<T2> otherStream) {
		return new JoinedStreams<>(this, otherStream);
	}

// 对比coGroup和join的区别
 data1.coGroup(data2)
                .where(new KeySelector<Tuple2<Integer, String>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, String> value) throws Exception {
                        return value.f0;
                    }
                }).equalTo(new KeySelector<Tuple2<Integer, String>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, String> value) throws Exception {
                        return value.f0;
                    }
                }).window(GlobalWindows.create())
                .trigger(CountTrigger.of(1))
                .apply(new CoGroupFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, String>() {
                    @Override
                    public void coGroup(Iterable<Tuple2<Integer, String>> first, Iterable<Tuple2<Integer, String>> second, Collector<String> out) throws Exception {

                    }
                });

  data1.join(data2)
                .where(new KeySelector<Tuple2<Integer, String>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, String> value) throws Exception {
                        return value.f0;
                    }
                }).equalTo(new KeySelector<Tuple2<Integer, String>, Integer>() {
                    @Override
                    public Integer getKey(Tuple2<Integer, String> value) throws Exception {
                        return value.f0;
                    }
                }).window(GlobalWindows.create())
                .trigger(CountTrigger.of(1))
                .apply(new JoinFunction<Tuple2<Integer, String>, Tuple2<Integer, String>, String>() {
                    @Override
                    public String join(Tuple2<Integer, String> first, Tuple2<Integer, String> second) throws Exception {
                        return null;
                    }
                });

窗口算子

timeWindowAll

public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			//传递ProcessingTime滚动窗口
			return windowAll(TumblingProcessingTimeWindows.of(size));
		} else {
			// 传递EventTime滚动窗口
			return windowAll(TumblingEventTimeWindows.of(size));
		}
	}
	
public AllWindowedStream<T, TimeWindow> timeWindowAll(Time size, Time slide) {
		if (environment.getStreamTimeCharacteristic() == TimeCharacteristic.ProcessingTime) {
			//传递ProcessingTime滑动窗口
			return windowAll(SlidingProcessingTimeWindows.of(size, slide));
		} else {
			// 传递EventTime滑动窗口
			return windowAll(SlidingEventTimeWindows.of(size, slide));
		}
	}

countWindowAll

public AllWindowedStream<T, GlobalWindow> countWindowAll(long size) {
		// 创建全局窗口assinger,设置countTrigger
		return windowAll(GlobalWindows.create()).trigger(PurgingTrigger.of(CountTrigger.of(size)));
	}
public AllWindowedStream<T, GlobalWindow> countWindowAll(long size, long slide) {
		// 滚动count,每slide触发一次窗口操作,然后统计近size的个数
		return windowAll(GlobalWindows.create())
				.evictor(CountEvictor.of(size))
				.trigger(CountTrigger.of(slide));
	}

WindowAssinger

TumblingWindows
  • 包含EventTime和ProcessTime
public class TumblingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	// 窗口大小
	private final long size;

	// 全局offset
	private final long globalOffset;

	// 错开窗口offset
	private Long staggerOffset = null;

	// 错开窗口策略
	private final WindowStagger windowStagger;

	/**
	 *
	 * @param size 窗口大小
	 * @param offset 全局offset
	 * @param windowStagger 窗口错开策略
	 */
	private TumblingProcessingTimeWindows(long size, long offset, WindowStagger windowStagger) {
		if (Math.abs(offset) >= size) {
			throw new IllegalArgumentException("TumblingProcessingTimeWindows parameters must satisfy abs(offset) < size");
		}

		this.size = size;
		this.globalOffset = offset;
		this.windowStagger = windowStagger;
	}

	/**
	 * 指定窗口元素到Collection中
	 * @param element The element to which windows should be assigned.
	 * @param timestamp The timestamp of the element.
	 * @param context The {@link WindowAssignerContext} in which the assigner operates.
	 * @return
	 */
	@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		// 获取当前processTime
		final long now = context.getCurrentProcessingTime();
		if (staggerOffset == null) {
			// 获取错开窗口offset,支持三种策略:ALIGNED(从0开始)、RANDOM(窗口大小的任意倍)、NATURAL(在窗口操作符中接收到第一个事件时,取开始窗口和当前处理时间作为偏移量。这样,窗户是交错的基于每个并行运算符接收到第一个事件的时间。)
			staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
		}
		// 计算开始窗口起始时间
		long start = TimeWindow.getWindowStartWithOffset(now, (globalOffset + staggerOffset) % size, size);
		// 塞入窗口集合
		return Collections.singletonList(new TimeWindow(start, start + size));
	}

	public long getSize() {
		return size;
	}

	@Override
	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		// 创建默认trigger
		return ProcessingTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "TumblingProcessingTimeWindows(" + size + ")";
	}

	/**
	 * Creates a new {@code TumblingProcessingTimeWindows} {@link WindowAssigner} that assigns
	 * elements to time windows based on the element timestamp.
	 *
	 * @param size The size of the generated windows.
	 * @return The time policy.
	 */
	public static TumblingProcessingTimeWindows of(Time size) {
		// 创建滚动窗口,size为毫秒,offset设置为0,在全部分区上触发
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
	}

	/**
	 * 创建一个新的TumblingProcessingTimeWindows WindowAssigner ,根据元素的时间戳和偏移量将元素分配给时间窗口。
	 * 例如,如果of(Time.hours(1),Time.minutes(15))小时显示流,但窗口从每小时的第15分钟开始,则可以使用of(Time.hours(1),Time.minutes(15)) ,然后将获得时间窗口开始在0:15:00、1:15:00、2:15:00等
	 * 而不是那样,如果您居住在不使用UTC±00:00时间的地方,例如使用UTC + 08:00的中国,并且您想要一个大小为一天的时间窗口,并且该窗口在每个时间开始当地时间00:00:00,您可以使用of(Time.days(1),Time.hours(-8)) 。 由于UTC + 08:00比UTC时间早8小时,所以offset参数为Time.hours(-8))
	 */
	public static TumblingProcessingTimeWindows of(Time size, Time offset) {
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
	}

	@PublicEvolving
	public static TumblingProcessingTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
		return new TumblingProcessingTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
	}

	@Override
	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}


public class TumblingEventTimeWindows extends WindowAssigner<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private final long size;

	// 全局偏移
	private final long globalOffset;

	// 错位偏移
	private Long staggerOffset = null;

	private final WindowStagger windowStagger;

	protected TumblingEventTimeWindows(long size, long offset, WindowStagger windowStagger) {
		if (Math.abs(offset) >= size) {
			throw new IllegalArgumentException("TumblingEventTimeWindows parameters must satisfy abs(offset) < size");
		}

		this.size = size;
		this.globalOffset = offset;
		this.windowStagger = windowStagger;
	}

	/**
	 * 分配窗口
	 * @param element The element to which windows should be assigned.
	 * @param timestamp The timestamp of the element.
	 * @param context The {@link WindowAssignerContext} in which the assigner operates.
	 * @return
	 */
	@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		// 如果传入的eventTime大于最小值
		if (timestamp > Long.MIN_VALUE) {
			//
			if (staggerOffset == null) {
				staggerOffset = windowStagger.getStaggerOffset(context.getCurrentProcessingTime(), size);
			}
			// Long.MIN_VALUE is currently assigned when no timestamp is present
			long start = TimeWindow.getWindowStartWithOffset(timestamp, (globalOffset + staggerOffset) % size, size);
			return Collections.singletonList(new TimeWindow(start, start + size));
		} else {
			throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no timestamp marker). " +
					"Is the time characteristic set to 'ProcessingTime', or did you forget to call " +
					"'DataStream.assignTimestampsAndWatermarks(...)'?");
		}
	}

	@Override
	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		return EventTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "TumblingEventTimeWindows(" + size + ")";
	}

	/**
	 * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
	 * elements to time windows based on the element timestamp.
	 *
	 * @param size The size of the generated windows.
	 * @return The time policy.
	 */
	public static TumblingEventTimeWindows of(Time size) {
		return new TumblingEventTimeWindows(size.toMilliseconds(), 0, WindowStagger.ALIGNED);
	}

	/**
	 * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
	 * elements to time windows based on the element timestamp and offset.
	 *
	 * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
	 * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
	 * time windows start at 0:15:00,1:15:00,2:15:00,etc.
	 *
	 * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
	 * such as China which is using UTC+08:00,and you want a time window with size of one day,
	 * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
	 * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
	 *
	 * @param size The size of the generated windows.
	 * @param offset The offset which window start would be shifted by.
	 */
	public static TumblingEventTimeWindows of(Time size, Time offset) {
		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), WindowStagger.ALIGNED);
	}


	/**
	 * Creates a new {@code TumblingEventTimeWindows} {@link WindowAssigner} that assigns
	 * elements to time windows based on the element timestamp, offset and a staggering offset,
	 * depending on the staggering policy.
	 *
	 * @param size The size of the generated windows.
	 * @param offset The globalOffset which window start would be shifted by.
	 * @param windowStagger The utility that produces staggering offset in runtime.
	 */
	@PublicEvolving
	public static TumblingEventTimeWindows of(Time size, Time offset, WindowStagger windowStagger) {
		return new TumblingEventTimeWindows(size.toMilliseconds(), offset.toMilliseconds(), windowStagger);
	}

	@Override
	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return true;
	}
}
SlidingWindows
  • 滑动窗口分为ProcessTime和EventTime
public class SlidingProcessingTimeWindows extends WindowAssigner<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;
	// 窗口大小
	private final long size;
	// offset
	private final long offset;
	// 滑动距离
	private final long slide;

	private SlidingProcessingTimeWindows(long size, long slide, long offset) {
		if (Math.abs(offset) >= slide || size <= 0) {
			throw new IllegalArgumentException("SlidingProcessingTimeWindows parameters must satisfy " +
				"abs(offset) < slide and size > 0");
		}

		this.size = size;
		this.slide = slide;
		this.offset = offset;
	}

	@Override
	public Collection<TimeWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		timestamp = context.getCurrentProcessingTime();
		// 窗口大小/滑动补长 计算一个window size下存在几个window
		List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
    // 计算起始位置
		long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, slide);
		// 从start开始,在size范围内,每次滑动slide,比如start为0,size为5 slide为3,windows存储为0~5,3~8,6~11.
		for (long start = lastStart;
			start > timestamp - size;
			start -= slide) {
			// 计算的窗口放入list
			windows.add(new TimeWindow(start, start + size));
		}
		return windows;
	}

	public long getSize() {
		return size;
	}

	public long getSlide() {
		return slide;
	}

	@Override
	public Trigger<Object, TimeWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		// 只会到窗口时间触发计算,累加的方式
		return ProcessingTimeTrigger.create();
	}

	@Override
	public String toString() {
		return "SlidingProcessingTimeWindows(" + size + ", " + slide + ")";
	}

	/**
	 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
	 * elements to sliding time windows based on the element timestamp.
	 *
	 * @param size The size of the generated windows.
	 * @param slide The slide interval of the generated windows.
	 * @return The time policy.
	 */
	public static SlidingProcessingTimeWindows of(Time size, Time slide) {
		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), 0);
	}

	/**
	 * 用于控制数据丢失时区问题
	 * Creates a new {@code SlidingProcessingTimeWindows} {@link WindowAssigner} that assigns
	 * elements to time windows based on the element timestamp and offset.
	 *
	 * <p>For example, if you want window a stream by hour,but window begins at the 15th minutes
	 * of each hour, you can use {@code of(Time.hours(1),Time.minutes(15))},then you will get
	 * time windows start at 0:15:00,1:15:00,2:15:00,etc.
	 *
	 * <p>Rather than that,if you are living in somewhere which is not using UTC±00:00 time,
	 * such as China which is using UTC+08:00,and you want a time window with size of one day,
	 * and window begins at every 00:00:00 of local time,you may use {@code of(Time.days(1),Time.hours(-8))}.
	 * The parameter of offset is {@code Time.hours(-8))} since UTC+08:00 is 8 hours earlier than UTC time.
	 *
	 * @param size The size of the generated windows.
	 * @param slide  The slide interval of the generated windows.
	 * @param offset The offset which window start would be shifted by.
	 * @return The time policy.
	 */
	public static SlidingProcessingTimeWindows of(Time size, Time slide, Time offset) {
		return new SlidingProcessingTimeWindows(size.toMilliseconds(), slide.toMilliseconds(), offset.toMilliseconds());
	}

	@Override
	public TypeSerializer<TimeWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new TimeWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}



// eventTime核心逻辑
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
		// 如果窗口的最大时间,小于等于当前watermarker,则将窗口数据发出,但是不会清空数据
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			// 如果时间超过watermark,则注册计时器,并且统计数据
			ctx.registerEventTimeTimer(window.maxTimestamp());
			return TriggerResult.CONTINUE;
		}
	}
GlobalWindows
private GlobalWindow() { }

	public static GlobalWindow get() {
		return INSTANCE;
	}

	@Override
	public long maxTimestamp() {
		return Long.MAX_VALUE;
	}


public class GlobalWindows extends WindowAssigner<Object, GlobalWindow> {
	private static final long serialVersionUID = 1L;

	private GlobalWindows() {}

	@Override
	public Collection<GlobalWindow> assignWindows(Object element, long timestamp, WindowAssignerContext context) {
		return Collections.singletonList(GlobalWindow.get());
	}

	/**
	 * 默认触发器为:直接跳过
	 * @param env
	 * @return
	 */
	@Override
	public Trigger<Object, GlobalWindow> getDefaultTrigger(StreamExecutionEnvironment env) {
		return new NeverTrigger();
	}

	@Override
	public String toString() {
		return "GlobalWindows()";
	}

	/**
	 * Creates a new {@code GlobalWindows} {@link WindowAssigner} that assigns
	 * all elements to the same {@link GlobalWindow}.
	 *
	 * @return The global window policy.
	 */
	public static GlobalWindows create() {
		return new GlobalWindows();
	}

	/**
	 * A trigger that never fires, as default Trigger for GlobalWindows.
	 */
	@Internal
	public static class NeverTrigger extends Trigger<Object, GlobalWindow> {
		private static final long serialVersionUID = 1L;

		@Override
		public TriggerResult onElement(Object element, long timestamp, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
			return TriggerResult.CONTINUE;
		}

		@Override
		public void clear(GlobalWindow window, TriggerContext ctx) throws Exception {}

		@Override
		public void onMerge(GlobalWindow window, OnMergeContext ctx) {
		}
	}

	@Override
	public TypeSerializer<GlobalWindow> getWindowSerializer(ExecutionConfig executionConfig) {
		return new GlobalWindow.Serializer();
	}

	@Override
	public boolean isEventTime() {
		return false;
	}
}

Evictor

  • CountEvictor:窗口计算时,只保留最近N条element
  • TimeEvictor:窗口计算时,只保留最近N段时间范围的element
  • DeltaEvictor:窗口计算时,最新的一条element与其他element做delta计算,保留delta在threshold内的element
CountEvictor
public class CountEvictor<W extends Window> implements Evictor<Object, W> {
	private static final long serialVersionUID = 1L;
	// 最大的读取count
	private final long maxCount;
	// 是否之后处理
	private final boolean doEvictAfter;

	private CountEvictor(long count, boolean doEvictAfter) {
		this.maxCount = count;
		this.doEvictAfter = doEvictAfter;
	}

	private CountEvictor(long count) {
		this.maxCount = count;
		this.doEvictAfter = false;
	}

	@Override
	public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
		if (!doEvictAfter) {
			evict(elements, size, ctx);
		}
	}

	@Override
	public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
		if (doEvictAfter) {
			evict(elements, size, ctx);
		}
	}

	/**
	 * 处理前n条元素
	 * @param elements
	 * @param size
	 * @param ctx
	 */
	private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
		// 如果如果count大于等于size,直接可以在窗口内处理,否则去要处理elements迭代器
		if (size <= maxCount) {
			return;
		} else {
      // 记录evicted的数据数量
			int evictedCount = 0;
			for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
				iterator.next();
				evictedCount++;
        // 如果删除的数量大于阈值终止
				if (evictedCount > size - maxCount) {
					break;
				} else {
					iterator.remove();
				}
			}
		}
	}

	/**
	 * Creates a {@code CountEvictor} that keeps the given number of elements.
	 * Eviction is done before the window function.
	 *
	 * @param maxCount The number of elements to keep in the pane.
	 */
	public static <W extends Window> CountEvictor<W> of(long maxCount) {
		return new CountEvictor<>(maxCount);
	}

	/**
	 * Creates a {@code CountEvictor} that keeps the given number of elements in the pane
	 * Eviction is done before/after the window function based on the value of doEvictAfter.
	 *
	 * @param maxCount The number of elements to keep in the pane.
	 * @param doEvictAfter Whether to do eviction after the window function.
     */
	public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {
		return new CountEvictor<>(maxCount, doEvictAfter);
	}
}
TimeEvictor
public class TimeEvictor<W extends Window> implements Evictor<Object, W> {
	private static final long serialVersionUID = 1L;

	// 窗口大小
	private final long windowSize;
	private final boolean doEvictAfter;

	public TimeEvictor(long windowSize) {
		this.windowSize = windowSize;
		this.doEvictAfter = false;
	}

	public TimeEvictor(long windowSize, boolean doEvictAfter) {
		this.windowSize = windowSize;
		this.doEvictAfter = doEvictAfter;
	}

	@Override
	public void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
		if (!doEvictAfter) {
			evict(elements, size, ctx);
		}
	}

	@Override
	public void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {
		if (doEvictAfter) {
			evict(elements, size, ctx);
		}
	}

	private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {
		// 如果记录没有时间戳直接终止
		if (!hasTimestamp(elements)) {
			return;
		}

		// 获取记录中最大时间
		long currentTime = getMaxTimestamp(elements);
		// 获取可以读取的timestamp
		long evictCutoff = currentTime - windowSize;

		// 遍历删除小于等于evictCutoff的记录
		for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {
			TimestampedValue<Object> record = iterator.next();
			if (record.getTimestamp() <= evictCutoff) {
				iterator.remove();
			}
		}
	}

	/**
	 * Returns true if the first element in the Iterable of {@link TimestampedValue} has a timestamp.
     */
	private boolean hasTimestamp(Iterable<TimestampedValue<Object>> elements) {
		Iterator<TimestampedValue<Object>> it = elements.iterator();
		if (it.hasNext()) {
			return it.next().hasTimestamp();
		}
		return false;
	}

	/**
	 * @param elements The elements currently in the pane.
	 * @return The maximum value of timestamp among the elements.
     */
	private long getMaxTimestamp(Iterable<TimestampedValue<Object>> elements) {
		long currentTime = Long.MIN_VALUE;
		for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){
			TimestampedValue<Object> record = iterator.next();
			currentTime = Math.max(currentTime, record.getTimestamp());
		}
		return currentTime;
	}

	@Override
	public String toString() {
		return "TimeEvictor(" + windowSize + ")";
	}

	@VisibleForTesting
	public long getWindowSize() {
		return windowSize;
	}

	/**
	 * Creates a {@code TimeEvictor} that keeps the given number of elements.
	 * Eviction is done before the window function.
	 *
	 * @param windowSize The amount of time for which to keep elements.
	 */
	public static <W extends Window> TimeEvictor<W> of(Time windowSize) {
		return new TimeEvictor<>(windowSize.toMilliseconds());
	}

	/**
	 * Creates a {@code TimeEvictor} that keeps the given number of elements.
	 * Eviction is done before/after the window function based on the value of doEvictAfter.
	 *
	 * @param windowSize The amount of time for which to keep elements.
	 * @param doEvictAfter Whether eviction is done after window function.
     */
	public static <W extends Window> TimeEvictor<W> of(Time windowSize, boolean doEvictAfter) {
		return new TimeEvictor<>(windowSize.toMilliseconds(), doEvictAfter);
	}
}
DeltaEvictor
public class DeltaEvictor<T, W extends Window> implements Evictor<T, W> {
	private static final long serialVersionUID = 1L;

	// delta函数,新老数据做操作的函数
	DeltaFunction<T> deltaFunction;
	// 阈值 在阈值范围内的元素保留
	private double threshold;
	private final boolean doEvictAfter;

	private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction) {
		this.deltaFunction = deltaFunction;
		this.threshold = threshold;
		this.doEvictAfter = false;
	}

	private DeltaEvictor(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
		this.deltaFunction = deltaFunction;
		this.threshold = threshold;
		this.doEvictAfter = doEvictAfter;
	}

	@Override
	public void evictBefore(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
		if (!doEvictAfter) {
			evict(elements, size, ctx);
		}
	}

	@Override
	public void evictAfter(Iterable<TimestampedValue<T>> elements, int size, W window, EvictorContext ctx) {
		if (doEvictAfter) {
			evict(elements, size, ctx);
		}
	}

	private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {
		// 获取最后元素,作为新元素
		TimestampedValue<T> lastElement = Iterables.getLast(elements);
		for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){
			TimestampedValue<T> element = iterator.next();
			// 前面的旧元素和新元素放入deltaFunction中计算,大于等于阈值则移除
			if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {
				iterator.remove();
			}
		}
	}

	@Override
	public String toString() {
		return "DeltaEvictor(" +  deltaFunction + ", " + threshold + ")";
	}

	/**
	 * Creates a {@code DeltaEvictor} from the given threshold and {@code DeltaFunction}.
	 * Eviction is done before the window function.
	 *
	 * @param threshold The threshold
	 * @param deltaFunction The {@code DeltaFunction}
	 */
	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction) {
		return new DeltaEvictor<>(threshold, deltaFunction);
	}

	/**
	 * Creates a {@code DeltaEvictor} from the given threshold, {@code DeltaFunction}.
	 * Eviction is done before/after the window function based on the value of doEvictAfter.
	 *
	 * @param threshold The threshold
	 * @param deltaFunction The {@code DeltaFunction}
	 * @param doEvictAfter Whether eviction should be done after window function
     */
	public static <T, W extends Window> DeltaEvictor<T, W> of(double threshold, DeltaFunction<T> deltaFunction, boolean doEvictAfter) {
		return new DeltaEvictor<>(threshold, deltaFunction, doEvictAfter);
	}
}

Trigger

  • 窗口触发器机制,当满足阈值触发对应动作
CountTrigger
public class CountTrigger<W extends Window> extends Trigger<Object, W> {
	private static final long serialVersionUID = 1L;

	// 最大触发条数
	private final long maxCount;

	// 创建Reducing状态,传入Sum reduce函数,计算放入stateDesc的数据之和
	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("count", new Sum(), LongSerializer.INSTANCE);

	private CountTrigger(long maxCount) {
		this.maxCount = maxCount;
	}

	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
		// 获取ReducingState状态
		ReducingState<Long> count = ctx.getPartitionedState(stateDesc);
		// 添加1个元素
		count.add(1L);
		// 如果窗口中以及满足count数,则清空状态并且输出窗口结果
		if (count.get() >= maxCount) {
			count.clear();
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) {
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		ctx.getPartitionedState(stateDesc).clear();
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
	}

	@Override
	public String toString() {
		return "CountTrigger(" +  maxCount + ")";
	}

	/**
	 * Creates a trigger that fires once the number of elements in a pane reaches the given count.
	 *
	 * @param maxCount The count of elements at which to fire.
	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
	 */
	public static <W extends Window> CountTrigger<W> of(long maxCount) {
		return new CountTrigger<>(maxCount);
	}

	private static class Sum implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;

		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			return value1 + value2;
		}

	}
}
ProcessingTimeTrigger
  • 每次到窗口的end时触发窗口计算
public class ProcessingTimeTrigger extends Trigger<Object, TimeWindow> {
	private static final long serialVersionUID = 1L;

	private ProcessingTimeTrigger() {}

	@Override
	public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) {
		// 注册定时器
		ctx.registerProcessingTimeTimer(window.maxTimestamp());
		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	/**
	 * timer触发是调用
	 * @param time The timestamp at which the timer fired.
	 * @param window The window for which the timer fired.
	 * @param ctx A context object that can be used to register timer callbacks.
	 * @return
	 */
	@Override
	public TriggerResult onProcessingTime(long time, TimeWindow window, TriggerContext ctx) {
		return TriggerResult.FIRE;
	}

	@Override
	public void clear(TimeWindow window, TriggerContext ctx) throws Exception {
		ctx.deleteProcessingTimeTimer(window.maxTimestamp());
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(TimeWindow window,
			OnMergeContext ctx) {
		// only register a timer if the time is not yet past the end of the merged window
		// this is in line with the logic in onElement(). If the time is past the end of
		// the window onElement() will fire and setting a timer here would fire the window twice.
		long windowMaxTimestamp = window.maxTimestamp();
		if (windowMaxTimestamp > ctx.getCurrentProcessingTime()) {
			ctx.registerProcessingTimeTimer(windowMaxTimestamp);
		}
	}

	@Override
	public String toString() {
		return "ProcessingTimeTrigger()";
	}

	/**
	 * Creates a new trigger that fires once system time passes the end of the window.
	 */
	public static ProcessingTimeTrigger create() {
		return new ProcessingTimeTrigger();
	}

}
ProcessingTimeoutTrigger
public class ProcessingTimeoutTrigger<T, W extends Window> extends Trigger<T, W> {

	private static final long serialVersionUID = 1L;

	// 内嵌的触发器
	private final Trigger<T, W> nestedTrigger;
	// Trigger触发的间隔
	private final long interval;
	// 新纪录是否重设timer
	private final boolean resetTimerOnNewRecord;
	// 超时窗口是否清理
	private final boolean shouldClearOnTimeout;

	private final ValueStateDescriptor<Long> timeoutStateDesc;

	private ProcessingTimeoutTrigger(
			Trigger<T, W> nestedTrigger,
			long interval,
			boolean resetTimerOnNewRecord,
			boolean shouldClearOnTimeout) {
		this.nestedTrigger = nestedTrigger;
		this.interval = interval;
		this.resetTimerOnNewRecord = resetTimerOnNewRecord;
		this.shouldClearOnTimeout = shouldClearOnTimeout;
		this.timeoutStateDesc = new ValueStateDescriptor<>("timeout", LongSerializer.INSTANCE);
	}

	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx)
			throws Exception {
		// 记录塞入内嵌触发器
		TriggerResult triggerResult = this.nestedTrigger.onElement(element, timestamp, window, ctx);
		// 如果可以输出则输出
		if (triggerResult.isFire()) {
			this.clear(window, ctx);
			return triggerResult;
		}

		ValueState<Long> timeoutState = ctx.getPartitionedState(this.timeoutStateDesc);
		// 下次触发的时间
		long nextFireTimestamp = ctx.getCurrentProcessingTime() + this.interval;
		Long timeoutTimestamp = timeoutState.value();
		// 如果新纪录就清空计时器和超时
		if (timeoutTimestamp != null && resetTimerOnNewRecord) {
			ctx.deleteProcessingTimeTimer(timeoutTimestamp);
			timeoutState.clear();
			timeoutTimestamp = null;
		}
		// 第一次没有获取
		if (timeoutTimestamp == null) {
			// 将下次窗口触发时间放入timeoutState
			timeoutState.update(nextFireTimestamp);
			// 注册定时器
			ctx.registerProcessingTimeTimer(nextFireTimestamp);
		}

		return triggerResult;
	}

	@Override
	public TriggerResult onProcessingTime(long timestamp, W window, TriggerContext ctx)
			throws Exception {
		//调用内嵌trigger的onProcessingTime方法
		TriggerResult triggerResult = this.nestedTrigger.onProcessingTime(timestamp, window, ctx);
		// 如果需要清空,则清空
		if (shouldClearOnTimeout) {
			this.clear(window, ctx);
		}
		return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
	}

	@Override
	public TriggerResult onEventTime(long timestamp, W window, TriggerContext ctx)
			throws Exception {
		TriggerResult triggerResult = this.nestedTrigger.onEventTime(timestamp, window, ctx);
		if (shouldClearOnTimeout) {
			this.clear(window, ctx);
		}
		return triggerResult.isPurge() ? TriggerResult.FIRE_AND_PURGE : TriggerResult.FIRE;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		ValueState<Long> timeoutTimestampState = ctx.getPartitionedState(this.timeoutStateDesc);
		Long timeoutTimestamp = timeoutTimestampState.value();
		if (timeoutTimestamp != null) {
			ctx.deleteProcessingTimeTimer(timeoutTimestamp);
			timeoutTimestampState.clear();
		}
		this.nestedTrigger.clear(window, ctx);
	}

	@Override
	public String toString() {
		return "TimeoutTrigger(" + this.nestedTrigger.toString() + ")";
	}

	/**
	 * Creates a new {@link ProcessingTimeoutTrigger} that fires when the inner trigger is fired or
	 * when the timeout timer fires.
	 *
	 * <p>For example:
	 * {@code ProcessingTimeoutTrigger.of(CountTrigger.of(3), 100)}, will create a CountTrigger with
	 * timeout of 100 millis. So, if the first record arrives at time {@code t}, and the second
	 * record arrives at time {@code t+50 }, the trigger will fire when the third record arrives or
	 * when the time is {code t+100} (timeout).
	 *
	 * @param nestedTrigger the nested {@link Trigger}
	 * @param timeout the timeout interval
	 *
	 * @return {@link ProcessingTimeoutTrigger} with the above configuration.
	 */
	public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(
			Trigger<T, W> nestedTrigger,
			Duration timeout) {
		return new ProcessingTimeoutTrigger<>(nestedTrigger, timeout.toMillis(), false, true);
	}

	/**
	 * Creates a new {@link ProcessingTimeoutTrigger} that fires when the inner trigger is fired or
	 * when the timeout timer fires.
	 *
	 * <p>For example:
	 * {@code ProcessingTimeoutTrigger.of(CountTrigger.of(3), 100, false, true)}, will create a
	 * CountTrigger with timeout of 100 millis. So, if the first record arrives at time {@code t},
	 * and the second record arrives at time {@code t+50 }, the trigger will fire when the third
	 * record arrives or when the time is {code t+100} (timeout).
	 *
	 * @param nestedTrigger the nested {@link Trigger}
	 * @param timeout the timeout interval
	 * @param resetTimerOnNewRecord each time a new element arrives, reset the timer and start a
	 * 		new one
	 * @param shouldClearOnTimeout whether to call {@link Trigger#clear(Window, TriggerContext)}
	 * 		when the processing-time timer fires
	 * @param <T> The type of the element.
	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
	 *
	 * @return {@link ProcessingTimeoutTrigger} with the above configuration.
	 */
	public static <T, W extends Window> ProcessingTimeoutTrigger<T, W> of(
			Trigger<T, W> nestedTrigger,
			Duration timeout,
			boolean resetTimerOnNewRecord,
			boolean shouldClearOnTimeout) {
		return new ProcessingTimeoutTrigger<>(
				nestedTrigger, timeout.toMillis(), resetTimerOnNewRecord, shouldClearOnTimeout);
	}

}
PurgingTrigger
  • 满足窗口清空后清楚窗口状态,清空触发器
public class PurgingTrigger<T, W extends Window> extends Trigger<T, W> {
	private static final long serialVersionUID = 1L;

	private Trigger<T, W> nestedTrigger;

	private  PurgingTrigger(Trigger<T, W> nestedTrigger) {
		this.nestedTrigger = nestedTrigger;
	}

	/**
	 * 如果为fire则为fire_purge触发结果
	 * @param element The element that arrived.
	 * @param timestamp The timestamp of the element that arrived.
	 * @param window The window to which the element is being added.
	 * @param ctx A context object that can be used to register timer callbacks.
	 * @return
	 * @throws Exception
	 */
	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onElement(element, timestamp, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onEventTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}
	
	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		TriggerResult triggerResult = nestedTrigger.onProcessingTime(time, window, ctx);
		return triggerResult.isFire() ? TriggerResult.FIRE_AND_PURGE : triggerResult;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		nestedTrigger.clear(window, ctx);
	}

	@Override
	public boolean canMerge() {
		return nestedTrigger.canMerge();
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		nestedTrigger.onMerge(window, ctx);
	}

	@Override
	public String toString() {
		return "PurgingTrigger(" + nestedTrigger.toString() + ")";
	}

	/**
	 * Creates a new purging trigger from the given {@code Trigger}.
	 *
	 * @param nestedTrigger The trigger that is wrapped by this purging trigger
	 */
	public static <T, W extends Window> PurgingTrigger<T, W> of(Trigger<T, W> nestedTrigger) {
		return new PurgingTrigger<>(nestedTrigger);
	}

	@VisibleForTesting
	public Trigger<T, W> getNestedTrigger() {
		return nestedTrigger;
	}
}
ContinuousEventTimeTrigger
public class ContinuousEventTimeTrigger<W extends Window> extends Trigger<Object, W> {
	private static final long serialVersionUID = 1L;

	// Trigger触发的间隔
	private final long interval;

	/** When merging we take the lowest of all fire timestamps as the new fire timestamp. */
	// 计算最小的时间
	private final ReducingStateDescriptor<Long> stateDesc =
			new ReducingStateDescriptor<>("fire-time", new Min(), LongSerializer.INSTANCE);

	private ContinuousEventTimeTrigger(long interval) {
		this.interval = interval;
	}

	@Override
	public TriggerResult onElement(Object element, long timestamp, W window, TriggerContext ctx) throws Exception {
		// 如果watermark大于等于窗口结束时间,则输出当前窗口记录
		if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
			// if the watermark is already past the window fire immediately
			return TriggerResult.FIRE;
		} else {
			// 注册event定时器
			ctx.registerEventTimeTimer(window.maxTimestamp());
		}
		// 获取输出记录时间
		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
		if (fireTimestamp.get() == null) {
			// 计算开始时间
			long start = timestamp - (timestamp % interval);
			long nextFireTimestamp = start + interval;
			// 注册计时器
			ctx.registerEventTimeTimer(nextFireTimestamp);
			//放入ReducingState
			fireTimestamp.add(nextFireTimestamp);
		}

		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onEventTime(long time, W window, TriggerContext ctx) throws Exception {

		if (time == window.maxTimestamp()){
			return TriggerResult.FIRE;
		}

		ReducingState<Long> fireTimestampState = ctx.getPartitionedState(stateDesc);

		Long fireTimestamp = fireTimestampState.get();

		if (fireTimestamp != null && fireTimestamp == time) {
			fireTimestampState.clear();
			fireTimestampState.add(time + interval);
			ctx.registerEventTimeTimer(time + interval);
			return TriggerResult.FIRE;
		}

		return TriggerResult.CONTINUE;
	}

	@Override
	public TriggerResult onProcessingTime(long time, W window, TriggerContext ctx) throws Exception {
		return TriggerResult.CONTINUE;
	}

	@Override
	public void clear(W window, TriggerContext ctx) throws Exception {
		ReducingState<Long> fireTimestamp = ctx.getPartitionedState(stateDesc);
		Long timestamp = fireTimestamp.get();
		if (timestamp != null) {
			ctx.deleteEventTimeTimer(timestamp);
			fireTimestamp.clear();
		}
	}

	@Override
	public boolean canMerge() {
		return true;
	}

	@Override
	public void onMerge(W window, OnMergeContext ctx) throws Exception {
		ctx.mergePartitionedState(stateDesc);
		Long nextFireTimestamp = ctx.getPartitionedState(stateDesc).get();
		if (nextFireTimestamp != null) {
			ctx.registerEventTimeTimer(nextFireTimestamp);
		}
	}

	@Override
	public String toString() {
		return "ContinuousEventTimeTrigger(" + interval + ")";
	}

	@VisibleForTesting
	public long getInterval() {
		return interval;
	}

	/**
	 * Creates a trigger that continuously fires based on the given interval.
	 *
	 * @param interval The time interval at which to fire.
	 * @param <W> The type of {@link Window Windows} on which this trigger can operate.
	 */
	public static <W extends Window> ContinuousEventTimeTrigger<W> of(Time interval) {
		return new ContinuousEventTimeTrigger<>(interval.toMilliseconds());
	}

	private static class Min implements ReduceFunction<Long> {
		private static final long serialVersionUID = 1L;

		@Override
		public Long reduce(Long value1, Long value2) throws Exception {
			return Math.min(value1, value2);
		}
	}
}
DeltaTrigger
  • 新元素和老元素做delta操作,大于给定的阈值则输出
private static final long serialVersionUID = 1L;
	// delta函数
	private final DeltaFunction<T> deltaFunction;
	// 阈值
	private final double threshold;
	// 状态描述
	private final ValueStateDescriptor<T> stateDesc;

	private DeltaTrigger(double threshold, DeltaFunction<T> deltaFunction, TypeSerializer<T> stateSerializer) {
		this.deltaFunction = deltaFunction;
		this.threshold = threshold;
		stateDesc = new ValueStateDescriptor<>("last-element", stateSerializer);

	}

	@Override
	public TriggerResult onElement(T element, long timestamp, W window, TriggerContext ctx) throws Exception {
		// 获取最后元素状态
		ValueState<T> lastElementState = ctx.getPartitionedState(stateDesc);
		// 收拾维护状态
		if (lastElementState.value() == null) {
			// 修改状态,并且跳过输出
			lastElementState.update(element);
			return TriggerResult.CONTINUE;
		}
		// 如果上一次元素和新元素的delta操作大于阈值,则输出并且修改元素
		if (deltaFunction.getDelta(lastElementState.value(), element) > this.threshold) {
			lastElementState.update(element);
			return TriggerResult.FIRE;
		}
		return TriggerResult.CONTINUE;
	}

windowAll

  • 所有windowAll操作算子的底层算子

assignTimestampsAndWatermarks

public class TimestampsAndWatermarksOperator<T>
		extends AbstractStreamOperator<T>
		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {

	private static final long serialVersionUID = 1L;

	// watermark生成策略
	private final WatermarkStrategy<T> watermarkStrategy;

	/** The timestamp assigner. */
	private transient TimestampAssigner<T> timestampAssigner;

	/** The watermark generator, initialized during runtime. */
	private transient WatermarkGenerator<T> watermarkGenerator;

	/** The watermark output gateway, initialized during runtime. */
	private transient WatermarkOutput wmOutput;

	/** The interval (in milliseconds) for periodic watermark probes. Initialized during runtime. */
	private transient long watermarkInterval;

	public TimestampsAndWatermarksOperator(
			WatermarkStrategy<T> watermarkStrategy) {

		this.watermarkStrategy = checkNotNull(watermarkStrategy);
		this.chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();

		timestampAssigner = watermarkStrategy.createTimestampAssigner(this::getMetricGroup);
		watermarkGenerator = watermarkStrategy.createWatermarkGenerator(this::getMetricGroup);

		wmOutput = new WatermarkEmitter(output, getContainingTask().getStreamStatusMaintainer());
		//watermark生成间隔,从配置中获取
		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
		if (watermarkInterval > 0) {
			final long now = getProcessingTimeService().getCurrentProcessingTime();
			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
		}
	}

	/**
	 * 每来一条记录触发一次
	 * @param element
	 * @throws Exception
	 */
	@Override
	public void processElement(final StreamRecord<T> element) throws Exception {
		final T event = element.getValue();
		final long previousTimestamp = element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE;
		// 提出eventTimestamp
		final long newTimestamp = timestampAssigner.extractTimestamp(event, previousTimestamp);

		element.setTimestamp(newTimestamp);
		output.collect(element);
		// 传入watermark生成器中,newTimestamp为timestampAssigner#extractTimestamp出来的时间
		watermarkGenerator.onEvent(event, newTimestamp, wmOutput);
	}

	/**
	 * 生成watermark,processing触发器触发,定期生产wm
	 * @param timestamp The timestamp for which the trigger event was scheduled.
	 * @throws Exception
	 */
	@Override
	public void onProcessingTime(long timestamp) throws Exception {
		watermarkGenerator.onPeriodicEmit(wmOutput);
		// 拿到现在的时间,注册下一次watermarker生成的时间
		final long now = getProcessingTimeService().getCurrentProcessingTime();
		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
	}

	/**
	 * Override the base implementation to completely ignore watermarks propagated from
	 * upstream, except for the "end of time" watermark.
	 */
	@Override
	public void processWatermark(org.apache.flink.streaming.api.watermark.Watermark mark) throws Exception {
		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
		// to signal the end of input and to not block watermark progress downstream
		if (mark.getTimestamp() == Long.MAX_VALUE) {
			wmOutput.emitWatermark(Watermark.MAX_WATERMARK);
		}
	}

	@Override
	public void close() throws Exception {
		super.close();
		// 关闭之前再生成一次watermark
		watermarkGenerator.onPeriodicEmit(wmOutput);
	}

	// ------------------------------------------------------------------------

	/**
	 * Implementation of the {@code WatermarkEmitter}, based on the components
	 * that are available inside a stream operator.
	 */
	private static final class WatermarkEmitter implements WatermarkOutput {

		private final Output<?> output;

		private final StreamStatusMaintainer statusMaintainer;

		private long currentWatermark;

		private boolean idle;

		WatermarkEmitter(Output<?> output, StreamStatusMaintainer statusMaintainer) {
			this.output = output;
			this.statusMaintainer = statusMaintainer;
			this.currentWatermark = Long.MIN_VALUE;
		}

		@Override
		public void emitWatermark(Watermark watermark) {
			final long ts = watermark.getTimestamp();

			if (ts <= currentWatermark) {
				return;
			}

			currentWatermark = ts;

			if (idle) {
				idle = false;
				statusMaintainer.toggleStreamStatus(StreamStatus.ACTIVE);
			}

			output.emitWatermark(new org.apache.flink.streaming.api.watermark.Watermark(ts));
		}

		@Override
		public void markIdle() {
			idle = true;
			statusMaintainer.toggleStreamStatus(StreamStatus.IDLE);
		}
	}
}

sink算子

print

	public DataStreamSink<T> print() {
		// print函数
		PrintSinkFunction<T> printFunction = new PrintSinkFunction<>();
		return addSink(printFunction).name("Print to Std. Out");
	}
	// 带标示的print
	public DataStreamSink<T> print(String sinkIdentifier) {
		PrintSinkFunction<T> printFunction = new PrintSinkFunction<>(sinkIdentifier, false);
		return addSink(printFunction).name("Print to Std. Out");
	}

writeAsText

public DataStreamSink<T> writeAsText(String path) {
		// 按照Text格式写入对应path,根据path解析出不同的FileSystem,然后将二进制数据写入Fs
		return writeUsingOutputFormat(new TextOutputFormat<T>(new Path(path)));
	}

OutputFormatSinkFunction

public class OutputFormatSinkFunction<IN> extends RichSinkFunction<IN> implements InputTypeConfigurable {

	private static final long serialVersionUID = 1L;

	private static final Logger LOG = LoggerFactory.getLogger(OutputFormatSinkFunction.class);
	
	// 对应的output格式
	private OutputFormat<IN> format;
	private boolean cleanupCalled = false;

	public OutputFormatSinkFunction(OutputFormat<IN> format) {
		this.format = format;
	}

	@Override
	public void open(Configuration parameters) throws Exception {
		// 获取运行上下文
		RuntimeContext context = getRuntimeContext();
		// 配置放入对应outputFormat中
		format.configure(parameters);
		// 获取subTask的index
		int indexInSubtaskGroup = context.getIndexOfThisSubtask();
		// 获取并行subTask的个数
		int currentNumberOfSubtasks = context.getNumberOfParallelSubtasks();
		// 用于并行写
		format.open(indexInSubtaskGroup, currentNumberOfSubtasks);
	}

	@Override
	public void setRuntimeContext(RuntimeContext context) {
		super.setRuntimeContext(context);
		if (format instanceof RichOutputFormat) {
			((RichOutputFormat) format).setRuntimeContext(context);
		}
	}

	@Override
	public void setInputType(TypeInformation<?> type, ExecutionConfig executionConfig) {
		if (format instanceof InputTypeConfigurable) {
			InputTypeConfigurable itc = (InputTypeConfigurable) format;
			itc.setInputType(type, executionConfig);
		}
	}

	@Override
	public void invoke(IN record) throws Exception {
		try {
			format.writeRecord(record);
		} catch (Exception ex) {
			cleanup();
			throw ex;
		}
	}

	@Override
	public void close() throws IOException {
		try {
			format.close();
		} catch (Exception ex) {
			cleanup();
			throw ex;
		}
	}

	private void cleanup() {
		try {
			if (!cleanupCalled && format instanceof CleanupWhenUnsuccessful) {
				cleanupCalled = true;
				((CleanupWhenUnsuccessful) format).tryCleanupOnError();
			}
		} catch (Throwable t) {
			LOG.error("Cleanup on error failed.", t);
		}
	}

	public OutputFormat<IN> getFormat() {
		return format;
	}
}

addSink

public DataStreamSink<T> addSink(SinkFunction<T> sinkFunction) {

		// read the output type of the input Transform to coax out errors about MissingTypeInfo
		transformation.getOutputType();

		// configure the type if needed
		if (sinkFunction instanceof InputTypeConfigurable) {
			((InputTypeConfigurable) sinkFunction).setInputType(getType(), getExecutionConfig());
		}
		// 包装成StreamSink
		StreamSink<T> sinkOperator = new StreamSink<>(clean(sinkFunction));

		DataStreamSink<T> sink = new DataStreamSink<>(this, sinkOperator);
		// 添加到算子链中
		getExecutionEnvironment().addOperator(sink.getTransformation());
		return sink;
	}
  • DataStreamSink
public class DataStreamSink<T> {

	private final SinkTransformation<T> transformation;

	@SuppressWarnings("unchecked")
	protected DataStreamSink(DataStream<T> inputStream, StreamSink<T> operator) {
		// 封装成SinkTransformation
		this.transformation = new SinkTransformation<T>(inputStream.getTransformation(), "Unnamed", operator, inputStream.getExecutionEnvironment().getParallelism());
	}

	/**
	 * Returns the transformation that contains the actual sink operator of this sink.
	 */
	@Internal
	public SinkTransformation<T> getTransformation() {
		return transformation;
	}

	/**
	 * Sets the name of this sink. This name is
	 * used by the visualization and logging during runtime.
	 *
	 * @return The named sink.
	 */
	public DataStreamSink<T> name(String name) {
		transformation.setName(name);
		return this;
	}

	/**
	 * Sets an ID for this operator.
	 *
	 * <p>The specified ID is used to assign the same operator ID across job
	 * submissions (for example when starting a job from a savepoint).
	 *
	 * <p><strong>Important</strong>: this ID needs to be unique per
	 * transformation and job. Otherwise, job submission will fail.
	 *
	 * @param uid The unique user-specified ID of this transformation.
	 * @return The operator with the specified ID.
	 */
	@PublicEvolving
	public DataStreamSink<T> uid(String uid) {
		transformation.setUid(uid);
		return this;
	}

	/**
	 * Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
	 *
	 * <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
	 * operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
	 *
	 * <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
	 * needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
	 * assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
	 *
	 * <p>A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
	 * automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
	 * obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
	 *
	 * @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
	 *                 logs and web ui.
	 * @return The operator with the user provided hash.
	 */
	@PublicEvolving
	public DataStreamSink<T> setUidHash(String uidHash) {
		transformation.setUidHash(uidHash);
		return this;
	}

	/**
	 * Sets the parallelism for this sink. The degree must be higher than zero.
	 *
	 * @param parallelism The parallelism for this sink.
	 * @return The sink with set parallelism.
	 */
	public DataStreamSink<T> setParallelism(int parallelism) {
		transformation.setParallelism(parallelism);
		return this;
	}

	//	---------------------------------------------------------------------------
	//	 Fine-grained resource profiles are an incomplete work-in-progress feature
	//	 The setters are hence private at this point.
	//	---------------------------------------------------------------------------

	/**
	 * Sets the minimum and preferred resources for this sink, and the lower and upper resource limits will
	 * be considered in resource resize feature for future plan.
	 *
	 * @param minResources The minimum resources for this sink.
	 * @param preferredResources The preferred resources for this sink
	 * @return The sink with set minimum and preferred resources.
	 */
	private DataStreamSink<T> setResources(ResourceSpec minResources, ResourceSpec preferredResources) {
		transformation.setResources(minResources, preferredResources);
		return this;
	}

	/**
	 * Sets the resources for this sink, the minimum and preferred resources are the same by default.
	 *
	 * @param resources The resources for this sink.
	 * @return The sink with set minimum and preferred resources.
	 */
	private DataStreamSink<T> setResources(ResourceSpec resources) {
		transformation.setResources(resources, resources);

		return this;
	}

	/**
	 * Turns off chaining for this operator so thread co-location will not be
	 * used as an optimization.
	 *
	 * <p>Chaining can be turned off for the whole
	 * job by {@link org.apache.flink.streaming.api.environment.StreamExecutionEnvironment#disableOperatorChaining()}
	 * however it is not advised for performance considerations.
	 *
	 * @return The sink with chaining disabled
	 */
	@PublicEvolving
	public DataStreamSink<T> disableChaining() {
		this.transformation.setChainingStrategy(ChainingStrategy.NEVER);
		return this;
	}

	/**
	 * Sets the slot sharing group of this operation. Parallel instances of
	 * operations that are in the same slot sharing group will be co-located in the same
	 * TaskManager slot, if possible.
	 *
	 * <p>Operations inherit the slot sharing group of input operations if all input operations
	 * are in the same slot sharing group and no slot sharing group was explicitly specified.
	 *
	 * <p>Initially an operation is in the default slot sharing group. An operation can be put into
	 * the default group explicitly by setting the slot sharing group to {@code "default"}.
	 *
	 * @param slotSharingGroup The slot sharing group name.
	 */
	@PublicEvolving
	public DataStreamSink<T> slotSharingGroup(String slotSharingGroup) {
		transformation.setSlotSharingGroup(slotSharingGroup);
		return this;
	}
}

KeyedStream

  • 用于keyby之后返回的数据流,支持keyedState,keyedWindowFunction等

process

  • ProcessFunction 过期API
	public <R> SingleOutputStreamOperator<R> process(
			ProcessFunction<T, R> processFunction,
			TypeInformation<R> outputType) {

		LegacyKeyedProcessOperator<KEY, T, R> operator = new LegacyKeyedProcessOperator<>(clean(processFunction));
		// 将创建的算子传入transform,最终加入transforms dag图中
		return transform("Process", outputType, operator);
	}

public class LegacyKeyedProcessOperator<K, IN, OUT>
		extends AbstractUdfStreamOperator<OUT, ProcessFunction<IN, OUT>>
		implements OneInputStreamOperator<IN, OUT>, Triggerable<K, VoidNamespace> {

	private static final long serialVersionUID = 1L;

	private transient TimestampedCollector<OUT> collector;

	private transient ContextImpl context;

	private transient OnTimerContextImpl onTimerContext;

	public LegacyKeyedProcessOperator(ProcessFunction<IN, OUT> function) {
		super(function);

		chainingStrategy = ChainingStrategy.ALWAYS;
	}

	@Override
	public void open() throws Exception {
		super.open();
		//创建时间戳收集器
		collector = new TimestampedCollector<>(output);
		//创建内部时间服务描述
		InternalTimerService<VoidNamespace> internalTimerService =
				getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, this);

		TimerService timerService = new SimpleTimerService(internalTimerService);

		context = new ContextImpl(userFunction, timerService);
		onTimerContext = new OnTimerContextImpl(userFunction, timerService);
	}

	@Override
	public void onEventTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
		collector.setAbsoluteTimestamp(timer.getTimestamp());
		// 触发processFunction
		invokeUserFunction(TimeDomain.EVENT_TIME, timer);
	}

	@Override
	public void onProcessingTime(InternalTimer<K, VoidNamespace> timer) throws Exception {
		collector.eraseTimestamp();
		// 触发processFunction
		invokeUserFunction(TimeDomain.PROCESSING_TIME, timer);
	}

	@Override
	public void processElement(StreamRecord<IN> element) throws Exception {
		// 设置处理数据的时间
		collector.setTimestamp(element);
		// 数据传递给context
		context.element = element;
		// 执行processFunction#processElement
		userFunction.processElement(element.getValue(), context, collector);
		context.element = null;
	}

  // 底层timer执行原理
	private void invokeUserFunction(
			TimeDomain timeDomain,
			InternalTimer<K, VoidNamespace> timer) throws Exception {
		onTimerContext.timeDomain = timeDomain;
		onTimerContext.timer = timer;
    // 触发process函数的onTimer逻辑
		userFunction.onTimer(timer.getTimestamp(), onTimerContext, collector);
		onTimerContext.timeDomain = null;
		onTimerContext.timer = null;
	}

	....
}
  • 传入keyedProcessFunction函数
public <R> SingleOutputStreamOperator<R> process(
			KeyedProcessFunction<KEY, T, R> keyedProcessFunction,
			TypeInformation<R> outputType) {

		KeyedProcessOperator<KEY, T, R> operator = new KeyedProcessOperator<>(clean(keyedProcessFunction));
		return transform("KeyedProcess", outputType, operator);
	}

intervalJoin

  • 双流在一段间隔内join
	public <T1> IntervalJoin<T, T1, KEY> intervalJoin(KeyedStream<T1, KEY> otherStream) {
		return new IntervalJoin<>(this, otherStream);
	}
	
	
		public IntervalJoined<T1, T2, KEY> between(Time lowerBound, Time upperBound) {

			TimeCharacteristic timeCharacteristic =
				streamOne.getExecutionEnvironment().getStreamTimeCharacteristic();
			// IntervalJoin不支持eventTime时间语义
			if (timeCharacteristic != TimeCharacteristic.EventTime) {
				throw new UnsupportedTimeCharacteristicException("Time-bounded stream joins are only supported in event time");
			}

			checkNotNull(lowerBound, "A lower bound needs to be provided for a time-bounded join");
			checkNotNull(upperBound, "An upper bound needs to be provided for a time-bounded join");

			return new IntervalJoined<>(
				streamOne,
				streamTwo,
				lowerBound.toMilliseconds(),
				upperBound.toMilliseconds(),
				true,
				true
			);
		}
		
		
		
		public <OUT> SingleOutputStreamOperator<OUT> process(
				ProcessJoinFunction<IN1, IN2, OUT> processJoinFunction,
				TypeInformation<OUT> outputType) {
			Preconditions.checkNotNull(processJoinFunction);
			Preconditions.checkNotNull(outputType);
			// 清理process函数
			final ProcessJoinFunction<IN1, IN2, OUT> cleanedUdf = left.getExecutionEnvironment().clean(processJoinFunction);
			// 创建IntervalJoinOperator
			final IntervalJoinOperator<KEY, IN1, IN2, OUT> operator =
				new IntervalJoinOperator<>(
					lowerBound,
					upperBound,
					lowerBoundInclusive,
					upperBoundInclusive,
					left.getType().createSerializer(left.getExecutionConfig()),
					right.getType().createSerializer(right.getExecutionConfig()),
					cleanedUdf
				);
			return left
				.connect(right)
				.keyBy(keySelector1, keySelector2)
				.transform("Interval Join", outputType, operator);
		}