Transforms

定义Bigflow Python中所有的变换

Author: Wang, Cong(bigflow-opensource@baidu.com), panyunhong(bigflow-opensource@baidu.com)

注意:除特殊说明外,所有变换的用户自定义方法(UDF)输入参数都不允许修改

bigflow.transforms.accumulate(pcollection, zero, accumulate_fn, side_inputs, options*)

将给定的PCollection按照一个初始值和方法聚合为PObject

假设输入类型为I,输出类型为O,则zero、accumulate_fn的期望签名为:

zero: O或zero() => O

accumulate_fn: accumulate_fn(O, I) => O (accumulate_fn的第一个参数允许被修改)

参数:
  • pcollection (PCollection) — 输入PCollection
  • zero (value or function) — 初始值,或是一个返回初始值的方法
  • accumulate_fn (function) — 聚合方法
  • side_inputs — 参与运算的SideInputs
  • *options — 可配置选项
返回:

聚合结果

返回类型:

PObject

由于该函数的语义是数据必然按顺序一条条的流过,限制了该函数可以进行的优化工作, 所以如果可以使用aggregate或reduce替换时,尽量使用aggregate/reduce替换该函数。

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 2, 3])
  3. >>> transforms.accumulate(_p, 0, lambda x, y: x + y).get()
  4. 6

TODO: Another example

bigflow.transforms.aggregate(pcollection, zero, aggregate_fn, combine_fn, side_inputs, options*)

将给定的PCollection按照初始值、初段聚合方法和汇总方法聚合为PObject

假设输入类型I,输出类型为O,则zero、aggregate_fn、combine_fn的期望签名为:

zero: O或zero() => O

aggregate_fn: aggregate_fn(O, I) => O (aggregate_fn的第一个参数允许被修改)

combine_fn: combine_fn(O, O) => O (combine_fn的第一个参数允许被修改)

在执行时aggregate会把输入pcollection先切分成许多个分片,然后对每个分片使用zero生成一个O类型的初始值。

随后,在每个分片上,持续调用aggregate_fn,将分片上全部的数据聚合为一个O类型的值。

最后,会再将所有分片上生成的那些O类型的值汇聚到一起,使用combine_fn最终聚合到一起。

分片的规则用户不应作任何假设。

参数:
  • pcollection (PCollection) — 输入PCollection
  • zero (value or callable) — 初始值,或是一个返回初始值的方法。
  • aggregate_fn (callable) — 初段聚合方法。该方法需要两个参数。
  • combine_fn (callable) — 汇总方法
  • side_inputs — 参与运算的SideInputs
  • *options — 可配置选项
返回:

聚合结果

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize(["viva", "la", "vida"])
  3. >>> transforms.aggregate(_p, 0, lambda x, y: x + len(y), lambda x, y: x + y).get() # sum words length
  4. 10

bigflow.transforms.cartesian(pcollections, options*)

对多个输入PCollection求笛卡尔积,返回一个PCollection

参数:
  • pcollections — 输入PCollection
  • *options — 可配置选项
返回:

笛卡尔积

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> p1 = _pipeline.parallelize([1, 2, 3])
  3. >>> p2 = _pipeline.parallelize(['a', 'b', 'c'])
  4. >>> transforms.cartesian(p1, p2)
  5. [(1, 'a'), (1, 'b'), (1, 'c'), (2, 'a'), (2, 'b'), (2, 'c'), (3, 'a'), (3, 'b'), (3, 'c')]
  1. >>> p3 = _pipeline.parallelize(1)
  2. >>> p4 = _pipeline.parallelize(2)
  3. >>> p3.cartesian(p4).get()
  4. [(1, 2)]
  5. >>> p5 = _pipeline.parallelize([3, 4])
  6. >>> p3.cartesian(p5).get()
  7. [(1, 3), (1, 4)]
  8. >>> p3.cartesian(p4, p5).get()
  9. [(1, 2, 3), (1, 2, 4)]

bigflow.transforms.cogroup(pcollections, options*)

对传入的所有pcollection进行协同分组。

cogroup要求所有传入的PCollection的每个元素都是一个(k, v)对, cogroup会用k来作为分组的key,对多个输入PCollection进行协同分组, 返回一个PTable表示分组结果。

这个返回的PTable的每个value为一个tuple,tuple的每个元素是一个PCollection, 其中第n个PCollection表示输入的第n个PCollection在当前key下的全部数据。

如果某个输入PCollection在某个key下无数据,则对应的PCollection为一个空PCollection。

目前不能像group_by指定key_extractor。

group_by_key可以理解成是cogroup只传有一个参数的特殊情况。

参数:
  • pcollections — 输入PCollection
  • *options — 可配置选项
返回:

分组结果

返回类型:

