任务调度结合数据结构图实现

各类作业执行会操作不同的表,作业之间又有依赖关系,所以希望通过输入输出表与作业的关系来决定作业顺序,其实如果没有特殊要求,像这类调度可以使用Azkanban工具来实现,更加方便

已有信息

  • 各spark作业jar
  • neo4j存储了作业与表的关系图
  • 作业有执行的先后关系

需求

  • 半/全自动化执行作业
  • 只需要确定输入表输出表与作业的关系,就可以生成执行信息

实现流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
1.Neo4j针对输入输出表,建立图
2.根据图去生成作业的执行顺序<重点>
eg: 现有5张表,ta,tb,tc,td,te,每个表对应1个spark程序j1,j2,j3,j4,j5
表的依赖关系为tc依赖于(ta,tb),te依赖于(tc,td),那么我们可以得到一个任务分层树
表分层:
第一层: ta,tb,td
第二层: tc
第三层: te
对应作业:
第一层: j1,j2,j4
第二层: j3
第三层: j5
3.建立作业的状态表job_state(状态信息MySQL存储)
eg: 根据上述信息,可以了解到,树的深度是未知的,所以根据层数来指定分层表是不理想的,但是可以用父子节点的方式将作业信息存放单表中
每个作业应该有的信息有
作业id(每天根据第2步生成任务分层树生成)
父作业id(依赖的作业id,无为0,,可以是多个)
执行状态(0未执行,1,正在执行,2执行失败,6执行成功)
执行命令(spark-submit ...jar)
开始时间
结束时间
4.建立脚本,每天定时执行(具体选择队列形式<celery一次执行>,还是重复执行<crontab每N分钟执行>)
5.脚本逻辑
去表中获取今日份作业信息<循环执行>
判断状态值与依赖
状态为0且无父依赖
调用console执行spark作业
状态为0但有父依赖
根据父作业id,查询父作业状态select * from job_state where id = 父作业id and 执行状态 != 6
判断状态值,为6则执行,只要有不为6的就跳过
状态不为0
跳过执行

图代码实现

Point(TablePoint,ProjectPoint)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import java.util.HashSet;

public abstract class Point {
HashSet<String> parentPoints = new HashSet<>();
HashSet<String> childPoints = new HashSet<>();
}

// ---

public class ProjectPoint extends Point {
@Override
public String toString() {
return "任务节点";
}
}

// ---

public class TablePoint extends Point {
@Override
public String toString() {
return "表节点";
}
}

Graph

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import java.util.HashMap;
import java.util.HashSet;

class Graph {
private HashMap<String, Point> pointMap = new HashMap<>();
private HashMap<String, HashSet<String>> edgeMap = new HashMap<>();

/**
* 添加节点,Point具有两个数组,分别存父节点,子节点
*
* @param name
* @param point
*/
void addPoint(String name, Point point) {
pointMap.put(name, point);
}

/**
* 添加边,并给from节点添加子节点列表,给to节点添加父节点列表
*
* @param from
* @param to
*/
void addEdge(String from, String to) {
HashSet<String> nextPoint = edgeMap.get(from);
if (nextPoint != null) {
nextPoint.add(to);
} else {
HashSet<String> strings = new HashSet<>();
strings.add(to);
edgeMap.put(from, strings);
}
Point fromPoint = pointMap.get(from);
Point toPoint = pointMap.get(to);
fromPoint.childPoints.add(to);
toPoint.parentPoints.add(from);
}

HashMap<String, HashSet<String>> getEdgeMap() {
return edgeMap;
}

HashMap<String, Point> getPointMap() {
return pointMap;
}

/**
* 获取出度
*
* @return
*/
HashMap<String, Integer> getOutDegree() {
HashMap<String, Integer> outDegree = new HashMap<>();
for (String key : edgeMap.keySet()) {
outDegree.put(key, edgeMap.get(key).size());
}
return outDegree;
}

/**
* 获取入度
*
* @return
*/
HashMap<String, Integer> getInDegree() {
HashMap<String, Integer> inDegree = new HashMap<>();
for (String key : edgeMap.keySet()) {
HashSet<String> strings = edgeMap.get(key);
for (String s : strings) {
inDegree.merge(s, 1, (a, b) -> a + b);
}
}
return inDegree;
}

/**
* 获取points各个节点为ProjectPoint的节点,HashSet去重
* 目前暂且支持table->project->table的情况
*
* @param points
* @return
*/
HashSet<String> getParentNodes(HashSet<String> points) {
HashSet<String> result = new HashSet<>();
for (String point : points) {
if (!(pointMap.get(point) instanceof ProjectPoint)) {
result.addAll(getParentNodes(pointMap.get(point).parentPoints));
} else {
result.add(point);
}
}
return result;
}
}

