1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
| exec_env = ExecutionEnvironment.get_execution_environment()
t_config = TableConfig()
t_env = BatchTableEnvironment.create(exec_env,t_config)
t_env.connect(FileSystem().path(source_file)) \ .with_format(OldCsv() .line_delimiter(',') .field('word',DataTypes.STRING())) \ .with_schema(Schema() .field('word',DataTypes.STRING())) \ .register_table_source('mySource')
t_env.connect(FileSystem().path(sink_file)) \ .with_format(OldCsv() .line_delimiter(',') .field('word',DataTypes.STRING()) .field('count',DataTypes.BIGINT())) \ .with_schema(Schema() .field('word',DataTypes.STRING()) .field('count',DataTypes.BIGINT())) \ .register_table_sink('mySink')
t_env.scan('mySource') \ .group_by('word') \ .select('word,count(1)') \ .insert_into('mySink')
t_env.execute("wc")
|