Flink-StreamingFileSink自定义Watermark+DateTimeBucket实现精准落仓

简介

flink自带的DateTimeBucket按照process time执行时间来落分区目录。
但有很多缺点:
1、数据延迟很常见 2、或者任务出问题暂停后再启动如果恰好跨分区,数据出问题的会更多。
因此很多时候我们希望的是 event time,本文的办法是
flink自带的DateTimeBucketAssigner定义了数据保存的hdfs时间路径,选用的是currentProcessingTime,重要的代码在方法中getBucketId
因此,本文的重点是重新定义getBucketId方法。

另外,我们还要生成done(又称succed)文件来通知下游任务,上游数据已经好了。

如何生成done

判断当前目录下是否还有progress文件

直接使用event time

但是日志中的脏数据本来就很多,如果单纯的使用event time,前面的分区将永远无法完成。
因此在代码里用了BehaviorHistoryMergeUtils.minutesLater(0L, -60)判断小于当前时间60的时间不再进入正确的分区,落入错误分区。
但是实时落仓本来的优势就是快,现在要等60min,不符合原意。如果改的更小如5min还是会有如果代码暂停了超过5分钟,还是会有很多问题。

Tips: StreamingFileSink的实例1.10.0有bug可以选择用java写,up版本已修复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.core.io.SimpleVersionedSerializer;
import org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner;
import org.apache.flink.streaming.api.functions.sink.filesystem.bucketassigners.SimpleVersionedStringSerializer;
import org.apache.flink.util.Preconditions;

import java.time.Instant;
import java.time.ZoneId;
import java.time.format.DateTimeFormatter;

/**
* A {@link BucketAssigner} that assigns to buckets based on current system time.
*
*
* <p>The {@code DateTimeBucketer} will create directories of the following form:
* {@code /{basePath}/{dateTimePath}/}. The {@code basePath} is the path
* that was specified as a base path when creating the
* {@link org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink}.
* The {@code dateTimePath} is determined based on the current system time and the
* user provided format string.
*
*
* <p>{@link DateTimeFormatter} is used to derive a date string from the current system time and
* the date format string. The default format string is {@code "yyyy-MM-dd--HH"} so the rolling
* files will have a granularity of hours.
*
* <p>Example:
*
* <pre>{@code
* BucketAssigner bucketAssigner = new DateTimeBucketAssigner("yyyy-MM-dd--HH");
* }</pre>
*
* <p>This will create for example the following bucket path:
* {@code /base/1976-12-31-14/}
*
*/
@PublicEvolving
public class DateTimeBucketWithEventAssigner<IN> implements BucketAssigner<IN, String> {

private static final long serialVersionUID = 1L;

private static final String DEFAULT_FORMAT_STRING = "yyyy-MM-dd--HH";

private final String formatString;

private final ZoneId zoneId;

private transient DateTimeFormatter dateTimeFormatter;

/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"}.
*/
public DateTimeBucketWithEventAssigner() {
this(DEFAULT_FORMAT_STRING);
}

/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string.
*
* @param formatString The format string that will be given to {@code SimpleDateFormat} to determine
* the bucket id.
*/
public DateTimeBucketWithEventAssigner(String formatString) {
this(formatString, ZoneId.systemDefault());
}

/**
* Creates a new {@code DateTimeBucketAssigner} with format string {@code "yyyy-MM-dd--HH"} using the given timezone.
*
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public DateTimeBucketWithEventAssigner(ZoneId zoneId) {
this(DEFAULT_FORMAT_STRING, zoneId);
}

/**
* Creates a new {@code DateTimeBucketAssigner} with the given date/time format string using the given timezone.
*
* @param formatString The format string that will be given to {@code DateTimeFormatter} to determine
* the bucket path.
* @param zoneId The timezone used to format {@code DateTimeFormatter} for bucket id.
*/
public DateTimeBucketWithEventAssigner(String formatString, ZoneId zoneId) {
this.formatString = Preconditions.checkNotNull(formatString);
this.zoneId = Preconditions.checkNotNull(zoneId);
}

@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
if (element instanceof SearchEtl) {
long minutesAgo = BehaviorHistoryMergeUtils.minutesLater(0L, -60);
Long pingbackTime = ((SearchEtl) element).getStime();
if (pingbackTime < minutesAgo){
return "error_time";
}
return dateTimeFormatter.format(Instant.ofEpochMilli(pingbackTime));
} else {
return "error_time";
}
}

