前提条件
- JDK 1.8 或以上
- Maven 2.5 或以上
安装 SDK 依赖库
需要在项目中包含 SDK 库,进行 SDK 的使用。库提供以下两种获取方式:
- 获取源码自行编译并将 SDK 包部署到本地仓库,详见如何编译。
- 直接引用 Apache 仓库里的已有库。
<dependency>
<groupId>org.apache.inlong</groupId>
<artifactId>transform-sdk</artifactId>
<version>1.13.0</version>
</dependency>
具体样例
Transform 需求
- 将shenzhen地区的视频播放开始数据过滤出来,原始字段包括:
- event_time,事件时间
- zone,地区,可选值:[ shenzhen, shanghai, beijing ]
- video_id,视频ID
- username,用户名
- operation_type,操作类型,可选值[ start, end ]
- 原始测试数据,CSV 格式,竖线分隔
2024-05-09 20:00:01|shenzhen|1111|zhangsan|start
2024-05-09 20:00:02|shanghai|1111|lisi|start
2024-05-09 20:00:03|shanghai|1111|lisi|end
2024-05-09 20:00:04|shenzhen|1111|zhangsan|end
2024-05-09 20:00:05|beijing|1111|zhangsan|start
2024-05-09 20:00:06|beijing|1111|zhangsan|end
- 预期结果数据,KV格式
event_time=2024-05-09 20:00:02&zone=shanghai&video_id=1111&username=lisi&operation_type=start
Transform SDK 实现
配置源数据配置
// source
SourceInfo csvSource = new CsvSourceInfo("UTF-8", "|", "\\", null);
配置结果数据配置
// sink
SinkInfo kvSink = new KvSinkInfo("UTF-8", null);
执行转换逻辑
String transformSql = "select $1 event_time,$2 zone,$3 video_id,$4 username,$5 operation_type from source where $2='shenzhen' and $5='start' ";
TransformConfig config = new TransformConfig(csvSource, kvSink, transformSql);
构建SDK对象并执行转换
TransformProcessor processor = new TransformProcessor(config);
String srcString = "2024-05-09 20:00:01|shenzhen|1111|zhangsan|start\n"
+ "2024-05-09 20:00:02|shanghai|1111|lisi|start\n"
+ "2024-05-09 20:00:03|shanghai|1111|lisi|end\n"
+ "2024-05-09 20:00:04|shenzhen|1111|zhangsan|end\n"
+ "2024-05-09 20:00:05|beijing|1111|zhangsan|start\n"
+ "2024-05-09 20:00:06|beijing|1111|zhangsan|end";
List<String> outputs = processor.transform("2024-04-28 00:00:00|ok", new HashMap<>());
// Expected outcome:[event_time=2024-05-09 20:00:02&zone=shanghai&video_id=1111&username=lisi&operation_type=start]
System.out.println(outputs);
更多 Transform 样例
- 请详见 更多样例。