整活,去年就有听过之信大佬讲过iceberg,不过当时认为iceberg只是做了一层数据编排的操作,认识还是太过于浅薄
准备工作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| # Flink和Zeppelin环境需要提前部署起来 # 下载Iceberg源码 git clone https://github.com/apache/iceberg.git cd iceberg # 开始编译 ./gradlew build -x test # 对应的运行时依赖包 spark-runtime/build/libs spark-runtime/build/libs flink-runtime/build/libs hive-runtime/build/libs
# 在Zeppelin配置Iceberg依赖包 %flink.conf flink.execution.jars /opt/iceberg/flink-runtime/build/libs/iceberg-flink-runtime-dae6c49.jar
|
创建新Catalog
HiveCatalog
1 2 3 4 5 6 7 8 9 10 11 12
| %flink.ssql CREATE CATALOG hive_catalog WITH ( 'type'='iceberg', 'catalog-type'='hive', 'uri'='thrift://mac:9083', 'clients'='5', 'property-version'='1', 'warehouse'='hdfs://user/hive/warehouse' );
|
HadoopCatalog
1 2 3 4 5 6
| CREATE CATALOG hadoop_catalog WITH ( 'type'='iceberg', 'catalog-type'='hadoop', 'warehouse'='hdfs://mac:9000/warehouse/test', 'property-version'='1' );
|
CustomCatalog
1 2 3 4 5
| CREATE CATALOG my_catalog WITH ( 'type'='iceberg', 'catalog-impl'='com.my.custom.CatalogImpl', 'my-additional-catalog-config'='my-value' );
|
目前支持的SQL操作
DDL操作
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| CREATE DATABASE iceberg_db; USE iceberg_db;
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING );
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING ) PARTITIONED BY (data);
CREATE TABLE hive_catalog.default.sample ( id BIGINT COMMENT 'unique id', data STRING ); CREATE TABLE hive_catalog.default.sample_like LIKE hive_catalog.default.sample;
ALTER TABLE hive_catalog.default.sample SET ('write.format.default'='avro')
ALTER TABLE hive_catalog.default.sample RENAME TO hive_catalog.default.new_sample;
DROP TABLE hive_catalog.default.sample;
|
查询操作
1 2 3 4 5 6 7 8 9
| SET execution.type = streaming SET table.dynamic-table-options.enabled=true; SELECT * FROM sample ; SELECT * FROM sample ;
SET execution.type = batch SELECT * FROM sample ;
|
插入操作
1 2 3 4 5 6 7
| INSERT INTO hive_catalog.default.sample VALUES (1, 'a'); INSERT INTO hive_catalog.default.sample SELECT id, data from other_kafka_table;
INSERT OVERWRITE sample VALUES (1, 'a'); INSERT OVERWRITE hive_catalog.default.sample PARTITION(data='a') SELECT 6;
|
API操作
数据读取
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| --- Batch StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path"); DataStream<RowData> batch = FlinkSource.forRowData() .env(env) .tableLoader(loader) .streaming(false) .build();
batch.print();
env.execute("Test Iceberg Batch Read");
--- Stream StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment(); TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path"); DataStream<RowData> stream = FlinkSource.forRowData() .env(env) .tableLoader(loader) .streaming(true) .startSnapshotId(3821550127947089987) .build();
stream.print();
env.execute("Test Iceberg Batch Read");
|
数据写入
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| --- 追加 StreamExecutionEnvironment env = ...;
DataStream<RowData> input = ... ; Configuration hadoopConf = new Configuration(); TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path");
FlinkSink.forRowData(input) .tableLoader(tableLoader) .hadoopConf(hadoopConf) .build();
env.execute("Test Iceberg DataStream");
--- 覆写
|
合并小文件
1 2 3 4 5 6 7 8
| import org.apache.iceberg.flink.actions.Actions;
TableLoader tableLoader = TableLoader.fromHadooptable("hdfs://nn:8020/warehouse/path"); Table table = tableLoader.loadTable(); RewriteDataFilesActionResult result = Actions.forTable(table) .rewriteDataFiles() .execute();
|
问题
1 2 3 4 5
| # 由于我使用的版本较高,iceberg目前好像支持到flink-1.11.*,hadoop-2.7.3,hive-2.3.8 # 编译之后会报错,不让强转,翻看源码就有点懵圈了 # 明明HadoopCatalog就是继承BaseMetastoreCatalog抽象类,然后BaseMetastoreCatalog实现Catalog接口 Cannot initialize Catalog, org.apache.iceberg.hadoop.HadoopCatalog does not implement Catalog. # 通过flink版本降级到1.11.3之后可以使用
|