1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| Flink的Async I/O允许用户将异步请求客户端与数据流一起使用 操作: 实现AsyncFunction调度请求 一个结果ResultFuture的回调 在数据流上应用异步IO操作作为转换 /** * AsyncFunction的一个实现,它发送请求并设置回调 */ class AsyncDatabaseRequest extends AsyncFunction[String, (String, String)] {
/** 可以发出带有回调的并发请求的特定数据库的客户端 */ lazy val client: DatabaseClient = new DatabaseClient(host, post, credentials)
/** 用于将来回调的上下文 */ implicit lazy val executor: ExecutionContext = ExecutionContext.fromExecutor(Executors.directExecutor())
override def asyncInvoke(str: String, resultFuture: ResultFuture[(String, String)]): Unit = {
// 发出异步请求,接收结果的Future val resultFutureRequested: Future[String] = client.query(str)
// 将回调设置为在客户端请求完成后执行 // 回调只是将结果转发到结果Future resultFutureRequested.onSuccess { case result: String => resultFuture.complete(Iterable((str, result))) } } }
// 创建原始流 val stream: DataStream[String] = ...
// 应用异步I/O转换 val resultStream: DataStream[(String, String)] = AsyncDataStream.unorderedWait(stream, new AsyncDatabaseRequest(), 1000, TimeUnit.MILLISECONDS, 100)
|