Theano 循环:scan(详解)

In [1]:

  1. import theano, time
  2. import theano.tensor as T
  3. import numpy as np
  4.  
  5. def floatX(X):
  6. return np.asarray(X, dtype=theano.config.floatX)
  1. Using gpu device 1: Tesla C2075 (CNMeM is disabled)

theano 中可以使用 scan 进行循环,常用的 mapreduce 操作都可以看成是 scan 的特例。

scan 通常作用在一个序列上,每次处理一个输入,并输出一个结果。

sum(x) 函数可以看成是 z + x(i) 函数在给定 z = 0 的情况下,对 x 的一个 scan

通常我们可以将一个 for 循环表示成一个 scan 操作,其好处如下:

  • 迭代次数成为符号图结构的一部分
  • 最小化 GPU 数据传递
  • 序列化梯度计算
  • 速度比 for 稍微快一些
  • 降低内存使用

scan 的使用

函数的用法如下:

  1. theano.scan(fn,
  2. sequences=None,
  3. outputs_info=None,
  4. non_sequences=None,
  5. n_steps=None,
  6. truncate_gradient=-1,
  7. go_backwards=False,
  8. mode=None,
  9. name=None,
  10. profile=False,
  11. allow_gc=None,
  12. strict=False)

主要参数的含义:

  • fn
    • 一步 scan 所进行的操作
  • sequences
    • 输入的序列
  • outputs_info
    • 前一步输出结果的初始状态
  • non_sequences
    • 非序列参数
  • n_steps
    • 迭代步数
  • go_backwards

    • 是否从后向前遍历 输出为一个元组 (outputs, updates)
  • outputs

    • 从初始状态开始,每一步 fn 的输出结果
  • updates
    • 一个字典,用来记录 scan 过程中用到的共享变量更新规则,构造函数的时候,如果需要更新共享变量,将这个变量当作 updates 的参数传入。

scan 和 map

这里实现一个简单的 map 操作,将向量 $\mathbf x$ 中的所有元素变成原来的两倍:

  1. map(lambda t: t * 2, x)

In [2]:

  1. x = T.vector()
  2.  
  3. results, _ = theano.scan(fn = lambda t: t * 2,
  4. sequences = x)
  5. x_double_scan = theano.function([x], results)
  6.  
  7. print x_double_scan(range(10))
  1. [ 0. 2. 4. 6. 8. 10. 12. 14. 16. 18.]

之前我们说到,theano 中的 mapscan 的一个特例,因此 theano.map 的用法其实跟 theano.scan 十分类似。

由于不需要考虑前一步的输出结果,所以 theano.map 的参数中没有 outputs_info 这一部分。

我们用 theano.map 实现相同的效果:

In [3]:

  1. result, _ = theano.map(fn = lambda t: t * 2,
  2. sequences = x)
  3. x_double_map = theano.function([x], result)
  4.  
  5. print x_double_map(range(10))
  1. [ 0. 2. 4. 6. 8. 10. 12. 14. 16. 18.]

scan 和 reduce

这里一个简单的 reduce 操作,求和:

  1. reduce(lambda a, b: a + b, x)

In [4]:

  1. result, _ = theano.scan(fn = lambda t, v: t + v,
  2. sequences = x,
  3. outputs_info = floatX(0.))
  4.  
  5. # 因为每一步的输出值都会被记录到最后的 result 中,所以最后的和是 result 的最后一个元素。
  6. x_sum_scan = theano.function([x], result[-1])
  7.  
  8. # 计算 1 + 2 + ... + 10
  9. print x_sum_scan(range(10))
  1. 45.0

theano.reduce 也是 scan 的一个特例,使用 theano.reduce 实现相同的效果:

In [5]:

  1. result, _ = theano.reduce(fn = lambda t, v: t + v,
  2. sequences = x,
  3. outputs_info = 0.)
  4.  
  5. x_sum_reduce = theano.function([x], result)
  6.  
  7. # 计算 1 + 2 + ... + 10
  8. print x_sum_reduce(range(10))
  1. 45.0

reducescan 不同的地方在于,result 包含的内容并不是每次输出的结果,而是最后一次输出的结果。

scan 的使用

输入与输出

fn 是一个函数句柄,对于这个函数句柄,它每一步接受的参数是由 sequences, outputs_info, non_sequence 这三个参数所决定的,并且按照以下的顺序排列:

  • sequences 中第一个序列的值
  • sequences 中最后一个序列的值
  • outputs_info 中第一个输出之前的值
  • outputs_info 中最后一个输出之前的值
  • non_squences 中的参数 这些序列的顺序与在参数 sequences, outputs_info 中指定的顺序相同。

