FlinkSQL的搭建与使用

用于Flink1.9以上版本

环境描述

  • Flink1.9客户端
  • CDH集群(hive-1.1.0)

注意事项

1
2
3
目前FlinkSQL通过测试的Hive版本只有1.2.1和2.3.4
但是其他版本经过我的测试,发现也是可以使用的
jar包需要导入对应版本的

修改配置文件

1
2
3
4
5
6
7
8
9
10
11
12
catalogs:
# catalogs 名称
- name: hive
# catalog连接类型
type: hive
# hive 安装路径下conf目录路径
hive-conf-dir: /etc/hive/conf.cloudera.hive
# hive 版本号
hive-version: 1.2.1
property-version: 1
# use catalog 后 默认连接的数据库名
default-database: default

拷贝依赖包

1
2
3
4
5
6
7
8
9
10
11
12
13
需要以下依赖包-我的是CDH的
flink-connector-hive_2.11-1.9.1.jar
flink-hadoop-compatibility_2.11-1.9.1.jar
flink-shaded-hadoop-2-uber-2.6.5-7.0.jar
hadoop-common-2.6.0-cdh5.15.1.jar
hadoop-mapreduce-client-common-2.6.0-cdh5.15.1.jar
hive-common-1.2.1.jar
hive-exec-1.2.1.jar
hive-metastore-1.2.1.jar
libfb303-0.9.3.jar
libthrift-0.9.3.jar
mysql-connector-java-5.1.48-bin.jar
antlr-runtime-3.4.jar

使用

1
2
3
./bin/sql-client.sh embedded
use catalog hive;
show tables;

TableAPI的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
# 记录一下坑
1.TableAPI目前不支持HiveStreamTableSink,所以写不进去,可以读
2.CDH集群一定要注意引用的pom是CDH版本的

# 测试代码
object HiveDemoOnTable {
def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val tEnv = StreamTableEnvironment.create(env)

val ds1 = env.socketTextStream("hadoop01", 9999, '\n')

val hiveCatalog = new HiveCatalog("test", "default",
"hive_conf", "1.2.1")
tEnv.registerCatalog("test", hiveCatalog)
tEnv.useCatalog("test")

val table = tEnv.sqlQuery("select `topic`,`partition`,`offset`,msg,`c_date` from user_test_orc")

table.insertInto("user_test_orc")

env.execute("test")
}

// case class Order(user: Int, product: String, amount: Int)
case class Order(topic: String, partition: Integer, offset: Integer, msg: String, c_date: String)
}

记录出现的问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
问题1:
Caused by: org.apache.flink.table.client.gateway.SqlExecutionException: Could not create execution context.
修改了默认module
解决:
导入flink-connector-hive_2.11-*.jar

问题2:
Caused by: java.lang.ClassNotFoundException: org.apache.hive.common.util.HiveVersionInfo
没有找到Hive版本信息
解决:
导入hive-exec-*.jar

问题3:
Caused by: java.lang.ClassNotFoundException: org.apache.hadoop.io.Writable
没有hadood依赖,导入shaded包或者hadoop依赖包
解决:
导入hadoop-common-*.jar等
或者flink-shaded-hadoop-2-uber-*.jar

问题4:
Caused by: java.lang.ClassNotFoundException: com.facebook.fb303.FacebookService$Iface
缺少libfb303
解决:
导入libfb303-0.9.3.jar