PTable

  1. >>> from bigflow import transforms
  2. >>> _p1 = _pipeline.parallelize([("A", 1), ("A", 2), ("B", 3)])
  3. >>> _p2 = _pipeline.parallelize([("A", 4)])
  4. >>> _p = transforms.cogroup(_p1, _p2)
  5. # _p的值为{"A": ([1, 2], [4]), "B": ([3], [])} ,但由于实现难度较大,PTable的value为tuple of PCollection时的get操作暂不支持。
  6. >>> _p.apply_values(lambda x, y: transforms.union(x, y)).get()
  7. {"A": [1, 2, 4], "B": [3]}
  1. >>> def distinct_and_join(p, q): # 去重,并join
  2. ... return p.cogroup(q) \
  3. ... .apply_values(lambda a, b: (a.distinct(), b.distinct())) \
  4. ... .apply_values(transforms.cartesian) \
  5. ... .flatten()
  6. >>> _p1 = _pipeline.parallelize([("A", 1), ("A", 2), ("A", 1), ("C", 1)])
  7. >>> _p2 = _pipeline.parallelize([("A", 3), ("A", 3), ("B", 2)])
  8. >>> print distinct_and_join(_p1, _p2).get()
  9. [("A", (1, 3)), ("A", (2, 3))]
  10. >>> # 未来bigflow会自动将p.distinct().join(q.distinct())优化成上边的样子(正在进行中)
  1. >>> def semi_join(p, q): # 同key的join结果只输出一条
  2. ... return p.cogroup(q) \
  3. ... .apply_values(lambda a, b: (a.take(1), b.take(1))) \
  4. ... .apply_values(transforms.cartesian) \
  5. ... .flatten()
  6. >>> print semi_join(_p1, _p2).get()
  7. [("A", (1, 3))]

bigflow.transforms.combine(pcollection, fn, **options)

给定一个合并函数,聚合输入PCollection中所有元素,这些元素以迭代器的形式给出

默认情况下,输入类型与输出类型需要一致,假设为O类型,fn的期望签名为 fn([O…]) => O,[]表示输入可遍历

在执行时会把输入pcollection先切分成许多个分片,然后对每个分片的数据组成一个列表,然后调用fn, 将每个分片中的数据合并成一个O类型的变量。 然后,会再将所有分片上生成的那些O类型的值汇聚到一起,组成一个列表,再使用fn最终聚合到一起。

分片的规则用户不应作任何假设。

用户可以显式的指定pre_combine=False,关掉预聚合。如果关掉预聚合,则会直接将全部的数据组成一个列表交给fn, 聚合成一个值。则这种情况下,fn需要的输入类型与fn返回的类型可以是不同类型的。

参数:
  • pcollection (PCollection) — 输入PCollection
  • fn (function) — 合并函数
  • **options — 可配置选项 其中重要配置项为: pre_combine(bool): 是否进行预聚合。默认为True。
返回:

合并结果

返回类型:

PObject

  1. >>> _p = _pipeline.parallelize([2, 4, 6, 10])
  2. >>> transforms.combine(_p, sum).get()
  3. 22
  4. >>> number_list = self._pipeline.parallelize(['2', '3', '4'])
  5. >>> to_lookup = {'2': 2, '3': 3, '4': 4}
  6. >>> def lookup_dict_and_sum(numbers):
  7. ... return sum(map(lambda s: to_lookup[s], numbers))
  8. >>> number_list.combine(lookup_dict_and_sum, pre_combine=False).get()
  9. 9
  10. >>> number_list.combine(lookup_dict_and_sum).get()
  11. Error may occur (because of pre_combine, 1)

bigflow.transforms.count(pcollection, **options)

返回给定PCollection中元素的数量

参数:
  • pcollection (PCollection) — 输入PCollection
  • **options — 可配置选项
返回:

元素数量

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> transforms.count(_p).get()
  4. 4

bigflow.transforms.diff(pcollection1, pcollection2)

对于给定的PCollection1和PCollection2,返回两者不相同的元素

参数:
返回:

表示差异的PCollection

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> a = _pipeline.parallelize([1, 1, 2, 3])
  3. >>> b = _pipeline.parallelize([1, 1, 2, 2])
  4. >>> transforms.diff(a, b).get()
  5. [(2, (1, 2)), (3, (1, 0))]

bigflow.transforms.distinct(pcollection, **options)

返回给定PCollection中所有不重复元素

参数:
  • pcollection (PCollection) — 输入PCollection
  • **options — 可配置选项
返回:

不重复元素,以PCollection给出

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([2, 2, 1, 9, 3, 3])
  3. >>> transforms.distinct(_p).get()
  4. [2, 3, 1, 9]

bigflow.transforms.extract_keys(ptable, **options)

提取给定PTable中所有的key

参数:
  • ptable (PTable) — 输入PTable
  • **options — 可配置选项
返回:

所有的key,以PCollection给出

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
  3. >>> transforms.extract_keys(_p).get()
  4. ["A", "B"]

bigflow.transforms.extract_values(ptable, **options)

提取给定PTable中所有的value

参数:
  • ptable (PTable) — 输入PTable
  • **options — 可配置选项
返回:

所有的value,以PCollection给出

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
  3. >>> transforms.extract_values(_p).get()
  4. [2, 3, 4, 5]

无论PTable为多少层嵌套,都会抽取出最内层的value

  1. >>> _p = _pipeline.parallelize({"A": {"a": [2, 3], "b": [1, 4]}, "B": {"c": [6, 7], "d": [9, 9]}}
  2. >>> print _p
  3. >>> {k0: {k1: [...]}} # 嵌套PTable
  4. >>> transforms.extract_values(_p).get()
  5. >>> [2, 3, 1, 4, 6, 7, 9, 9]

bigflow.transforms.filter(pcollection, fn, side_inputs, options*)

对于给定的PCollection和一个断言函数,返回只满足断言函数元素的PCollection

假设输入类型为I,fn的期望签名为 fn(I) => bool

参数:
  • pcollection (PCollection) — 输入PCollection
  • fn (function) — 断言函数
  • **options — 可配置选项
返回:

过滤结果

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
  3. >>> transforms.filter(_p, lambda x: x % 2 == 0).get()
  4. [2, 8]

bigflow.transforms.first(pcollection, **options)

取出PCollection中的第一个元素

参数:
  • pcollection (PCollection) — 输入PCollection
  • **options — 可配置选项
返回:

取出的单个元素,以PObject给出

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
  3. >>> transforms.first(_p).get()
  4. 3

bigflow.transforms.flat_map(pvalue, fn, side_inputs, options*)

对PCollection中的每个元素做一对N映射

对变换函数必须返回一个可遍历变量(即实现了__iter__()方法),将迭代器中的所有元素 构造PCollection

假设输入类型为I,fn的期望签名为 fn(I) => [O…],[]表示返回结果可遍历

参数:
  • pvalue (PCollection or PObject) — 输入P类型
  • fn (function) — 变换函数
  • side_inputs — 参与运算的SideInputs
  • *options — 可配置选项
  • Results:

    PCollection: 变换后的PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 3, 5, 7])
  3. >>> transforms.flat_map(_p, lambda x: [x, x * 2]).get()
  4. [1, 2, 3, 5, 6, 7, 10, 14]
  5. >>> transforms.flat_map(_p, lambda x: [[x, x * 2]]).get()
  6. [[1, 2], [3, 6], [5, 10], [7, 14]]
  7. >>> transforms.flat_map(_p, lambda x: (x, x * 2)).get()
  8. [1, 2, 3, 6, 5, 10, 7, 14]
  9. >>> transforms.flat_map(_p, lambda x: [(x, x * 2)]).get()
  10. [(1, 2), (3, 6), (5, 10), (7, 14)]

注意返回结果可以为空:

  1. >>> transforms.flat_map(_p, lambda x: [])
  2. []

如果返回的对象不能被遍历,则运行时会报错。 典型的错误用法包括None或返回一个单个元素。

返回对象只要是可迭代类型即可,不必一定是list。 特别是,需要输出较多数据时, 使用list可能会导致内存占用过大, 用户可以直接利用python的yield语法生成一个generator, 达到不需要占用大量内存的目的。

  1. >>> _p = _pipeline.parallelize([3, 5])
  2. >>> def make_partial_sum(x):
  3. ... sum = 0
  4. ... for i in xrange(1, x + 1):
  5. ... sum += i
  6. ... yield sum
  1. >>> transforms.flat_map(_p, make_partial_sum)
  2. [1, 3, 6, 1, 3, 6, 10, 15]

这种用法可以避免产生大list,从而避免内存占用过大的问题。

bigflow.transforms.flatten(ptable, **options)

对于给定PTable中的key和value中每一个元素,构造(key, value)对,结果保存在PCollection中

参数:
  • ptable (PTable) — 输入PTable
  • **options — 可配置选项
返回:

(key, value)对,结果以PCollection表示

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
  3. >>> transforms.flatten(_p).get()
  4. [("A", 2), ("A", 3), ("B", 4), ("B", 5)]

bigflow.transforms.flatten_values(ptable, **options)

等价于 extract_values(ptable)

参数:
  • ptable (PTable) — 输入PTable
  • **options — 可配置选项
返回:

所有的value,以PCollection给出

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize({"A": [2, 3], "B": [4, 5]})
  3. >>> transforms.flatten_values(_p).get()
  4. [2, 3, 4, 5]

bigflow.transforms.foreach(pvalue, fn, side_inputs, options*)

对给定的PCollection/PObject中的每个元素应用一个函数,函数并不期望有任何的 返回,而是利用其副作用产生效果。 该函数一般用于产出数据到外部存储,同一条数据可能会被多次调用, 用户需要注意在下游去重,或想办法保证foreach操作具有幂等性质。

假设输入类型为I,fn的期望签名为 fn(I) => object,即返回值类型任意(并被忽略)