GraphDemo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
import java.util.HashMap;
import java.util.HashSet;

public class GraphDemo {
public static void main(String[] args) {
Graph graph = new Graph();
// 构建图
graph.addPoint("a", new TablePoint());
graph.addPoint("b", new TablePoint());
graph.addPoint("c", new TablePoint());
graph.addPoint("d", new ProjectPoint());
graph.addPoint("e", new TablePoint());
graph.addPoint("f", new ProjectPoint());
graph.addPoint("g", new TablePoint());
graph.addPoint("h", new TablePoint());
graph.addPoint("i", new TablePoint());
graph.addPoint("j", new ProjectPoint());
graph.addEdge("a", "d");
graph.addEdge("b", "d");
graph.addEdge("c", "d");
graph.addEdge("d", "e");
graph.addEdge("e", "f");
graph.addEdge("f", "g");
graph.addEdge("f", "h");
graph.addEdge("g", "j");
graph.addEdge("h", "j");
graph.addEdge("i", "j");

System.out.println(graph.getPointMap());
System.out.println(graph.getEdgeMap());

HashMap<String, HashSet<String>> edgeMap = graph.getEdgeMap();
HashMap<String, Point> pointMap = graph.getPointMap();

// 出度
System.out.println(graph.getOutDegree());

// 入度
System.out.println(graph.getInDegree());

// 生成ProjectPoint节点,以及其父ProjectPoint节点信息
for (String key : pointMap.keySet()) {
// System.out.println(key+":"+pointMap.get(key).parentPoints);
if (pointMap.get(key) instanceof ProjectPoint) {
HashSet<String> parentNodes = graph.getParentNodes(pointMap.get(key).parentPoints);
System.out.println(key + ":" + parentNodes);
}
}
}
}

图升级版实现<附思路和代码>

思路

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
我们已经获取了一个图,并且确定了每个节点的父节点与子节点
那么获取所有没有入度的节点
执行完没有入度的节点之后,删除节点,与其对应的边,再去获取没有入度的节点
一直循环,直到结束

点集:存放节点信息
边集:存放(from点,to点)关系对

执行节点:
执行时,将正在执行的节点保存起来<runningNode>;
什么是执行节点,获取没有入度的节点并且不存在于<runningNode>的节点就是执行节点
没有入度的节点:遍历边集的to节点,不存在与点集的都是没有入度的节点
执行前或执行后删除都可以,顺序如下
[1,2,3,4,5,6]->[(1,2),(1,3),(4,5),(3,6),(5,6)]
to(2,3,5,6) 执行1,4
[2,3,5,6]->[(3,6),(5,6)]
to(6) 执行2,3,5
[6]->[]
to() 执行6

具体实现开启多线程执行作业

缩边

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
任务调度图是由表与作业两个节点进行连接生成的
最终我们需要的是一个完全由作业组成的作业流图
实现:
a.获取以作业节点为起点的边集->(作业节点,表节点/作业节点)
b.获取(作业节点,作业节点)类型的边集
c.根据a的边集获取对应表节点为起点的边集->(表节点,作业节点)
d.对ac的结果集进行缩边,将表节点转成作业节点
eg:
p1,p2,p3是作业节点
t1,t2,t3,t4是表节点
依赖为:(t1,t2)->p1->t3->p2->t4->p3
a将产生[(p1,t3),(p2->t4)]]
b是空
c根据a的t3,t4将产生[(t3,p2),(t4,p3)]
d产生[(p1,p2),(p2,p4)]

