flink数据序列化

Flink 的类型分类

Flink 的类型系统源码位于 org.apache.flink.api.common.typeinfo 包,类的继承关系:

TypeInformation
TypeInformation类是所有类型描述符的基类,它和它的所有子类必须可序列化(Serializable),因为类型信息将会伴随 Flink 的作业提交,被传递给每个执行节点。
如,POJO在Flink内部使用PojoTypeInfo来表示,PojoTypeInfo继承自CompositeType,CompositeType继承自TypeInformation。TypeInformation的一个重要的功能就是创建TypeSerializer序列化器,为该类型的数据做序列化。每种类型都有一个对应的序列化器来进行序列化。

Flink会自动探测传入的数据类型,生成对应的TypeInformation,调用对应的序列化器。比如,Flink的map函数Scala签名为:def map[R: TypeInformation](fun: T => R): DataStream[R],传入map的数据类型是T,生成的数据类型是R,Flink会推测T和R的数据类型,并使用对应的序列化器进行序列化。
以一个字符串String类型为例,Flink首先推断出该类型,并生成对应的TypeInformation,然后在序列化时调用对应的序列化器,将一个内存对象写入内存块。

手动创建typeInformation
import org.apache.flink.streaming.api.scala._ //must import 如果使用flink-sql,引入import org.apache.flink.table.api.scala._
1)TypeInformation.of(classOf[SensorReading]) or TypeInformation.of(new TypeHint[(String, Array[Byte])] {})
2)import org.apache.flink.streaming.api.scala.createTypeInformation createTypeInformation[SensorReading]
3)BasicTypeInfo.STRING_TYPE_INFO

flink会尝试识别出更多的 在分布式计算框架上传递和存储的 数据类型;比如把它当作是一个推断 表元数据 的数据库。大多数情况下,flink可以自己识别到所有信息。
有这些类型信息flink可以做:
1、用POJOs做grouping / joining / aggregating时可以用字段名(like dataSet.keyBy(“username”)).这个可以让flink在执行之前就识别到一些类型的错误。
2、flink可以更好的做序列化和数据布局。这对flink的内存优化非常重要。尽可能处理堆内部/外部的序列化数据,会使序列化非常容易。
3、
经常出现的问题:
1、注册子类。使用继承的类时,需要用StreamExecutionEnvironment or ExecutionEnvironment .registerType(clazz)注册子类,否则flink会多花很多时间来识别这个子类
2、自定义类型。对于用于自定义的数据类型,flink无法透明识别时,会回退到Kyro来识别这个类。而且并不是所有的数据类型Kyro都可以识别(flink同)。比如Google Guava里的集合类collection 都无法被自动识别。可以通过自定义序列化类来使它被识别。getConfig().addDefaultKryoSerializer(clazz, serializer) on the StreamExecutionEnvironment or ExecutionEnvironment.一般情况下很多包里已经自带了序列化类,比如protoBuffer、Thrift。代码可看下面
3、增加Type Hints。这种一般只在Java API中需要。有时候使用了所有的办法都无法识别数据类型,这种时候需要指定一个数据暗示Hints.returns(SomeType.class);
dataSet.map(new MyGenericNonInferrableFunction<Long, SomeType>()).returns(SomeType.class);returns指定生产类型。
returns() 接受三种类型的参数:
(1)字符串描述的类名(例如 “String”)字符串形式的用法即将废弃,如果确实有必要,请使用 Class.forName() 等方法来解决。
(2)TypeHint(接下来会讲到,用于泛型类型参数)
(3)Java 原生 Class(例如 String.class) 等
TypeHint支持:
(1)类,无参
(2)returns(new TypeHint<Tuple2<Integer, SomeType>>(){}).TypeHint类可以捕获通用类型信息并将其保留到运行时。

4、手动创建 TypeInformation。一般在由于Java的通用类型擦除 flink无法识别出 时使用。

protoBuffer、Thrift序列化:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

// register the Google Protobuf serializer with Kryo
env.getConfig().registerTypeWithKryoSerializer(MyCustomType.class, ProtobufSerializer.class);

// register the serializer included with Apache Thrift as the standard serializer
// TBaseSerializer states it should be initialized as a default Kryo serializer
env.getConfig().addDefaultKryoSerializer(MyCustomType.class, TBaseSerializer.class);

<!-- Thrift -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-thrift</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- libthrift is required by chill-thrift -->
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>0.11.0</version>
<exclusions>
<exclusion>
<groupId>javax.servlet</groupId>
<artifactId>servlet-api</artifactId>
</exclusion>
<exclusion>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
</exclusion>
</exclusions>
</dependency>

<!-- protoBuf -->
<dependency>
<groupId>com.twitter</groupId>
<artifactId>chill-protobuf</artifactId>
<version>0.7.6</version>
<!-- exclusions for dependency conversion -->
<exclusions>
<exclusion>
<groupId>com.esotericsoftware.kryo</groupId>
<artifactId>kryo</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- We need protobuf for chill-protobuf -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.7.0</version>
</dependency>

Flink支持的数据类型:
1、Java的基础类型和装箱(Integer等)
2、基础类型数组和Object数组
3、合成类型:
(1)Java 元组Tuples(Flink Java API);最长25个字段,不允许null
(2)Scala case classes (包括scala的元组);不允许null
(3)Row。允许null
(4)POJOs。
4、辅助类型。(Option, Either, Lists, Maps, …)
5、范型和其他类型Generic Kyro
第5个Kyro是最后的备选,尽量优化不要使用Kyro,会有大量的性能损耗。当不满足前四种时,就会被Flink认为是一种泛型(GenericType),使用Kryo来进行序列化和反序列化。但Kryo在有些流处理场景效率非常低,有可能造成流数据的积压。我们可以使用senv.getConfig.disableGenericTypes()来禁用Kryo,禁用后,Flink遇到无法处理的数据类型将抛出异常,这种方法对于调试非常有效。
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html

注意:scala的元组从_1开始,为Java准备的元组Tuple从0开始。

FLink POJOs比较强大,必须满足的条件:
1、类是个public的独立类(没有非静态内部类)
2、必须有公共的无参构造器
3、类中的所有非静态,非瞬态成员都是公共public的,不能是final,否则有对应的getter、setter方法
通过env.registerType(BaseLog.class);或者env.getConfig().registerPojoType(BaseLog.class)注册
通过System.out.println(TypeInformation.of(BaseLog.class).createSerializer(new ExecutionConfig()));的输出可以判断自定义类是否是POJO,kyro不是POJO。
org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer@fe305308
org.apache.flink.api.java.typeutils.runtime.PojoSerializer@f1db634d

参考:
https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/types_serialization.html (官网,建议)
https://cloud.tencent.com/developer/article/1240444
https://blog.csdn.net/qq_42596142/article/details/103763612
https://blog.csdn.net/shufangreal/article/details/105916100

文章作者:Lily

原始链接:/2020/05/11/flink%E6%95%B0%E6%8D%AE%E5%BA%8F%E5%88%97%E5%8C%96/

版权说明:转载请保留原文链接及作者。