DolphinScheduler删除库内日志问题

分钟级日志不太好管理,所以直接删除了库内日志

问题原因

1
2
3
4
5
6
7
8
9
10
删除库内日志直接导致ds频繁的打印TASK_STATE_CHANGE错误日志
导致磁盘直接写满

本以为是因为MySQL库或者ZK中内有映射没有进行删除
结果找了一圈没有多余的信息

原因:
WorkflowExecuteThread
代码中使用了ConcurrentLinkedQueue<StateEvent>将状态进行了存储
而我进行删库日志时,代码还未执行remove操作,导致taskStateChangeHandler方法失效

关键代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
WorkflowExecuteThread

// 状态队列
private ConcurrentLinkedQueue<StateEvent> stateEvents = new ConcurrentLinkedQueue<>();

// 查看状态并进行对应操作
private boolean stateEventHandler(StateEvent stateEvent) {
logger.info("process event: {}", stateEvent.toString());

if (!checkProcessInstance(stateEvent)) {
return false;
}

boolean result = false;
switch (stateEvent.getType()) {
case PROCESS_STATE_CHANGE:
result = processStateChangeHandler(stateEvent);
break;
case TASK_STATE_CHANGE:
result = taskStateChangeHandler(stateEvent);
break;
case PROCESS_TIMEOUT:
result = processTimeout();
break;
case TASK_TIMEOUT:
result = taskTimeout(stateEvent);
break;
case WAIT_TASK_GROUP:
result = checkForceStartAndWakeUp(stateEvent);
break;
case TASK_RETRY:
result = taskRetryEventHandler(stateEvent);
break;
default:
break;
}

if (result) {
this.stateEvents.remove(stateEvent);
}
return result;
}

// 进行状态改变操作
private boolean taskStateChangeHandler(StateEvent stateEvent) {
if (!checkTaskInstanceByStateEvent(stateEvent)) {
return true;
}

TaskInstance task = getTaskInstance(stateEvent.getTaskInstanceId());
if (task.getState() == null) {
logger.error("task state is null, state handler error: {}", stateEvent);
return true;
}

if (task.getState().typeIsFinished()) {
if (completeTaskMap.containsKey(task.getTaskCode()) && completeTaskMap.get(task.getTaskCode()) == task.getId()) {
return true;
}
taskFinished(task);
if (task.getTaskGroupId() > 0) {
releaseTaskGroup(task);
}
return true;
}
if (activeTaskProcessorMaps.containsKey(task.getTaskCode())) {
ITaskProcessor iTaskProcessor = activeTaskProcessorMaps.get(task.getTaskCode());
iTaskProcessor.action(TaskAction.RUN);

if (iTaskProcessor.taskInstance().getState().typeIsFinished()) {
taskFinished(task);
}
return true;
}
logger.error("state handler error: {}", stateEvent);

return true;
}