利用了Flink的Sink功能,将对ES的操作封装在自定义ElasticsearchSinkFunction类中.
 
代码 主实现 1 2 3 4 5 6 7 val  addressList = new  java.util.ArrayList [HttpHost ]()addressList.add(new  HttpHost (ES_NAME , ES_PORT )) val  hbaseDs = kafkaStream.map(x => {  val  result = new  JSONObject (x)   (result.getString("value" ), 1 ) }) hbaseDs.addSink(new  ElasticsearchSink .Builder [(String , Int )](addressList, new  TestElasticsearchSinkFunction ).build()) 
自定义ElasticsearchSinkFunction类 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 package  com.dev.flink.stream.es.entryimport  com.dev.flink.stream.es.develop.{ES_INDEX , ES_TYPE }import  org.apache.flink.api.common.functions.RuntimeContext import  org.apache.flink.streaming.connectors.elasticsearch.{ElasticsearchSinkFunction , RequestIndexer }import  org.elasticsearch.action.update.UpdateRequest import  org.elasticsearch.common.xcontent.json.JsonXContent import  org.elasticsearch.script.Script class  TestElasticsearchSinkFunction  extends  ElasticsearchSinkFunction [(String , Int )]   override  def  process String , Int ), runtimeContext: RuntimeContext , requestIndexer: RequestIndexer ): Unit  = {     val  id = data._1     val  content = JsonXContent .contentBuilder().startObject()       .field("id" , id)       .field("word" , data._1)       .field("count" , data._2)       .endObject()                                                                      val  updateRequest1 = new  UpdateRequest ().index(       ES_INDEX      ).`type `(       ES_TYPE       ).id (id )    .docAsUpsert (true ).doc (content )     val  updateRequest   = new  UpdateRequest ().index(      ES_INDEX      ).`type `(       ES_TYPE       ).id (id )      .script (new Script ("ctx._source.remove(\"word\" )")).scriptedUpsert (true )                 requestIndexer .add (updateRequest1 )     requestIndexer .add (updateRequest )   } } 
总结 可以看到,对于ES的增删改查都在自定义ElasticsearchSinkFunction类中实现,支持IndexRequest,DeleteRequest,UpdateRequest,GetRequest