参数:
  • pvalue (PCollection or PObject) — 输入P类型
  • fn (function) — 变换函数
  • side_inputs — 参与运算的SideInputs
  • *options — 可配置选项
  • Results:

    None

  1. >>> from bigflow import lazy_var
  2. >>> r = lazy_var.declare(lambda: redis.Redis(host='x.x.x.x', port=x, db=0))
  3. >>> x = _pipeline.parallelize([("a", "1"), ("b", "4")])
  4. >>> x.foreach(lambda (k, v): r.get().set(k, v))
  5. >>> _pipeline.run() # all the data will be written into redis

bigflow.transforms.full_join(pcollections, options*)

对于多个输入PCollection,根据key对PCollection做全连接操作 ,连接结果为(key, (value 1, value 2, …, value n)),若第m个PCollection没有元素, 则value m为None

参数:
  • pcollections — 输入PCollection
  • *options — 可配置选项
返回:

连接结果

返回类型:

PCollection

  1. >>> x = _pipeline.parallelize([("a", 1)])
  2. >>> y = _pipeline.parallelize([("b", 2)])
  3. >>> transforms.full_join(x, y).get()
  4. [("a", (1, None)), ("b", (None, 2))]

bigflow.transforms.group_by(pcollection, key_extractor, value_extractor=None, **options)

利用给定的key_extractor和value_extractor对输入PCollection分组,返回一个表示分组结果的PTable

参数:
  • pcollection (PCollection) — 输入PCollection
  • key_extractor (function) — 用于提取key的函数
  • value_extractor (function, optional) — 用于提取value的函数
  • **options — 可配置选项
返回:

分组结果

返回类型:

PTable

  1. >>> _p = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
  2. >>> transforms.group_by(_p, lambda x: x[0], lambda x: x[1]).get()
  3. {"A": [4, 3, 1], "B": [2]}

bigflow.transforms.group_by_key(pcollection, **options)

利用给定的PCollection,使用一个默认的key/value提取函数对输入的PCollection分组 ,返回一个表示分组的PTable

参数:
  • pcollection (PCollection) — 输入PCollection
  • **options — 可配置选项
返回:

分组结果

返回类型:

PTable

  1. >>> _p = _pipeline.parallelize([("A", 4), ("A", 3), ("B", 2), ("A", 1)])
  2. >>> transforms.group_by_key(_p).get()
  3. {"A": [4, 3, 1], "B": [2]}

bigflow.transforms.idl_to_str(pcollection, **options)

对于给定的PCollection,对每条数据执行idl解包。并过滤掉idl packet类型为Heartbeat和EOF的数据。

参数:
  • pcollection (PCollection) — 输入
  • **options

    可配置选项

    log_type: idl数据类型,目前支持log_text和log_bin,默认为log_text

返回:

处理后的PCollection

返回类型:

PCollection

bigflow.transforms.intersection(pcollection1, pcollection2, output_duplicated=False)

对于给定的PCollection1和PCollection2,返回所有同时存在于PCollection1和PCollection2 中的元素,即取两者交集

参数:
返回:

相交结果

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> a = _pipeline.parallelize([1, 1, 2, 3])
  3. >>> b = _pipeline.parallelize([1, 1, 2, 2, 5])
  4. >>> transforms.intersection(a, b).get()
  5. [1, 2]
  6. >>> transforms.intersection(a, b, output_duplicated = True).get()
  7. [1, 1, 2]

bigflow.transforms.is_empty(pcollection)

对于输入PCollection,返回其是否为空

参数:pcollection (PCollection) — 输入PCollection
返回:表示返回结果的PObject
返回类型:PObject
  1. >>> from bigflow import transforms
  2. >>> a = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> transforms.is_empty(a).get()
  4. False
  5. >>> b = _pipeline.parallelize([])
  6. >>> transforms.is_empty(b).get()
  7. True

bigflow.transforms.join(pcollections, options*)

对于多个输入PCollection,根据key对PCollection做内连接操作 ,连接结果为(key, (value1, value2, …, valuen))

参数:
  • pcollections — 输入PCollection
  • *options — 可配置选项
返回:

连接结果

返回类型:

PCollection

  1. >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
  2. >>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
  3. >>> transforms.join(x, y).get()
  4. [("a", (1, 2)), ("a", (1, 3))]

bigflow.transforms.left_join(pcollections, options*)

对于多个输入PCollection,根据key对PCollection做左连接操作 ,连接结果为(key, (value 1, value 2, …, value n)),若第m个PCollection没有元素, 则value m为None

参数:
  • pcollections — 输入PCollection
  • *options — 可配置选项
返回:

连接结果

返回类型:

PCollection

  1. >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
  2. >>> y = _pipeline.parallelize([("a", 2)])
  3. >>> transforms.left_join(x, y).get()
  4. [("a", (1, 2)), ("b", (4, None))]

bigflow.transforms.make_tuple(pobjects, options*)

将所有输入的PObject合并成一个PObject(tuple)。

除返回值类型及输入类型都全是PObject外, 结果与 bigflow.transforms.cartesian(self, *pvalues, **options) 相同。

