此流程是基于1.10.x版本的,对于1.11.x来说存在许多不足;所以根据问题去看源码
概览 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 // 将SqlNode去accept一个访问类 FlinkPlannerImpl validate() // 获取Table信息 PreValidateReWriter visit() appendPartitionProjects() // 将Table转换为CatalogTable FlinkCalciteCatalogReader getTable() toPreparingTable() convertCatalogTable() // 查找并创建TableSource CatalogSourceTable findAndCreateTableSource() // 创建TableSource HiveTableFactory<-TableSourceFactory createTableSource() createHiveTableSource() // 从StreamExecutionEnvironment获取DataStream并获取HiveTableInputFormat HiveTableSource getDataStream() getInputFormat() // 创建InputSplit并根据HiveTableInputSplit转换为对应的Reader HiveTableInputFormat createInputSplits() open() nextRecord() // 根据BaseRow添加字段信息转换为GenericRow HiveMapredSplitReader/HiveVectorizedOrcSplitReader nextRecord() // 实际运行,通过获取InputSplit循环调用Format执行nextRecord DataSourceTask invoke()
问题 为什么Source并行度会很高,并且程序内无法控制? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER 默认值false false ,使用FlinkNativeVectorizedReader去读取ORC文件 true ,使用HadooMapredRecordReader去读取ORC文件 TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM 默认值true false ,Source的并行度在Config中设置 true ,并行度使用split数量 TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX 默认1000 Source的最大并行度 @Override public DataStream<BaseRow> getDataStream (StreamExecutionEnvironment execEnv) { List<HiveTablePartition> allHivePartitions = initAllPartitions(); @SuppressWarnings("unchecked") TypeInformation<BaseRow> typeInfo = (TypeInformation<BaseRow>) TypeInfoDataTypeConverter.fromDataTypeToTypeInfo(getProducedDataType()); Configuration conf = GlobalConfiguration.loadConfiguration(); HiveTableInputFormat inputFormat = getInputFormat(allHivePartitions, conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_FALLBACK_MAPRED_READER)); DataStreamSource<BaseRow> source = execEnv.createInput(inputFormat, typeInfo); int parallelism = conf.get(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM); if (conf.getBoolean(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM)) { int max = conf.getInteger(HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX); if (max < 1 ) { throw new IllegalConfigurationException( HiveOptions.TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM_MAX.key() + " cannot be less than 1" ); } int splitNum; try { long nano1 = System.nanoTime(); splitNum = inputFormat.createInputSplits(0 ).length; long nano2 = System.nanoTime(); LOG.info( "Hive source({}}) createInputSplits use time: {} ms" , tablePath, (nano2 - nano1) / 1_000_000 ); } catch (IOException e) { throw new FlinkHiveException(e); } parallelism = Math.min(splitNum, max); } parallelism = limit > 0 ? Math.min(parallelism, (int ) limit / 1000 ) : parallelism; parallelism = Math.max(1 , parallelism); source.setParallelism(parallelism); return source.name(explainSource()); } 由于HiveOptions中TABLE_EXEC_HIVE_INFER_SOURCE_PARALLELISM为true 并且conf的获取为去配置文件中加载 所以在程序内设置并行度并不会生效
为什么写入的Parquet文件无法读取,字段会乱? 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 @Override @SuppressWarnings("unchecked") public BaseRow nextRecord (BaseRow reuse) throws IOException { if (reachedEnd()) { return null ; } final GenericRow row = reuse instanceof GenericRow ? (GenericRow) reuse : new GenericRow(selectedFields.length); try { Object hiveRowStruct = deserializer.deserialize(value); for (int i = 0 ; i < selectedFields.length; i++) { if (selectedFields[i] < structFields.size()) { StructField structField = structFields.get(selectedFields[i]); Object object = HiveInspectors.toFlinkObject(structField.getFieldObjectInspector(), structObjectInspector.getStructFieldData(hiveRowStruct, structField), hiveShim); row.setField(i, converters[i].toInternal(object)); } } } catch (Exception e) { LOG.error("Error happens when converting hive data type to flink data type." ); throw new FlinkHiveException(e); } if (!rowReused) { if (!partitionKeys.isEmpty()) { for (int i = 0 ; i < selectedFields.length; i++) { if (selectedFields[i] >= structFields.size()) { String partition = partitionKeys.get(selectedFields[i] - structFields.size()); row.setField(i, converters[i].toInternal(hiveTablePartition.getPartitionSpec().get(partition))); } } } rowReused = true ; } this .fetched = false ; return row; } public Object getStructFieldData (Object data, StructField fieldRef) { if (data == null ) { return null ; } else if (data instanceof ArrayWritable) { ArrayWritable arr = (ArrayWritable)data; ArrayWritableObjectInspector.StructFieldImpl structField = (ArrayWritableObjectInspector.StructFieldImpl)fieldRef; return structField.getIndex() < arr.get().length ? arr.get()[structField.getIndex()] : null ; } else if (data instanceof List) { return ((List)data).get(((ArrayWritableObjectInspector.StructFieldImpl)fieldRef).getIndex()); } else { throw new UnsupportedOperationException("Cannot inspect " + data.getClass().getCanonicalName()); } }