Flink读取kafka数据并以parquet格式写入HDFS

!!如果是flink 1.11建议尝试flink-sql的直接写入hive的方式。对生成success文件等都有封装。官网 文章

Flink目前对于外部Exactly-Once写支持提供了两种的sink,一个是Kafka-Sink,另一个是Hdfs-Sink,这两种sink实现的Exactly-Once都是基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的。

1、Flink 提供了两个分桶策略,分桶策略实现了
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 接口:
BasePathBucketAssigner,不分桶,所有文件写到根目录;
DateTimeBucketAssigner,基于系统时间(yyyy-MM-dd–HH)分桶。

除此之外,还可以实现BucketAssigner接口,自定义分桶策略。

2、Flink 提供了两个滚动策略,滚动策略实现了
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy 接口:
DefaultRollingPolicy 当超过最大桶大小(默认为 128 MB),或超过了滚动周期(默认为 60 秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;
OnCheckpointRollingPolicy 当 checkpoint 的时候,滚动文件。

3、StreamingFileSink提供了基于行、列两种文件写入格式。
forRowFormat行写可基于文件大小、滚动时间、不活跃时间进行滚动。
forBulkFormat列写方式只能基于checkpoint机制进行文件滚动,即在执行snapshotState方法时滚动文件。
如果基于大小或者时间滚动文件,那么在任务失败恢复时就必须对处于in-processing状态的文件按照指定的offset进行truncate,我想这是由于列式存储是无法针对文件offset进行truncate的,因此就必须在每次checkpoint使文件滚动,其使用的滚动策略实现是OnCheckpointRollingPolicy。
压缩:自定义ParquetAvroWriters方法,创建AvroParquetWriter时传入压缩方式。
以上来自:https://blog.csdn.net/u013516966/article/details/104726234/(压缩与合并小文件)

Hdfs 上有三个状态
第一个状态是 in-progress ,正在进行状态。
第二个状态是 pending 状态,
第三个状态是 finished 状态。

如何使用 Flink 实现二阶段提交协议 2pc
首先,StreamingFileSink 实现两个接口,CheckpointedFunction 和 CheckpointListener。 CheckpointedFunction 实现 initializeState 和 snapshotState 函数。 CheckpointListener 是 notifyCheckpointComplete 的方法实现,因此这两个接口可以实现二阶段提交语义。
snapshotState
触发 CheckPoint 时会将 in-progress 文件转化为 pending state,同时记录数据长度(truncate 方式需要截断长度)。 snapshotState 并非真正将数据写入 HDFS,而是写入 ListState。 Flink 在 Barrier 对齐状态时内部实现 Exactly-Once 语义,但是实现外部端到端的 Exactly-Once 语义比较困难。 Flink 内部实现 Exactly-Once 通过 ListState,将数据全部存入 ListState,等待所有算子 CheckPoint 完成,再将 ListState 中的数据刷到 HDFS 中。
notifyCheckpointComplete
notifyCheckpointComplete 会触发 pending 到 finished state 的数据写入。 实现方法是 rename,Streaming 不断向 HDFS 写入临时文件,所有动作结束后通过 rename 动作写成正式文件。

使用 Hadoop < 2.7 时,请使用 OnCheckpointRollingPolicy 滚动策略,该策略会在每次checkpoint检查点时进行文件切割。 这样做的原因是如果部分文件的生命周期跨多个检查点,当 StreamingFileSink 从之前的检查点进行恢复时会调用文件系统的 truncate() 方法清理 in-progress 文件中未提交的数据。 Hadoop 2.7 之前的版本不支持这个方法,因此 Flink 会报异常,因此使用OnCheckpointRollingPolicy避免文件跨多个检查点。

part file有三种状态:

  • in-progress:表示当前part file正在被写入
  • pending:写入完成后等待提交的状态(flink写hdfs的二段提交的体现)
  • finished:写入完成状态,只有finished状态下的文件才会保证不会再有修改,下游可安全读取

为防止原文被删,以下全文转载自:https://blog.csdn.net/u013411339/article/details/88937671

大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中;
目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read.parquet(path)

第一种方式

数据实体:

1
2
3
4
5
6
7
8
9
10
11
public class Prti {

private String passingTime;

private String plateNo;

public Prti() {
}

//gettter and setter 方法....
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
public class FlinkReadKafkaToHdfs {

private final static StreamExecutionEnvironment environment = StreamExecutionEnvironment.getExecutionEnvironment();

private final static Properties properties = new Properties();

/**
* kafka 中发送数据JSON格式:
* {"passingTime":"1546676393000","plateNo":"1"}
*/
public static void main(String[] args) throws Exception {
init();
readKafkaToHdfsByReflect(environment, properties);
}
}

private static void init() {
environment.enableCheckpointing(5000);
environment.setParallelism(1);
environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
//kafka的节点的IP或者hostName,多个使用逗号分隔
properties.setProperty("bootstrap.servers", "192.168.0.10:9092");
//only required for Kafka 0.8;
// properties.setProperty("zookeeper.connect", "192.168.0.10:2181");
//flink consumer flink的消费者的group.id
properties.setProperty("group.id", "test-consumer-group");
//第一种方式:路径写自己代码上的路径
// properties.setProperty("fs.hdfs.hadoopconf", "...\\src\\main\\resources");
//第二种方式:填写一个schema参数即可
properties.setProperty("fs.default-scheme", "hdfs://hostname:8020");

properties.setProperty("kafka.topic", "test");
properties.setProperty("hfds.path", "hdfs://hostname/test");
properties.setProperty("hdfs.path.date.format", "yyyy-MM-dd");
properties.setProperty("hdfs.path.date.zone", "Asia/Shanghai");
properties.setProperty("window.time.second", "60");
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public static void readKafkaToHdfsByReflect(StreamExecutionEnvironment environment, Properties properties) throws Exception {
String topic = properties.getProperty("kafka.topic");
String path = properties.getProperty("hfds.path");
String pathFormat = properties.getProperty("hdfs.path.date.format");
String zone = properties.getProperty("hdfs.path.date.zone");
Long windowTime = Long.valueOf(properties.getProperty("window.time.second"));
FlinkKafkaConsumer010<String> flinkKafkaConsumer010 = new FlinkKafkaConsumer010<>(topic, new SimpleStringSchema(), properties);
KeyedStream<Prti, String> KeyedStream = environment.addSource(flinkKafkaConsumer010)
.map(FlinkReadKafkaToHdfs::transformData)
.assignTimestampsAndWatermarks(new CustomWatermarks<Prti>())
.keyBy(Prti::getPlateNo);

DataStream<Prti> output = KeyedStream.window(TumblingEventTimeWindows.of(Time.seconds(windowTime)))
.apply(new WindowFunction<Prti, Prti, String, TimeWindow>() {
@Override
public void apply(String key, TimeWindow timeWindow, Iterable<Prti> iterable, Collector<Prti> collector) throws Exception {
System.out.println("keyBy: " + key + ", window: " + timeWindow.toString());
iterable.forEach(collector::collect);
}
});
//写入HDFS,parquet格式
DateTimeBucketAssigner<Prti> bucketAssigner = new DateTimeBucketAssigner<>(pathFormat, ZoneId.of(zone));
StreamingFileSink<Prti> streamingFileSink = StreamingFileSink.
forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(Prti.class))
.withBucketAssigner(bucketAssigner)
.build();
output.addSink(streamingFileSink).name("Hdfs Sink");
environment.execute("PrtiData");
}

private static Prti transformData(String data) {
if (data != null && !data.isEmpty()) {
JSONObject value = JSON.parseObject(data);
Prti prti = new Prti();
prti.setPlateNo(value.getString("plate_no"));
prti.setPassingTime(value.getString("passing_time"));
return prti;
} else {
return new Prti();
}
}

private static class CustomWatermarks<T> implements AssignerWithPunctuatedWatermarks<Prti> {

private Long cuurentTime = 0L;

@Nullable
@Override
public Watermark checkAndGetNextWatermark(Prti prti, long l) {
return new Watermark(cuurentTime);
}

@Override
public long extractTimestamp(Prti prti, long l) {
Long passingTime = Long.valueOf(prti.getPassingTime());
cuurentTime = Math.max(passingTime, cuurentTime);
return passingTime;
}
}

进入spark shell中,执行:spark.read.parquet(“/test/日期路径”),即可读取;
注意点:

1
2
3
4
StreamingFileSink<Prti> streamingFileSink = StreamingFileSink.
forBulkFormat(new Path(path), ParquetAvroWriters.forReflectRecord(Prti.class))
.withBucketAssigner(bucketAssigner)
.build();

上面就是最简单直接的第一种方法:
ParquetAvroWriters.forReflectRecord(Prti.class)

第二种方式

这种方式对实体类有很高的要求,需要借助avro的插件编译生成数据实体类即可;
ParquetAvroWriters.forSpecificRecord(Prti.class)
编写好一个prti.avsc的文件,内容如下:

1
2
3
4
5
6
7
8
{"namespace": "com.xxx.streaming.entity",
"type": "record",
"name": "Prti",
"fields": [
{"name": "passingTime", "type": "string"},
{"name": "plateNo", "type": "string"}
]
}

其中:com.xxx.streaming.entity是生成的实体放置的包路径;
在pom中引入插件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.8.2</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>${project.basedir}/src/main/resources/</sourceDirectory>
<outputDirectory>${project.basedir}/src/main/java/</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>

第三种方式

ParquetAvroWriters.forGenericRecord(“schema”)
传入一个avsc的文件进去即可。

文章作者:Lily

原始链接:/2020/05/11/Flink%E8%AF%BB%E5%8F%96kafka%E6%95%B0%E6%8D%AE%E5%B9%B6%E4%BB%A5parquet%E6%A0%BC%E5%BC%8F%E5%86%99%E5%85%A5HDFS/

版权说明:转载请保留原文链接及作者。