Angel 中的计算图

1. 什么是计算图

计算图是主流深度学习框架普遍采用的, 如Tensorflow, Caffe和Mxnet等. 事实上, Spark这样的大数据处理工具也是用计算图来调度任务的. 为了更好地支持深度学习算法, Angel也支持了计算图框架. 与Tensorflow等相比, Angel的计算图更轻量, 主要表现在:

  • 粗粒度: Angel的计算图中的节点是层(layer), 而不是操作(operator). Tensorflow等使用操作作为图中的结点, 十分灵活, 适合二次开发(封装), 但也给机器学习算发开发者带来更陡的学习曲线与更大的工作量, 因此老版本的Tensorflow也一直被诟病”API太底层,开发效率低”, 后来的Tensorflow版本才提供基于层(layer)的高级API. 鉴于这一点, Angel只提供粗粒度的计算图.
  • 特征交叉: 对于推荐系统相关算法, 特征Embedding后往往要通过一些交叉(:这里的特征交叉与特征工程中的人工交叉不同, 这里的特征交叉是通过Embedding的输出做特定的操作)处理后再输入DNN. 这些特征交叉在Tensorflow, Caffe, Torch等上实现比较繁锁, 在Angel上则直接提供了这种特征交叉层.
  • 自动生成网络: Angel可以读取Json文件生成深度网络. 这一点是借鉴的Caffe, 用户可以不编写代码而生成自已的网络, 大大地减轻工作量.

需要指出的是, Angel目前不支持CNN, RNN等, 只关注推荐领域的常用算法.

2. 计算图的构建

2.1 层的基本结构

要了解计算图是怎样构建的, 先要了解其组成元素Layer的结构(关于层更详细的信息, 请参考Angel中的层), 如下:

  1. abstract class Layer(val name: String, val outputDim: Int)(implicit val graph: AngelGraph)
  2. extends Serializable {
  3. var status: STATUS.Value = STATUS.Null
  4. val input = new ListBuffer[Layer]()
  5. val consumer = new ListBuffer[Layer]()
  6. def addInput(layer: Layer): Unit = {
  7. input.append(layer)
  8. }
  9. def addConsumer(layer: Layer): Unit = {
  10. consumer.append(layer)
  11. }
  12. def calOutput(): Matrix = ???
  13. def gatherGrad(): Matrix = ???
  14. }

这个抽象类已将层的大部分功能描述清楚, 具体如下:

  • status: Angel计算图中的节点是有状态的, 用一个状态机来处理, 具体在下一节中讲述
  • input: 用以记录本节点/层有输入, 用一个ListBuffer表示, 一个层可以有多个输入层, 可多次调用addInput(layer: Layer)加入
  • outputDim: 在Angel中最多只能有一个输出, outputDim用于指定输出的维度
  • consumer: 层虽然只有一个输出, 但输出结点可以被多次消费, 因此用ListBuffer表示. 在构建图时调用input层的addConsumer(layer: Layer)告诉输出层哪些层消费了它

事实上, 构建图的具体操作在inputlayer/linearlayer/joinlayer的基类中已完成, 用户自定义layer不必关心, 如下:

  1. abstract class InputLayer(name: String, outputDim: Int)(implicit graph: AngelGraph)
  2. extends Layer(name, outputDim)(graph) {
  3. graph.addInput(this)
  4. def calBackward(): Matrix
  5. }
  6. abstract class JoinLayer(name: String, outputDim: Int, val inputLayers: Array[Layer])(implicit graph: AngelGraph)
  7. extends Layer(name, outputDim)(graph) {
  8. inputLayers.foreach { layer =>
  9. layer.addConsumer(this)
  10. this.addInput(layer)
  11. }
  12. def calGradOutput(idx: Int): Matrix
  13. }
  14. abstract class LinearLayer(name: String, outputDim: Int, val inputLayer: Layer)(implicit graph: AngelGraph)
  15. extends Layer(name, outputDim)(graph) {
  16. inputLayer.addConsumer(this)
  17. this.addInput(inputLayer)
  18. def calGradOutput(): Matrix
  19. }