默认情况下,在第 k 次迭代时,如果 sequencesoutputs_info 中给定的值不是字典(dictionary)或者一个字典列表(list of dictionaries),那么

  • sequences 中的序列 seq 传入 fn 的是 seq[k] 的值
  • outputs_info 中的序列 output 传入 fn 的是 output[k-1] 的值 fn 的返回值有两部分 (outputs_list, update_dictionary),第一部分将作为序列,传入 outputs 中,与 outputs_info 中的初始输入值的维度一致(如果没有给定 outputs_info ,输出值可以任意。)

第二部分则是更新规则的字典,告诉我们如何对 scan 中使用到的一些共享的变量进行更新:

  1. return [y1_t, y2_t], {x:x+1}

这两部分可以任意,即顺序既可以是 (outputs_list, update_dictionary), 也可以是 (update_dictionary, outputs_list)theano 会根据类型自动识别。

两部分只需要有一个存在即可,另一个可以为空。

例子分析

例如,在我们的第一个例子中

  1. theano.scan(fn = lambda t: t * 2,
  2. sequences = x)

在第 k 次迭代的时候,传入参数 t 的值为 x[k]

再如,在我们的第二个例子中:

  1. theano.scan(fn = lambda t, v: t + v,
  2. sequences = x,
  3. outputs_info = floatX(0.))

fn 接受了两个参数,初始迭代时,按照规则,t 接受的参数为 x[0]v 接受的参数为我们传入 outputs_info 的第一个初始值即 0 (认为是 outputs[-1]),他们的结果 t+v 将作为 outputs[0] 的值传入下一次迭代以及最终 scan 输出的 outputs 值中。

输入多个序列

我们可以一次输入多个序列,这些序列会按照顺序传入 fn 的参数中,例如计算多项式\sum_{n=0}^N a_n x^ n时,我们可以将多项式的系数和幂数两个序列放到一个 list 中作为输入参数:

In [6]:

  1. # 变量 x
  2. x = T.scalar("x")
  3.  
  4. # 不为 0 的系数
  5. A = T.vectors("A")
  6.  
  7. # 对应的幂数
  8. N = T.ivectors("N")
  9.  
  10. # a 对应的是 A, n 对应 N,v 对应 x
  11. components, _ = theano.scan(fn = lambda a, n, v: a * (v ** n),
  12. sequences = [A, N],
  13. non_sequences = x)
  14.  
  15. result = components.sum()
  16.  
  17. polynomial = theano.function([x, A, N], result)
  18.  
  19. # 计算 1 + 3 * 10 ^ 2 + 2 * 10^3 = 2301
  20. print polynomial(floatX(10),
  21. floatX([1, 3, 2]),
  22. [0, 2, 3])
  1. 2301.0

使用序列的多个值

默认情况下,我们只能使用输入序列的当前时刻的值,以及前一个输出的输出值。

事实上,theano 会将参数中的序列变成一个有 inputtaps 两个键值的 dict

  • input:输入的序列
  • taps:要传入 fn 的值的列表
    • 对于 sequences 参数中的序列来说,默认值为 [0],表示时间 t 传入 t+0 时刻的序列值,可以为正,可以为负。
    • 对于 outputs_info 参数中的序列来说,默认值为 [-1],表示时间 t 传入 t-1 时刻的序列值,只能为负值,如果值为 None,表示这个输出结果不会作为参数传入 fn 中。 传入 fn 的参数也会按照 taps 中的顺序来排列,我们考虑下面这个例子:
  1. scan(fn, sequences = [ dict(input= Sequence1, taps = [-3,2,-1])
  2. , Sequence2
  3. , dict(input = Sequence3, taps = 3) ]
  4. , outputs_info = [ dict(initial = Output1, taps = [-3,-5])
  5. , dict(initial = Output2, taps = None)
  6. , Output3 ]
  7. , non_sequences = [ Argument1, Argument2])

首先是 Sequence1[-3, 2, -1] 被传入,然后 Sequence2 不是 dict, 所以传入默认值 [0]Sequence3 传入的参数是 3,所以 fn 在第 t 步接受的前几个参数是:

  1. Sequence1[t-3]
  2. Sequence1[t+2]
  3. Sequence1[t-1]
  4. Sequence2[t]
  5. Sequence3[t+3]

然后 Output1 传入的是 [-3, -5]传入的初始值的形状应为 shape (5,)+),Output2 不作为参数传入,Output3 传入的是 [-1],所以接下的参数是:

  1. Output1[t-3]
  2. Output1[t-5]
  3. Output3[t-1]
  4. Argument1
  5. Argument2

