背景

在介绍调度模型之前,首先了解一下otter系统要解决的异地机房的网络环境.

  • 中美网络延迟 (平均200ms)
  • 中美传输速度 (2~6MB/s)
    网络因素是一个很重要的问题,

    a. 比如中美网络延迟RTT,平均为200ms,这直接会影响整个系统的架构设计。

试想一下,发送一条binlog一次RTT 200ms,那是否意味着单线程1秒钟只能发送5条。不过tcp在解决这类问题时,有自己的一套优化算法,叫做滑动窗口,它发送数据可能一次性发了10条,然后一起等返回结果,这样可以提升传输效率,但始终无法满足1秒传输1w+记录的需求。这也就决定了,需要对发送的binlog做批处理,一次性发送尽可能多的数据,然后一起等结果.

  1. b . 比如中美单socket的带宽只有26MB,这直接会影响整个系统的架构设计。

试想一下,假定一binlog平均1kb,那6MB最多只有6000条数据,也就意味着最大的同步tps只有6000? 而且很多业务的mysql都是共享,也就是6000个binlog对象中,可能只有三分之一或者四分之一是某个业务的,这肯定是无法满足要求的。所以,基于带宽的问题,决定otter架构必须是双节点部署,在杭州一个节点,美国一个节点,杭州这边对数据做加速同步处理,然后快速传递到美国.

基于这两个因素,决定了: batch处理 + 双节点部署的架构.

调度模型

Otter调度模型 - 图1

在正式介绍otter调度模型之前,我们首先得了解TCP/IP协议在解决此类"差网络"环境的一些处理方案,从中借鉴相应的方案.

Nagle算法

通过Canal解决Nagle算法,Canal之前是做为otter的一个子项目,为解决otter的数据增量获取的机制,并为otter项目的特点而量身打造了几个feature.

Canal的处理:

a. 构建RingBuffer (可以基于内存控制模式/数量控制模式)

b. 允许客户端指定batchSize获取

  1. i. 内存大小
  2. ii. 记录数

c. 指定定batchSize + timeout获取

  1. i. timeout = -1 ,即时获取,有多少取多少
  2. ii. timeout = 0,阻塞至满足batchSize条件
  3. iii. timeout > 0,阻塞指定的时间或者满足batchSize.

建议值:batchSize=4000(约4M) , timeout=500,内存控制模式

滑动窗口

Otter调度模型 - 图2

说明:

  • otter通过select模块串行获取canal的批数据,注意是串行获取,每批次获取到的数据,就会有一个全局标识,otter里称之为processId.
  • select模块获取到数据后,将其传递给后续的ETL模型. 这里E和T模块会是一个并行处理
  • 将数据最后传递到Load时,会根据每批数据对应的processId,按照顺序进行串行加载。 ( 比如有一个processId=2的数据先到了Load模块,但会阻塞等processId=1的数据Load完成后才会被执行)
    简单一点说,Select/Load模块会是一个串行机制来保证binlog处理的顺序性,Extract/Transform会是一个并行,加速传输效率.

并行度

类似于tcp滑动窗口大小,比如整个滑动窗口设置了并行度为5时,只有等第一个processId Load完成后,第6个Select才会去获取数据。

数据可靠性

  • 如何保证数据不丢:2pc. (get/ack)
  • 如何处理重传协议:get/ack/rollback
  • 如何支持并行化:多get cursor+ack curosr (可以参看Canal的异步ACK模型)
    Otter调度模型 - 图3

编程模型抽象(SEDA模型)

Otter调度模型 - 图4

说明: 将并行化调度的串行/并行处理,进行隐藏,抽象了await/single的接口,整个调度称之为仲裁器。(有了这层抽象,不同的仲裁器实现可以解决同机房,异地机房的同步需求)

模型接口:

  • await模拟object获取锁操作
  • notify被唤醒后提交任务到thread pools
  • single模拟object释放锁操作,触发下一个stage
    这里使用了SEDA模型的优势:
  • 共享thread pool,解决流控机制
  • 划分多stage,提升资源利用率
  • 统一编程模型,支持同机房,跨机房不同的调度算法

仲裁器算法

主要包括: 令牌生成(processId) + 事件通知.

令牌生成:

  • 基于AtomicLong.inc()机制,(纯内存机制,解决同机房,单节点同步需求,不需要多节点交互)
  • 基于zookeeper的自增id机制,(解决异地机房,多节点协作同步需求)
    事件通知: (简单原理: 每个stage都会有个block queue,接收上一个stage的single信号通知,当前stage会阻塞在该block queue上,直到有信号通知)

  • block queue + put/take方法,(纯内存机制)

  • block queue + rpc + put/take方法 (两个stage对应的node不同,需要rpc调用,需要依赖负载均衡算法解决node节点的选择问题)
  • block queue + zookeeper watcher ()
    负载均衡算法:

  • Stick : 类似于session stick技术,一旦第一次选择了node,下一次选择会继续使用该node. (有一个好处,资源上下文缓存命中率高)

  • Random : 随机算法
  • RoundRbin : 轮询算法
    注意点:每个node节点,都会在zookeeper中生成Ephemeral节点,每个node都会缓存住当前存活的node列表,node节点消失,通过zookeeper watcher机制刷新每个node机器的内存。然后针对每次负载均衡选择时只针对当前存活的节点,保证调度的可靠性。

调度算法成本估算

中美网络RTT = 200ms , zookeeper一次写入=10ms

调度成本估算:

  1. a. zookeeper + zookeeper watch (完全分布式)
  2. 10 * 4 + 200 * 2 + 200 = 640ms
  3. b. zookeeper + rpc (sticky分布式,尽可能选择同节点)
  4. 10 + 100 + 200 = 310ms

c. memory + memory (内存调度,单机房)

  1. 0ms

d. memory + rpc (跨机房调度,最优实现,待完成??)

  1. 0 + 100 + 100 = 200ms

数据传输

有了一层SEDA调度模型的抽象,S/E/T/L模块之间互不感知,那几个模块之间的数据传递,需要有一个机制来处理,这里抽象了一个pipe(管道)的概念.

原理:

stage | pipe | stage

基于pipe实现:

  • in memory (两个stage经过仲裁器调度算法选择为同一个node时,直接使用内存传输)
  • rpc call (<1MB)
  • file(gzip) + http多线程下载
    在pipe中,通过对数据进行TTL控制,解决TCP协议中的丢包问题控制.

原文: https://github.com/alibaba/otter/wiki/Otter%E8%B0%83%E5%BA%A6%E6%A8%A1%E5%9E%8B