一、累加器
在集群中执行代码时,一个难点是:理解变量和方法的范围、生命周期。下面是一个闭包的例子:
x
counter = 0
rdd = sc.parallelize(data)
def increment_counter(x):
global counter
counter += x
rdd.foreach(increment_counter)
print("Counter value: ", counter)
上述代码的行为是不确定的,并且无法按照预期正常工作。
在执行作业时,
spark
会分解RDD
操作到每个executor
的task
中。在执行之前,spark
计算任务的闭包- 所谓闭包:指的是
executor
要在RDD
上进行计算时,必须对执行节点可见的那些变量和方法 - 闭包被序列化,并被发送到每个
executor
- 所谓闭包:指的是
在上述代码中,闭包的变量的副本被发送给每个
executor
,当counter
被foreach
函数引用时,它已经不再是驱动器节点的counter
了- 虽然驱动器程序中,仍然有一个
counter
在内存中;但是对于executors
,它是不可见的。 executor
看到的只是序列化的闭包的一个副本。所有对counter
的操作都是在executor
的本地进行。- 要想正确实现预期目标,则需要使用累加器
- 虽然驱动器程序中,仍然有一个
1.1 Accumulator
一个累加器(
Accumulator
)变量只支持累加操作工作节点和驱动器程序对它都可以执行
+=
操作,但是只有驱动器程序可以访问它的值。在工作节点上,累加器对象看起来就像是一个只写的变量
工作节点对它执行的任何累加,都将自动的传播到驱动器程序中。
SparkContext
的累加器变量只支持基本的数据类型,如int、float
等。- 你可以通过
AccumulatorParam
来实现自定义的累加器
- 你可以通过
Accumulator
的方法:.add(term)
:向累加器中增加值term
Accumulator
的属性:.value
:获取累加器的值。只可以在驱动器程序中使用
通常使用累加器的流程为:
- 在驱动器程序中调用
SparkContext.accumulator(init_value)
来创建出带有初始值的累加器 - 在执行器的代码中使用累加器的
+=
方法或者.add(term)
方法来增加累加器的值 - 在驱动器程序中使用累加器的
.value
属性来访问累加器的值
示例:
file=sc.textFile('xxx.txt')
acc=sc.accumulator(0)
def xxx(line):
global acc #访问全局变量
if yyy:
acc+=1
return zzz
rdd=file.map(xxx)
- 在驱动器程序中调用
1.2 累加器与容错性
spark
中同一个任务可能被运行多次:- 如果工作节点失败了,则
spark
会在另一个节点上重新运行该任务 - 如果工作节点处理速度比别的节点慢很多,则
spark
也会抢占式的在另一个节点上启动一个投机性的任务副本 - 甚至有时候
spark
需要重新运行任务来获取缓存中被移出内存的数据
- 如果工作节点失败了,则
当
spark
同一个任务被运行多次时,任务中的累加器的处理规则:在行动操作中使用的累加器,
spark
确保每个任务对各累加器修改应用一次- 因此:如果想要一个无论在失败还是重新计算时,都绝对可靠的累加器,我们必须将它放在
foreach()
这样的行动操作中
- 因此:如果想要一个无论在失败还是重新计算时,都绝对可靠的累加器,我们必须将它放在
在转化操作中使用的累加器,无法保证只修改应用一次。
- 转化操作中累加器可能发生不止一次更新
- 在转化操作中,累加器通常只用于调试目的