简介
flink自带的DateTimeBucket按照process time执行时间来落分区目录。
但有很多缺点:
1、数据延迟很常见 2、或者任务出问题暂停后再启动如果恰好跨分区,数据出问题的会更多。
因此很多时候我们希望的是 event time,本文的办法是
flink自带的DateTimeBucketAssigner
定义了数据保存的hdfs时间路径,选用的是currentProcessingTime
,重要的代码在方法中getBucketId
。
因此,本文的重点是重新定义getBucketId
方法。
另外,我们还要生成done(又称succed)文件来通知下游任务,上游数据已经好了。
如何生成done
判断当前目录下是否还有progress文件
直接使用event time
但是日志中的脏数据本来就很多,如果单纯的使用event time,前面的分区将永远无法完成。
因此在代码里用了BehaviorHistoryMergeUtils.minutesLater(0L, -60)
判断小于当前时间60的时间不再进入正确的分区,落入错误分区。
但是实时落仓本来的优势就是快,现在要等60min,不符合原意。如果改的更小如5min还是会有如果代码暂停了超过5分钟,还是会有很多问题。
Tips: StreamingFileSink的实例1.10.0有bug可以选择用java写,up版本已修复。
1 | /* |
其他:
1 | def minutesLater(nowDate: Long, plus: Int): Long = { |
使用:
1 | StreamingFileSink build = StreamingFileSink |
改进 使用watermark
在sink之前定义watermark。
不同的日志不同的定义方式,如果日志里还会有很多超过当前时间的日志,就可以参考这里的办法。
1 | source.map(...).uid(...) |
getBucketId
方法的定义。watermark使用的时候需要注意,任务刚刚启动的时候watermark可能是个默认值。
watermark的生成是每200ms,而且不是任务启动就会调用extractTimestamp
再调用getCurrentWatermark
.具体可以看另一个讲解watermark的文章。
(pingbackTime < watermark && pingbackTime < 前一小时的零点)||(pingbackTime > 下一小时的零点)时进入错误分区
1 | @Override |
问题
1、当程序突然停止时,文件仍处于inprogress状态。
2、默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,选用上次所有subtask最大的编号值count加1继续开始。
3、写入HDFS时,会产生大量的小文件。此问题比较轻微可简单解决。