Flink连接器解析UpsertKafka

Flink的RowKind和Kafka的ChangeLog消息互转

消息类型

1
2
3
4
5
6
7
8
9
# Flink
INSERT
UPDATE-BEFORE
UPDATE-AFTER
DELETE

# Kafka
UPSERT
DELETE(Tombstone)

顺序性

1
2
3
4
5
6
7
8
确保具有相同Key的消息写入到一个Partition之中
Flink内部分发不允许使用非Keyby的Partitioner
Flink->Kafka按照id = mod(hash(key),num)的分区策略,num为Kafka分区数

# 注意(并行度修改没有问题)
UpsertKafka不允许修改分片策略
不允许修改分区数量
确保查询的Key和Sink表的Key是一致的

Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
# https://github.com/fsk119/flink-pageviews-demo
# 添加依赖项
# MySQL添加测试表数据
CREATE DATABASE flink;
USE flink;

CREATE TABLE users (
user_id BIGINT,
user_name VARCHAR(1000),
region VARCHAR(1000)
);

INSERT INTO users VALUES
(1, 'Timo', 'Berlin'),
(2, 'Tom', 'Beijing'),
(3, 'Apple', 'Beijing');

# SqlClient创建表
CREATE TABLE users (
user_id BIGINT,
user_name STRING,
region STRING
) WITH (
'connector' = 'mysql-cdc',
'hostname' = 'localhost',
'database-name' = 'flink',
'table-name' = 'users',
'username' = 'root',
'password' = '123456'
);

CREATE TABLE pageviews (
user_id BIGINT,
page_id BIGINT,
view_time TIMESTAMP(3),
proctime AS PROCTIME()
) WITH (
'connector' = 'kafka',
'topic' = 'pageviews',
'properties.bootstrap.servers' = 'localhost:9092',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json'
);

INSERT INTO pageviews VALUES
(1, 101, TO_TIMESTAMP('2020-11-23 15:00:00')),
(2, 104, TO_TIMESTAMP('2020-11-23 15:00:01.00'));


# 案例一,创建Sink表,灌入关联数据
CREATE TABLE enriched_pageviews (
user_id BIGINT,
user_region STRING,
page_id BIGINT,
view_time TIMESTAMP(3),
WATERMARK FOR view_time as view_time - INTERVAL '5' SECOND,
PRIMARY KEY (user_id, page_id) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'enriched_pageviews',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
);

INSERT INTO enriched_pageviews
SELECT pageviews.user_id, region, pageviews.page_id, pageviews.view_time
FROM pageviews
LEFT JOIN users ON pageviews.user_id = users.user_id;

kafka-console-consumer --bootstrap-server mac:9092 --topic "enriched_pageviews" --from-beginning --property print.key=true

# 案例二,聚合数据
CREATE TABLE pageviews_per_region (
user_region STRING,
cnt BIGINT,
PRIMARY KEY (user_region) NOT ENFORCED
) WITH (
'connector' = 'upsert-kafka',
'topic' = 'pageviews_per_region',
'properties.bootstrap.servers' = 'localhost:9092',
'key.format' = 'json',
'value.format' = 'json'
)

INSERT INTO pageviews_per_region
SELECT
user_region,
COUNT(*)
FROM enriched_pageviews
WHERE user_region is not null
GROUP BY user_region;

kafka-console-consumer --bootstrap-server mac:9092 --topic "pageviews_per_region" --from-beginning --property print.key=true

源码项

Sink

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
# 主要是将RowKind进行合并
# BufferedUpsertSinkFunction
invoke()->addToBuffer()->changeFlag()
private RowData changeFlag(RowData value) {
switch (value.getRowKind()) {
case INSERT:
case UPDATE_AFTER:
value.setRowKind(UPDATE_AFTER);
break;
case UPDATE_BEFORE:
case DELETE:
value.setRowKind(DELETE);
}
return value;
}

Source

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
# 将读取的数据转换为对应的格式
# DynamicKafkaDeserializationSchema
@Override
public void deserialize(ConsumerRecord<byte[], byte[]> record, Collector<RowData> collector)
throws Exception {
// 没有Key并且没有MetaData
if (keyDeserialization == null && !hasMetadata) {
valueDeserialization.deserialize(record.value(), collector);
return;
}

// key
if (keyDeserialization != null) {
keyDeserialization.deserialize(record.key(), keyCollector);
}

// project output while emitting values
outputCollector.inputRecord = record;
outputCollector.physicalKeyRows = keyCollector.buffer;
outputCollector.outputCollector = collector;
if (record.value() == null && upsertMode) {
// Kafka的墓碑信息,value为null
outputCollector.collect(null);
} else {
valueDeserialization.deserialize(record.value(), outputCollector);
}
keyCollector.buffer.clear();
}

# Value的emit在OutputProjectionCollector
@Override
public void collect(RowData physicalValueRow) {
// no key defined
if (keyProjection.length == 0) {
emitRow(null, (GenericRowData) physicalValueRow);
return;
}

// otherwise emit a value for each key
for (RowData physicalKeyRow : physicalKeyRows) {
emitRow((GenericRowData) physicalKeyRow, (GenericRowData) physicalValueRow);
}
}

private void emitRow(
@Nullable GenericRowData physicalKeyRow,
@Nullable GenericRowData physicalValueRow) {
final RowKind rowKind;
if (physicalValueRow == null) {
if (upsertMode) {
rowKind = RowKind.DELETE;
} else {
throw new DeserializationException(
"Invalid null value received in non-upsert mode. Could not to set row kind for output record.");
}
} else {
rowKind = physicalValueRow.getRowKind();
}
......
outputCollector.collect(producedRow);
}