利用了Flink的Sink功能,将对ES的操作封装在自定义ElasticsearchSinkFunction类中.
代码 主实现 1 2 3 4 5 6 7 val addressList = new java.util.ArrayList [HttpHost ]()addressList.add(new HttpHost (ES_NAME , ES_PORT )) val hbaseDs = kafkaStream.map(x => { val result = new JSONObject (x) (result.getString("value" ), 1 ) }) hbaseDs.addSink(new ElasticsearchSink .Builder [(String , Int )](addressList, new TestElasticsearchSinkFunction ).build())
自定义ElasticsearchSinkFunction类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package com.dev.flink.stream.es.entryimport com.dev.flink.stream.es.develop.{ES_INDEX , ES_TYPE }import org.apache.flink.api.common.functions.RuntimeContext import org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction , RequestIndexer }import org.elasticsearch.action.update.UpdateRequest import org.elasticsearch.common.xcontent.json.JsonXContent import org.elasticsearch.script.Script class TestElasticsearchSinkFunction extends ElasticsearchSinkFunction [(String , Int )] { override def process (data: (String , Int ), runtimeContext: RuntimeContext , requestIndexer: RequestIndexer ): Unit = { val id = data._1 val content = JsonXContent .contentBuilder().startObject() .field("id" , id) .field("word" , data._1) .field("count" , data._2) .endObject() val updateRequest1 = new UpdateRequest ().index( ES_INDEX ).`type `( ES_TYPE ).id (id ) .docAsUpsert (true ).doc (content ) val updateRequest = new UpdateRequest ().index( ES_INDEX ).`type `( ES_TYPE ).id (id ) .script (new Script ("ctx._source.remove(\"word\" )")).scriptedUpsert (true ) requestIndexer .add (updateRequest1 ) requestIndexer .add (updateRequest ) } }
总结 可以看到,对于ES的增删改查都在自定义ElasticsearchSinkFunction类中实现,支持IndexRequest,DeleteRequest,UpdateRequest,GetRequest