基本概念

下面是开发过程中需要了解的一些基本概念:

  • Transform SQL 函数,包括算术函数(如 abs , power ),时间函数(如 localtime 、 date_format ),字符串函数(如 locate 、 translate )等,函数一般都有一至多个参数,作用是对输入的数据进行某种转换操作,然后输出转换后的结果。
  • Transform SQL 解析器,主要有两类解析器,一种是针对类型的解析器类,作用是将原始数据转换为对应的类型对象,如 DateParser 可以将输入数据转换为 Java 中的 Date 对象,便于实现进一步的转换操作;另一种是针对计算表达式的解析器类,作用是对原始数据转换后再进行一定的计算操作,输出计算结果(类似一种函数),如 AdditionParser 可以解析 SQL 语句中类似 a + b 的部分,输出相应的输出结果。
  • Transform SQL 操作符,主要是一些逻辑操作符,如( and 、 or 、 not )等,实现一些逻辑判断操作,输出结果是一个布尔值。

函数开发

该部分介绍了如何拓展一个新的函数。

创建函数类文件

函数实现类存放在该目录下。确定好您想要拓展的函数后,在该目录下新建一个类,类名由 函数名 + Function 组成,如 AbsFunction

基础代码框架搭建

创建好类后,构建代码的基础框架,以 AbsFunction 为例:

  1. /**
  2. * AbsFunction
  3. * description: abs(numeric)--returns the absolute value of numeric
  4. */
  5. @TransformFunction(names = {"abs"})
  6. public class AbsFunction implements ValueParser {
  7. @Override
  8. public Object parse(SourceData sourceData, int rowIndex, Context context) {
  9. }
  10. }

为函数添加对应的类注释和 @TransformFunction 注解,函数需要实现 ValueParser 接口,重写该接口中的 parse 方法。

添加构造函数和 ValueParser 对象

在函数中添加有参构造函数及相关的 ValueParser 成员变量,在构造函数中解析函数表达式,初始化参数解析器对象,以 AbsFunction 为例:

  1. private ValueParser numberParser;
  2. public AbsFunction(Function expr) {
  3. numberParser = OperatorTools.buildParser(expr.getParameters().getExpressions().get(0));
  4. }

ValueParser 对象数量与函数参数个数相同。

实现函数逻辑

重写 parse 方法,解析参数并实现函数逻辑,计算函数返回值,以 AbsFunction 为例:

  1. @Override
  2. public Object parse(SourceData sourceData, int rowIndex, Context context) {
  3. Object numberObj = numberParser.parse(sourceData, rowIndex, context);
  4. BigDecimal numberValue = OperatorTools.parseBigDecimal(numberObj);
  5. return numberValue.abs();
  6. }

添加单元测试代码

每个函数都需要通过单元测试来验证函数逻辑是否正确,单元测试类位于该目录下。每个函数的所有单元测试函数均放在同一个单元测试类中,单元测试类以 Test + 函数名 + Function 的格式进行命名,以 testAbsFunction() 为例:

  1. @Test
  2. public void testAbsFunction() throws Exception {
  3. String transformSql = "select abs(numeric1) from source";
  4. TransformConfig config = new TransformConfig(transformSql);
  5. // case1: |2|
  6. TransformProcessor<String, String> processor = TransformProcessor
  7. .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
  8. SinkEncoderFactory.createKvEncoder(kvSink));
  9. List<String> output1 = processor.transform("2|4|6|8", new HashMap<>());
  10. Assert.assertEquals(1, output1.size());
  11. Assert.assertEquals(output1.get(0), "result=2");
  12. // case2: |-4.25|
  13. List<String> output2 = processor.transform("-4.25|4|6|8", new HashMap<>());
  14. Assert.assertEquals(1, output2.size());
  15. Assert.assertEquals(output2.get(0), "result=4.25");
  16. }

经过以上步骤,恭喜您完成了一个新函数的实现,可以向社区提交您的代码。AbsFunction 完整代码可见 代码链接

下面是一些注意事项:

  • 部分函数的参数可以为 NULL,在 parse 函数中注意对 NULL 对象的解析逻辑,防止出现 NullPointerException
  • @TransformFunction 注解中的函数名可以有多个,只要遵循各类数据库的命名规范即可
  • 部分函数的参数个数不固定,在构建 ValueParser 的过程中要防止出现 IndexOutOfBoundsException
  • 单元测试请尽可能覆盖多种情况,如使用不同的参数个数、参数设置为 NULL 等,以确保函数能够在不同情况下都能输出正确结果

