FlinkSQL修改Calcite解析

从0到1实现修改FlinkSQL解析步骤,确定最终生效方式,入门可以参考之间的简易例子,传送门

源头模块

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
flink-table/flink-sql-parser
复制一份新的
像我这个时间点,用的模块版本是1.14-SNAPSHOT
对应的Calcite版本为1.26.0
复制calcite-core-1.26.0下codegen的templates文件夹到自己项目的codegen下(Parser.jj)

项目结构如下:
.
├── pom.xml
└── src
├── main
│   ├── codegen
│   │   ├── config.fmpp
│   │   ├── data
│   │   │   └── Parser.tdd
│   │   ├── includes
│   │   │   ├── compoundIdentifier.ftl
│   │   │   └── parserImpls.ftl
│   │   └── templates
│   │   └── Parser.jj
│   ├── java
│   │   └── org
│   │   └── apache
│   │   └── flink
│   │   ├── sql
│   │   │   └── parser
│   │   │   ├── ExtendedSqlNode.java
│   │   │   ├── SqlPartitionUtils.java
│   │   │   ├── SqlProperty.java
│   │   │   ├── ddl
│   │   │   │   ├── SqlAddJar.java
│   │   │   │   ├── SqlAddPartitions.java
│   │   │   │   ├── SqlAddReplaceColumns.java
│   │   │   │   ├── SqlAlterDatabase.java
│   │   │   │   ├── SqlAlterFunction.java
│   │   │   │   ├── SqlAlterTable.java
│   │   │   │   ├── SqlAlterTableAddConstraint.java
│   │   │   │   ├── SqlAlterTableDropConstraint.java
│   │   │   │   ├── SqlAlterTableOptions.java
│   │   │   │   ├── SqlAlterTableRename.java
│   │   │   │   ├── SqlAlterTableReset.java
│   │   │   │   ├── SqlAlterView.java
│   │   │   │   ├── SqlAlterViewAs.java
│   │   │   │   ├── SqlAlterViewProperties.java
│   │   │   │   ├── SqlAlterViewRename.java
│   │   │   │   ├── SqlChangeColumn.java
│   │   │   │   ├── SqlCreateCatalog.java
│   │   │   │   ├── SqlCreateDatabase.java
│   │   │   │   ├── SqlCreateFunction.java
│   │   │   │   ├── SqlCreateTable.java
│   │   │   │   ├── SqlCreateView.java
│   │   │   │   ├── SqlDropCatalog.java
│   │   │   │   ├── SqlDropDatabase.java
│   │   │   │   ├── SqlDropFunction.java
│   │   │   │   ├── SqlDropPartitions.java
│   │   │   │   ├── SqlDropTable.java
│   │   │   │   ├── SqlDropView.java
│   │   │   │   ├── SqlRemoveJar.java
│   │   │   │   ├── SqlReset.java
│   │   │   │   ├── SqlSet.java
│   │   │   │   ├── SqlTableColumn.java
│   │   │   │   ├── SqlTableLike.java
│   │   │   │   ├── SqlTableOption.java
│   │   │   │   ├── SqlUseCatalog.java
│   │   │   │   ├── SqlUseDatabase.java
│   │   │   │   ├── SqlUseModules.java
│   │   │   │   ├── SqlWatermark.java
│   │   │   │   └── constraint
│   │   │   │   ├── SqlConstraintEnforcement.java
│   │   │   │   ├── SqlTableConstraint.java
│   │   │   │   └── SqlUniqueSpec.java
│   │   │   ├── dml
│   │   │   │   ├── RichSqlInsert.java
│   │   │   │   ├── RichSqlInsertKeyword.java
│   │   │   │   ├── SqlBeginStatementSet.java
│   │   │   │   └── SqlEndStatementSet.java
│   │   │   ├── dql
│   │   │   │   ├── SqlDescribeCatalog.java
│   │   │   │   ├── SqlDescribeDatabase.java
│   │   │   │   ├── SqlLoadModule.java
│   │   │   │   ├── SqlRichDescribeTable.java
│   │   │   │   ├── SqlRichExplain.java
│   │   │   │   ├── SqlShowCatalogs.java
│   │   │   │   ├── SqlShowCreateTable.java
│   │   │   │   ├── SqlShowCurrentCatalog.java
│   │   │   │   ├── SqlShowCurrentDatabase.java
│   │   │   │   ├── SqlShowDatabases.java
│   │   │   │   ├── SqlShowFunctions.java
│   │   │   │   ├── SqlShowJars.java
│   │   │   │   ├── SqlShowModules.java
│   │   │   │   ├── SqlShowPartitions.java
│   │   │   │   ├── SqlShowTables.java
│   │   │   │   ├── SqlShowViews.java
│   │   │   │   └── SqlUnloadModule.java
│   │   │   ├── error
│   │   │   │   └── SqlValidateException.java
│   │   │   ├── package-info.java
│   │   │   ├── type
│   │   │   │   ├── ExtendedSqlCollectionTypeNameSpec.java
│   │   │   │   ├── ExtendedSqlRowTypeNameSpec.java
│   │   │   │   ├── SqlMapTypeNameSpec.java
│   │   │   │   ├── SqlRawTypeNameSpec.java
│   │   │   │   └── SqlTimestampLtzTypeNameSpec.java
│   │   │   ├── utils
│   │   │   │   └── ParserResource.java
│   │   │   └── validate
│   │   │   └── FlinkSqlConformance.java
│   │   └── table
│   │   └── calcite
│   │   └── ExtendedRelTypeFactory.java
│   └── resources
│   └── org.apache.flink.sql.parser.utils
│   └── ParserResource.properties
└── test
└── java
└── org
└── apache
└── flink
└── sql
└── parser
├── CreateTableLikeTest.java
├── Fixture.java
├── FlinkDDLDataTypeTest.java
├── FlinkSqlParserImplTest.java
├── FlinkSqlUnParserTest.java
├── TableApiIdentifierParsingTest.java
└── TestRelDataTypeFactory.java