Args: *pobjects (PObject) 待操作PObjects。所有输入都必须是PObject. :returns: 返回一个PObject(tuple), tuple中的第n个元素是第n个输入PObject对应的值。 :rtype: PObject

  1. >>> p1 = _pipeline.parallelize(1)
  2. >>> p2 = _pipeline.parallelize(2)
  3. >>> transforms.make_tuple(p1, p2).get()
  4. (1, 2)
  5. >>> p3 = _pipeline.parallelize([3, 4])
  6. >>> transforms.make_tuple(p1, p3).get()
  7. !!! AssertionError
  8. >>> transforms.make_tuple(p1, p1, p2).get()
  9. (1, 1, 2)

bigflow.transforms.map(pvalue, fn, side_inputs, options*)

对PCollection中的每个元素做一对一映射

对给定的PCollection/PObject中的每个元素应用一个变换函数,以函数的返回结果 构造PCollection/PObject

假设输入类型为I,fn的期望签名为 fn(I) => O

参数:
  • pvalue (PCollection or PObject) — 输入P类型
  • fn (function) — 变换函数
  • side_inputs — 参与运算的SideInputs
  • *options — 可配置选项
  • Results:

    PType: 变换后的PCollection/PObject,与输入类型一致

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 3, 5, 7])
  3. >>> transforms.map(_p, lambda x: x + 1).get()
  4. [2, 4, 6, 8]
  5. >>> transforms.map(_p, lambda x: [x, x * 2]).get()
  6. [[1, 2], [3, 6], [5, 10], [7, 14]]

bigflow.transforms.max(pcollection, key=None, **options)

得到输入PCollection中最大的元素

参数:
  • pcollection (PCollection) — 输入PCollection
  • key (function, optional) — 用于提取key的函数,与Python内置max()中的 key 参数相同
  • **options — 可配置选项
返回:

包含最大元素的PObject

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
  3. >>> transforms.max(_p).get()
  4. 8
  5. >>> transforms.max(_p, lambda val: -val).get()
  6. 1

bigflow.transforms.max_elements(pcollection, n, key=None, **options)

得到输入PCollection中前n大的元素

参数:
  • pcollection (PCollection) — 输入PCollection
  • n (int/PObject) — 必须大于0
  • key (function, optional) — 用于提取key的函数,与Python内置max()中的 key 参数相同
  • **options — 可配置选项
返回:

包含前n大元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
  3. >>> transforms.max_elements(_p, 2).get()
  4. [8, 7]
  5. >>> n = _p.count().map(lambda x: x - 1) # n is PObject(5)
  6. >>> transforms.max_elements(_p, n, lambda val: -val).get() # key is -val
  7. >>> [1, 3, 3, 2, 7]

bigflow.transforms.min(pcollection, key=None, **options)

得到输入PCollection中最小的元素

参数:
  • pcollection (PCollection) — 输入PCollection
  • key (function, optional) — 用于提取key的函数,与Python内置min()中的 key 参数相同
  • **options — 可配置选项
返回:

包含最小元素的PObject

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
  3. >>> transforms.min(_p).get()
  4. 1
  5. >>> transforms.min(_p, lambda val: -val).get()
  6. 8

bigflow.transforms.min_elements(pcollection, n, key=None, **options)

得到输入PCollection中前n小的元素

参数:
  • pcollection (PCollection) — 输入PCollection
  • n (int/PObject) — 必须大于0
  • key (function, optional) — 用于提取key的函数,与Python内置max()中的 key 参数相同
  • **options — 可配置选项
返回:

包含前n小元素的PCollection,注意对于n=1,这里仍然返回PCollection 而非PObject

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 7, 1, 3, 2, 8])
  3. >>> transforms.min_elements(_p, 2).get()
  4. [1, 2]
  5. >>> n = _p.count().map(lambda x: x - 1) # n is PObject(5)
  6. >>> transforms.min_elements(_p, n, lambda val: -val).get() # key is -val
  7. >>> [8, 3, 3, 2, 7]

bigflow.transforms.pipe(pvalue, command, **options)

对于给定的PCollection/PTable,返回通过command处理后的PCollection

参数:
  • pvalue (PCollection/PTable) — 输入
  • command — 命令行
  • **options

    可配置选项

    type: pipe类型,目前支持streaming和bistreaming,默认为streaming

    buffer_size: 缓存大小(单条数据),默认64MB

    input_fields_num: 输入command的一条数据有几个field,默认为1。 PTable上调用pipe不需要指定;如果PCollection上调用pipe需要输入多个field,则要指定改配置,并且PCollection的元素类型需为tuple

    output_fields_num: command输出的数据有几个field,默认为1

    field_delimiter: streaming模式下field的分割符,默认为tab(制表符)

返回:

处理后的PCollection

返回类型:

PCollection

注解

  1. pipe作用于PCollection上,pipe会将数据直接发送到管道中,框架对数据如何划分不做任何保证;