总的说来上面的例子中,fn 函数按照以下顺序最多接受这样 10 个参数:

  1. Sequence1[t-3]
  2. Sequence1[t+2]
  3. Sequence1[t-1]
  4. Sequence2[t]
  5. Sequence3[t+3]
  6. Output1[t-3]
  7. Output1[t-5]
  8. Output3[t-1]
  9. Argument1
  10. Argument2

例子,假设 $x$ 是我们的输入,$y$ 是我们的输出,我们需要计算 $y(t) = tanh\left[W{1} y(t-1) + W{2} x(t) + W_{3} x(t-1)\right]$ 的值:

In [7]:

  1. X = T.matrix("X")
  2. Y = T.vector("y")
  3.  
  4. W_1 = T.matrix("W_1")
  5. W_2 = T.matrix("W_2")
  6. W_3 = T.matrix("W_3")
  7.  
  8. # W_yy 和 W_xy 作为不变的参数可以直接使用
  9. results, _ = theano.scan(fn = lambda x, x_pre, y: T.tanh(T.dot(W_1, y) + T.dot(W_2, x) + T.dot(W_3, x_pre)),
  10. # 0 对应 x,-1 对应 x_pre
  11. sequences = dict(input=X, taps=[0, -1]),
  12. outputs_info = Y)
  13.  
  14. Y_seq = theano.function(inputs = [X, Y, W_1, W_2, W_3],
  15. outputs = results)

测试小矩阵计算:

In [8]:

  1. # 测试
  2. t = 1001
  3. x_dim = 10
  4. y_dim = 20
  5.  
  6. x = 2 * floatX(np.random.random([t, x_dim])) - 1
  7. y = 2 * floatX(np.zeros(y_dim)) - 1
  8. w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1
  9. w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1
  10. w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1
  11.  
  12. tic = time.time()
  13.  
  14. y_res_theano = Y_seq(x, y, w_1, w_2, w_3)
  15.  
  16. print "theano running time {:.4f} s".format(time.time() - tic)
  17.  
  18. tic = time.time()
  19. # 与 numpy 的结果进行比较:
  20. y_res_numpy = np.zeros([t, y_dim])
  21. y_res_numpy[0] = y
  22.  
  23. for i in range(1, t):
  24. y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1]))
  25.  
  26. print "numpy running time {:.4f} s".format(time.time() - tic)
  27.  
  28. # 这里要从 1 开始,因为使用了 x(t-1),所以 scan 从第 1 个位置开始计算
  29. print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[0:10] - y_res_numpy[1:11]))
  1. theano running time 0.0537 s
  2. numpy running time 0.0197 s
  3. the max difference of the first 10 results is 1.25780650354e-06

测试大矩阵运算:

In [9]:

  1. # 测试
  2. t = 1001
  3. x_dim = 100
  4. y_dim = 200
  5.  
  6. x = 2 * floatX(np.random.random([t, x_dim])) - 1
  7. y = 2 * floatX(np.zeros(y_dim)) - 1
  8. w_1 = 2 * floatX(np.random.random([y_dim, y_dim])) - 1
  9. w_2 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1
  10. w_3 = 2 * floatX(np.random.random([y_dim, x_dim])) - 1
  11.  
  12. tic = time.time()
  13.  
  14. y_res_theano = Y_seq(x, y, w_1, w_2, w_3)
  15.  
  16. print "theano running time {:.4f} s".format(time.time() - tic)
  17.  
  18. tic = time.time()
  19. # 与 numpy 的结果进行比较:
  20. y_res_numpy = np.zeros([t, y_dim])
  21. y_res_numpy[0] = y
  22.  
  23. for i in range(1, t):
  24. y_res_numpy[i] = np.tanh(w_1.dot(y_res_numpy[i-1]) + w_2.dot(x[i]) + w_3.dot(x[i-1]))
  25.  
  26. print "numpy running time {:.4f} s".format(time.time() - tic)
  27.  
  28. # 这里要从 1 开始,因为使用了 x(t-1),所以 scan 从第 1 个位置开始计算
  29. print "the max difference of the first 10 results is", np.max(np.abs(y_res_theano[:10] - y_res_numpy[1:11]))
  1. theano running time 0.0754 s
  2. numpy running time 0.1334 s
  3. the max difference of the first 10 results is 0.000656997077348

值得注意的是,由于 theanonumpy 在某些计算的实现上存在一定的差异,随着序列长度的增加,这些差异将被放大:

In [10]:

  1. for i in xrange(20):
  2. print "iter {:03d}, max diff:{:.6f}".format(i + 1,
  3. np.max(np.abs(y_res_numpy[i + 1,:] - y_res_theano[i,:])))
  1. iter 001, max diff:0.000002
  2. iter 002, max diff:0.000005
  3. iter 003, max diff:0.000007
  4. iter 004, max diff:0.000010
  5. iter 005, max diff:0.000024
  6. iter 006, max diff:0.000049
  7. iter 007, max diff:0.000113
  8. iter 008, max diff:0.000145
  9. iter 009, max diff:0.000334
  10. iter 010, max diff:0.000657
  11. iter 011, max diff:0.001195
  12. iter 012, max diff:0.002778
  13. iter 013, max diff:0.004561
  14. iter 014, max diff:0.004748
  15. iter 015, max diff:0.014849
  16. iter 016, max diff:0.012696
  17. iter 017, max diff:0.043639
  18. iter 018, max diff:0.046540
  19. iter 019, max diff:0.083032
  20. iter 020, max diff:0.123678

控制循环次数

假设我们要计算方阵$A$的$A^k$,$k$ 是一个未知变量,我们可以这样通过 n_steps 参数来控制循环计算的次数:

In [11]:

  1. A = T.matrix("A")
  2. k = T.iscalar("k")
  3.  
  4. results, _ = theano.scan(fn = lambda P, A: P.dot(A),
  5. # 初始值设为单位矩阵
  6. outputs_info = T.eye(A.shape[0]),
  7. # 乘 k 次
  8. non_sequences = A,
  9. n_steps = k)
  10.  
  11. A_k = theano.function(inputs = [A, k], outputs = results[-1])
  12.  
  13. test_a = floatX([[2, -2], [-1, 2]])
  14.  
  15. print A_k(test_a, 10)
  16.  
  17. # 使用 numpy 进行验证
  18. a_k = np.eye(2)
  19. for i in range(10):
  20. a_k = a_k.dot(test_a)
  21.  
  22. print a_k
  1. [[ 107616. -152192.]
  2. [ -76096. 107616.]]
  3. [[ 107616. -152192.]
  4. [ -76096. 107616.]]

使用共享变量

可以在 scan 中使用并更新共享变量,例如,利用共享变量 n,我们可以实现这样一个迭代 k 步的简单计数器:

In [12]:

  1. n = theano.shared(floatX(0))
  2. k = T.iscalar("k")
  3.  
  4. # 这里 lambda 的返回值是一个 dict,因此这个值会被传入 updates 中
  5. _, updates = theano.scan(fn = lambda n: {n:n+1},
  6. non_sequences = n,
  7. n_steps = k)
  8.  
  9. counter = theano.function(inputs = [k],
  10. outputs = [],
  11. updates = updates)
  12.  
  13. print n.get_value()
  14. counter(10)
  15. print n.get_value()
  16. counter(10)
  17. print n.get_value()
  1. 0.0
  2. 10.0
  3. 20.0

之前说到,fn 函数的返回值应该是 (outputs_list, update_dictionary) 或者 (update_dictionary, outputs_list) 或者两者之一。

这里 fn 函数返回的是一个字典,因此自动被放入了 update_dictionary 中,然后传入 functionupdates 参数中进行迭代。

使用条件语句结束循环

我们可以将 scan 设计为 loop-until 的模式,具体方法是在 scan 中,将 fn 的返回值增加一个参数,使用 theano.scan_module 来设置停止条件。

假设我们要计算所有不小于某个值的 2 的幂,我们可以这样定义:

In [13]:

  1. max_value = T.scalar()
  2.  
  3. results, _ = theano.scan(fn = lambda v_pre, max_v: (v_pre * 2, theano.scan_module.until(v_pre * 2 > max_v)),
  4. outputs_info = T.constant(1.),
  5. non_sequences = max_value,
  6. n_steps = 1000)
  7.  
  8. # 注意,这里不能取 results 的全部
  9. # 例如在输入值为 40 时,最后的输出可以看成 (64, False)
  10. # scan 发现停止条件满足,停止循环,但是不影响 64 被输出到 results 中,因此要将 64 去掉
  11. power_of_2 = theano.function(inputs = [max_value], outputs = results[:-1])
  12.  
  13. print power_of_2(40)
  1. [ 2. 4. 8. 16. 32.]

原文: https://nbviewer.jupyter.org/github/lijin-THU/notes-python/blob/master/09-theano/09.07-loop-with-scan.ipynb