!!如果是flink 1.11建议尝试flink-sql的直接写入hive的方式。对生成success文件等都有封装。官网 文章
Flink目前对于外部Exactly-Once写支持提供了两种的sink,一个是Kafka-Sink,另一个是Hdfs-Sink,这两种sink实现的Exactly-Once都是基于Flink checkpoint提供的hook来实现的两阶段提交模式来保证的。
1、Flink 提供了两个分桶策略,分桶策略实现了
org.apache.flink.streaming.api.functions.sink.filesystem.BucketAssigner 接口:
BasePathBucketAssigner,不分桶,所有文件写到根目录;
DateTimeBucketAssigner,基于系统时间(yyyy-MM-dd–HH)分桶。
除此之外,还可以实现BucketAssigner接口,自定义分桶策略。
2、Flink 提供了两个滚动策略,滚动策略实现了
org.apache.flink.streaming.api.functions.sink.filesystem.RollingPolicy 接口:
DefaultRollingPolicy 当超过最大桶大小(默认为 128 MB),或超过了滚动周期(默认为 60 秒),或未写入数据处于不活跃状态超时(默认为 60 秒)的时候,滚动文件;
OnCheckpointRollingPolicy 当 checkpoint 的时候,滚动文件。
3、StreamingFileSink提供了基于行、列两种文件写入格式。
forRowFormat行写可基于文件大小、滚动时间、不活跃时间进行滚动。
forBulkFormat列写方式只能基于checkpoint机制进行文件滚动,即在执行snapshotState方法时滚动文件。
如果基于大小或者时间滚动文件,那么在任务失败恢复时就必须对处于in-processing状态的文件按照指定的offset进行truncate,我想这是由于列式存储是无法针对文件offset进行truncate的,因此就必须在每次checkpoint使文件滚动,其使用的滚动策略实现是OnCheckpointRollingPolicy。
压缩:自定义ParquetAvroWriters
方法,创建AvroParquetWriter
时传入压缩方式。
以上来自:https://blog.csdn.net/u013516966/article/details/104726234/(压缩与合并小文件)
Hdfs 上有三个状态
第一个状态是 in-progress ,正在进行状态。
第二个状态是 pending 状态,
第三个状态是 finished 状态。
如何使用 Flink 实现二阶段提交协议 2pc
首先,StreamingFileSink 实现两个接口,CheckpointedFunction 和 CheckpointListener。 CheckpointedFunction 实现 initializeState 和 snapshotState 函数。 CheckpointListener 是 notifyCheckpointComplete 的方法实现,因此这两个接口可以实现二阶段提交语义。
snapshotState
触发 CheckPoint 时会将 in-progress 文件转化为 pending state,同时记录数据长度(truncate 方式需要截断长度)。 snapshotState 并非真正将数据写入 HDFS,而是写入 ListState。 Flink 在 Barrier 对齐状态时内部实现 Exactly-Once 语义,但是实现外部端到端的 Exactly-Once 语义比较困难。 Flink 内部实现 Exactly-Once 通过 ListState,将数据全部存入 ListState,等待所有算子 CheckPoint 完成,再将 ListState 中的数据刷到 HDFS 中。
notifyCheckpointComplete
notifyCheckpointComplete 会触发 pending 到 finished state 的数据写入
。 实现方法是 rename,Streaming 不断向 HDFS 写入临时文件,所有动作结束后通过 rename 动作写成正式文件。
使用 Hadoop < 2.7 时,请使用 OnCheckpointRollingPolicy 滚动策略,该策略会在每次checkpoint检查点时进行文件切割。 这样做的原因是如果部分文件的生命周期跨多个检查点,当 StreamingFileSink 从之前的检查点进行恢复时会调用文件系统的 truncate() 方法清理 in-progress 文件中未提交的数据。 Hadoop 2.7 之前的版本不支持这个方法,因此 Flink 会报异常,因此使用OnCheckpointRollingPolicy避免文件跨多个检查点。
part file有三种状态:
- in-progress:表示当前part file正在被写入
- pending:写入完成后等待提交的状态(flink写hdfs的二段提交的体现)
- finished:写入完成状态,只有finished状态下的文件才会保证不会再有修改,下游可安全读取
为防止原文被删,以下全文转载自:https://blog.csdn.net/u013411339/article/details/88937671
大数据业务场景中,经常有一种场景:外部数据发送到kafka中,flink作为中间件消费kafka数据并进行业务处理;处理完成之后的数据可能还需要写入到数据库或者文件系统中,比如写入hdfs中;
目前基于spark进行计算比较主流,需要读取hdfs上的数据,可以通过读取parquet:spark.read.parquet(path)
第一种方式
数据实体:
1 | public class Prti { |
1 | public class FlinkReadKafkaToHdfs { |
1 | public static void readKafkaToHdfsByReflect(StreamExecutionEnvironment environment, Properties properties) throws Exception { |
进入spark shell中,执行:spark.read.parquet(“/test/日期路径”),即可读取;
注意点:
1 | StreamingFileSink<Prti> streamingFileSink = StreamingFileSink. |
上面就是最简单直接的第一种方法:ParquetAvroWriters.forReflectRecord(Prti.class)
第二种方式
这种方式对实体类有很高的要求,需要借助avro的插件编译生成数据实体类即可;ParquetAvroWriters.forSpecificRecord(Prti.class)
编写好一个prti.avsc的文件,内容如下:
1 | {"namespace": "com.xxx.streaming.entity", |
其中:com.xxx.streaming.entity是生成的实体放置的包路径;
在pom中引入插件:
1 | <plugin> |
第三种方式
ParquetAvroWriters.forGenericRecord(“schema”)
传入一个avsc的文件进去即可。