1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
   | # 并行度是个迷,设置了也没有啥用,写死的1 FlinkSink.forRowData().*.build() public DataStreamSink<RowData> build() {   Preconditions.checkArgument(rowDataInput != null,       "Please use forRowData() to initialize the input DataStream.");   Preconditions.checkNotNull(tableLoader, "Table loader shouldn't be null");
    if (table == null) {     tableLoader.open();     try (TableLoader loader = tableLoader) {       this.table = loader.loadTable();     } catch (IOException e) {       throw new UncheckedIOException("Failed to load iceberg table from table loader: " + tableLoader, e);     }   }
    // Find out the equality field id list based on the user-provided equality field column names.   List<Integer> equalityFieldIds = Lists.newArrayList();   if (equalityFieldColumns != null && equalityFieldColumns.size() > 0) {     for (String column : equalityFieldColumns) {       org.apache.iceberg.types.Types.NestedField field = table.schema().findField(column);       Preconditions.checkNotNull(field, "Missing required equality field column '%s' in table schema %s",           column, table.schema());       equalityFieldIds.add(field.fieldId());     }   }
    // Convert the requested flink table schema to flink row type.   RowType flinkRowType = toFlinkRowType(table.schema(), tableSchema);
    // Distribute the records from input data stream based on the write.distribution-mode.   rowDataInput = distributeDataStream(rowDataInput, table.properties(), table.spec(), table.schema(), flinkRowType);
    // Chain the iceberg stream writer and committer operator.   IcebergStreamWriter<RowData> streamWriter = createStreamWriter(table, flinkRowType, equalityFieldIds);   IcebergFilesCommitter filesCommitter = new IcebergFilesCommitter(tableLoader, overwrite);
    this.writeParallelism = writeParallelism == null ? rowDataInput.getParallelism() : writeParallelism;
    DataStream<Void> returnStream = rowDataInput       .transform(ICEBERG_STREAM_WRITER_NAME, TypeInformation.of(WriteResult.class), streamWriter)       .setParallelism(writeParallelism)       .transform(ICEBERG_FILES_COMMITTER_NAME, Types.VOID, filesCommitter)       .setParallelism(1)       .setMaxParallelism(1);
    return returnStream.addSink(new DiscardingSink())       .name(String.format("IcebergSink %s", table.name()))       .setParallelism(1); }
   |