2. pipe作用于PTable上,pipe会将PTable的Key和数据一起发送到管道中(支持嵌套), 并保证相同Key的数据会顺序发送到管道中,例如下列代码:

  1. >>> from bigflow import transforms
  2. >>> p = _pipeline.parallelize({
  3. >>> 'key1_a': {
  4. >>> 'key2_a': ['value1', 'value2'],
  5. >>> 'key2_b': ['value3', 'value4']
  6. >>> },
  7. >>> 'key2_b': {
  8. >>> 'key2_c': ['value5', 'value6']
  9. >>> }
  10. >>> })
  11. >>> transforms.pipe(p, 'cat').get()

用户程序(cat)接收到的数据为,column间默认使用制表符(tab)作为分割符:

key1_a key2_a value1

key1_a key2_a value2

key1_a key2_b value3

key1_a key2_b value4

key1_b key2_c value5

key1_b key2_c value6

3. 尽量不要在PTable上通过apply_values中使用pipe(应该使用apply), 不仅性能极差而且发送给管道的数据不包含Key;

  1. >>> from bigflow import transforms
  2. >>> p = _pipeline.parallelize([1, 1, 2, 3])
  3. >>> transforms.pipe(p, 'cat').get()
  4. ['1', '1', '2', '3']
  1. >>> from bigflow import transforms
  2. >>> p = _pipeline.parallelize({'A': [1, 2], 'B': [2, 3]})
  3. >>> transforms.pipe(p, 'cat').get()
  4. ['A 1', 'A 2', 'B 2', 'B 3']
  1. >>> from bigflow import transforms
  2. >>> p = _pipeline.parallelize([(1, 'a'), (1, 'a'), (2, 'b'), (3, 'c')])
  3. >>> transforms.pipe(p, 'cat', type='bistreaming', input_fields_num=2, output_fields_num=2).get()
  4. [('1', 'a'), ('1', 'a'), ('2', 'b'), ('3', 'c')]

bigflow.transforms.reduce(pcollection, fn, side_inputs, options*)

对于属于PCollection,使用给定的fn将所有元素规约为单个元素

假设输入类型为I,fn的期望签名为 fn(I1, I2) => I,即输出的类型必须与输入相同 (fn的第一个参数允许被修改)

参数:
  • pcollection (PCollection) — 输入PCollection
  • fn (function) — 规约函数
  • side_inputs — 参与运算的SideInputs
  • *options — 可配置参数
返回:

规约结果

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> transforms.reduce(_p, lambda x, y: x + y).get()
  4. 10

bigflow.transforms.right_join(pcollections, options*)

对于多个输入PCollection,根据key对PCollection做右连接操作 ,连接结果为(key, (value 1, value 2, …, value n)),若第m个PCollection没有元素, 则value m为None

参数:
  • pcollections — 输入PCollection
  • *options — 可配置选项
返回:

连接结果

返回类型:

PCollection

  1. >>> x = _pipeline.parallelize([("a", 1), ("b", 4)])
  2. >>> y = _pipeline.parallelize([("a", 2), ("a", 3)])
  3. >>> transforms.right_join(x, y).get()
  4. [("a", (1, 2)), ("a", (1, 3))]

bigflow.transforms.sort(pcollection, reverse=False)

对于输入PCollection,将其进行排序

参数:
  • pcollection (PCollection) — 输入PCollection
  • reverse (bool) — 若True则降序排列,否则为升序排列
返回:

排序结果

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([3, 1, 2, 8])
  3. >>> transforms.sort(_p).get()
  4. [1, 2, 3, 8]

bigflow.transforms.sort_by(pcollection, key, reverse=False)

对于输入PCollection,使用给定的key将其进行排序

参数:
  • pcollection (PCollection) — 输入PCollection
  • key (function, optional) — 用于提取key的函数,与Python内置sort()中的 key 参数相同。提取的key不能为None。可以返回一个key的列表,每个key都可以分别按照升序或者降序排
  • reverse (bool) — 若True则降序排列,否则为升序排列
返回:

