override def flush() = { if (writer == null) throw new IllegalStateException("Writer is not open") val before = writer.getRawDataSize writer.writeIntermediateFooter() val after = writer.getRawDataSize println(s"###################################$before ==> $after###################################") writer.getRawDataSize }
object HiveDemoOnSink { def main(args: Array[String]): Unit = { val properties = new Properties() properties.setProperty("bootstrap.servers", "hadoop03:9092") properties.setProperty("group.id", "test") properties.setProperty("auto.offset.reset", "latest")
val consumer010 = new FlinkKafkaConsumer010[String]( "test", // List("test1","test2"), new JsonDeserializationSchema(), properties ).setStartFromEarliest()
val senv = StreamExecutionEnvironment.getExecutionEnvironment senv.enableCheckpointing(500)
val dataStream = senv.addSource(consumer010) val configuration = new Configuration() configuration.set("fs.defaultFS", "hdfs://hadoop01:8020") val bucketingSink = new BucketingSink[Message]("/user/hive/warehouse/user_test_orc").setBucketer( new DateTimeBucketer[Message]("'c_date='yyyy-MM-dd") ).setWriter( new OrcWriter[Message](classOf[Message]) ).setBatchSize(1024 * 10).setFSConfig(configuration)
// 写入Hdfs val ds = dataStream.map(data => { val json = new JSONObject(data.toString) val topic = json.get("topic").toString val partition = json.get("partition").toString.toInt val offset = json.get("offset").toString.toInt new Message(topic, partition, offset, json.toString()) })
ds.print()
ds.addSink(bucketingSink) senv.execute() } }
Hive表创建语句
1 2 3 4 5 6 7 8 9
CREATE TABLE user_test_orc( topic string, partition int, offset int, msg string) PARTITIONEED BY (c_date string) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.orc.OrcSerde' STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat' OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat';