从0到1实现修改FlinkSQL解析步骤,确定最终生效方式,入门可以参考之间的简易例子,传送门
源头模块
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127
| flink-table/flink-sql-parser 复制一份新的 像我这个时间点,用的模块版本是1.14-SNAPSHOT 对应的Calcite版本为1.26.0 复制calcite-core-1.26.0下codegen的templates文件夹到自己项目的codegen下(Parser.jj)
项目结构如下: . ├── pom.xml └── src ├── main │ ├── codegen │ │ ├── config.fmpp │ │ ├── data │ │ │ └── Parser.tdd │ │ ├── includes │ │ │ ├── compoundIdentifier.ftl │ │ │ └── parserImpls.ftl │ │ └── templates │ │ └── Parser.jj │ ├── java │ │ └── org │ │ └── apache │ │ └── flink │ │ ├── sql │ │ │ └── parser │ │ │ ├── ExtendedSqlNode.java │ │ │ ├── SqlPartitionUtils.java │ │ │ ├── SqlProperty.java │ │ │ ├── ddl │ │ │ │ ├── SqlAddJar.java │ │ │ │ ├── SqlAddPartitions.java │ │ │ │ ├── SqlAddReplaceColumns.java │ │ │ │ ├── SqlAlterDatabase.java │ │ │ │ ├── SqlAlterFunction.java │ │ │ │ ├── SqlAlterTable.java │ │ │ │ ├── SqlAlterTableAddConstraint.java │ │ │ │ ├── SqlAlterTableDropConstraint.java │ │ │ │ ├── SqlAlterTableOptions.java │ │ │ │ ├── SqlAlterTableRename.java │ │ │ │ ├── SqlAlterTableReset.java │ │ │ │ ├── SqlAlterView.java │ │ │ │ ├── SqlAlterViewAs.java │ │ │ │ ├── SqlAlterViewProperties.java │ │ │ │ ├── SqlAlterViewRename.java │ │ │ │ ├── SqlChangeColumn.java │ │ │ │ ├── SqlCreateCatalog.java │ │ │ │ ├── SqlCreateDatabase.java │ │ │ │ ├── SqlCreateFunction.java │ │ │ │ ├── SqlCreateTable.java │ │ │ │ ├── SqlCreateView.java │ │ │ │ ├── SqlDropCatalog.java │ │ │ │ ├── SqlDropDatabase.java │ │ │ │ ├── SqlDropFunction.java │ │ │ │ ├── SqlDropPartitions.java │ │ │ │ ├── SqlDropTable.java │ │ │ │ ├── SqlDropView.java │ │ │ │ ├── SqlRemoveJar.java │ │ │ │ ├── SqlReset.java │ │ │ │ ├── SqlSet.java │ │ │ │ ├── SqlTableColumn.java │ │ │ │ ├── SqlTableLike.java │ │ │ │ ├── SqlTableOption.java │ │ │ │ ├── SqlUseCatalog.java │ │ │ │ ├── SqlUseDatabase.java │ │ │ │ ├── SqlUseModules.java │ │ │ │ ├── SqlWatermark.java │ │ │ │ └── constraint │ │ │ │ ├── SqlConstraintEnforcement.java │ │ │ │ ├── SqlTableConstraint.java │ │ │ │ └── SqlUniqueSpec.java │ │ │ ├── dml │ │ │ │ ├── RichSqlInsert.java │ │ │ │ ├── RichSqlInsertKeyword.java │ │ │ │ ├── SqlBeginStatementSet.java │ │ │ │ └── SqlEndStatementSet.java │ │ │ ├── dql │ │ │ │ ├── SqlDescribeCatalog.java │ │ │ │ ├── SqlDescribeDatabase.java │ │ │ │ ├── SqlLoadModule.java │ │ │ │ ├── SqlRichDescribeTable.java │ │ │ │ ├── SqlRichExplain.java │ │ │ │ ├── SqlShowCatalogs.java │ │ │ │ ├── SqlShowCreateTable.java │ │ │ │ ├── SqlShowCurrentCatalog.java │ │ │ │ ├── SqlShowCurrentDatabase.java │ │ │ │ ├── SqlShowDatabases.java │ │ │ │ ├── SqlShowFunctions.java │ │ │ │ ├── SqlShowJars.java │ │ │ │ ├── SqlShowModules.java │ │ │ │ ├── SqlShowPartitions.java │ │ │ │ ├── SqlShowTables.java │ │ │ │ ├── SqlShowViews.java │ │ │ │ └── SqlUnloadModule.java │ │ │ ├── error │ │ │ │ └── SqlValidateException.java │ │ │ ├── package-info.java │ │ │ ├── type │ │ │ │ ├── ExtendedSqlCollectionTypeNameSpec.java │ │ │ │ ├── ExtendedSqlRowTypeNameSpec.java │ │ │ │ ├── SqlMapTypeNameSpec.java │ │ │ │ ├── SqlRawTypeNameSpec.java │ │ │ │ └── SqlTimestampLtzTypeNameSpec.java │ │ │ ├── utils │ │ │ │ └── ParserResource.java │ │ │ └── validate │ │ │ └── FlinkSqlConformance.java │ │ └── table │ │ └── calcite │ │ └── ExtendedRelTypeFactory.java │ └── resources │ └── org.apache.flink.sql.parser.utils │ └── ParserResource.properties └── test └── java └── org └── apache └── flink └── sql └── parser ├── CreateTableLikeTest.java ├── Fixture.java ├── FlinkDDLDataTypeTest.java ├── FlinkSqlParserImplTest.java ├── FlinkSqlUnParserTest.java ├── TableApiIdentifierParsingTest.java └── TestRelDataTypeFactory.java
|
开始修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65
| # 让我们从简单开始入门,实现一个和Order By功能一致的关键词Sorted By # 从Flink模块代码中可以发现,OrderBy解析并不是Flink自带的,而是复用的Calcite # 在Parser.jj文件找到SqlNodeList OrderBy(boolean accept),复制OrderBy与OrderItem方法,并改名 SqlNodeList SortedBy(boolean accept) : { List<SqlNode> list; SqlNode e; final Span s; } { <SORTED> { s = span(); if (!accept) { // 此处抛出异常,可以重构CalciteResource抽象类进行异常值定义 throw SqlUtil.newContextException(s.pos(), RESOURCE.illegalOrderBy()); } } <BY> e = SortedItem() { list = startList(e); } ( LOOKAHEAD(2) <COMMA> e = SortedItem() { list.add(e); } )* { return new SqlNodeList(list, s.addAll(list).pos()); } }
SqlNode SortedItem() : { SqlNode e; } { e = Expression(ExprContext.ACCEPT_SUB_QUERY) ( <ASC> | <DESC> { e = SqlStdOperatorTable.DESC.createCall(getPos(), e); } )? ( LOOKAHEAD(2) <NULLS> <FIRST> { e = SqlStdOperatorTable.NULLS_FIRST.createCall(getPos(), e); } | <NULLS> <LAST> { e = SqlStdOperatorTable.NULLS_LAST.createCall(getPos(), e); } )? { return e; } }
# 还是Parser.jj文件,找到| < SOME: "SOME" >,在它后面添加 | < SOME: "SOME" > | < SORTED: "SORTED" >
# 改完重新生成一份试试 mvn clean package
# 查看FlinkSqlParserImpl类是可以发现该函数已经被生成了 # 接下来写个测试实例试下
|