快速入门

本教程旨在提供一个使用Bigflow Python的快速上手演示。教程首先会通过在Python交互式环境(Interactive Shell)中一步一步展示Bigflow Python的基本概念和API,然后演示如何写一个完整的Bigflow Python程序。对于深入的概念和用法,请参照 编程指南 部分。

在Python shell中使用Bigflow Python

准备

Bigflow Python工作在Python 2.7环境中(其它版本尚未测试)。它使用标准的CPython解释器,同时用户也能够在代码中使用第三方的Python库。

获取 bigflow_python.tar.gz

保证系统环境中已经安装好hadoop client,并设置好HADOOP_HOME,Bigflow Python需要从Hadoop配置文件中读取相应的配置来访问HDFS。 保证系统环境中已经安装好spark client,并设置好SPARK_HOME,Bigflow Python需要从Spark配置文件中读取相应的配置来访问Spark。 保证系统环境中已经安装好了Java,并设置好了JAVA_HOME,需要使用java来运行spark。

用户通过 公有云 可以更方便的是使用Bigflow

运行 bigflow/bin/pyrun 即可在Python交互式环境中使用Bigflow Python。

接下来,我们可以导入相关的module了:

  1. from bigflow import base, transforms, input, output

Bigflow Python任务通过 pipeline 进行定义,它通过一些列对抽象数据集的变换用户计算任务。同时,pipeline也代表任务最终被提交到的计算引擎上。我们可以首先创建一个本地引擎来快速地试用:

  1. >>> pipeline = base.Pipeline.create("LOCAL")

接下来可以读一个文件:

  1. >>> text = pipeline.read(input.TextFile("some input path"))
  2. >>> print text
  3. [...]

pipeline.read() 将本地文件映射为一个 P类型 的实例,更确切的说,是一个 PCollection 。PCollection可以看作是一个分布式的Python list 结构。TextFile表示读取的类型为文本文件,它会将文本的每一行作为PCollection的元素。注意,这时我们还没有真正地读取数据,仅仅记录了一个文本文件到PCollection的映射,因此 print text 显示的内容为省略号(“[…]“)。我们可以使用 get() 方法触发Pipeline计算,将PCollection转化为一个Python的内置list:

  1. >>> print text.get()
  2. >>> ['An apple', 'a banana', ...(文本文件的每一行内容)]

PCollection定义了一系列的变换方法方便我们对其表示的数据进行计算,例如,可以使用 count() 方法统计元素的个数,也就是输入文本的行数:

  1. >>> line_num = text.count()
  2. >>> print line_num
  3. o

记住P类型实例的变换结果是另一个P类型实例,这次我们看到line_num通过一个 o 表示它是另一种P类型,PObject,它对应于单个值。我们可以再次调用 get() 方法看到结果:

  1. >>> print line_num.get()
  2. 54

我们可以再使用一个 distinct() 方法得到text中所有的不重复元素:

  1. >>> distinct_lines = text.distinct()
  2. >>> print distinct_lines
  3. >>> [...]

distinct() 方法的返回结果是一个PCollection,我们仍然可以使用 get() 方法得到其内容,或是使用 Pipeline.write() 方法把结果写到文件中:

  1. >>> pipeline.write(distinct_lines, output.TextFile("some output path"))

与 read() 方法类似,调用 write() 并不会马上触发文件的写操作,而仅仅将这个过程记录下来。这次我们可以使用 Pipeline.run() 触发Pipeline计算:

  1. >>> pipeline.run()

这时,distinct_lines的内容真正被写到了我们指定的输出路径中。

回顾一下,Bigflow Python会仅可能地推迟Pipeline的计算来对整个计算过程进行优化。触发计算的方式只有两种: get() 和 run() 方法。

更多的变换操作