注意:
不支持存在表节点->表节点关系的图

代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
package graph

import java.io.FileInputStream
import java.time.{Instant, LocalDateTime, ZoneOffset}
import java.time.format.DateTimeFormatter
import java.util.Properties

import scala.collection.mutable.ListBuffer

object Graph {

case class Node(name: String, flag: String, cost: Long, path: String)

case class Edge(from: String, to: String)

var mysqlData: MySQLData = null

var nodeSet: Set[Node] = Set[Node]()
var edgeSet: Set[Edge] = Set[Edge]()
var eSet = Set[Edge]()
var nSet = Set[Node]()
var recoverSet = Set[Node]()
var pickedHeadSet = Set[Node]()

var nodeMap: Map[String, Node] = Map[String, Node]()

var THREAD_COUNT = 3
val formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss")
val LOCK = 1l
var recover = true

/**
* 初始化
*/
def init(): Unit = {
// 替换成MySQL
nodeSet += Node("a", "t", 0, "执行命令")
nodeSet += Node("b", "t", 0, "执行命令")
nodeSet += Node("c", "t", 0, "执行命令")
nodeSet += Node("d", "p", 7, "执行命令d")
nodeSet += Node("x", "t", 0, "执行命令")
nodeSet += Node("e", "t", 0, "执行命令")
nodeSet += Node("f", "p", 6, "执行命令f")
nodeSet += Node("g", "t", 0, "执行命令")
nodeSet += Node("h", "t", 0, "执行命令")
nodeSet += Node("i", "t", 0, "执行命令")
nodeSet += Node("j", "p", 5, "执行命令j")
nodeSet += Node("k", "p", 4, "执行命令k")

edgeSet += Edge("a", "d")
edgeSet += Edge("b", "d")
edgeSet += Edge("c", "d")
edgeSet += Edge("d", "e")
edgeSet += Edge("e", "f")
edgeSet += Edge("f", "g")
edgeSet += Edge("f", "h")
edgeSet += Edge("g", "j")
edgeSet += Edge("h", "j")
edgeSet += Edge("h", "k")
edgeSet += Edge("i", "j")
edgeSet += Edge("j", "i")


// val url = prop.getProperty("url")
// val username = prop.getProperty("username")
// val password = prop.getProperty("password")
//
// mysqlData = MySQLData(url, username, password)
//
// val resultSet = mysqlData.executeQuery("select name, flag, cost, path from job_node")
// while (resultSet.next()) {
// val name = resultSet.getString(1)
// val cost = resultSet.getLong(2)
// val path = resultSet.getString(3)
//
// nodeSet += Node(name, "", cost, path)
// }
// resultSet.close()
//
// // job edge
// val edgeResultSet = mysqlData.executeQuery("select before_job, after_job from job_edge")
// while (edgeResultSet.next()) {
// val from = edgeResultSet.getString(1)
// val to = edgeResultSet.getString(2)
//
// edgeSet += Edge(from, to)
// }
// edgeResultSet.close()
//
// nodeMap = nodeSet.map(x => (x.name, x)).toMap[String, Node]
//
// // failed job
// val failedResultSet = mysqlData.executeQuery(
// s"select name from job_failed_node where c_time> '${LocalDate.now().toString}'"
// )
// while (failedResultSet.next()) {
// val name = failedResultSet.getString(1)
//
// if (nodeMap.contains(name)) {
// recoverSet += nodeMap(name)
// }
// }
// failedResultSet.close()
}

/**
* 缩边,重置点集边集
*/
def generate(): Unit = {
nodeMap = nodeSet.map(x => (x.name, x)).toMap[String, Node]

// 获取<作业节点,表节点/作业节点>边
val edge1 = edgeSet.filter(x => nodeMap.filter(_._2.flag == "p").keys.toList.contains(x.from)).map(x => (x.from, x.to)).toList


// 获取<作业节点,作业节点>边
val p2p = edge1.filter(x => {
nodeMap(x._1).flag.equals("p") && nodeMap(x._2).flag.equals("p")
})
// 根据edge1获取<表节点,作业节点>边,注意,在初始图中不应该存在<表节点,表节点>这样的边
val edge2 = edgeSet.filter(x => edge1.map(_._2).contains(x.from)).map(x => (x.from, x.to)).toList


// 重置点边
nodeSet = nodeSet.filter(_.flag == "p")
nodeMap = nodeSet.map(x => (x.name, x)).toMap[String, Node]
edgeSet = edge1.map(x => (x, edge2.filter(_._1 == x._2).map(_._2))).map(x => {
x._2.map(y => {
(x._1._1, y)
})
// }).flatMap(_.toList).++(p2p).filter(x => x._1 != x._2).map(x => {
}).flatMap(_.toList).++(p2p).map(x => {
Edge(x._1, x._2)
}).toSet
println(edge1)
println(edge2)
println(nodeSet)
println(edgeSet)
}

/**
* 验证图
*/
def validate(): Unit = {
edgeSet = edgeSet.filter(x => nodeMap.get(x.from).isDefined && nodeMap.get(x.to).isDefined)
eSet = edgeSet
nSet = nodeSet
}

/**
* 加载需要恢复的节点
*
* @param rSet
* @return
*/
def getReset(rSet: Set[Node]): Set[Node] = {
val set = if (rSet.size < 1) Set[Node]() else getReset(edgeSet.filter(x => {
rSet.map(_.name).contains(x.from)
}).map(x => nodeMap(x.to)))
set ++ rSet
}


def main(args: Array[String]): Unit = {
val prop = new Properties()
prop.load(new FileInputStream("D:\\Projects\\IdeaProjects\\Demo\\zsd-test\\src\\main\\resource\\prop.properties"))
THREAD_COUNT = prop.getProperty("worker_count").toInt
if (args.length > 1) {
recover = true
}

// 初始化(表-作业)图
init()
// 生成作业图
generate()

// validate()

// if (recover && recoverSet != null && recoverSet.nonEmpty) { // 如果是恢复流程
// println("############################# Recover job #############################")
// val rSet = getReset(recoverSet) //加载需要恢复的节点
// nodeSet = rSet
// nSet = nodeSet
// nodeMap = nodeSet.map(x => (x.name, x)).toMap[String, Node]
// validate() // 验证边和点信息
// }
//
for (_ <- 0 until THREAD_COUNT) { // 启动线程
val thread = new Worker()
thread.start()
}
}

class Worker extends Thread {
val runtime: Runtime = Runtime.getRuntime
var history: ListBuffer[Node] = ListBuffer[Node]()
var cost = 0d
var enable = true

override def run(): Unit = {
while (nSet.nonEmpty && enable) {
var pickedNode: Node = null
LOCK.synchronized {
val headSet = getHead // 获取可运行的头结点
println(headSet)
if (headSet.nonEmpty) { // 如果有可用头结点,则挑选一个来运行
val (pn, length) = pickNode(headSet) // 挑选加权深度最大的一个node运行
history += pn // 记录本线程运行历史
pickedHeadSet += pn // 记录正在运行的头结点
eSet = eSet.filter(_.from != pn.name) // 去除边
nSet = nSet.filter(_ != pn) // 去除点
pickedNode = pn
}
else { // 如果没有可用头结点就等待10s
println(s"Thread ${Thread.currentThread().getId} Sleeping !!")
Thread.sleep(10000)
}
}

// 执行程序
if (pickedNode != null) {
println(s"Thread ${Thread.currentThread().getId} ==> ${pickedNode.name}")
val t1 = System.currentTimeMillis()
var t2 = t1
var rtn = 0
try {
println(s"Exec ${pickedNode.path}")
// val path = pickedNode.path.substring(0, pickedNode.path.lastIndexOf("/"))
Thread.sleep(pickedNode.cost * 1000)

// val cmd: Array[String] = Array("/bin/sh", "-c", s"nohup ${pickedNode.path} >> $path/msg.log 2>&1")
// val process = runtime.exec(cmd)
// rtn = process.waitFor()
println(pickedNode.path)
t2 = System.currentTimeMillis()
if (rtn != 0) {
throw new RuntimeException(s"Return Code $rtn !!")
}
else {
println(s"Run Success in Code $rtn !! Cost ${(t2 - t1) / 1000} Seconds")
}

LOCK.synchronized {
pickedHeadSet -= pickedNode // 如果执行成功了,就将节点从正在执行的set中移除
}
} catch {
case e: RuntimeException => {
LOCK.synchronized {
println(s"### Node ${pickedNode.name} # is error !!")
printChildren(pickedNode, pickedNode) // 执行失败,则记录错误信息
println(e)
}
}
} finally { // 最终,更新node信息,记录最后运行时间和花费时间
val time = formatter.format(LocalDateTime.ofInstant(Instant.ofEpochMilli(t1), ZoneOffset.ofHours(8)))
val sql = new StringBuilder(s"update job_node set last_run='$time'")
if (rtn == 0) {
cost += (t2 - t1) / 1000
sql.append(s", cost = ${(t2 - t1) / 1000}")
}
sql.append(s" where name='${pickedNode.name}'")
// val count = mysqlData.executeUpdate(sql.toString())
// println(s"Infect Count $count")
}
}
}
println(s"@@ Thread ${Thread.currentThread().getId} Cost: $cost ==> $history")
}

def printChildren(node: Node, error_root: Node): Unit = {
// mysqlData.executeUpdate(
// s"insert into job_failed_node(name, error_root) values('${node.name}', '${error_root.name}')"
// )
println(s"Record the Error Node ${node.name}!!")
nSet -= node
edgeSet.filter(_.from == node.name).foreach(x => {
printChildren(nodeMap(x.to), error_root)
})
}

def getDepth(node: Node, nodeSeq: Seq[Node], depth: Double = 0): (Seq[Node], Double) = {
val subEdgeSet = edgeSet.filter(_.from == node.name)
if (subEdgeSet.isEmpty) {
(nodeSeq :+ node, node.cost + depth)
}
else
subEdgeSet.map(x => {
getDepth(nodeMap(x.to), nodeSeq :+ node, node.cost + depth)
}).maxBy(_._2)
}

def pickNode(headSet: Set[Node]): (Node, Double) = {
val maxNode = headSet.map(node => (node, getDepth(node, Seq[Node]()))).maxBy(_._2._2) // 挑选运行估计时间最长的
(maxNode._1, maxNode._2._2)
}

def getHead: Set[Node] = {
nSet.filterNot(node => { // 这些情况的不挑
eSet.map(_.to).contains(node.name) || // 有入度的节点
pickedHeadSet.map(_.name).contains(node.name) || // 正在执行的节点
edgeSet.filter(x => pickedHeadSet.map(_.name).contains(x.from)).map(_.to).contains(node.name) // 正在执行的节点的子节点
})
}
}
}

---

package graph

import java.sql.{Connection, DriverManager, ResultSet}

class MySQLData(url: String, username: String, password: String) {
val lock = 1l
var conn: Connection = null

def initConn(): Unit = {
lock.synchronized {
if (conn == null) {
Class.forName("com.mysql.jdbc.Driver")
conn = DriverManager.getConnection(url, username, password)
}
}
}

def executeQuery(sql: String): ResultSet = {
initConn()

val statement = conn.createStatement()
statement.executeQuery(sql)
}

def executeUpdate(sql: String): Int = {
initConn()

val statement = conn.createStatement()
statement.executeUpdate(sql)
}
}

object MySQLData {
def apply(
url: String,
username: String,
password: String
): MySQLData = new MySQLData(url, username, password)
}