Flink本身的format.type
目前支持json
,avro
,csv
三种格式 对于涉及的源码会另开一章进行介绍
需求 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 FlinkSQL 创建Kafka 源表,数据格式为JSON ,但是数据中有一些脏数据这时候程序会直接报错停掉 Caused by: java.io.IOException : Failed to deserialize JSON object .def createKafkaTable (): String = { "" " |CREATE TABLE demo1 ( | uid VARCHAR COMMENT 'uid', | rid VARCHAR COMMENT 'rid' |) |WITH ( | 'connector.type' = 'kafka', -- 使用 kafka connector | 'connector.version' = 'universal', -- kafka 版本 | 'connector.topic' = 'test', -- kafka topic | 'connector.properties.0.key' = 'zookeeper.connect', -- zk连接信息 | 'connector.properties.0.value' = 'hosts:2181', -- zk连接信息 | 'connector.properties.1.key' = 'bootstrap.servers', -- broker连接信息 | 'connector.properties.1.value' = 'hosts:9092', -- broker连接信息 | 'connector.sink-partitioner' = 'fixed', | 'update-mode' = 'append', | 'format.type' = 'json', -- 数据源格式为 json | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 |) " "" .stripMargin}
解决 自定义Factory 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 package org.apache.flink.formats.custom;import com.test.flink.CustomJsonRowDeserializationSchema;import org.apache.flink.api.common.serialization.DeserializationSchema;import org.apache.flink.api.common.serialization.SerializationSchema;import org.apache.flink.api.common.typeinfo.TypeInformation;import org.apache.flink.api.java.typeutils.RowTypeInfo;import org.apache.flink.formats.json.JsonRowDeserializationSchema;import org.apache.flink.formats.json.JsonRowSchemaConverter;import org.apache.flink.formats.json.JsonRowSerializationSchema;import org.apache.flink.table.descriptors.DescriptorProperties;import org.apache.flink.table.descriptors.JsonValidator;import org.apache.flink.table.factories.DeserializationSchemaFactory;import org.apache.flink.table.factories.SerializationSchemaFactory;import org.apache.flink.table.factories.TableFormatFactoryBase;import org.apache.flink.types.Row;import java.util.Map;public class CustomJsonRowFormatFactory extends TableFormatFactoryBase <Row > implements SerializationSchemaFactory <Row >, DeserializationSchemaFactory <Row > { public CustomJsonRowFormatFactory () { super ("custom" , 1 , true ); } private static DescriptorProperties getValidatedProperties (Map<String, String> propertiesMap) { final DescriptorProperties descriptorProperties = new DescriptorProperties(); descriptorProperties.putProperties(propertiesMap); new JsonValidator().validate(descriptorProperties); return descriptorProperties; } @Override public DeserializationSchema<Row> createDeserializationSchema (Map<String, String> properties) { final DescriptorProperties descriptorProperties = getValidatedProperties(properties); final CustomJsonRowDeserializationSchema.Builder schema = new CustomJsonRowDeserializationSchema.Builder(createTypeInformation(descriptorProperties)); return schema.build(); } @Override public SerializationSchema<Row> createSerializationSchema (Map<String, String> properties) { final DescriptorProperties descriptorProperties = getValidatedProperties(properties); return new JsonRowSerializationSchema.Builder(createTypeInformation(descriptorProperties)).build(); } private TypeInformation<Row> createTypeInformation (DescriptorProperties descriptorProperties) { if (descriptorProperties.containsKey(JsonValidator.FORMAT_SCHEMA)) { return (RowTypeInfo) descriptorProperties.getType(JsonValidator.FORMAT_SCHEMA); } else if (descriptorProperties.containsKey(JsonValidator.FORMAT_JSON_SCHEMA)) { return JsonRowSchemaConverter.convert(descriptorProperties.getString(JsonValidator.FORMAT_JSON_SCHEMA)); } else { return deriveSchema(descriptorProperties.asMap()).toRowType(); } } }
注意 由于是自定义Factory类,所以需要在resources文件夹下进行以下操作
1 2 3 4 5 6 创建文件夹 META-INF/services 创建文件(注意文件名就是下面的字符串) org.apache.flink.table.factories.TableFactory 文件内容(自定义Factory类路径) org.apache.flink.formats.custom.CustomJsonRowFormatFactory
创建自定义的DeSerializationSchema/SerializationSchema 1 2 3 4 此处可以参考以下实现 org.apache.flink.formats.json.JsonRowSerializationSchema org.apache.flink.formats.json.JsonRowDeserializationSchema 不再赘述
使用 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 def createKafkaTable (): String = { "" " |CREATE TABLE demo1 ( | uid VARCHAR COMMENT 'uid', | rid VARCHAR COMMENT 'rid' |) |WITH ( | 'connector.type' = 'kafka', -- 使用 kafka connector | 'connector.version' = 'universal', -- kafka 版本 | 'connector.topic' = 'test', -- kafka topic | 'connector.properties.0.key' = 'zookeeper.connect', -- zk连接信息 | 'connector.properties.0.value' = 'hosts:2181', -- zk连接信息 | 'connector.properties.1.key' = 'bootstrap.servers', -- broker连接信息 | 'connector.properties.1.value' = 'hosts:9092', -- broker连接信息 | 'connector.sink-partitioner' = 'fixed', | 'update-mode' = 'append', | 'format.type' = 'custom', -- 数据源格式为解析换为自定义 | 'format.derive-schema' = 'true' -- 从 DDL schema 确定 json 解析规则 |) " "" .stripMargin}