1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| # 并行度是个迷,设置了也没有啥用,写死的1 FlinkSink.forRowData().*.build() public DataStreamSink<RowData> build() { Preconditions.checkArgument(rowDataInput != null, "Please use forRowData() to initialize the input DataStream."); Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
if (table == null) { tableLoader.open(); try (TableLoader loader = tableLoader) { this.table = loader.loadTable(); } catch (IOException e) { throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e); } }
// Find out the equality field id list based on the user-provided equality field column names. List<Integer> equalityFieldIds = Lists.newArrayList(); if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) { for (String column : equalityFieldColumns) { org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column); Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s", column, table.schema()); equalityFieldIds.add(field.fieldId()); } }
// Convert the requested flink table schema to flink row type. RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
// Distribute the records from input data stream based on the write.distribution-mode. rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
// Chain the iceberg stream writer and committer operator. IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds); IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
DataStream<Void> returnStream = rowDataInput .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter) .setParallelism(writeParallelism) .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter) .setParallelism(1) .setMaxParallelism(1);
return returnStream.addSink(new DiscardingSink()) .name(String.format("IcebergSink %s", table.name())) .setParallelism(1); }
|