UpsertStreamTableSink依赖外部数据结构

记录下UpsertStreamTableSink使用的时候碰到的趣事

问题来源

1
2
3
4
5
版本: Flink1.14.0
落地的MySQL端数据结构固定,没有给数据中想要的主键信息
只有一个id字段是主键,但写入数据中无id字段,不允许改动MySQL的表结构

=-= 不允许改动MySQL的表结构就很迷

探讨

1
2
3
4
5
1.Flink DDL主键设置和外部表主键设置的关系如何
2.Flink DDL设置主键后外部表只设置了联合索引,结果如何
3.外部表主键已定,数据中没有体现,只能自定义hash实现唯一值,最终如何
4.Flink DDL设置联合主键顺序与外部数据设置的联合主键顺序不一致
5.Flink DDL设置字段顺序与外部数据不一致

结论

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
1.主键
只要外部表没有设置主键,FlinkDDL主键设置与否对于流并不改变,设置了主键也还是插入,不会更新数据
外部表设置了主键
Flink DDL不设置主键,会报错:Duplicate entry # for key '#.PRIMARY'
只有当FlinkDDL同样设置了主键之后,才能保证数据进行更新

2.索引
唯一索引与主键效果一致,两者存在一者都可以做到数据更新,但一样需要Flink DDl中设置了主键信息

3.主键需自定义
直接进行自定义主键数据组合即可,保证Flink DDl设置了组合字段作为主键

4.联合主键顺序
不影响最终结果

5.字段顺序
不影响最终结果

代码实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
package org.example.sink

import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.table.api.bridge.scala.StreamTableEnvironment

/**
* @Author xz
* @Date 2021/11/26 09:39
* @Description
* Flink DDL主键设置与否与外部Sink表数据结构的关系
*/
object UpsertStreamTableSinkDemo {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

// 模拟数据源
tEnv.executeSql(
s"""
|CREATE TABLE test (
| name1 STRING,
| name2 STRING,
| price BIGINT,
| ctime TIMESTAMP(3)
|) WITH (
| 'connector' = 'datagen',
| 'rows-per-second' = '1'
|)
|""".stripMargin)

// sink1和sink2都是外部表没有设置主键
// sink3和sink4都是外部表设置了联合主键
// sink5和sink6都是外部表设置了联合索引
// sink7是外部表设置了额外字段作为主键,所以需要Flink DLL新增一个对应主键信息
// sink8是Flink DDL设置联合主键顺序与外部表设置的联合主键,顺序不一致
// sink9是Flink DDL字段顺序与外部数据的字段顺序不一致

// Flink DDL不设置主键,外部数据源不设置主键
tEnv.executeSql(
s"""
|
|CREATE TABLE sink1 (
| x STRING,
| m STRING,
| price BIGINT,
| PRIMARY KEY (x,m) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink1'
|)
|""".stripMargin)

// Flink DDL设置主键,外部数据源不设置主键
tEnv.executeSql(
s"""
|CREATE TABLE sink2 (
| x STRING,
| m STRING,
| price BIGINT,
| PRIMARY KEY (x,m) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink1'
|)
|""".stripMargin)

// Flink DDL不设置主键,外部数据源设置主键(x,m)
tEnv.executeSql(
s"""
|CREATE TABLE sink3 (
| x STRING,
| m STRING,
| price BIGINT
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink2'
|)
|""".stripMargin)

// Flink DDL设置主键,外部数据源设置主键(x,m)
tEnv.executeSql(
s"""
|CREATE TABLE sink4 (
| x STRING,
| m STRING,
| price BIGINT,
| PRIMARY KEY (x,m) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink2'
|)
|""".stripMargin)

// Flink DDL不设置主键,外部数据源设置唯一索引(x,m)
tEnv.executeSql(
s"""
|CREATE TABLE sink5 (
| x STRING,
| m STRING,
| price BIGINT
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink3'
|)
|""".stripMargin)

// Flink DDL设置主键,外部数据源设置唯一索引(x,m)
tEnv.executeSql(
s"""
|CREATE TABLE sink6 (
| x STRING,
| m STRING,
| price BIGINT,
| PRIMARY KEY (x,m) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink3'
|)
|""".stripMargin)

// 主键由已有数据生成
tEnv.executeSql(
s"""
|CREATE TABLE sink7 (
| id STRING,
| x STRING,
| m STRING,
| price BIGINT,
| PRIMARY KEY (id) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink4'
|)
|""".stripMargin)

// 联合主键Flink DDL与外部数据结构相反,Flink定义(m,x),外部数据是(x,m)
tEnv.executeSql(
s"""
|CREATE TABLE sink8 (
| x STRING,
| m STRING,
| price BIGINT,
| PRIMARY KEY (m,x) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink2'
|)
|""".stripMargin)

// 字段顺序与外部数据不一致
tEnv.executeSql(
s"""
|CREATE TABLE sink9 (
| price BIGINT,
| x STRING,
| m STRING,
| PRIMARY KEY (x,m) NOT ENFORCED
|) WITH (
| 'connector' = 'jdbc',
| 'url' = 'jdbc:mysql://localhost:3306/temp?useSSL=false',
| 'username' = 'root',
| 'password' = '123456',
| 'table-name' = 'sink2'
|)
|""".stripMargin)

tEnv.executeSql(
s"""
|insert into sink9
|select abs(price)%10 price,substring(name1,1,1) x,substring(name2,1,1) m
|from test
|""".stripMargin).print()

}
}