Flink-Watermark

官网指路

https://ci.apache.org/projects/flink/flink-docs-release-1.10/zh/dev/event_timestamps_watermarks.html

前言

!!! 经实际使用发现任务刚刚启动时watermark没有调用extractTimestamp方法,因此会是初始值。。。。

Flink中测量进度使用的是水印(Watermark)in event time。水印流作为数据流的一部分,携带了一个时间戳t。
一个Watermark(t)标志数据流中“事件时间”已经到了t,即不再会有时间戳小于等于t数据过来。

Watermark:算子通过Watermark推断当前的事件时间。Watermark用于通知算子没有比水位更小的时间戳的事件会发生了。

一般来讲Watermark经常和Window一起被用来处理乱序事件。解决窗口的触发时机以及对于乱序数据的数据保障(或者说数据乱序情况下依然分配进属于它的窗口中)。

WaterMark绝大部分时候是和eventTime配合使用,最大程度上确保了Flink窗口处理中数据的顺序(注意是watermark只是最大程度上确保了数据顺序即一定程度上缓解了数据乱序的问题,某些情况下,数据延迟非常严重即watermark机制也无法确保数据全部进入窗口,关于延迟数据参考allowed lateness机制)。

watermark是一个全局的值,不是某一个key下的值,所以即使不是同一个key的数据,其warmark也会增加.currentMaxTimestamp也增加了。
1.窗口的触发时机问题: (有两个关键词 1:Partition Watermark , 2:事件时间时钟)
– 任务会对flink的输入分区维护相应的Partition Watermark.
当检测到某一分区有输入后,系统会提取该输入事件的watermark与当前分区的watermark 进行比较
并将大的watermark更新当前Partition watermark(相当于是一个更新操作);
更新后会将 Partition Watermark (分区) 中最小的那个watermark 作为事件时间时钟 向下游发送 后续就是trigger的操作了

传递

Watermark在定义处随数据流一同往下传递,当一个operate有多个输入流或者或者channel时,采用最小的watermark。
水印是单调递增,新的来了会替换旧的。输出的水印是所有输入最小的水印。

与window一起使用时

