1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| import java.nio.ByteBuffer import java.util.Properties
import com.alibaba.fastjson.JSON import org.apache.flink.api.common.serialization.SimpleStringSchema import org.apache.flink.orc.writer.OrcBulkWriterFactory import org.apache.flink.runtime.state.filesystem.FsStateBackend import org.apache.flink.streaming.api.CheckpointingMode import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink import org.apache.flink.streaming.api.scala.{StreamExecutionEnvironment, _} import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer import org.apache.hadoop.conf.Configuration import org.apache.kafka.clients.producer.ProducerConfig import org.apache.flink.core.fs.Path
object OrcFileWriteDemo { def main(args: Array[String]): Unit = { val READ_TOPIC = "topic" val env = StreamExecutionEnvironment.getExecutionEnvironment env.enableCheckpointing(60000L, CheckpointingMode.EXACTLY_ONCE) env.setStateBackend(new FsStateBackend("file:///job/flink/ck/Orc")) val props = new Properties() props.put("bootstrap.servers", "hosts:9092") props.put("group.id", "xs_test3") props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer") props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
val producerProps = new Properties() producerProps.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "skuldcdhtest1.ktcs:9092") producerProps.setProperty(ProducerConfig.RETRIES_CONFIG, "3") producerProps.setProperty(ProducerConfig.ACKS_CONFIG, "all")
val student = env.addSource(new FlinkKafkaConsumer( READ_TOPIC, new SimpleStringSchema, props).setStartFromLatest() ).map(x => { ... Demo("","","") }).setParallelism(1)
val schema: String = "struct<platform:string,event:string,dt:string>" val writerProperties: Properties = new Properties() writerProperties.setProperty("orc.compress", "ZLIB")
val vectorizer = new DemoVectorizer(schema) val writerFactory = new CustomOrcBulkWriterFactory(vectorizer, writerProperties, new Configuration()) val sink = StreamingFileSink.forBulkFormat(new Path("F:\\test\\Demo\\Flink11\\src\\main\\resources"), writerFactory ).build()
student.addSink(sink).setParallelism(1) env.execute("write hdfs") } }
case class Demo(platform: String, event: String, dt: String)
|