Bigflow Python提供了众多的变换支持从简单到复杂的计算逻辑。比如说现在我们希望得到之前的文本文件中,单词最多的那一行:

  1. >>> line_with_num = text.map(lambda line: (len(line.split()), line))
  2. >>> line_with_num.reduce(lambda a, b: a if (a[0] > b[0]) else b).get()
  3. (3, "Whatever it uses")

第一行代码使用了 map() 变换,将每一行映射为一个tuple — (单词数, 行内容),映射结果为另一个PCollection;第二行代码使用 reduce() 变换,将PCollection中的每两个元素进行比较,返回单词数更大的元素,最终将所有的元素聚合为一,返回一个PObject。这里的 map() 和 reduce() 用法非常类似于Python内置的 map()reduce() 。 变换的参数是一个方法,例子中是Python的 匿名表达式(lambdas) 。我们也可以显式定义方法并使用:

  1. >>> def max(a, b):
  2. ... if a[0] > b[0]:
  3. ... return a
  4. ... else:
  5. ... return b
  6. ...
  7. >>> line_with_num.reduce(max).get()
  8. (3, "Whatever it uses")

在当前的分布式计算领域,广为人知的范式便是MapReduce。在Bigflow Python中,用户能够轻易地实现一个MapReduce范式,比如以经典的Word Count为例:

  1. >>> words = text.flat_map(lambda line: line.split()) # [...]
  2. >>> groups = words.group_by(lambda word: word, lambda word: 1) # {k0: [...]}
  3. >>> result = groups.apply_values(transforms.sum) # {k0: o}
  4. >>> print result.get()
  5. {"Whatever": 1, "it": 3, "use": 2}

第一行代码将输入的每一行映射为多个单词( flat_map() 变换是一个1到N的映射),第二行使用 group_by() 变换根据单词进行分组。分组的结果是一种新的P类型 — PTable。简单而言,PTable可以看作是分布式的Python dict 类型,其具有key到另一个P类型的映射:

  1. >>> print groups
  2. {k0: [...]}

例子中,groups是一个以单词为key,PCollection为value的PTable,PCollection的包含着多个’1’。我们可以使用 apply_values() 方法应用任何的变换到value,也就是PCollection上。例如,之前用过的 transforms.count()

  1. >>> result = groups.apply_values(transforms.count)
  2. >>> print result
  3. {k0: o}

现在结果是另一个PTable,value变为了PObject(单词数量)。

PTable可以通过 flatten() 变换转换为一个PCollection:

  1. >>> flatten_result = result.flatten()
  2. >>> print flatten_result
  3. [...]

PCollection的元素为(key, value) tuple:

  1. >>> print flatten.get()
  2. [("Whatever", 1), ("it", 3), ("use", 2)]

Bigflow Python中所有的变换可以在 transforms 查看接口说明和用法。

完整的程序

批处理引擎(spark)

之前的例子中,Pipeline运行在本地引擎上,生产环境中我们可以使用spark引擎处理真正的大规模数据。

把所有的代码放到一个py文件中,在创建Pipeline的时候指定”spark”作为Pipeline类型:

  1. """word_cnt.py"""
  2. from bigflow import base, transforms, input, output
  3. def word_cnt(p):
  4. return p.group_by(lambda x: x, lambda x: 1) \
  5. .apply_values(transforms.sum) \
  6. .flatten()
  7. pipeline = base.Pipeline.create("spark", tmp_data_path="some hdfs path") # now the job runs on DCE
  8. input_data = pipeline.read(input.TextFile("hdfs:///some input path"))
  9. result = input_data.flat_map(lambda line: line.split()) \
  10. .apply(word_cnt)
  11. pipeline.write(result, output.TextFile("hdfs:///some output path"))
  12. pipeline.run()

运行”bin/pyrun word_cnt.py”便可以把任务提交到spark上。

更多使用示例

examples

进阶教程

  • 更多的概念和介绍,请参照Bigflow Python 编程指南
  • Bigflow Python API索引 参考所有API的说明。