解析器开发

该部分介绍了如何拓展一个新的解析器类。

创建解析器类文件

解析器存放在该目录下。确定好您想要拓展的解析器后,在该目录下新建一个类,类名由 类型 + Parser 组成,如 AdditionParser

基础代码框架搭建

创建好类后,构建代码的基础框架,以 AdditionParser 为例:

  1. /**
  2. * description: calcute a + b
  3. */
  4. @TransformParser(values = Addition.class)
  5. public class AdditionParser implements ValueParser {
  6. @Override
  7. public Object parse(SourceData sourceData, int rowIndex, Context context) {
  8. }
  9. }

为解析器类添加对应的 @TransformParser 注解,类型解析器类需要实现 ValueParser 接口,重写该接口中的 parse 方法。

添加构造函数和相关成员变量

在解析器类中添加有参构造函数及相关的成员变量,在构造函数中解析输入的表达式,将其转换为对应的类型对象,以 AdditionParser 为例:

  1. private final ValueParser left;
  2. private final ValueParser right;
  3. public AdditionParser(Addition expr) {
  4. this.left = OperatorTools.buildParser(expr.getLeftExpression());
  5. this.right = OperatorTools.buildParser(expr.getRightExpression());
  6. }

实现解析逻辑

重写 parse 方法,如果解析器需要对上一步中解析得到的类型对象进行进一步处理,可以在此方法中实现对应的处理逻辑,否则直接返回上一步中解析得到的类型对象即可,以 AdditionParser 为例:

  1. @Override
  2. public Object parse(SourceData sourceData, int rowIndex, Context context) {
  3. if (this.left instanceof IntervalParser && this.right instanceof IntervalParser) {
  4. return null;
  5. } else if (this.left instanceof IntervalParser || this.right instanceof IntervalParser) {
  6. IntervalParser intervalParser = null;
  7. ValueParser dateParser = null;
  8. if (this.left instanceof IntervalParser) {
  9. intervalParser = (IntervalParser) this.left;
  10. dateParser = this.right;
  11. } else {
  12. intervalParser = (IntervalParser) this.right;
  13. dateParser = this.left;
  14. }
  15. Object intervalPairObj = intervalParser.parse(sourceData, rowIndex, context);
  16. Object dateObj = dateParser.parse(sourceData, rowIndex, context);
  17. if (intervalPairObj == null || dateObj == null) {
  18. return null;
  19. }
  20. return DateUtil.dateAdd(OperatorTools.parseString(dateObj),
  21. (Pair<Integer, Map<ChronoField, Long>>) intervalPairObj, 1);
  22. } else {
  23. return numericalOperation(sourceData, rowIndex, context);
  24. }
  25. }

添加单元测试代码

每个解析器类都需要通过单元测试来验证逻辑是否正确,单元测试类位于该目录下。每个解析器的所有单元测试函数均放在同一个单元测试类中,单元测试类以 Test + 解析器名 + Parser 的格式进行命名,以 TestAdditionParser 为例:

  1. @Test
  2. public void testAdditionParser() throws Exception {
  3. String transformSql = null;
  4. TransformConfig config = null;
  5. TransformProcessor<String, String> processor = null;
  6. List<String> output = null;
  7. transformSql = "select numeric1 + numeric2 from source";
  8. config = new TransformConfig(transformSql);
  9. processor = TransformProcessor
  10. .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
  11. SinkEncoderFactory.createKvEncoder(kvSink));
  12. // case1: 1 + 10
  13. output = processor.transform("1|10||||", new HashMap<>());
  14. Assert.assertEquals(1, output.size());
  15. Assert.assertEquals("result=11", output.get(0));
  16. }

经过以上步骤,恭喜您完成了一个新解析器类的实现,可以向社区提交您的代码。AdditionParser 完整代码可见 代码链接

逻辑操作符开发规范

该部分介绍了如何拓展一个新的逻辑操作符类。

创建逻辑操作符类文件

逻辑操作符类存放在该目录下。确定好您想要拓展的逻辑操作符后,在该目录下新建一个类,类名由 逻辑操作符名 + Parser 组成,如 AndOperator