注: LossLayer是一种特殊的LinearLayer, 所以这里没有给出.

2.2 AngelGraph的基本结构

通过input/consumer构建起了一个复杂的图, 虽然可以从图中的任意节点对图进行遍历, 但是为了方便, 在AngelGraph中还是存储verge节点, 便于对图的操作, 如下:

  1. class AngelGraph(val placeHolder: PlaceHolder, val conf: SharedConf) extends Serializable {
  2. def this(placeHolder: PlaceHolder) = this(placeHolder, SharedConf.get())
  3. private val inputLayers = new ListBuffer[InputLayer]()
  4. private var lossLayer: LossLayer = _
  5. private val trainableLayer = new ListBuffer[Trainable]()
  6. def addInput(layer: InputLayer): Unit = {
  7. inputLayers.append(layer)
  8. }
  9. def setOutput(layer: LossLayer): Unit = {
  10. lossLayer = layer
  11. }
  12. def getOutputLayer: LossLayer = {
  13. lossLayer
  14. }
  15. def addTrainable(layer: Trainable): Unit = {
  16. trainableLayer.append(layer)
  17. }
  18. def getTrainable: ListBuffer[Trainable] = {
  19. trainableLayer
  20. }

verge有两大类:

  • inputLayer: 这类节点的输入是数据, AngelGraph中存储这类节点是方便反向计算, 只要依次调用inputlayer的calBackward. 为了加入inputLayer, Angel要求所有的inputLayer中都调用AngelGraph的addInput方法将自已加入AngelGraph中. 事实上, 在InputLayer的基类中已完成这一操作, 用户新增inputLayer不必关心这一点
  • lossLayer: 目前Angel不支持多任务学习, 所以只有一个lossLayer, 这类节点主要方便前向计算, 只要调用它的predictcalOutput即可. 由于losslayer是linearlayer的子类, 所以用户自定义lossLayer可手动调用setOutput(layer: LossLayer), 但用户新增losslayer的机会不多, 更多的是增加lossfunc.

有了inputLayers, lossLayer后, 从AngelGraph中遍历图十分方便, 正向计算只要调用losslayer的predict方法, 反向计算只要调用inputlayer的calBackward. 但是梯度计算, 参数更新不方便, 为了方便参数更新, AngelGraph中增加了一个trainableLayer的变量, 用以保存带参数的层.

2.3 数据入口PlaceHolder

通过layer的input/consumer构建起了图的边(节点的关系), 在AngelGraph中保存特殊节点(inputlayer/losslayer/trainablelayer)方便前向与后向计算与参数更新. 最后数据是怎样输入的呢? — 通过PlaceHolder

Angel中的PlaceHolder在构建AngelGraph中传给Graph, 而Graph又作为隐式参数传给Layer, 所以在所有的Layer中都可以访问placeholder(即数据).

目前, Angel中只允许有一个PlaceHolder, 以后会去除这一限制, 允许多种数据输入. PlaceHolder只存放一个mini-batch的数据, 主要方法如下:

  1. class PlaceHolder(val conf: SharedConf) extends Serializable {
  2. def feedData(data: Array[LabeledData]): Unit
  3. def getFeats: Matrix
  4. def getLabel: Matrix
  5. def getBatchSize: Int
  6. def getFeatDim: Long
  7. def getIndices: Vector
  8. }

通过feedData, 将Array[LabeledData]类型的数据给placeholder后, 便可以从其中获得:

  • 特征
  • 特征维度
  • 标签
  • batchSize
  • 特征索引

3. Angel中计算图的运行原理

上一节中构建起了计算图的拓朴结构, 这一节要讲述它是怎样运行的

3.1 运行状态机

Angel的状态机有如下几个状态:

  • Null: 初始状态, 每次feedData后都会将Graph置于这一状态
  • Forward: 这一状态表示前向计算已完成
  • Backward: 这一状态表示后向计算已完成, 但还没有计算参数的梯度
  • Gradient: 这一状态表示梯度已计算完成, 并且梯度已推送到PS上了
  • Update: 这一状态表示模型更新已完成

这些状态是依次进行的, 如下图所示:

状态机

状态机的引入主要是保证运算的顺序进行, 减少重复计算. 例如有多个层消费同一层的输出, 在计算时, 可以根所据状态进行判断, 只要计算一次. 状态机在代码中的体现为:

  1. def feedData(data: Array[LabeledData]): Unit = {
  2. deepFirstDown(lossLayer.asInstanceOf[Layer])(
  3. (lay: Layer) => lay.status != STATUS.Null,
  4. (lay: Layer) => lay.status = STATUS.Null
  5. )
  6. placeHolder.feedData(data)
  7. }
  8. override def calOutput(): Matrix = {
  9. status match {
  10. case STATUS.Null =>
  11. // do come forward calculation
  12. status = STATUS.Forward
  13. case _ =>
  14. }
  15. output
  16. }
  17. override def calBackward(): Matrix = {
  18. status match {
  19. case STATUS.Forward =>
  20. val gradTemp = gatherGrad()
  21. // do backward calculation
  22. status = STATUS.Backward
  23. case _ =>
  24. }
  25. backward
  26. }
  27. override def pushGradient(): Unit = {
  28. status match {
  29. case STATUS.Backward =>
  30. // calculate gradient and push to PS
  31. status = STATUS.Gradient
  32. case _ =>
  33. }
  34. }
  35. override def update(epoch: Int = 0): Unit = {
  36. status match {
  37. case STATUS.Gradient =>
  38. optimizer.update(weightId, 1, epoch)
  39. status = STATUS.Update
  40. case _ =>
  41. throw new AngelException("STATUS Error, please calculate Gradient frist!")
  42. }
  43. }

3.2 Angel中Graph的训练过程

具体的代码在GraphLearner, 这理给出框架,

  1. def trainOneEpoch(epoch: Int, iter: Iterator[Array[LabeledData]], numBatch: Int): Double = {
  2. var batchCount: Int = 0
  3. var loss: Double = 0.0
  4. while (iter.hasNext) {
  5. graph.feedData(iter.next())
  6. graph.pullParams()
  7. loss = graph.calLoss() // forward
  8. graph.calBackward() // backward
  9. graph.pushGradient() // pushgrad
  10. PSAgentContext.get().barrier(ctx.getTaskId.getIndex)
  11. if (ctx.getTaskId.getIndex == 0) {
  12. graph.update(epoch * numBatch + batchCount) // update parameters on PS
  13. }
  14. PSAgentContext.get().barrier(ctx.getTaskId.getIndex)
  15. batchCount += 1
  16. LOG.info(s"epoch $epoch batch $batchCount is finished!")
  17. }
  18. loss
  19. }

步骤如下:

  • feedData: 这个过程会将Graph的状态设为Null
  • 拉取参数: 会根据数据, 只拉取当前mini-batch计算所需要的参数, 所以Angel可以训练非常高维的模型
  • 前向计算: 从Losslayer开始, 级联地调用它的inputlayer的calOutput方法, 依次计算output, 计算完后将它的状态设为forward. 对于状态已是forward的情况, 则直接返回上一次计算的结果, 这样避免重复计算
  • 后向计算: 依次调用Graph的inputlayer, 这样会级联调用第一层的CalGradOutput方法, 完成后向计算. 计算完后将它的状态设为backward. 对于状态已是backward的情况, 则直接返回上一次计算的结果, 这样避免重复计算
  • 梯度计算与更新: 计算backward只计算了网络结点的梯度, 并没有计算参数的梯度. 这一步计算参数的梯度, 只需调用trainablepushGradient即可. 这个方法会先计算梯度, 然后再将梯度推送到PS上, 最后将状态设为gradient
  • 梯度更新: 梯度更新是在PS上进行的, 只要发送一个梯度更新的PSF即可, 因此只需一个Workor发送(在Spark on Angel中是通过Driver发送). 不同的优化器的更新方式不一样, 在Angel中, 优化器的核心本质是一个PSF. 参数更新前要做一次同步, 保证所有的梯度都推送完成, 参数更新完成也要做一次同步, 保证所有worker拉取的参数是最新的. 参数更新完成状态被设成update