ApacheLivy使用

之前的工作中有涉及过用livy去整合Spark任务,这里做一下记录

简单介绍

1
2
3
4
5
6
Livy是什么?
REST服务架构,简化与Spark集群的交互,避免笨重的Jar包上传

使用的前置条件
Spark 1.6以上
Scala 2.10以上

运行Livy

1
2
3
4
5
6
7
8
9
10
配置变量
export SPARK_HOME=/usr/lib/spark
export HADOOP_CONF_DIR=/etc/hadoop/conf

启动
./bin/livy-server start
默认服务端口8998

建议
使用YARN集群模式提交应用,保证Livy所在主机不过载

REST API

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
get /sessions
获取所有活动的交互式会话

post /sessions
在集群中创建新的交互式Scala,Python,R Shell会话

get /sessions/{sessionId}
获取指定会话信息

get /sessions/{sessionId}/state
获取指定会话信息状态

delete /sessions/{sessionId}
结束会话

get /sessions/{sessionId}/log
获取指定会话日志

get /sessions/{sessionId}/statements
获取指定会话的所有执行语句

post /sessions/{sessionId}/statements
提交执行语句

get /sessions/{sessionId}/statements/{statementId}
获取指定执行语句

post /sessions/{sessionId}/statements/{statementId}/cancel
取消执行语句

post /sessions/{sessionId}/completion
开始执行执行语句

get /batches
获取批处理会话

post /batches
创建批处理会话

get /batches/{batchId}
获取批处理会话信息

get /batches/{batchId}/state
获取批处理会话信息状态

delete /batches/{batchId}
停止批处理任务

get /batches/{batchId}/log
获取批处理会话日志

编程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
允许应用程序在Spark内运行代码,而不需要维护Spark的上下文

<dependency>
<groupId>org.apache.livy</groupId>
<artifactId>livy-client-http</artifactId>
<version>0.7.0-incubating</version>
</dependency>

具体扩展在org.apache.livy.Job中

测试Demo
import java.util.*;

import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;

import org.apache.livy.*;

public class PiJob implements Job<Double>, Function<Integer, Integer>,
Function2<Integer, Integer, Integer> {

private final int samples;

public PiJob(int samples) {
this.samples = samples;
}

@Override
public Double call(JobContext ctx) throws Exception {
List<Integer> sampleList = new ArrayList<Integer>();
for (int i = 0; i < samples; i++) {
sampleList.add(i + 1);
}

return 4.0d * ctx.sc().parallelize(sampleList).map(this).reduce(this) / samples;
}

@Override
public Integer call(Integer v1) {
double x = Math.random();
double y = Math.random();
return (x*x + y*y < 1) ? 1 : 0;
}

@Override
public Integer call(Integer v1, Integer v2) {
return v1 + v2;
}

}

利用Livy提交代码
LivyClient client = new LivyClientBuilder()
.setURI(new URI(livyUrl))
.build();

try {
System.err.printf("Uploading %s to the Spark context...\n", piJar);
client.uploadJar(new File(piJar)).get();

System.err.printf("Running PiJob with %d samples...\n", samples);
double pi = client.submit(new PiJob(samples)).get();

System.out.println("Pi is roughly: " + pi);
} finally {
client.stop(true);
}

未来规划

1
2
3
4
希望Livy能出Flink分支

就个人而言,大数据之后对于组件而言其实并不是很重要,而怎么使用一个比较方便的方式去提交任务,并且运行起来
无关乎是否是大数据工程师