Flink小扩展之Calcite自定义SQL解析器

FlinkSQL其底层的SQL解析流程使用的Calcite框架,参考传送门

前期项目构建

maven项目

1
2
3
<groupId>org.example</groupId>
<artifactId>calcite_test</artifactId>
<version>1.0</version>

pom.xml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
<dependencies>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>
<dependency>
<groupId>net.java.dev.javacc</groupId>
<artifactId>javacc</artifactId>
<version>7.0.9</version>
</dependency>
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.30</version>
</dependency>
<dependency>
<groupId>org.apache.calcite</groupId>
<artifactId>calcite-core</artifactId>
<version>1.23.0</version>
</dependency>

</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.2</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
</configuration>
</plugin>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>javacc-maven-plugin</artifactId>
<executions>
<execution>
<id>javacc</id>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-sources/fmpp</sourceDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
</configuration>
</execution>
<execution>
<id>javacc-test</id>
<phase>generate-test-sources</phase>
<goals>
<goal>javacc</goal>
</goals>
<configuration>
<sourceDirectory>${project.build.directory}/generated-test-sources/fmpp</sourceDirectory>
<outputDirectory>${project.build.directory}/generated-test-sources/javacc</outputDirectory>
<includes>
<include>**/Parser.jj</include>
</includes>
<lookAhead>2</lookAhead>
<isStatic>false</isStatic>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.drill.tools</groupId>
<artifactId>drill-fmpp-maven-plugin</artifactId>
<executions>
<execution>
<configuration>
<config>src/main/codegen/config.fmpp</config>
<output>${project.build.directory}/generated-sources/fmpp</output>
<templates>src/main/codegen/templates</templates>
</configuration>
<id>generate-fmpp-sources</id>
<phase>validate</phase>
<goals>
<goal>generate</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>

模板文件

1
2
3
4
5
6
7
8
9
10
# 复制Calcite源码
git clone https://github.com/apache/calcite.git
# 复制模板文件到自己的工程下
mv calcite\core\src\main\codegen calcite_test\src\main\

# 可以了解一下模板文件
compoundIdentifier.ftl
parserImpls.ftl
Parser.jj
config.fmpp

代码编写

自定义SqlNode

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
// 需要在org.apache.calcite.sql包内,SqlNode没有public构造函数
package org.apache.calcite.sql;

import org.apache.calcite.sql.parser.SqlParserPos;
import org.apache.calcite.sql.util.SqlVisitor;
import org.apache.calcite.sql.validate.SqlValidator;
import org.apache.calcite.sql.validate.SqlValidatorScope;
import org.apache.calcite.util.Litmus;

/**
* SQL解析树
*
* @author XiaShuai on 2020/7/3.
*/
public class SqlExample extends SqlNode {
private String exampleString;
private SqlParserPos pos;

public SqlExample(SqlParserPos pos, String exampleString) {
super(pos);
this.pos = pos;
this.exampleString = exampleString;
}

public String getExampleString() {
System.out.println("getExampleString");
return this.exampleString;
}

@Override
public SqlNode clone(SqlParserPos sqlParserPos) {
System.out.println("clone");
return null;
}

@Override
public void unparse(SqlWriter sqlWriter, int i, int i1) {
sqlWriter.keyword("run");
sqlWriter.keyword("example");
sqlWriter.print("\n");
sqlWriter.keyword(exampleString);
}

@Override
public void validate(SqlValidator sqlValidator, SqlValidatorScope sqlValidatorScope) {
System.out.println("validate");
}

@Override
public <R> R accept(SqlVisitor<R> sqlVisitor) {
System.out.println("validate");
return null;
}

@Override
public boolean equalsDeep(SqlNode sqlNode, Litmus litmus) {
System.out.println("equalsDeep");
return false;
}
}

修改config.fmpp

1
2
3
4
找到package: "org.apache.calcite.sql.parser.impl"
修改下方class,替换成自己的类名ExampleSqlParserImpl
Flink也是这样处理FlinkSqlParserImpl
class: "ExampleSqlParserImpl"

修改Parser.jj文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
在import导入处,添加自定义SqlNode解析类的引入
import org.apache.calcite.sql.SQLExample;

在处理代码中加入解析逻辑,我是加在SqlStmtEof()后
SqlNode SqlExample() :
{
SqlNode stringNode;
}
{
<RUN> <EXAMPLE>
stringNode = StringLiteral()
{
return new SqlExample(getPos(), token.image);
}
}

找到声明语句的方法SqlNode SqlStmt() :
适当位置加入
|
stmt = SqlExample()

在<DEFAULT, DQID, BTID> TOKEN :处加入关键字
| < RUN: "RUN">
| < EXAMPLE: "EXAMPLE">

编译

1
mvn clean compile

测试代码编写

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
package org.example.devlop;

import org.apache.calcite.avatica.util.Casing;
import org.apache.calcite.avatica.util.Quoting;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.parser.SqlParser;
import org.apache.calcite.sql.parser.impl.ExampleSqlParserImpl;
import org.apache.calcite.tools.FrameworkConfig;
import org.apache.calcite.tools.Frameworks;

/**
* @author XiaShuai on 2020/7/3.
*/
public class ExampleParser {
public static void main(String[] args) {
FrameworkConfig config = Frameworks.newConfigBuilder()
.parserConfig(SqlParser.configBuilder()
.setParserFactory(ExampleSqlParserImpl.FACTORY)
// 设置大小写是否敏感
.setCaseSensitive(false)
// 设置应用标识,mysql是``
.setQuoting(Quoting.BACK_TICK)
// Quoting策略,不变,变大写或变小写
.setQuotedCasing(Casing.TO_UPPER)
// 标识符没有被Quoting后的策略
.setUnquotedCasing(Casing.TO_UPPER)
.build())
.build();
String sql = "run example 'select ids, name from test where id < 5'";
SqlParser parser = SqlParser.create(sql, config.getParserConfig());
try {
SqlNode sqlNode = parser.parseStmt();
System.out.println(sqlNode.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}