Python编程指南Beta

Flink中的分析程序是实现数据集转换的常规程序(例如,Filter,映射,连接,分组)。数据集最初是从某些来源创建的(例如,通过读取文件或从集合中创建)。结果通过接收器返回,接收器可以例如将数据写入(分布式)文件或标准输出(例如命令行终端)。Flink程序可以在各种环境中运行,独立运行或嵌入其他程序中。执行可以在本地JVM中执行,也可以在许多计算机的集群上执行。

为了创建自己的Flink程序,我们建议您从程序框架开始,逐步添加自己的转换其余部分充当其他 算子操作和高级函数的参考。

示例程序

以下程序是WordCount的完整工作示例。您可以复制并粘贴代码以在本地运行它。

  1. from flink.plan.Environment import get_environment
  2. from flink.functions.GroupReduceFunction import GroupReduceFunction
  3. class Adder(GroupReduceFunction):
  4. def reduce(self, iterator, collector):
  5. count, word = iterator.next()
  6. count += sum([x[0] for x in iterator])
  7. collector.collect((count, word))
  8. env = get_environment()
  9. data = env.from_elements("Who's there?",
  10. "I think I hear them. Stand, ho! Who's there?")
  11. data \
  12. .flat_map(lambda x, c: [(1, word) for word in x.lower().split()]) \
  13. .group_by(1) \
  14. .reduce_group(Adder(), combinable=True) \
  15. .output()
  16. env.execute(local=True)

程序框架

正如我们在示例中看到的那样,Flink程序看起来像普通的python程序。每个程序包含相同的基本部分:

  • 获得一个Environment
  • 加载/创建初始数据,
  • 指定此数据的转换,
  • 指定计算结果的放置位置,和
  • 执行你的程序。我们现在将概述每个步骤,但请参阅相应部分以获取更多详细信息。

Environment是所有Flink计划的基础。你可以在课堂上使用这些静态方法获得一个Environment

  1. get_environment()

为了指定数据源,运行环境有几种从文件中读取的方法。要将文本文件作为一系列行读取,您可以使用:

  1. env = get_environment()
  2. text = env.read_text("file:///path/to/file")

这将为您提供一个DataSet,然后您可以在其上应用转换。有关数据源和输入格式的更多信息,请参阅数据源

拥有DataSet后,您可以应用转换来创建新的DataSet,然后您可以将其写入文件,再次转换或与其他DataSet结合使用。您可以通过使用自己的自定义转换函数调用DataSet上的方法来应用转换。例如,Map转换如下所示:

  1. data.map(lambda x: x*2)

这将通过将原始DataSet中的每个值加倍来创建新的DataSet。有关更多信息和所有转换的列表,请参阅转换

一旦有了需要写入磁盘的DataSet,就可以在DataSet上调用其中一个方法:

  1. data.write_text("<file-path>", WriteMode=Constants.NO_OVERWRITE)
  2. write_csv("<file-path>", line_delimiter='\n', field_delimiter=',', write_mode=Constants.NO_OVERWRITE)
  3. output()

最后一种方法仅对本地机器上的开发/调试有用,它会将DataSet的内容输出到标准输出。(请注意,在集群中,结果将转到集群节点的标准输出流,最终会出现在工作程序的.out文件中)。前两个顾名思义。有关写入文件的更多信息,请参阅数据接收器

一旦您指定的完整程序,你需要调用executeEnvironment这将提交您的程序以在群集上执行。

项目设置

除了设置Flink外,无需额外的工作。python包可以在Flink发行版的/ resource文件夹中找到。flink包以及计划和可选包在运行作业时通过HDFS自动分布到群集中。

Python API在安装了Python 2.7或3.4的Linux / Windows系统上进行了测试。

默认情况下,Flink将通过调用“python”来启动python进程。通过在flink-conf.yaml中设置“python.binary.path”键,您可以修改此行为以使用您选择的二进制文件。