查看源码EventTimeTrigger

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
@Override
public TriggerResult onElement(Object element, long timestamp, TimeWindow window, TriggerContext ctx) throws Exception {
if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
// if the watermark is already past the window fire immediately
return TriggerResult.FIRE;
} else {
ctx.registerEventTimeTimer(window.maxTimestamp());
return TriggerResult.CONTINUE;
}
}
```
`watermark >= end_of_window + allowedLateness(默认是0) `时窗口关闭. 但窗口里真实计算的是`event time<end_of_window + allowedLateness`

Flink何时触发window?

`1、watermark时间 > Event Time(对于late element太多的数据而言)`
或者
`1、watermark时间 >= window_end_time(对于out-of-order以及正常的数据而言)`
`2、在[window_start_time, window_end_time)中有数据存在`

参考:https://blog.csdn.net/xorxos/article/details/80715113
### 使用流程
1、指定使用event time
`env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);`
2、定义获取event time的字段和watermark
方式1: 在source中定义,使数据刚进去flink时就分配event time和watermark。具体使用可以参看`SourceFunction`,但如果是使用的不是自定义的source,那就无法使用此方法。

@Override
public void run(SourceContext ctx) throws Exception {
while (/* condition */) {
MyType next = getNext();
ctx.collectWithTimestamp(next, next.getEventTimestamp());

    if (next.hasWatermarkTime()) {
        ctx.emitWatermark(new Watermark(next.getWatermarkTime()));
    }
}

}

方式2: 在要使用watermark之前的operate后定义`assignTimestampsAndWatermarks`
两种方法同时使用,第2中会覆盖第一种。
3、如果在上面选用第二种方式,timestamp 和 watermark 的生成器主要有两类,`AssignerWithPeriodicWatermarks` and `AssignerWithPunctuatedWatermarks`。或者可以直接实现抽象类`BoundedOutOfOrdernessTimestampExtractor`,传入允许延迟的最大时间,和实现获取event time的方法。

### Watermarks提取器类型
1、periodic watermark根据设定周期性的生成watermarks,默认是每200ms发一次,通过这个配置修改`env.getConfig().setAutoWatermarkInterval(200);`。每次会调用`getCurrentWatermark()`方法然后发射出一个新的watemark(如果这个watermark不为null && 这个新的watermark大于当前的watermark)
2、puncuated watermarks 几乎每条数据都会生成水印。
![](1.png)

#### periodic watermarks
通常使用的watermarks生成选用方式。Periodic Watermark分为两种,升序模式和固定时间间隔模式。
(1) 升序模式
Ascending Timestamp分配器会指定数据中的Timestamp字段,并用当前的Timestamp作为最小的watermark。这个模式比较适合于事件按顺序生成,没有乱序事件的情况。
升序模式通过实现抽象类`AscendingTimestampExtractor`定义。`AscendingTimestampExtractor`的抽象方法`extractAscendingTimestamp`定义了如何从元素中抽取Timestamp。
(2) 固定时间间隔
该模式是通过设置固定的时间来指定watermark落后于Timestamp的区间长度。该固定时间就是最长容忍迟到的时间,也即系统最大容忍迟到多长时间内的数据到达系统。
通过创建抽象类`BoundedOutOfOrdernessTimestampExtractor`的子类实现Timestamp Assigner。`BoundedOutOfOrdernessTimestampExtractor`定义了`extractTimestamp`抽象方法用于从元素中抽取Timestamp,其子类要在该方法实现Timestamp抽取策略;实现抽象类时传入参数`maxOutOfOrderness`,是再大容忍的延迟时间。 Watermarks创建是根据Timestamp减去固定时间长度生成。如果当前数据中时间小于等于watermarks时间,则认为是迟到时间。

#### puncuated watermarks
间断性调用getCurrentWatermark,它会根据一个条件发送watermark,这个条件可以自己去定义。
数据流中每一个递增的EventTime都会产生一个Watermark,在实际生产中TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。
实现接口`AssignerWithPunctuatedWatermarks`.

### allowedLateness && 什么情况下数据会被丢弃或者说不会被计算?
https://www.cnblogs.com/gouhaiping/p/12556802.html
默认情况下,如果不指定allowedLateness,其值是0
allowedLateness就是针对event time而言,对于watermark超过end-of-window之后,还允许有一段时间(也是以event time来衡量)来等待之前的数据到达,以便再次处理这些数据。
注意:对于trigger是默认的EventTimeTrigger的情况下,allowedLateness会再次触发窗口的计算,而之前触发的数据,会buffer起来,直到watermark超过end-of-window + allowedLateness()的时间,窗口的数据及元数据信息才会被删除。再次计算就是DataFlow模型中的Accumulating(积累)的情况。


两种情况:
a.未设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间,则该条数据会被丢弃;
b.设置allowedLateness情况下,某条数据属于某个窗口,但是watermark超过了窗口的结束时间+延迟时间,则该条数据会被丢弃;

 也就是说如果一个key下面的某条数据如果延迟到来太多,就会被丢弃,这个问题无法解决的

题外话:非要用Processing Time,只要加上env.getConfig().setAutoWatermarkInterval(200);这句话就可以了,是没有任何效果。(https://blog.csdn.net/lvwenyuan_1/article/details/91389124?utm_medium=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.add_param_isCf&depth_1-utm_source=distribute.pc_relevant_t0.none-task-blog-BlogCommendFromMachineLearnPai2-1.add_param_isCf)

参考:https://blog.csdn.net/xorxos/article/details/80715113  (!!!有事例做演示)
https://blog.csdn.net/weixin_42261489/article/details/105756384
https://blog.csdn.net/oTengYue/article/details/102689538?depth_1-utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1&utm_source=distribute.pc_relevant.none-task-blog-BlogCommendFromBaidu-1

文章作者:Lily

原始链接:/2020/05/11/Flink-Watermark/

版权说明:转载请保留原文链接及作者。