之前的工作中有涉及过用livy去整合Spark任务,这里做一下记录
简单介绍
1 2 3 4 5 6
| Livy是什么? REST服务架构,简化与Spark集群的交互,避免笨重的Jar包上传
使用的前置条件 Spark 1.6以上 Scala 2.10以上
|
运行Livy
1 2 3 4 5 6 7 8 9 10
| 配置变量 export SPARK_HOME=/usr/lib/spark export HADOOP_CONF_DIR=/etc/hadoop/conf
启动 ./bin/livy-server start 默认服务端口8998
建议 使用YARN集群模式提交应用,保证Livy所在主机不过载
|
REST API
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50
| get /sessions 获取所有活动的交互式会话
post /sessions 在集群中创建新的交互式Scala,Python,R Shell会话
get /sessions/{sessionId} 获取指定会话信息
get /sessions/{sessionId}/state 获取指定会话信息状态
delete /sessions/{sessionId} 结束会话
get /sessions/{sessionId}/log 获取指定会话日志
get /sessions/{sessionId}/statements 获取指定会话的所有执行语句
post /sessions/{sessionId}/statements 提交执行语句
get /sessions/{sessionId}/statements/{statementId} 获取指定执行语句
post /sessions/{sessionId}/statements/{statementId}/cancel 取消执行语句
post /sessions/{sessionId}/completion 开始执行执行语句
get /batches 获取批处理会话
post /batches 创建批处理会话
get /batches/{batchId} 获取批处理会话信息
get /batches/{batchId}/state 获取批处理会话信息状态
delete /batches/{batchId} 停止批处理任务
get /batches/{batchId}/log 获取批处理会话日志
|
编程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67
| 允许应用程序在Spark内运行代码,而不需要维护Spark的上下文
<dependency> <groupId>org.apache.livy</groupId> <artifactId>livy-client-http</artifactId> <version>0.7.0-incubating</version> </dependency>
具体扩展在org.apache.livy.Job中
测试Demo import java.util.*;
import org.apache.spark.api.java.*; import org.apache.spark.api.java.function.*;
import org.apache.livy.*;
public class PiJob implements Job<Double>, Function<Integer, Integer>, Function2<Integer, Integer, Integer> {
private final int samples;
public PiJob(int samples) { this.samples = samples; }
@Override public Double call(JobContext ctx) throws Exception { List<Integer> sampleList = new ArrayList<Integer>(); for (int i = 0; i < samples; i++) { sampleList.add(i + 1); }
return 4.0d * ctx.sc().parallelize(sampleList).map(this).reduce(this) / samples; }
@Override public Integer call(Integer v1) { double x = Math.random(); double y = Math.random(); return (x*x + y*y < 1) ? 1 : 0; }
@Override public Integer call(Integer v1, Integer v2) { return v1 + v2; }
}
利用Livy提交代码 LivyClient client = new LivyClientBuilder() .setURI(new URI(livyUrl)) .build();
try { System.err.printf("Uploading %s to the Spark context...\n", piJar); client.uploadJar(new File(piJar)).get();
System.err.printf("Running PiJob with %d samples...\n", samples); double pi = client.submit(new PiJob(samples)).get();
System.out.println("Pi is roughly: " + pi); } finally { client.stop(true); }
|
未来规划
1 2 3 4
| 希望Livy能出Flink分支
就个人而言,大数据之后对于组件而言其实并不是很重要,而怎么使用一个比较方便的方式去提交任务,并且运行起来 无关乎是否是大数据工程师
|