Transforms
定义Bigflow Python中所有的变换
Author: Wang, Cong(bigflow-opensource@baidu.com), panyunhong(bigflow-opensource@baidu.com)
注意:除特殊说明外,所有变换的用户自定义方法(UDF)输入参数都不允许修改
bigflow.transforms.accumulate
(pcollection, zero, accumulate_fn, side_inputs, options*)
将给定的PCollection按照一个初始值和方法聚合为PObject
假设输入类型为I,输出类型为O,则zero、accumulate_fn的期望签名为:
zero: O或zero() => O
accumulate_fn: accumulate_fn(O, I) => O (accumulate_fn的第一个参数允许被修改)
参数: |
|
---|---|
返回: | 聚合结果 |
返回类型: |
由于该函数的语义是数据必然按顺序一条条的流过,限制了该函数可以进行的优化工作, 所以如果可以使用aggregate或reduce替换时,尽量使用aggregate/reduce替换该函数。
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 2, 3])
>>> transforms.accumulate(_p, 0, lambda x, y: x + y).get()
6
TODO: Another example
bigflow.transforms.aggregate
(pcollection, zero, aggregate_fn, combine_fn, side_inputs, options*)
将给定的PCollection按照初始值、初段聚合方法和汇总方法聚合为PObject
假设输入类型I,输出类型为O,则zero、aggregate_fn、combine_fn的期望签名为:
zero: O或zero() => O
aggregate_fn: aggregate_fn(O, I) => O (aggregate_fn的第一个参数允许被修改)
combine_fn: combine_fn(O, O) => O (combine_fn的第一个参数允许被修改)
在执行时aggregate会把输入pcollection先切分成许多个分片,然后对每个分片使用zero生成一个O类型的初始值。
随后,在每个分片上,持续调用aggregate_fn,将分片上全部的数据聚合为一个O类型的值。
最后,会再将所有分片上生成的那些O类型的值汇聚到一起,使用combine_fn最终聚合到一起。
分片的规则用户不应作任何假设。
参数: |
|
---|---|
返回: | 聚合结果 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize(["viva", "la", "vida"])
>>> transforms.aggregate(_p, 0, lambda x, y: x + len(y), lambda x, y: x + y).get() # sum words length
10
bigflow.transforms.cartesian
(pcollections, options*)
对多个输入PCollection求笛卡尔积,返回一个PCollection
参数: |
|
---|---|
返回: | 笛卡尔积 |
返回类型: |
>>> from bigflow import transforms
>>> p1 = _pipeline.parallelize([1, 2, 3])
>>> p2 = _pipeline.parallelize(['a', 'b', 'c'])
>>> transforms.cartesian(p1, p2)
[(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]
>>> p3 = _pipeline.parallelize(1)
>>> p4 = _pipeline.parallelize(2)
>>> p3.cartesian(p4).get()
[(1, 2)]
>>> p5 = _pipeline.parallelize([3, 4])
>>> p3.cartesian(p5).get()
[(1, 3), (1, 4)]
>>> p3.cartesian(p4, p5).get()
[(1, 2, 3), (1, 2, 4)]
bigflow.transforms.cogroup
(pcollections, options*)
对传入的所有pcollection进行协同分组。
cogroup要求所有传入的PCollection的每个元素都是一个(k, v)对, cogroup会用k来作为分组的key,对多个输入PCollection进行协同分组, 返回一个PTable表示分组结果。
这个返回的PTable的每个value为一个tuple,tuple的每个元素是一个PCollection, 其中第n个PCollection表示输入的第n个PCollection在当前key下的全部数据。
如果某个输入PCollection在某个key下无数据,则对应的PCollection为一个空PCollection。
目前不能像group_by指定key_extractor。
group_by_key可以理解成是cogroup只传有一个参数的特殊情况。
参数: |
|
---|---|
返回: | 分组结果 |
返回类型: |
>>> from bigflow import transforms
>>> _p1 = _pipeline.parallelize([("A", 1), ("A", 2), ("B", 3)])
>>> _p2 = _pipeline.parallelize([("A", 4)])
>>> _p = transforms.cogroup(_p1, _p2)
# _p的值为{"A": ([1, 2], [4]), "B": ([3], [])} ,但由于实现难度较大,PTable的value为tuple of PCollection时的get操作暂不支持。
>>> _p.apply_values(lambda x, y: transforms.union(x, y)).get()
{"A": [1, 2, 4], "B": [3]}
>>> def distinct_and_join(p, q): # 去重,并join
... return p.cogroup(q) \
... .apply_values(lambda a, b: (a.distinct(), b.distinct())) \
... .apply_values(transforms.cartesian) \
... .flatten()
>>> _p1 = _pipeline.parallelize([("A", 1), ("A", 2), ("A", 1), ("C", 1)])
>>> _p2 = _pipeline.parallelize([("A", 3), ("A", 3), ("B", 2)])
>>> print distinct_and_join(_p1, _p2).get()
[("A", (1, 3)), ("A", (2, 3))]
>>> # 未来bigflow会自动将p.distinct().join(q.distinct())优化成上边的样子(正在进行中)
>>> def semi_join(p, q): # 同key的join结果只输出一条
... return p.cogroup(q) \
... .apply_values(lambda a, b: (a.take(1), b.take(1))) \
... .apply_values(transforms.cartesian) \
... .flatten()
>>> print semi_join(_p1, _p2).get()
[("A", (1, 3))]
bigflow.transforms.combine
(pcollection, fn, **options)
给定一个合并函数,聚合输入PCollection中所有元素,这些元素以迭代器的形式给出
默认情况下,输入类型与输出类型需要一致,假设为O类型,fn的期望签名为 fn([O…]) => O,[]表示输入可遍历
在执行时会把输入pcollection先切分成许多个分片,然后对每个分片的数据组成一个列表,然后调用fn, 将每个分片中的数据合并成一个O类型的变量。 然后,会再将所有分片上生成的那些O类型的值汇聚到一起,组成一个列表,再使用fn最终聚合到一起。
分片的规则用户不应作任何假设。
用户可以显式的指定pre_combine=False,关掉预聚合。如果关掉预聚合,则会直接将全部的数据组成一个列表交给fn, 聚合成一个值。则这种情况下,fn需要的输入类型与fn返回的类型可以是不同类型的。
参数: |
|
---|---|
返回: | 合并结果 |
返回类型: |
>>> _p = _pipeline.parallelize([2, 4, 6, 10])
>>> transforms.combine(_p, sum).get()
22
>>> number_list = self._pipeline.parallelize(['2', '3', '4'])
>>> to_lookup = {'2': 2, '3': 3, '4': 4}
>>> def lookup_dict_and_sum(numbers):
... return sum(map(lambda s: to_lookup[s], numbers))
>>> number_list.combine(lookup_dict_and_sum, pre_combine=False).get()
9
>>> number_list.combine(lookup_dict_and_sum).get()
Error may occur (because of pre_combine, 1)
bigflow.transforms.count
(pcollection, **options)
返回给定PCollection中元素的数量
参数: |
|
---|---|
返回: | 元素数量 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.count(_p).get()
4
bigflow.transforms.diff
(pcollection1, pcollection2)
对于给定的PCollection1和PCollection2,返回两者不相同的元素
参数: |
|
---|---|
返回: | 表示差异的PCollection |
返回类型: |
>>> from bigflow import transforms
>>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> b = _pipeline.parallelize([1, 1, 2, 2])
>>> transforms.diff(a, b).get()
[(2, (1, 2)), (3, (1, 0))]
bigflow.transforms.distinct
(pcollection, **options)
返回给定PCollection中所有不重复元素
参数: |
|
---|---|
返回: | 不重复元素,以PCollection给出 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([2, 2, 1, 9, 3, 3])
>>> transforms.distinct(_p).get()
[2, 3, 1, 9]
bigflow.transforms.extract_keys
(ptable, **options)
提取给定PTable中所有的key
参数: |
|
---|---|
返回: | 所有的key,以PCollection给出 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
>>> transforms.extract_keys(_p).get()
["A", "B"]
bigflow.transforms.extract_values
(ptable, **options)
提取给定PTable中所有的value
参数: |
|
---|---|
返回: | 所有的value,以PCollection给出 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
>>> transforms.extract_values(_p).get()
[2, 3, 4, 5]
无论PTable为多少层嵌套,都会抽取出最内层的value
>>> _p = _pipeline.parallelize({"A": {"a": [2, 3], "b": [1, 4]}, "B": {"c": [6, 7], "d": [9, 9]}}
>>> print _p
>>> {k0: {k1: [...]}} # 嵌套PTable
>>> transforms.extract_values(_p).get()
>>> [2, 3, 1, 4, 6, 7, 9, 9]
bigflow.transforms.filter
(pcollection, fn, side_inputs, options*)
对于给定的PCollection和一个断言函数,返回只满足断言函数元素的PCollection
假设输入类型为I,fn的期望签名为 fn(I) => bool
参数: |
|
---|---|
返回: | 过滤结果 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
>>> transforms.filter(_p, lambda x: x % 2 == 0).get()
[2, 8]
bigflow.transforms.first
(pcollection, **options)
取出PCollection中的第一个元素
参数: |
|
---|---|
返回: | 取出的单个元素,以PObject给出 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
>>> transforms.first(_p).get()
3
bigflow.transforms.flat_map
(pvalue, fn, side_inputs, options*)
对PCollection中的每个元素做一对N映射
对变换函数必须返回一个可遍历变量(即实现了__iter__()方法),将迭代器中的所有元素 构造PCollection
假设输入类型为I,fn的期望签名为 fn(I) => [O…],[]表示返回结果可遍历
参数: |
|
---|
Results:
PCollection: 变换后的PCollection
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 3, 5, 7])
>>> transforms.flat_map(_p, lambda x: [x, x * 2]).get()
[1, 2, 3, 5, 6, 7, 10, 14]
>>> transforms.flat_map(_p, lambda x: [[x, x * 2]]).get()
[[1, 2], [3, 6], [5, 10], [7, 14]]
>>> transforms.flat_map(_p, lambda x: (x, x * 2)).get()
[1, 2, 3, 6, 5, 10, 7, 14]
>>> transforms.flat_map(_p, lambda x: [(x, x * 2)]).get()
[(1, 2), (3, 6), (5, 10), (7, 14)]
注意返回结果可以为空:
>>> transforms.flat_map(_p, lambda x: [])
[]
如果返回的对象不能被遍历,则运行时会报错。 典型的错误用法包括None或返回一个单个元素。
返回对象只要是可迭代类型即可,不必一定是list。 特别是,需要输出较多数据时, 使用list可能会导致内存占用过大, 用户可以直接利用python的yield语法生成一个generator, 达到不需要占用大量内存的目的。
>>> _p = _pipeline.parallelize([3, 5])
>>> def make_partial_sum(x):
... sum = 0
... for i in xrange(1, x + 1):
... sum += i
... yield sum
>>> transforms.flat_map(_p, make_partial_sum)
[1, 3, 6, 1, 3, 6, 10, 15]
这种用法可以避免产生大list,从而避免内存占用过大的问题。
bigflow.transforms.flatten
(ptable, **options)
对于给定PTable中的key和value中每一个元素,构造(key, value)对,结果保存在PCollection中
参数: |
|
---|---|
返回: | (key, value)对,结果以PCollection表示 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
>>> transforms.flatten(_p).get()
[("A", 2), ("A", 3), ("B", 4), ("B", 5)]
bigflow.transforms.flatten_values
(ptable, **options)
等价于 extract_values(ptable)
参数: |
|
---|---|
返回: | 所有的value,以PCollection给出 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
>>> transforms.flatten_values(_p).get()
[2, 3, 4, 5]
bigflow.transforms.foreach
(pvalue, fn, side_inputs, options*)
对给定的PCollection/PObject中的每个元素应用一个函数,函数并不期望有任何的 返回,而是利用其副作用产生效果。 该函数一般用于产出数据到外部存储,同一条数据可能会被多次调用, 用户需要注意在下游去重,或想办法保证foreach操作具有幂等性质。
假设输入类型为I,fn的期望签名为 fn(I) => object,即返回值类型任意(并被忽略)
参数: |
|
---|
Results:
None
>>> from bigflow import lazy_var
>>> r = lazy_var.declare(lambda: redis.Redis(host='x.x.x.x', port=x, db=0))
>>> x = _pipeline.parallelize([("a", "1"), ("b", "4")])
>>> x.foreach(lambda (k, v): r.get().set(k, v))
>>> _pipeline.run() # all the data will be written into redis
bigflow.transforms.full_join
(pcollections, options*)
对于多个输入PCollection,根据key对PCollection做全连接操作 ,连接结果为(key, (value 1, value 2, …, value n)),若第m个PCollection没有元素, 则value m为None
参数: |
|
---|---|
返回: | 连接结果 |
返回类型: |
>>> x = _pipeline.parallelize([("a", 1)])
>>> y = _pipeline.parallelize([("b", 2)])
>>> transforms.full_join(x, y).get()
[("a", (1, None)), ("b", (None, 2))]
bigflow.transforms.group_by
(pcollection, key_extractor, value_extractor=None, **options)
利用给定的key_extractor和value_extractor对输入PCollection分组,返回一个表示分组结果的PTable
参数: |
|
---|---|
返回: | 分组结果 |
返回类型: |
>>> _p = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
>>> transforms.group_by(_p, lambda x: x[0], lambda x: x[1]).get()
{"A": [4, 3, 1], "B": [2]}
bigflow.transforms.group_by_key
(pcollection, **options)
利用给定的PCollection,使用一个默认的key/value提取函数对输入的PCollection分组 ,返回一个表示分组的PTable
参数: |
|
---|---|
返回: | 分组结果 |
返回类型: |
>>> _p = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
>>> transforms.group_by_key(_p).get()
{"A": [4, 3, 1], "B": [2]}
bigflow.transforms.idl_to_str
(pcollection, **options)
对于给定的PCollection,对每条数据执行idl解包。并过滤掉idl packet类型为Heartbeat和EOF的数据。
参数: |
|
---|---|
返回: | 处理后的PCollection |
返回类型: |
bigflow.transforms.intersection
(pcollection1, pcollection2, output_duplicated=False)
对于给定的PCollection1和PCollection2,返回所有同时存在于PCollection1和PCollection2 中的元素,即取两者交集
参数: |
|
---|---|
返回: | 相交结果 |
返回类型: |
>>> from bigflow import transforms
>>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> b = _pipeline.parallelize([1, 1, 2, 2, 5])
>>> transforms.intersection(a, b).get()
[1, 2]
>>> transforms.intersection(a, b, output_duplicated = True).get()
[1, 1, 2]
bigflow.transforms.is_empty
(pcollection)
对于输入PCollection,返回其是否为空
参数: | pcollection (PCollection) — 输入PCollection |
---|---|
返回: | 表示返回结果的PObject |
返回类型: | PObject |
>>> from bigflow import transforms
>>> a = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.is_empty(a).get()
False
>>> b = _pipeline.parallelize([])
>>> transforms.is_empty(b).get()
True
bigflow.transforms.join
(pcollections, options*)
对于多个输入PCollection,根据key对PCollection做内连接操作 ,连接结果为(key, (value1, value2, …, valuen))
参数: |
|
---|---|
返回: | 连接结果 |
返回类型: |
>>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
>>> transforms.join(x, y).get()
[("a", (1, 2)), ("a", (1, 3))]
bigflow.transforms.left_join
(pcollections, options*)
对于多个输入PCollection,根据key对PCollection做左连接操作 ,连接结果为(key, (value 1, value 2, …, value n)),若第m个PCollection没有元素, 则value m为None
参数: |
|
---|---|
返回: | 连接结果 |
返回类型: |
>>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2)])
>>> transforms.left_join(x, y).get()
[("a", (1, 2)), ("b", (4, None))]
bigflow.transforms.make_tuple
(pobjects, options*)
将所有输入的PObject合并成一个PObject(tuple)。
除返回值类型及输入类型都全是PObject外, 结果与 bigflow.transforms.cartesian(self, *pvalues, **options)
相同。
Args: *pobjects (PObject) 待操作PObjects。所有输入都必须是PObject. :returns: 返回一个PObject(tuple), tuple中的第n个元素是第n个输入PObject对应的值。 :rtype: PObject
>>> p1 = _pipeline.parallelize(1)
>>> p2 = _pipeline.parallelize(2)
>>> transforms.make_tuple(p1, p2).get()
(1, 2)
>>> p3 = _pipeline.parallelize([3, 4])
>>> transforms.make_tuple(p1, p3).get()
!!! AssertionError
>>> transforms.make_tuple(p1, p1, p2).get()
(1, 1, 2)
bigflow.transforms.map
(pvalue, fn, side_inputs, options*)
对PCollection中的每个元素做一对一映射
对给定的PCollection/PObject中的每个元素应用一个变换函数,以函数的返回结果 构造PCollection/PObject
假设输入类型为I,fn的期望签名为 fn(I) => O
参数: |
|
---|
Results:
PType: 变换后的PCollection/PObject,与输入类型一致
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 3, 5, 7])
>>> transforms.map(_p, lambda x: x + 1).get()
[2, 4, 6, 8]
>>> transforms.map(_p, lambda x: [x, x * 2]).get()
[[1, 2], [3, 6], [5, 10], [7, 14]]
bigflow.transforms.max
(pcollection, key=None, **options)
得到输入PCollection中最大的元素
参数: |
|
---|---|
返回: | 包含最大元素的PObject |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
>>> transforms.max(_p).get()
8
>>> transforms.max(_p, lambda val: -val).get()
1
bigflow.transforms.max_elements
(pcollection, n, key=None, **options)
得到输入PCollection中前n大的元素
参数: |
|
---|---|
返回: | 包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
>>> transforms.max_elements(_p, 2).get()
[8, 7]
>>> n = _p.count().map(lambda x: x - 1) # n is PObject(5)
>>> transforms.max_elements(_p, n, lambda val: -val).get() # key is -val
>>> [1, 3, 3, 2, 7]
bigflow.transforms.min
(pcollection, key=None, **options)
得到输入PCollection中最小的元素
参数: |
|
---|---|
返回: | 包含最小元素的PObject |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
>>> transforms.min(_p).get()
1
>>> transforms.min(_p, lambda val: -val).get()
8
bigflow.transforms.min_elements
(pcollection, n, key=None, **options)
得到输入PCollection中前n小的元素
参数: |
|
---|---|
返回: | 包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
>>> transforms.min_elements(_p, 2).get()
[1, 2]
>>> n = _p.count().map(lambda x: x - 1) # n is PObject(5)
>>> transforms.min_elements(_p, n, lambda val: -val).get() # key is -val
>>> [8, 3, 3, 2, 7]
bigflow.transforms.pipe
(pvalue, command, **options)
对于给定的PCollection/PTable,返回通过command处理后的PCollection
参数: |
|
---|---|
返回: | 处理后的PCollection |
返回类型: |
注解
- pipe作用于PCollection上,pipe会将数据直接发送到管道中,框架对数据如何划分不做任何保证;
2. pipe作用于PTable上,pipe会将PTable的Key和数据一起发送到管道中(支持嵌套), 并保证相同Key的数据会顺序发送到管道中,例如下列代码:
>>> from bigflow import transforms
>>> p = _pipeline.parallelize({
>>> 'key1_a': {
>>> 'key2_a': ['value1', 'value2'],
>>> 'key2_b': ['value3', 'value4']
>>> },
>>> 'key2_b': {
>>> 'key2_c': ['value5', 'value6']
>>> }
>>> })
>>> transforms.pipe(p, 'cat').get()
用户程序(cat)接收到的数据为,column间默认使用制表符(tab)作为分割符:
key1_a key2_a value1
key1_a key2_a value2
key1_a key2_b value3
key1_a key2_b value4
key1_b key2_c value5
key1_b key2_c value6
3. 尽量不要在PTable上通过apply_values中使用pipe(应该使用apply), 不仅性能极差而且发送给管道的数据不包含Key;
>>> from bigflow import transforms
>>> p = _pipeline.parallelize([1, 1, 2, 3])
>>> transforms.pipe(p, 'cat').get()
['1', '1', '2', '3']
>>> from bigflow import transforms
>>> p = _pipeline.parallelize({'A': [1, 2], 'B': [2, 3]})
>>> transforms.pipe(p, 'cat').get()
['A 1', 'A 2', 'B 2', 'B 3']
>>> from bigflow import transforms
>>> p = _pipeline.parallelize([(1, 'a'), (1, 'a'), (2, 'b'), (3, 'c')])
>>> transforms.pipe(p, 'cat', type='bistreaming', input_fields_num=2, output_fields_num=2).get()
[('1', 'a'), ('1', 'a'), ('2', 'b'), ('3', 'c')]
bigflow.transforms.reduce
(pcollection, fn, side_inputs, options*)
对于属于PCollection,使用给定的fn将所有元素规约为单个元素
假设输入类型为I,fn的期望签名为 fn(I1, I2) => I,即输出的类型必须与输入相同 (fn的第一个参数允许被修改)
参数: |
|
---|---|
返回: | 规约结果 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.reduce(_p, lambda x, y: x + y).get()
10
bigflow.transforms.right_join
(pcollections, options*)
对于多个输入PCollection,根据key对PCollection做右连接操作 ,连接结果为(key, (value 1, value 2, …, value n)),若第m个PCollection没有元素, 则value m为None
参数: |
|
---|---|
返回: | 连接结果 |
返回类型: |
>>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
>>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
>>> transforms.right_join(x, y).get()
[("a", (1, 2)), ("a", (1, 3))]
bigflow.transforms.sort
(pcollection, reverse=False)
对于输入PCollection,将其进行排序
参数: |
|
---|---|
返回: | 排序结果 |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([3, 1, 2, 8])
>>> transforms.sort(_p).get()
[1, 2, 3, 8]
bigflow.transforms.sort_by
(pcollection, key, reverse=False)
对于输入PCollection,使用给定的key将其进行排序
参数: |
|
---|---|
返回: | 排序结果 |
返回类型: |
>>> from bigflow import transforms
>>> from bigflow.transform_impls import sort
>>> _p = _pipeline.parallelize([3, 1, 2, 8])
>>> p2 = _pipeline.parallelize([(1, 2), (3, 4), (3, 5), (2, 6), (2, 4)]
>>> transforms.sort_by(_p).get()
>>> transforms.sort_by(p2, lambda rec:[sort.ASC(rec[0]), sort.DESC(rec[1])]).get()
[1, 2, 3, 8]
[(1, 2), (2, 6), (2, 4), (3, 5), (3, 4)]
注解
sort时所有元素类型必须相同,否则可能出现结果不正确。例如,元素1与元素2.0可能排序结果不正确。 但作为排序的key的多列可以类型不同。
sort/sort_by后的数据集只有以下操作可以保证顺序:
- accumulate,transform操作
- aggregate的第一个聚合函数(即签名为O+I=>O的那个函数)。
- first 和 take (暂不保证语义,未来会支持)
由于操作性质,其它操作保序也无意义(或语义不明),故不保证顺序。
3. sort后调用write并不保证顺序。如果想保证输出有序,可以参考此文档: http://bigflow.cloud/zh/rst/bigflow.output.html
bigflow.transforms.str_to_idl
(pcollection, **options)
对于给定的PCollection,对每条数据执行idl打包。要求输入的数据类型为str。
参数: |
|
---|---|
返回: | 处理后的PCollection |
返回类型: |
bigflow.transforms.substract
(pcollection1, pcollection2)
此接口已废弃。请使用subtract。
bigflow.transforms.subtract
(pcollection1, pcollection2)
对于给定的PCollection1和PCollection2,返回所有存在于PCollection1但不在PCollection2 中的元素,相当于做容器减法
参数: |
|
---|---|
返回: | 表示结果的PCollection |
返回类型: |
>>> from bigflow import transforms
>>> a = _pipeline.parallelize([1, 2, 3, 4])
>>> b = _pipeline.parallelize([1, 2, 5])
>>> transforms.subtract(a, b).get()
[3, 4]
bigflow.transforms.sum
(pcollection, **options)
对于输入PCollection,求其所有包含元素相加的结果
参数: |
|
---|---|
返回: | 表示结果的PObject |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.sum(_p).get()
10
bigflow.transforms.take
(pcollection, n, **options)
取给定PCollection中的任意n个元素。 (如果总元素数量不足n,则返回输入pcollection)
参数: |
|
---|---|
返回: | 表示结果的PCollection |
返回类型: |
>>> from bigflow import transforms
>>> _p = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.take(_p, 3).get()
[1, 2, 3]
>>> _n = _pipeline.parallelize(2)
>>> transforms.take(_p, _n).get()
[1, 2]
>>> _n = _pipeline.parallelize([1, 2, 3, 4])
>>> transforms.take(_p, 10).get()
[1, 2, 3, 4]
bigflow.transforms.to_list_pobject
(pvalue, **options)
对于给定的PCollection,聚合为PObject,PObject的内容为list
参数: |
|
---|---|
返回: | 聚合后的list |
返回类型: |
>>> from bigflow import transforms
>>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> transforms.to_list_pobject(a).get()
[1, 1, 2, 3]
>>> type(a)
bigflow.pcollection.PCollection
>>> a.map(lambda x: str(type(x))).get()
["<type 'int'>", "<type 'int'>", "<type 'int'>", "<type 'int'>"]
>>> b = transforms.to_list_pobject(a)
>>> type(b)
bigflow.pobject.PObject
>>> b.map(lambda x: str(type(x))).get()
"<type 'list'>"
注解
这个是最易被滥用的一个transform。 它可以使一个PCollection转化为一个元素为list的PObject, 用户可以在后续的map操作中拿到这个list进行任意的单机操作, 在一些场景下,如复用现有单机代码时,比较有用。 但是,该变换将使得Bigflow许多优化无法执行,导致运行效率下降, 另外,在apply_values中使用时, 由于每个分组的数据必须转化为一个list, 则导致在一个分组内数据过多时,会占用大量内存资源, 甚至可能引起作业因内存占用过多而Out-Of-Memory失败。
故,使用该变换前,请三思,尽量使用以下其它算子替换掉此方法, 一些较为通用的替代方案如下(下列替换方案,按顺序前边的比后边的效率高):
bigflow.transforms.to_pobject
(pvalue, **options)
对于给定的PCollection/PTable,聚合为PObject,PObject的内容为list/dict
参数: |
|
---|---|
返回: | 聚合后的list/dict |
返回类型: |
>>> from bigflow import transforms
>>> a = _pipeline.parallelize([1, 1, 2, 3])
>>> transforms.to_pobject(a).get()
[1, 1, 2, 3]
>>> from bigflow import transforms
>>> b = _pipeline.parallelize({'e': 'f', 'g': 'h'})
>>> transforms.to_pobject(b).get()
{'e': 'f', 'g': 'h'}
bigflow.transforms.transform
(pcollection, first_arg, other_args, options*)
对给定PCollection进行任意的变换,结果为另一个PCollection
transform有两种形式,形式一:
基本原型为`transform(pcollection, initializer, transformer, finalizer, *side_inputs, **options)`
transform将PCollection的处理分为3个阶段: 初始化,遍历及结束,分别对应于 initializer, transformer和finalizer三个处理函数。三个函数之间有一个状态 status(也可以理解为上下文context),同时有一个emitter参数可以向输出PCollection发送数据
假定输入数据类型为I,输出数据类型为O,initializer, transformer, finalizer各自的期望签名为:
initializer(emitter, *side_inputs) => status(object)
transformer(status, emitter, I, *side_inputs) => status(object) (transformer的第一个参数允许被修改)
finalizer(status, emitter, *side_inputs) => None (finalizer的第一个参数允许被修改)
emitter.emit(O)
参数: |
|
---|---|
返回: | 表示返回结果的PCollection |
返回类型: |
>>> from bigflow import transforms
>>> import copy
>>> def initializer(emitter):
>>> return []
>>>
>>> def transformer(status, emitter, inp):
>>> status.append(copy.deepcopy(inp)) #如果要缓存一个数据,最好复制一份。
>>> return status
>>>
>>> def finalizer(status, emitter):
>>> emitter.emit(status)
>>>
>>> _p = _pipeline.parallelize([1, 2, 3])
>>> _plist = transforms.transform(_p, initializer, transformer, finalizer)
>>> print _plist.count().get() # 只有一个元素,元素的内容是[1, 2, 3]这样一个列表。
1
>>> print _plist.get()
[[1, 2, 3]]
形式二:
基本原型为`transform(pcollection, transformer, *side_inputs, **options)` 其中transformer应为 bigflow.base.Transformer
类的子类, Transformer.begin_process在数据开始处理前会被调用。 Transformer.process在数据开始处理时,每条数据调用一次,传入需要的数据。 Transformer.end_process在数据处理完成后被调用。 用户需要输出的数据以列表或其它可迭代对象的形式返回,其中所有元素都会被作为输出PCollection中的一个元素。 (注意,如果不需要输出请返回一个空的[],而不要返回None)
>>> class SumTransformer(base.Transformer):
...
... def begin_process(self):
... self._sum = 0
... return []
...
... def process(self, record):
... self._sum += record
... return []
...
... def end_process(self):
... yield self._sum
...
>>> p1 = _pipeline.parallelize([1, 2, 3])
>>> transforms.transform(p1, SumTransformer).get()
6
>>> class PartialSumTransformer(base.Transformer):
...
... def begin_process(self):
... self._sum = 0
... return []
...
... def process(self, record):
... self._sum += record
... yield self._sum
...
>>> transforms.transform(p1, PartialSumTransformer()),get()
[1, 3, 6]
>>> class ZipTransformer(base.Transformer):
...
... def begin_process(self, *si):
... self.index = 0
... lens = map(len, si)
... self.min_len = min(lens)
... return []
...
... def process(self, inp, *si):
... if self.index < self.min_len:
... yield (inp, ) + tuple(map(lambda x: x[self.index], si))
... self.index += 1
...
>>> p2 = _pipeline.parallelize([4, 5]).sort()
>>> transforms.transform(p1, ZipTransformer(), p2).get()
[(1, 4), (2, 5)]
本方法为Bigflow所提供的最底层和最复杂的变换方法,它可以表达对PCollection 的任意变换。
在有其它函数(如aggregate)能完成同样功能时,尽量不要使用该函数,框架无法了解该函数内部实现, 无法进行许多深层次的优化工作。
bigflow.transforms.union
(pvalues, options*)
对于多个输入PCollection/PObject,返回包含它们所有元素的PCollection
输入PCollection必须为同类型
参数: | *pvalues — 输入PCollection/PObject |
---|---|
返回: | 表示结果的PCollection |
返回类型: | PCollection |
>>> from bigflow import transforms
>>> _p1 = _pipeline.parallelize([1, 2, 3, 4])
>>> _p2 = _pipeline.parallelize([5, 6, 7, 8])
>>> transforms.union(_p1, _p2).get()
[1, 2, 3, 4, 5, 6, 7, 8]
bigflow.transforms.window_into
(pcollection, window, **options)
利用给定的window对输入PCollection分组,返回一个表示分组结果的PTable
参数: |
|
---|---|
返回: | 分组结果 |
返回类型: |