懒惰的评价

所有Flink程序都是懒惰地执行:当执行程序的main方法时,数据加载和转换不会直接发生。而是创建每个 算子操作并将其添加到程序的计划中。execute()在Environment对象上调用其中一个方法时,实际执行这些 算子操作

懒惰的评估使您可以构建Flink作为一个整体计划单元执行的复杂程序。

转换

数据转换将一个或多个DataSet转换为新的DataSet。程序可以将多个转换组合到复杂的程序集中。

本节简要概述了可用的转换。转换文档与示例全部转换的完整描述。

</ tr> </ tr>

转型描述
Map采用一个数据元并生成一个数据元。
  1. data.map(lambda x: x 2)
FlatMap采用一个数据元并生成零个,一个或多个数据元。
  1. data.flat_map( lambda x,c: [(1,word) for word in line.lower().split() for line in x])
MapPartition在单个函数调用中转换并行分区。该函数将分区作为“迭代器”,并可以生成任意数量的结果值。每个分区中的数据元数量取决于并行度和先前的 算子操作。
  1. data.map_partition(lambda x,c: [value 2 for value in x])
Filter计算每个数据元的布尔函数,并保存函数返回true的数据元。
  1. data.filter(lambda x: x > 1000)
Reduce通过将两个数据元重复组合成一个数据元,将一组数据元组合成一个数据元。Reduce可以应用于完整数据集或分组数据集。
  1. data.reduce(lambda x,y : x + y)