排序结果

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> from bigflow.transform_impls import sort
  3. >>> _p = _pipeline.parallelize([3, 1, 2, 8])
  4. >>> p2 = _pipeline.parallelize([(1, 2), (3, 4), (3, 5), (2, 6), (2, 4)]
  5. >>> transforms.sort_by(_p).get()
  6. >>> transforms.sort_by(p2, lambda rec:[sort.ASC(rec[0]), sort.DESC(rec[1])]).get()
  7. [1, 2, 3, 8]
  8. [(1, 2), (2, 6), (2, 4), (3, 5), (3, 4)]

注解

  1. sort时所有元素类型必须相同,否则可能出现结果不正确。例如,元素1与元素2.0可能排序结果不正确。 但作为排序的key的多列可以类型不同。

  2. sort/sort_by后的数据集只有以下操作可以保证顺序:

    • accumulate,transform操作
    • aggregate的第一个聚合函数(即签名为O+I=>O的那个函数)。
    • first 和 take (暂不保证语义,未来会支持)

    由于操作性质,其它操作保序也无意义(或语义不明),故不保证顺序。

3. sort后调用write并不保证顺序。如果想保证输出有序,可以参考此文档: http://bigflow.cloud/zh/rst/bigflow.output.html

bigflow.transforms.str_to_idl(pcollection, **options)

对于给定的PCollection,对每条数据执行idl打包。要求输入的数据类型为str。

参数:
  • pcollection (PCollection) — 输入
  • **options

    可配置选项

    log_type: idl数据类型,目前支持log_text和log_bin,默认为log_text

返回:

处理后的PCollection

返回类型:

PCollection

bigflow.transforms.substract(pcollection1, pcollection2)

此接口已废弃。请使用subtract。

bigflow.transforms.subtract(pcollection1, pcollection2)

对于给定的PCollection1和PCollection2,返回所有存在于PCollection1但不在PCollection2 中的元素,相当于做容器减法

参数:
  • pcollection1 (PCollection) — 作为被减数的PCollection
  • pcollection2 (PCollection) — 作为减数的PCollection
返回:

表示结果的PCollection

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> a = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> b = _pipeline.parallelize([1, 2, 5])
  4. >>> transforms.subtract(a, b).get()
  5. [3, 4]

bigflow.transforms.sum(pcollection, **options)

对于输入PCollection,求其所有包含元素相加的结果

参数:
  • pcollection (PCollection) — 输入PCollection
  • **options — 可配置参数
返回:

表示结果的PObject

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> transforms.sum(_p).get()
  4. 10

bigflow.transforms.take(pcollection, n, **options)

取给定PCollection中的任意n个元素。 (如果总元素数量不足n,则返回输入pcollection)

参数:
  • pcollection (PCollection) — 输入PCollection
  • n (int or PObject) — 元素数量
  • **options — 可配置参数
返回:

表示结果的PCollection

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> _p = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> transforms.take(_p, 3).get()
  4. [1, 2, 3]
  5. >>> _n = _pipeline.parallelize(2)
  6. >>> transforms.take(_p, _n).get()
  7. [1, 2]
  1. >>> _n = _pipeline.parallelize([1, 2, 3, 4])
  2. >>> transforms.take(_p, 10).get()
  3. [1, 2, 3, 4]

bigflow.transforms.to_list_pobject(pvalue, **options)

对于给定的PCollection,聚合为PObject,PObject的内容为list

参数:
  • pvalue (PCollection) — 输入
  • **options — 可配置选项
返回:

聚合后的list

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> a = _pipeline.parallelize([1, 1, 2, 3])
  3. >>> transforms.to_list_pobject(a).get()
  4. [1, 1, 2, 3]
  1. >>> type(a)
  2. bigflow.pcollection.PCollection
  1. >>> a.map(lambda x: str(type(x))).get()
  2. ["<type 'int'>", "<type 'int'>", "<type 'int'>", "<type 'int'>"]
  1. >>> b = transforms.to_list_pobject(a)
  2. >>> type(b)
  3. bigflow.pobject.PObject
  1. >>> b.map(lambda x: str(type(x))).get()
  2. "<type 'list'>"

注解

这个是最易被滥用的一个transform。 它可以使一个PCollection转化为一个元素为list的PObject, 用户可以在后续的map操作中拿到这个list进行任意的单机操作, 在一些场景下,如复用现有单机代码时,比较有用。 但是,该变换将使得Bigflow许多优化无法执行,导致运行效率下降, 另外,在apply_values中使用时, 由于每个分组的数据必须转化为一个list, 则导致在一个分组内数据过多时,会占用大量内存资源, 甚至可能引起作业因内存占用过多而Out-Of-Memory失败。

故,使用该变换前,请三思,尽量使用以下其它算子替换掉此方法, 一些较为通用的替代方案如下(下列替换方案,按顺序前边的比后边的效率高):

  1. bigflow.transforms.aggregate
  2. bigflow.transforms.transform

bigflow.transforms.to_pobject(pvalue, **options)

对于给定的PCollection/PTable,聚合为PObject,PObject的内容为list/dict

参数:
  • pvalue (PCollection/PTable) — 输入
  • **options — 可配置选项
返回:

聚合后的list/dict

返回类型:

PObject

  1. >>> from bigflow import transforms
  2. >>> a = _pipeline.parallelize([1, 1, 2, 3])
  3. >>> transforms.to_pobject(a).get()
  4. [1, 1, 2, 3]
  1. >>> from bigflow import transforms
  2. >>> b = _pipeline.parallelize({'e': 'f', 'g': 'h'})
  3. >>> transforms.to_pobject(b).get()
  4. {'e': 'f', 'g': 'h'}

bigflow.transforms.transform(pcollection, first_arg, other_args, options*)

对给定PCollection进行任意的变换,结果为另一个PCollection

transform有两种形式,形式一:

基本原型为`transform(pcollection, initializer, transformer, finalizer, *side_inputs, **options)`

transform将PCollection的处理分为3个阶段: 初始化,遍历及结束,分别对应于 initializer, transformer和finalizer三个处理函数。三个函数之间有一个状态 status(也可以理解为上下文context),同时有一个emitter参数可以向输出PCollection发送数据

假定输入数据类型为I,输出数据类型为O,initializer, transformer, finalizer各自的期望签名为:

initializer(emitter, *side_inputs) => status(object)

transformer(status, emitter, I, *side_inputs) => status(object) (transformer的第一个参数允许被修改)

finalizer(status, emitter, *side_inputs) => None (finalizer的第一个参数允许被修改)

emitter.emit(O)

参数:
  • pcollection (PCollection) — 输入PCollection
  • initializer (callable) — 初始化函数
  • transformer (callable) — 变换函数
  • finalizer (callable) — 结束函数
  • side_inputs — 参与计算的SideInputs
  • *options — 可配置选项
返回:

表示返回结果的PCollection

返回类型:

PCollection

  1. >>> from bigflow import transforms
  2. >>> import copy
  3. >>> def initializer(emitter):
  4. >>> return []
  5. >>>
  6. >>> def transformer(status, emitter, inp):
  7. >>> status.append(copy.deepcopy(inp)) #如果要缓存一个数据,最好复制一份。
  8. >>> return status
  9. >>>
  10. >>> def finalizer(status, emitter):
  11. >>> emitter.emit(status)
  12. >>>
  13. >>> _p = _pipeline.parallelize([1, 2, 3])
  14. >>> _plist = transforms.transform(_p, initializer, transformer, finalizer)
  15. >>> print _plist.count().get() # 只有一个元素,元素的内容是[1, 2, 3]这样一个列表。
  16. 1
  17. >>> print _plist.get()
  18. [[1, 2, 3]]

形式二:

基本原型为`transform(pcollection, transformer, *side_inputs, **options)` 其中transformer应为 bigflow.base.Transformer 类的子类, Transformer.begin_process在数据开始处理前会被调用。 Transformer.process在数据开始处理时,每条数据调用一次,传入需要的数据。 Transformer.end_process在数据处理完成后被调用。 用户需要输出的数据以列表或其它可迭代对象的形式返回,其中所有元素都会被作为输出PCollection中的一个元素。 (注意,如果不需要输出请返回一个空的[],而不要返回None)

  1. >>> class SumTransformer(base.Transformer):
  2. ...
  3. ... def begin_process(self):
  4. ... self._sum = 0
  5. ... return []
  6. ...
  7. ... def process(self, record):
  8. ... self._sum += record
  9. ... return []
  10. ...
  11. ... def end_process(self):
  12. ... yield self._sum
  13. ...
  14. >>> p1 = _pipeline.parallelize([1, 2, 3])
  15. >>> transforms.transform(p1, SumTransformer).get()
  16. 6
  1. >>> class PartialSumTransformer(base.Transformer):
  2. ...
  3. ... def begin_process(self):
  4. ... self._sum = 0
  5. ... return []
  6. ...
  7. ... def process(self, record):
  8. ... self._sum += record
  9. ... yield self._sum
  10. ...
  11. >>> transforms.transform(p1, PartialSumTransformer()),get()
  12. [1, 3, 6]
  1. >>> class ZipTransformer(base.Transformer):
  2. ...
  3. ... def begin_process(self, *si):
  4. ... self.index = 0
  5. ... lens = map(len, si)
  6. ... self.min_len = min(lens)
  7. ... return []
  8. ...
  9. ... def process(self, inp, *si):
  10. ... if self.index < self.min_len:
  11. ... yield (inp, ) + tuple(map(lambda x: x[self.index], si))
  12. ... self.index += 1
  13. ...
  14. >>> p2 = _pipeline.parallelize([4, 5]).sort()
  15. >>> transforms.transform(p1, ZipTransformer(), p2).get()
  16. [(1, 4), (2, 5)]

本方法为Bigflow所提供的最底层和最复杂的变换方法,它可以表达对PCollection 的任意变换。

在有其它函数(如aggregate)能完成同样功能时,尽量不要使用该函数,框架无法了解该函数内部实现, 无法进行许多深层次的优化工作。

bigflow.transforms.union(pvalues, options*)

对于多个输入PCollection/PObject,返回包含它们所有元素的PCollection

输入PCollection必须为同类型

参数:*pvalues — 输入PCollection/PObject
返回:表示结果的PCollection
返回类型:PCollection
  1. >>> from bigflow import transforms
  2. >>> _p1 = _pipeline.parallelize([1, 2, 3, 4])
  3. >>> _p2 = _pipeline.parallelize([5, 6, 7, 8])
  4. >>> transforms.union(_p1, _p2).get()
  5. [1, 2, 3, 4, 5, 6, 7, 8]

bigflow.transforms.window_into(pcollection, window, **options)

利用给定的window对输入PCollection分组,返回一个表示分组结果的PTable

参数:
  • pcollection (PCollection) — 输入PCollection
  • window (Window) — 输入Window
返回:

分组结果

返回类型:

PTable