0%
B站Flink教程视频观看
预定义的Source和Sink
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| 基于文件的Source readTextFile(path) readFile(fileInputFormat,path) 基于文件的Sink writeAsText writeAsCsv 基于Socket的Source socketTextStream 基于Socket的Sink writeToSocket 基于Collections,Iterators的Source fromCollection fromElements 标准输出,标准错误 print printToError
|
Bundled Connectors
1 2 3 4 5 6 7 8
| Apache Kafka(source/sink) Apache Cassandra(sink) Amazon Kinesis Streams(source/sink) ElasticSearch(sink) Hadoop FileSystem(sink) RabbitMQ(source/sink) Apache NiFi(source/sink) Twitter Streaming API(source)
|
Apache Bahir中的连接器
1 2 3 4 5
| Apache ActiveMQ(source/sink) Apache Flume(sink) Redis(sink) Akka(sink) Netty(source)
|
Async I/O
1 2
| 使用connector并不是数据输入输出Flink的唯一方式 在Map,FlatMap中使用Async I/O方式读取外部数据库等
|
Flink Kafka Consumer反序列化数据
1 2 3 4 5 6 7 8
| 将kafka中二进制数据转化为具体的java,scala对象 DeserializationSchema,T deserialize(byte[] message) KeyedDeserializationSchema,T deserialize(byte[] messageKey,byte[] message,String topic,int partition,long offset):对于访问kafka key/value
常用 SimpleStringSchema:按字符串方式进行序列化,反序列化 TypeInformationSerializationSchema:基于Flink的TypeInformation来创建schema JsonDeserializationSchema:使用jackson反序列化json格式消息,并返回ObjectNode,可以使用.get("property")方法来访问字段
|
Flink Kafka Consumer消费起始位置
1 2 3 4 5 6 7 8 9 10 11 12 13
| setStartFromGroupOffsets(默认) 从Kafka记录的group.id的位置开始读取,如果没有根据anto.offset.reset设置 setStratFromEarliest 从Kafka最早的位置读取 setStartFromLatest 从Kafka最新数据开始读取 setStartFromTimestamp(long) 从时间戳大于或等于指定时间戳的位置开始读取 setStartFromSpecificOffsets 从指定的分区的offset位置开始读取,如指定的offsets中不存在某个分区,该分区从group offset位置开始读取
注意 作业故障从CK自动恢复,以及手动做SP时,消费的位置从保存状态中恢复,与该配置无关
|
Flink Kafka Consumer Topic Partition自动发现
1 2 3 4 5 6 7 8 9 10 11
| 原理: 内部单独的线程获取kafka meta信息进行更新 flink.partition-discovery.interval-millis:发现时间间隔.默认false,设置为非负值开启
分区发现 消费的Source Kafka Topic进行partition扩容 新发现的分区,从earliest位置开始读取 Topic发现 支持正则表达式描述topic名字 Pattern topicPattern = java.util.regex.Pattern.compile("topic[0-9]"); FlinkKafkaConsumer010<String> consumer = new FlinkKafkaConsumer010(topicPattern,new SimpleStringSchema(),properties);
|
Flink Kafka Consumer Commit Offset方式
1 2 3 4 5 6 7
| CK关闭 依赖kafka客户端的auto commit定期提交offset 需设置enable.anto.commit,auto.commit.interval.ms参数到consumer properties
CK开启 Offset自己在ck state中管理和容错,提交Kafka仅作为外部监视消费进度 通过setCommitOffsetsOnCheckpoints控制,CK成功之后,是否提交offset到kafka
|
Flink Kafka Consumer时间戳提取/水位生成
1 2 3
| per Kafka Partition WaterMark assignTimestampsAndWatermarks,每个partition一个assigner,水位为对个partition对齐后值 不在KafkaSource生成WaterMark,会出现扔掉部分数据情况
|
Flink Kafka Producer Producer分区
1 2 3 4
| Producer分区 FlinkFixedPartitioner(默认):parallellnstanceld % partitions.length Partitioner设置为null:round-robin kafka partitioner,维持过多链接 Custom Partitioner:自定义分区
|
Flink Kafka Producer Producer容错
1 2 3 4 5 6 7
| Kafka 0.9 and 0.10 setLogFailuresOnly:默认false.写失败时,是否打印失败log,不抛异常 setFlushOnCheckpoint:默认true.ck时保证数据写到kafka at-least-once语义:setLogFailuresOnly(false)+setFlushOnCheckPoint(true)
Kafka 0.11 FlinkKafkaProducer011,两阶段提交Sink结合Kafka事务,保证端到端精准一次
|