ReduceGroup将一组数据元组合成一个或多个数据元。ReduceGroup可以应用于完整数据集或分组数据集。
  1. class Adder(GroupReduceFunction): def reduce(self, iterator, collector): count, word = iterator.next() count += sum([x[0] for x in iterator) collector.collect((count, word))data.reduce_group(Adder())
骨料在数据集或数据集的每个组中的所有元组的一个字段上执行内置 算子操作(sum,min,max)。聚合可以应用于完整数据集或分组数据集。
  1. # This code finds the sum of all of the values in the first field and the maximum of all of the values in the second fielddata.aggregate(Aggregation.Sum, 0).and_agg(Aggregation.Max, 1)# min(), max(), and sum() syntactic sugar functions are also availabledata.sum(0).and_agg(Aggregation.Max, 1)
Join通过创建在其键上相等的所有数据元对来连接两个数据集。(可选)使用JoinFunction将数据元对转换为单个数据元。如何定义连接Keys。
  1. # In this case tuple fields are used as keys.# "0" is the join field on the first tuple# "1" is the join field on the second tuple.result = input1.join(input2).where(0).equal_to(1)
CoGroupreduce 算子操作的二维变体。将一个或多个字段上的每个输入分组,然后关联组。每对组调用转换函数。如何定义CoGroup键。
  1. data1.co_group(data2).where(0).equal_to(1)
交叉构建两个输入的笛卡尔积(交叉乘积),创建所有数据元对。可选择使用CrossFunction将数据元对转换为单个数据元。
  1. result = data1.cross(data2)
Union生成两个数据集的并集。
  1. data.union(data2)
ZipWithIndex为每个数据元分配连续索引。有关详细信息,请参阅[Zip数据元指南](zip_elements_guide.html#zip-with-a-dense-index)。
  1. data.zip_with_index()

指定Keys

某些转换(如Join或CoGroup)要求在其参数DataSets上定义键,而其他转换(Reduce,GroupReduce)允许DataSet在应用之前在键上进行分组。

DataSet被分组为

  1. reduced = data \
  2. .group_by(<define key here>) \
  3. .reduce_group(<do something>)

Flink的数据模型不基于键值对。因此,您无需将数据集类型物理打包到键和值中。键是“虚拟的”:它们被定义为实际数据上的函数,以指导分组算子。

定义元组的键

最简单的情况是在元组的一个或多个字段上对元组的数据集进行分组:

  1. reduced = data \
  2. .group_by(0) \
  3. .reduce_group(<do something>)

数据集分组在元组的第一个字段中。因此,group-reduce函数将接收第一个字段中具有相同值的元组。

  1. grouped = data \
  2. .group_by(0,1) \
  3. .reduce(/*do something*/)

数据集在由第一个和第二个字段组成的复合键上分组,因此reduce函数将接收两个字段具有相同值的组。

关于嵌套元组的注释:如果你有一个带有嵌套元组的DataSet指定group_by(<index of tuple>)将导致系统使用完整的元组作为键。

某些 算子操作需要用户定义的函数,而所有 算子操作都接受lambda函数和丰富的函数作为参数。

  1. data.filter(lambda x: x > 5)
  1. class Filter(FilterFunction):
  2. def filter(self, value):
  3. return value > 5
  4. data.filter(Filter())

丰富的函数允许使用导入的函数,提供对广播变量的访问,可以使用init()进行参数化,并且是复杂函数的首选。它们也是combine为reduce 算子操作定义可选函数的唯一方法

Lambda函数允许轻松插入单线。请注意,如果 算子操作可以返回多个值,则lambda函数必须返回iterable。(所有接收收集器参数的函数)

数据类型

Flink的Python API目前仅提供对原始python类型(int,float,bool,string)和字节数组的本机支持。

可以通过将序列化程序,反序列化程序和类型类传递给环境来扩展类型支持。

  1. class MyObj(object):
  2. def __init__(self, i):
  3. self.value = i
  4. class MySerializer(object):
  5. def serialize(self, value):
  6. return struct.pack(">i", value.value)
  7. class MyDeserializer(object):
  8. def _deserialize(self, read):
  9. i = struct.unpack(">i", read(4))[0]
  10. return MyObj(i)
  11. env.register_custom_type(MyObj, MySerializer(), MyDeserializer())

元组/列表

您可以将元组(或列表)用于复合类型。Python元组映射到Flink Tuple类型,它包含各种类型的固定数量的字段(最多25个)。元组的每个字段都可以是原始类型 - 包括更多元组,从而产生嵌套元组。

  1. word_counts = env.from_elements(("hello", 1), ("world",2))
  2. counts = word_counts.map(lambda x: x[1])

使用需要Key进行分组或匹配记录的 算子时,使用元组可以简单地指定要用作键的字段的位置。您可以指定多个位置以使用复合键(请参见章节数据转换)。

  1. wordCounts \
  2. .group_by(0) \
  3. .reduce(MyReduceFunction())

数据源

数据源创建初始数据集,例如来自文件或集合。

基于文件的:

  • read_text(path) - 按行读取文件并将其作为字符串返回。
  • read_csv(path, type) - 解析逗号(或其他字符)分隔字段的文件。返回元组的DataSet。支持基本java类型及其Value对应作为字段类型。基于集合:

  • from_elements(*args) - 从Seq创建数据集。所有数据元

  • generate_sequence(from, to) - 并行生成给定间隔中的数字序列。例子
  1. env = get_environment
  2. \# read text file from local files system
  3. localLiens = env.read_text("file:#/path/to/my/textfile")
  4. \# read text file from a HDFS running at nnHost:nnPort
  5. hdfsLines = env.read_text("hdfs://nnHost:nnPort/path/to/my/textfile")
  6. \# read a CSV file with three fields, schema defined using constants defined in flink.plan.Constants
  7. csvInput = env.read_csv("hdfs:///the/CSV/file", (INT, STRING, DOUBLE))
  8. \# create a set from some given elements
  9. values = env.from_elements("Foo", "bar", "foobar", "fubar")
  10. \# generate a number sequence
  11. numbers = env.generate_sequence(1, 10000000)

数据接收

数据接收器使用DataSet并用于存储或返回它们:

  • writetext() - 按字符串顺序写入数据元。通过调用每个数据元的_str()方法获得字符串。
  • writecsv(…) - 将元组写为逗号分隔值文件。行和字段分隔符是可配置的。每个字段的值来自对象的_str()方法。
  • output()- 打印标准输出上每个数据元的str()值。可以将DataSet输入到多个 算子操作。程序可以编写或打印数据集,同时对它们执行其他转换。

例子

标准数据接收方法:

  1. write DataSet to a file on the local file system
  2. textData.write_text("file:///my/result/on/localFS")
  3. write DataSet to a file on a HDFS with a namenode running at nnHost:nnPort
  4. textData.write_text("hdfs://nnHost:nnPort/my/result/on/localFS")
  5. write DataSet to a file and overwrite the file if it exists
  6. textData.write_text("file:///my/result/on/localFS", WriteMode.OVERWRITE)
  7. tuples as lines with pipe as the separator "a|b|c"
  8. values.write_csv("file:///path/to/the/result/file", line_delimiter="\n", field_delimiter="|")
  9. this writes tuples in the text formatting "(a, b, c)", rather than as CSV lines
  10. values.write_text("file:///path/to/the/result/file")

广播变量

除了常规的 算子操作输入之外,广播变量还允许您为 算子操作的所有并行实例提供数据集。这对于辅助数据集或与数据相关的参数化非常有用。然后,算子可以将数据集作为集合访问。

  • 广播:广播集通过名称注册with_broadcast_set(DataSet, String)
  • 访问:可通过self.context.get_broadcast_variable(String)目标算子访问
  1. class MapperBcv(MapFunction):
  2. def map(self, value):
  3. factor = self.context.get_broadcast_variable("bcv")[0][0]
  4. return value * factor
  5. # 1. The DataSet to be broadcast
  6. toBroadcast = env.from_elements(1, 2, 3)
  7. data = env.from_elements("a", "b")
  8. # 2. Broadcast the DataSet
  9. data.map(MapperBcv()).with_broadcast_set("bcv", toBroadcast)

bcv注册和访问广播数据集时,请确保名称(在前面的示例中)匹配。

注意:由于广播变量的内容保存在每个节点的内存中,因此不应该变得太大。对于像标量值这样的简单事物,您可以简单地参数化丰富的函数。

并行执行

本节介绍如何在Flink中配置程序的并行执行。Flink程序由多个任务( 算子,数据源和接收器)组成。任务被分成几个并行实例以供执行,每个并行实例处理任务输入数据的子集。任务的并行实例数称为并行并行__度(DOP)

可以在不同级别的Flink中指定任务的并行度。

运行环境级别

Flink程序在运行环境的上下文中执行运行环境为其执行的所有算子,数据源和数据接收器定义默认并行性。可以通过显式配置 算子的并行性来覆盖运行环境并行性。

可以通过调用set_parallelism()方法来指定运行环境的默认并行性要以并行方式执行WordCount示例程序的所有 算子,数据源和数据接收器,请3按如下方式设置运行环境的默认并行度:

  1. env = get_environment()
  2. env.set_parallelism(3)
  3. text.flat_map(lambda x,c: x.lower().split()) \
  4. .group_by(1) \
  5. .reduce_group(Adder(), combinable=True) \
  6. .output()
  7. env.execute()

系统级别

可以通过设置parallelism.default属性来定义所有运行环境的系统范围默认并行度./conf/flink-conf.yaml有关详细信息,请参阅配置文档

执行计划

要使用Flink运行计划,请转到Flink分发,然后从/ bin文件夹运行pyflink.sh脚本。包含该计划的脚本必须作为第一个参数传递,然后是一些额外的python包,最后由 - 将被提供给脚本的其他参数分隔。

  1. ./bin/pyflink.sh <Script>[ <pathToPackage1>[ <pathToPackageX]][ - <param1>[ <paramX>]]