基础代码框架搭建

创建好类后,构建代码的基础框架,以 AndOperator 为例:

  1. @TransformOperator(values = AndExpression.class)
  2. public class AndOperator implements ExpressionOperator {
  3. @Override
  4. public boolean check(SourceData sourceData, int rowIndex, Context context) {
  5. }
  6. }

为逻辑操作符类添加对应的 @TransformOperator 注解,操作符类需要实现 ExpressionOperator 接口,重写该接口中的 check 方法。

添加构造函数和相关成员变量

在类中添加带参数的构造函数及相关的成员变量,在构造函数中解析输入的表达式,将其构建 check 方法中判断逻辑所需的对象,以 AndOperator 为例:

  1. private final ExpressionOperator left;
  2. private final ExpressionOperator right;
  3. public AndOperator(AndExpression expr) {
  4. this.left = OperatorTools.buildOperator(expr.getLeftExpression());
  5. this.right = OperatorTools.buildOperator(expr.getRightExpression());
  6. }

实现判断逻辑

重写 check 方法,根据逻辑操作符的定义与上一步中解析得到的数据实现判断逻辑,输出判断结果(true或者false),以 AndOperator 为例:

  1. @Override
  2. public boolean check(SourceData sourceData, int rowIndex, Context context) {
  3. return OperatorTools.compareValue((Comparable) this.left.parse(sourceData, rowIndex, context),
  4. (Comparable) this.right.parse(sourceData, rowIndex, context)) > 0;
  5. }

添加单元测试代码

每个逻辑操作符类都需要通过单元测试来验证逻辑是否正确,单元测试类位于该目录下。每个逻辑操作符的所有单元测试函数均放在同一个单元测试类中,单元测试类以 Test + 逻辑操作符名 + Operator 的格式进行命名,以 TestAndOperator 为例:

  1. public void testAndOperator() throws Exception {
  2. String transformSql = "select if((string2 < 4) and (numeric4 > 5),1,0) from source";
  3. TransformConfig config = new TransformConfig(transformSql);
  4. // case1: "3.14159265358979323846|3a|4|4"
  5. TransformProcessor<String, String> processor = TransformProcessor
  6. .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
  7. SinkEncoderFactory.createKvEncoder(kvSink));
  8. List<String> output1 = processor.transform("3.14159265358979323846|3a|4|4");
  9. Assert.assertEquals(1, output1.size());
  10. Assert.assertEquals(output1.get(0), "result=0");
  11. // case2: "3.14159265358979323846|5|4|8"
  12. List<String> output2 = processor.transform("3.14159265358979323846|5|4|8");
  13. Assert.assertEquals(1, output1.size());
  14. Assert.assertEquals(output2.get(0), "result=0");
  15. // case3: "3.14159265358979323846|3|4|8"
  16. List<String> output3 = processor.transform("3.14159265358979323846|3|4|8");
  17. Assert.assertEquals(1, output1.size());
  18. Assert.assertEquals(output3.get(0), "result=1");
  19. transformSql = "select if((numeric3 < 4) and (numeric4 > 5),1,0) from source";
  20. config = new TransformConfig(transformSql);
  21. // case4: "3.14159265358979323846|4|4|8"
  22. processor = TransformProcessor
  23. .create(config, SourceDecoderFactory.createCsvDecoder(csvSource),
  24. SinkEncoderFactory.createKvEncoder(kvSink));
  25. List<String> output4 = processor.transform("3.14159265358979323846|4|4|8");
  26. Assert.assertEquals(1, output1.size());
  27. Assert.assertEquals(output4.get(0), "result=0");
  28. // case5: "3.14159265358979323846|4|3.2|4"
  29. List<String> output5 = processor.transform("3.14159265358979323846|4|3.2|4");
  30. Assert.assertEquals(1, output1.size());
  31. Assert.assertEquals(output5.get(0), "result=0");
  32. // case6: "3.14159265358979323846|4|3.2|8"
  33. List<String> output6 = processor.transform("3.14159265358979323846|4|3.2|8");
  34. Assert.assertEquals(1, output1.size());
  35. Assert.assertEquals(output6.get(0), "result=1");
  36. }

经过以上步骤,恭喜您完成了一个新逻辑操作符类的实现,可以向社区提交您的代码。AndOperator 完整代码可见 代码链接