@Override
public SimpleVersionedSerializer<String> getSerializer() {
return SimpleVersionedStringSerializer.INSTANCE;
}

@Override
public String toString() {
return "DateTimeBucketAssigner{" +
"formatString='" + formatString + '\'' +
", zoneId=" + zoneId +
'}';
}
}

其他:

1
2
3
4
5
6
7
def minutesLater(nowDate: Long, plus: Int): Long = {
if (nowDate <= 0) {
new DateTime().plusMinutes(plus).getMillis
} else {
new DateTime(nowDate).plusMinutes(plus).getMillis
}
}

使用:

1
2
3
4
5
StreamingFileSink build = StreamingFileSink
.forBulkFormat(new Path(hdfs), ParquetAvroWriters.forSpecificRecord(SearchEtl.class))
.withBucketAssigner(new DateTimeBucketWithEventAssigner())
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.build();

改进 使用watermark

在sink之前定义watermark。
不同的日志不同的定义方式,如果日志里还会有很多超过当前时间的日志,就可以参考这里的办法。

1
2
3
4
5
6
7
source.map(...).uid(...)
.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SearchEtl](Time.minutes(3)) {
override def extractTimestamp(element: SearchEtl): Long = {
math.min(element.getStime, System.currentTimeMillis())
}
})
.addSink(FlinkConfig.getParquetHdfsSink(hdfs)).uid("sink")

getBucketId方法的定义。watermark使用的时候需要注意,任务刚刚启动的时候watermark可能是个默认值。
watermark的生成是每200ms,而且不是任务启动就会调用extractTimestamp再调用getCurrentWatermark.具体可以看另一个讲解watermark的文章。
(pingbackTime < watermark && pingbackTime < 前一小时的零点)||(pingbackTime > 下一小时的零点)时进入错误分区

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
@Override
public String getBucketId(IN element, BucketAssigner.Context context) {
if (dateTimeFormatter == null) {
dateTimeFormatter = DateTimeFormatter.ofPattern(formatString).withZone(zoneId);
}
if (element instanceof SearchEtl) {
long watermark = context.currentWatermark();
if (watermark < 0){
watermark = context.currentProcessingTime();
}
Long pingbackTime = ((SearchEtl) element).getStime();
if ((pingbackTime < watermark && pingbackTime < BehaviorHistoryMergeUtils.getCurrentHourZeroTime())
|| pingbackTime > BehaviorHistoryMergeUtils.getNextHourZeroTime()) {
return dateTimeFormatter.format(Instant.ofEpochMilli(context.currentProcessingTime())) + "error_time";
}
return dateTimeFormatter.format(Instant.ofEpochMilli(pingbackTime));
} else {
return "error_time";
}
}

def getNextHourZeroTime: Long = {
getHourZeroTime(1)
}

def getCurrentHourZeroTime: Long = {
getHourZeroTime(0)
}

def getHourZeroTime(plus: Int): Long = {
val calendar = Calendar.getInstance()
calendar.add(Calendar.HOUR, plus)
calendar.set(Calendar.MINUTE, 0)
calendar.set(Calendar.SECOND, 0)
calendar.set(Calendar.MILLISECOND, 0)
calendar.getTimeInMillis
}

问题

1、当程序突然停止时,文件仍处于inprogress状态。
2、默认桶下的文件名是 part-{parallel-task}-{count}。当程序重启时,选用上次所有subtask最大的编号值count加1继续开始。
3、写入HDFS时,会产生大量的小文件。此问题比较轻微可简单解决。

文章作者:Lily

原始链接:/2020/09/11/Flink-StreamingFileSink%E8%87%AA%E5%AE%9A%E4%B9%89DateTimeBucket/

版权说明:转载请保留原文链接及作者。