背景
因为alibaba的特殊业务,比如:
- 同步数据同时,需要同步数据关联的文件 (需要做数据join)
- 同步会员数据,敏感字段信息不能同步到美国站点. (需要做数据过滤)
- 两地机房的数据库可能为异构数据库,(需要做字段类型,名字等转化.)
为解决这些业务,otter引入了映射规则这一概念,用于描述这样的一种同步业务的关系,其粒度可以精确到一张表,或者是一整个库.
映射规则
表映射
otter中每个pipeline可以设置多个映射规则,每个映射规则描述一个数据库同步的内容,比如源表是哪张,同步到哪张目标表。
权重的概念
可以先看下:Otter数据入库算法 , 因为otter采用了pk hash的并行载入算法,会将原先binlog中的事务进行打散做并行处理提升同步性能。原先事务被拆散后,通过这个权重概念,来提供“业务上类事务功能".
举个实际点的例子来看:
- 比如有两张表,product(记录产品的属性信息)和product_detail(记录产品的详情信息),product_detail上有个字段为product_id与product进行关联. 目前为1:1的关系
- 业务上插入数据可以通过事务,先插入product,然后插入product_detail,最后一起提交到数据库. 正常,页面上通过查询用户的product表的数据,发现有产品记录,然后通过product_id去查询product_detail表,查到后才显示产品页面
- 假如同步到目标库后,打散事务后,同步过程如果先插入了product表,后插入product_detail表,这时如果有用户访问该产品记录,可能就会遇到product_detail不存在的情况,从而导致页面出错.
- 所以,我们通过权重定义,比如将product_detail权重定义为5,将product定义为10。 otter会优先同步权重低的表数据,最终可以保证查询product有数据后,product_detail一定有数据,避免业务出错.
视图映射
如何进入视图编辑:
点击下一步后,进入视图编辑页面:
说明:
- 映射规则配置页面,可以选择视图模式为:include或exclude,代表正向匹配或者逆向排除.
- 视图配置页面,只支持存在的数据表(因为要获取数据表结构,所以.*等正则的数据表不支持配置该功能)
视图配置列表,左右选中列表会按照顺序进行对应,做映射时需按照顺序进行选择.
举个例子:如果要排除表字段A的同步,则只需要选择为exclude模式,然后视图编辑页面选择左右皆选择A字段即可,点击保存.
字段组映射
首先解释一下,需要字段组同步的需求.
- 文件同步. 一条记录对应的图片,可能会有一个或者多个字段,比如会有image_path,image_version来决定图片,所以我们可以定义这两个字段为一组,只要满足组内任意一个字段的变更,就会认为需要文件同步.
- 数据上的组同步,比如国家,省份,城市,可能在数据库为三个字段. 如果是双A同步,两地同时修改这些字段,但业务上可能在A地修改了国家为美国,在B地修改为省份为浙江,然后一同步,最终就会变成美国,浙江这样的情况. 这种情况可以通过group来解决,将国家,省份,城市做一个group,组内任何一个字段发生了变更,其余字段会做为整体一起变更.
再来看一下配置:(点击视图编辑页面的下一步,即可进入)
说明:
- 也可不配置视图,单独配置字段组,此时可选择的字段即为当前所有字段(映射规则按照同名映射).
高级映射
主要分为两类:
- 文件同步
自定义数据同步
具体代码扩展方式和配置可参见: Otter扩展性配置方式:
文件同步
首先解释一下文件同步的需求,阿里巴巴国际站业务,主要模式为对外贸易,卖家基本在国内,买家在国外. 所以,目前我们的机房部署为杭州和美国各一个,卖家访问杭州机房,买家访问美国机房。卖家在国内发布产品和图片,通过otter同步到美国,同步产品数据记录的同时,同样需要把图片同步过去,保证买家用户的访问体验. 所以,基于这个业务场景,衍生出了文件同步的需求.
所以,做文件同步的几个前提:
- 异地同步 (需要部署为两个node,S/E和T/L分为两地. )
- 数据触发文件同步 (数据库记录做为类似文件索引信息,不支持单独同步文件)
本地文件同步 (需要同步的文件需要和node部署在一台机器上或者通过nfs mount,如果要同步 公司自带的分布式文件系统的数据,otter需要做额外扩展)
文件同步准备工作:准备两台机器,分别部署上两个node
- 配置channel/pipeline同步,配置映射规则
- 编写FileResolver解析类,根据binlog中的变更数据,转化为文件的路径信息. 例子:TestFileResolver
- public class TestFileResolver extends AbstractFileResolver {
public FileInfo[] getFileInfo(Map<String, String> rowMap) {
// 基本步骤:
// 1. 获取binlog中的变更字段,比如组成文件有多个字段组成version+path
// 2. 基于字段内容,构造一个文件路径,目前开源版本只支持本地文件的同步.(如果是网络文件,建议进行NFS mount到ndde机器).
// 3. 返回FileInfo数组,(目前不支持目录同步,如果是目录需要展开为多个FileInfo的子文件),如果不需要同步,则返回null.
String path = rowMap.get("FIELD"); //注意为大写
FileInfo fileInfo = null;
if (StringUtils.isNotEmpty(path)) {
fileInfo = new FileInfo(path);
return new FileInfo[] { fileInfo };
} else {
return null;
}
}
}
自定义数据同步
通过前面的字段视图映射,或许可以解决80%的需求,但总会有一小撮的特殊用户,希望可以自定义自己的同步数据内容,所以otter引入了自定义数据同步为EventProcessor,允许你任意改变整个同步过程中的数据内容.
可以支持的需求:
- 根据字段内容,判断是否需要屏蔽本记录同步
- 动态增加/减少字段
- 动态修改字段内容
动态改变事件类型(Insert/Update/Delete)
几点注意:EventProcessor主要是在E模块进行数据处理,也就是EventProcessor处理后的数据,会再次经过视图配置,字段组映射,文件同步,最后进入Transform处理.
- EventProcessor修改数据中的schema/table name需要谨慎,因为会继续后续的E/T/L流程,所以需要保证修改后的name在映射规则列表里有配置,否则同步会出错.
一个例子:(比如我想将源库的每条binlog变更,记录到一个日志表binlog,映射规则配置为.*所有表的同步)
- public class TestEventProcessor extends AbstractEventProcessor {
public boolean process(EventData eventData) {
// 基本步骤:
// 1. 获取binlog中的变更字段
// 2. 根据业务逻辑进行判断,如果需要忽略本条数据同步,直接返回false,否则返回true
// 3. 根据业务逻辑进行逻辑转化,比如可以修改整个EventData数据.
// 本文例子:源库的每条binlog变更,记录到一个日志表binlog
// create table test.binlog(
// id bigint(20) auto_increment,
// oschema varchar(256),
// otable varchar(256),
// gtime varchar(32)
// ovalue text,
// primary key(id);
// )
// 在process处理中,可以修改EventData的任何数据,达到数据转换的效果, just have fun.
JSONObject col = new JSONObject();
JSONArray array = new JSONArray();
for (EventColumn column : eventData.getColumns()) {
JSONObject obj = this.doColumn(column);
array.add(obj);
}
for (EventColumn key : eventData.getKeys()) {
JSONObject obj = this.doColumn(key);
array.add(obj);
}
// 记录原始的表信息
col.put("schema", eventData.getSchemaName());
col.put("table", eventData.getTableName());
col.put("columns", array);
col.put("dml", eventData.getEventType());
col.put("exectime", eventData.getExecuteTime());
// 构造新的主键
EventColumn id = new EventColumn();
id.setColumnValue(eventData.getSchemaName());
id.setColumnType(Types.BIGINT);
id.setColumnName("id");
// 构造新的字段
EventColumn schema = new EventColumn();
schema.setColumnValue(eventData.getSchemaName());
schema.setColumnType(Types.VARCHAR);
schema.setColumnName("oschema");
EventColumn table = new EventColumn();
table.setColumnValue(eventData.getTableName());
table.setColumnType(Types.VARCHAR);
table.setColumnName("otable");
EventColumn ovalue = new EventColumn();
ovalue.setColumnValue(col.toJSONString());
ovalue.setColumnType(Types.VARCHAR);
ovalue.setColumnName("ovalue");
EventColumn gtime = new EventColumn();
gtime.setColumnValue(eventData.getExecuteTime() + "");
gtime.setColumnType(Types.VARCHAR);
gtime.setColumnName("gtime");
// 替换为新的字段和主键信息
List<EventColumn> cols = new ArrayList<EventColumn>();
cols.add(schema);
cols.add(table);
cols.add(gtime);
cols.add(ovalue);
eventData.setColumns(cols);
List<EventColumn> keys = new ArrayList<EventColumn>();
keys.add(id);
eventData.setKeys(keys);
//修改数据meta信息
eventData.setEventType(EventType.INSERT);
eventData.setSchemaName("test");
eventData.setTableName("binlog");
return true;
}
private JSONObject doColumn(EventColumn column) {
JSONObject obj = new JSONObject();
obj.put("name", column.getColumnName());
obj.put("update", column.isUpdate());
obj.put("key", column.isKey());
if (column.getColumnType() != Types.BLOB && column.getColumnType() != Types.CLOB) {
obj.put("value", column.getColumnValue());
} else {
obj.put("value", "");
}
return obj;
}
}
原文: https://github.com/alibaba/otter/wiki/%E6%98%A0%E5%B0%84%E8%A7%84%E5%88%99%E9%85%8D%E7%BD%AE