开始修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
# 让我们从简单开始入门,实现一个和Order By功能一致的关键词Sorted By
# 从Flink模块代码中可以发现,OrderBy解析并不是Flink自带的,而是复用的Calcite
# 在Parser.jj文件找到SqlNodeList OrderBy(boolean accept),复制OrderBy与OrderItem方法,并改名
SqlNodeList SortedBy(boolean accept) :
{
List<SqlNode> list;
SqlNode e;
final Span s;
}
{
<SORTED> {
s = span();
if (!accept) {
// 此处抛出异常,可以重构CalciteResource抽象类进行异常值定义
throw SqlUtil.newContextException(s.pos(), RESOURCE.illegalOrderBy());
}
}
<BY> e = SortedItem() {
list = startList(e);
}
(
LOOKAHEAD(2) <COMMA> e = SortedItem() { list.add(e); }
)*
{
return new SqlNodeList(list, s.addAll(list).pos());
}
}

SqlNode SortedItem() :
{
SqlNode e;
}
{
e = Expression(ExprContext.ACCEPT_SUB_QUERY)
(
<ASC>
| <DESC> {
e = SqlStdOperatorTable.DESC.createCall(getPos(), e);
}
)?
(
LOOKAHEAD(2)
<NULLS> <FIRST> {
e = SqlStdOperatorTable.NULLS_FIRST.createCall(getPos(), e);
}
|
<NULLS> <LAST> {
e = SqlStdOperatorTable.NULLS_LAST.createCall(getPos(), e);
}
)?
{
return e;
}
}

# 还是Parser.jj文件,找到| < SOME: "SOME" >,在它后面添加
| < SOME: "SOME" >
| < SORTED: "SORTED" >

# 改完重新生成一份试试
mvn clean package

# 查看FlinkSqlParserImpl类是可以发现该函数已经被生成了
# 接下来写个测试实例试下