并没有涉及Flink的本身使用的序列化器的改动,只是flink使用的kryo依赖等级过低
前因
1 2 3 4 5 6
| 着手将Spark的实时项目重构成Flink项目 在对于数据的清洗时,原项目是将Json格式的数据转换为实体类 最后对其进行Kryo序列化发送给Kafka 在重构时,为了新老项目能够使用同一个数据源 新项目也采用同样的方式 结果发现Kryo使用的一些冲突
|
引用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| <dependency> <groupId>com.esotericsoftware</groupId> <artifactId>kryo-shaded</artifactId> <version>4.0.2</version> </dependency> <dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-scala_${scala.2.11.version}</artifactId> <exclusions> <exclusion> <groupId>com.esotericsoftware.kryo</groupId> <artifactId>kryo</artifactId> </exclusion> </exclusions> </dependency>
|
冲突
1 2 3
| 在使用Kryo包时,其实在引用flink-scala-2.11包时,其内部含有Kryo包(等级过低,需要排除) 如果直接引用Kryo包会导致项目编译通过,运行时报错,最终可能导致数据丢失 建议使用kryo-shaded包,很好的兼容了两者的异同
|
注意
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 1.代码区别 由于我是纯scala实现的实体类,并且其中有Map数据结构 为了使其能够兼顾老项目的Java实体类 需要应用util.HashMap[lang.String,lang.String] 2.Kryo使用 setRegistrationRequired(false) setReferences(false) register(Class) 序列化分为两种 一种是序列化后带有ID信息,那么反序列化时就需要提供对应的信息才能进行反序列化(默认这种模式是关闭的) 一种是不带ID信息,直接进行序列化 3.实体类 实体类需要保证字段顺序与Hive表中的顺序保持一致 该bug在flink-1.11.0中有得到修复
|
代码
还有线程池的实现但是因为版本问题,这里没有使用-传送门
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| package com.skuld.util
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import com.esotericsoftware.kryo.Kryo import com.esotericsoftware.kryo.io.{Input, Output} import java.util
import com.skuld.entry.EventGame
object KryoUtils {
private val OUTPUT_BUFFER_SIZE = 1024 * 30
private val OUTPUT_MAX_BUFFER_SIZE = 1024 * 60
private lazy val kryo = create
def create: Kryo = { val kryo = new Kryo kryo.setRegistrationRequired(false) kryo.setReferences(false) kryo.register(classOf[util.HashMap[_, _]]) kryo.register(classOf[Array[_]]) kryo }
def serialize[T](`object`: T): Array[Byte] = { try { val baos = new ByteArrayOutputStream() val output = new Output(OUTPUT_BUFFER_SIZE, OUTPUT_MAX_BUFFER_SIZE) output.setOutputStream(baos) try { kryo.writeObject(output, `object`) output.flush() output.getBuffer } finally { if (output != null) output.close() } } }
def deserialize[T](pvBytes: Array[Byte], tClass: Class[T]): T = { try { val byteArrayInputStream = new ByteArrayInputStream(pvBytes) val input = new Input(byteArrayInputStream) try { input.setBuffer(pvBytes) kryo.readObject(input, tClass) } finally { if (input != null) input.close() } } } }
|