Flink-窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Keyed Windows

stream
.keyBy(...) <- keyed versus non-keyed windows
.window(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

Non-Keyed Windows

stream
.windowAll(...) <- required: "assigner"
[.trigger(...)] <- optional: "trigger" (else default trigger)
[.evictor(...)] <- optional: "evictor" (else no evictor)
[.allowedLateness(...)] <- optional: "lateness" (else zero)
[.sideOutputLateData(...)] <- optional: "output tag" (else no side output for late data)
.reduce/aggregate/fold/apply() <- required: "function"
[.getSideOutput(...)] <- optional: "output tag"

Flink必须在调用函数之前在内部缓冲窗口中的所有元素,所以使用非增量计算的reduce/aggregate/fold的执行效率都不高。
AllWindowedStream 警告:所有数据将发送给下游的单个实例,或者说下游算子的并行度为1。

滚动窗口


特殊用法,修改对齐窗口时间:

1
2
3
4
5
// daily tumbling event-time windows offset by -8 hours.
input
.keyBy(<key selector>)
.window(TumblingEventTimeWindows.of(Time.days(1), Time.hours(-8)))
.<windowed transformation>(<window function>)

For example, without offsets hourly tumbling windows are aligned with epoch, that is you will get windows such as 1:00:00.000 - 1:59:59.999, 2:00:00.000 - 2:59:59.999 and so on. If you want to change that you can give an offset. With an offset of 15 minutes you would, for example, get 1:15:00.000 - 2:14:59.999, 2:15:00.000 - 3:14:59.999 etc.An important use case for offsets is to adjust windows to timezones other than UTC-0. For example, in China you would have to specify an offset of Time.hours(-8).

滑动窗口


size>interval,那么就会形成sliding-window(有重叠数据)
如果size< interval, 那么这种窗口将会丢失数据。比如每5秒钟,统计过去3秒的通过路口汽车的数据,将会漏掉2秒钟的数据。
特殊用法:

1
2
3
4
5
// sliding processing-time windows offset by -8 hours
input
.keyBy(<key selector>)
.window(SlidingProcessingTimeWindows.of(Time.hours(12), Time.hours(1), Time.hours(-8)))
.<windowed transformation>(<window function>)

As shown in the last example, sliding window assigners also take an optional offset parameter that can be used to change the alignment of windows. For example, without offsets hourly windows sliding by 30 minutes are aligned with epoch, that is you will get windows such as 1:00:00.000 - 1:59:59.999, 1:30:00.000 - 2:29:59.999 and so on. If you want to change that you can give an offset. With an offset of 15 minutes you would, for example, get 1:15:00.000 - 2:14:59.999, 1:45:00.000 - 2:44:59.999 etc.

会话窗口


活动的事件进行窗口化。窗口的长度可变,每个窗口的开始和结束时间并不是确定的。可以设置定长的Session gap,也可以使用SessionWindowTimeGapExtractor动态地确定Session gap的长度。
当一个窗口在大于Session gap的时间内没有接收到新数据时,窗口将关闭。

val input: DataStream[T] = ...

// event-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)

// event-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(EventTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

// processing-time session windows with static gap
input
    .keyBy(<key selector>)
    .window(ProcessingTimeSessionWindows.withGap(Time.minutes(10)))
    .<windowed transformation>(<window function>)


// processing-time session windows with dynamic gap
input
    .keyBy(<key selector>)
    .window(DynamicProcessingTimeSessionWindows.withDynamicGap(new SessionWindowTimeGapExtractor[String] {
      override def extract(element: String): Long = {
        // determine and return session gap
      }
    }))
    .<windowed transformation>(<window function>)

其他操作

https://www.cnblogs.com/bjwu/p/10393146.html

Triggers(触发器)

触发器定义了窗口何时准备好被窗口处理。每个窗口分配器默认都有一个触发器,如果默认的触发器不符合你的要求,就可以使用trigger(…)自定义触发器。
通常来说,默认的触发器适用于多种场景。例如,多有的event-time窗口分配器都有一个EventTimeTrigger作为默认触发器。该触发器在watermark通过窗口末尾时出发。
PS:GlobalWindow默认的触发器时NeverTrigger,该触发器从不出发,所以在使用GlobalWindow时必须自定义触发器。

Allowed Lateness

当使用event-time窗口时,元素可能会晚到,例如Flink用于跟踪event-time进度的watermark已经超过了窗口的结束时间戳。
默认来说,当watermark超过窗口的末尾时,晚到的元素会被丢弃。但是flink也允许为窗口operator指定最大的allowed lateness,以至于可以容忍在彻底删除元素之前依然接收晚到的元素,其默认值是0。
为了支持该功能,Flink会保持窗口的状态,知道allowed lateness到期。一旦到期,flink会删除窗口并删除其状态。
把晚到的元素可以用side output输出使用。

文章作者:Lily

原始链接:/2020/05/11/Flink-%E7%AA%97%E5%8F%A3/

版权说明:转载请保留原文链接及作者。