Greenplum MapReduce规范
该规范描述了文档格式以及用于定义Greenplum MapReduce任务的模式。
注释: 无论是MapReduce工具gpmapreduce)还是其相关的YAML规范均已被弃用,它们将在下一个Greenplum数据库的主发布(6.x)中被移除。作为替代,请使用其他的查询格式,例如SQL或者过程语言扩展。
MapReduce是谷歌开发的一个编程模型,用于在普通服务器阵列上处理和生成大量的数据集。Greenplum MapReduc允许熟悉MapReduce的程序员写map和reduce函数同时提交它们到Greenplum数据库并行引擎上用于处理。
为了能让Greenplum处理MapReduce函数,需要在一个文档中定义函数,然后将该文档传递给Greenplum的MapReduce程序,gpmapreduce,由Greenplum数据库并行引擎执行。Greenplum数据库系统会将输入的数据进行分布、在一系列的机器上执行程序、处理机器故障以及管理所需的内部机器间的通信。
关于 gpmapreduce的信息请见Greenplum数据库工具指南。
上级主题: Greenplum数据库参考指南
Greenplum MapReduce文档格式
这部分解释一些Greenplum MapReduce文档格式的基本知识来帮助用户开始创建自己的Greenplum MapReduce文档。Greenplum使用YAML 1.1文档格式同时对于定义一个MapReduce作业的各个步骤有自己的模式实现。
所有Greenplum MapReduce文件必须首先定义它们使用的YAML规范的版本。在此之后,三个破折号(-—)表示一个文档的是开始以及三个点号 (…) 指明一个不需要启动新文档的文档的结束。只是行以一个英镑符号(#)为前缀。也可以在相同的文件中声明多个Greenplum MapReduce文档:
%YAML 1.1
---
# Begin Document 1
# ...
---
# Begin Document 2
# ...
在一个Greenplum MapReduce 文档中,有三种数据结构或者节点的基本类型:标量、序列 以及映射。
标量是一个用空格缩进的文本串。如果有一个跨越多行的标量的输入,一个前置竖线(|)会表示一种literal样式,在其中所有的换行都是有意义的。或者,用前置的前括号(>)为后续具有相同缩进级别的行把单个换行符折叠为空格。如果一个字符串包含具有保留意义的字符,字符串必须被引用或者特殊字符必须使用反斜线(\)转义。
# 逐字地读取每行
somekey: | this value contains two lines
and each line is read literally
# 将每个新的行看做一个空格
anotherkey: >
this value contains two lines
but is treated as one continuous line
# 该字符串引用包含了一个特殊字符
ThirdKey: "This is a string: not a mapping"
序列是一个列表,列表中每个项在它们自己的行中以一个破折号和一个空格(- )来指示。或者,用户可以指定一个内联序列作为一个方括号内部逗号分隔的列表。一个序列提供了一个数据集合同时给定它们之间的顺序。当用户装载一个列表到Greenplum MapReduce程序中时,该顺序会被保留。
# 列表序列
- this
- is
- a list
- with
- five scalar values
# 内联序列
[this, is, a list, with, five scalar values]
映射被用来打包数据的值和被称为keys的标识符。映射为每个键: 值对使用一个冒号和空格(: ),或者也可以被指定内联为一个花括号内由逗号分隔的列表。键用来作为从映射中获取数据的索引。
# 多个项目的一个映射
title: War and Peace
author: Leo Tolstoy
date: 1865
# 用内联表示该映射
{title: War and Peace, author: Leo Tolstoy, date: 1865}
键被用来连接每个节点和元数据信息以及指定期望的节点类型(scalar、sequence或者mapping)。见Greenplum MapReduce文档模式获取关于Greenplum MapReduce程序期待的键。
Greenplum MapReduce 程序安好走啊顺序处理一个文档的节点同时使用缩进(空格)来决定文档的层级和节点见得关系。空格的使用非常重要。不应该简单的使用空格作为格式化的目的,制表符不应该被使用。
Greenplum MapReduce文档模式
Greenplum MapReduce使用YAML文档架构同时实现了自己的YAML模式。一个Greenplum MapReduce文档的基本结构为:
%YAML 1.1
---
VERSION: 1.0.0.2
DATABASE: dbname
USER: db_username
HOST: master_hostname
PORT: master_port
DEFINE:
- INPUT:
NAME: input_name
FILE:
- hostname:/path/to/file
GPFDIST:
- hostname:port/file_pattern
TABLE: table_name
QUERY: SELECT_statement
EXEC: command_string
COLUMNS:
- field_name data_type
FORMAT: TEXT | CSV
DELIMITER: delimiter_character
ESCAPE: escape_character
NULL: null_string
QUOTE: csv_quote_character
ERROR_LIMIT: integer
ENCODING: database_encoding
- OUTPUT:
NAME: output_name
FILE: file_path_on_client
TABLE: table_name
KEYS:
- column_name
MODE: REPLACE | APPEND
- MAP:
NAME: function_name
FUNCTION: function_definition
LANGUAGE: perl | python | c
LIBRARY: /path/filename.so
PARAMETERS:
- nametype
RETURNS:
- nametype
OPTIMIZE: STRICT IMMUTABLE
MODE: SINGLE | MULTI
- TRANSITION | CONSOLIDATE | FINALIZE:
NAME: function_name
FUNCTION: function_definition
LANGUAGE: perl | python | c
LIBRARY: /path/filename.so
PARAMETERS:
- nametype
RETURNS:
- nametype
OPTIMIZE: STRICT IMMUTABLE
MODE: SINGLE | MULTI
- REDUCE:
NAME: reduce_job_name
TRANSITION: transition_function_name
CONSOLIDATE: consolidate_function_name
FINALIZE: finalize_function_name
INITIALIZE: value
KEYS:
- key_name
- TASK:
NAME: task_name
SOURCE: input_name
MAP: map_function_name
REDUCE: reduce_function_name
EXECUTE
- RUN:
SOURCE: input_or_task_name
TARGET: output_name
MAP: map_function_name
REDUCE: reduce_function_name...
VERSION
要求。Greenplum MapReduce YAML规范的版本。当前版本为1.0.0.1.
DATABASE
可选。指定要连接到Greenplum中哪个数据库。如果没有指定,默认连接到默认的数据库或者$PGDATABASE(如果该值被设置了)。
USER
可选。指定使用哪个数据库角色来连接。如果没有指定,默认使用当前用户,或者$PGUSER(如果该值被设置)。用户必须为Greenplum的超级用户,如果要运行的函数是由不可行的Python或者Perl写的。普通数据库用户能运行由可信Perl写的函数。当用户运行的MapReduce作业包含了FILE、GPFDIST以及EXEC输入类型,那么用户也必须是数据库的超级用户。
HOST
可选。指定Greenplum的Master主机的名称。如果没有指定,默认为localhost 或者$PGHOST(如果该值被设置了)。
PORT
可选。指定Greenplum的Master节点的端口。如果没有指定,默认端口为5432或者$PGPORT(如果该值被设置)。
DEFINE
要求。为该MapReduce文档的定义的一个序列。DEFINE部分必须至少有一个INPUT定义。- INPUT
要求。定义输入数据。每个MapReduce文档必须至少有一个输入数据定义。在一个文档中允许有多个输入定义,但是每个输入定义只能指定一个访问类型:一个文件、一个gpfdist文件分布程序、一个数据库中的表、一个SQL命令或者一个操作系统的命令。见 *Greenplum数据库工具指南*获取关于 gpfdist的信息。- NAME
一个该输入的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
FILE
一个或者多个输入文件的序列,以格式: seghostname:/path/to/filename。用户必须是一个Greenplum数据库的超级用户来运行带有FILE输入的MapReduce作业。文件必须驻留在Greenplum的segment主机上。
- GPFDIST
一个或者多个运行的gpfdist文件分发程序的序列,以格式: hostname\[:port\]/file\_pattern。用户必须为Greenplum数据库超级用户运行以GPFDIST作为输入的MapReduce作业,除非服务配置参数[服务器配置参数]($6a27cd4413f88e59.md#topic1)被设置为on。
TABLE
数据库中存在的表的名称
QUERY
一个运行在数据库内的SQL命令SELECT。
EXEC
一个运行在Greenplum的Segment主机上的操作系统命令。该命令默认由系统中所有的Segment实例运行。例如,如果在每个Segment主机上有四个Segment实例,那么该命令会在每个主机上执行四次。必须作为Greenplum数据库的超级用户才能执行带有EXEC输入的MapReduce作业,同时服务配置参数[服务器配置参数]($6a27cd4413f88e59.md#topic1)要设置为on。
COLUMNS
可选。列被指定为: column\_name \[data\_type\]。如果没有指定,默认为value text。 The [DELIMITER](#topic3__DELIMITER)字符用来分隔两个数据域(列)。一个行由一个(a line feed character)行的字符(0x0a)决定。
FORMAT
可选。指定数据的格式 - 或者为分隔的文本(TEXT)或者逗号分隔的值(CSV的格式。如果数据格式没有被指定,默认为TEXT。
DELIMITER
对于[FILE](#topic3__FILE)、[GPFDIST](#topic3__GPFDIST)以及[EXEC](#topic3__EXEC)输入是可选的。指定单个字符来分隔数据的值。默认为一个tab字符在TEXT中。分隔字符必须出现在两个数据值的域之间。不用讲分隔符放在一个行的开始或者结束。
ESCAPE
对于[FILE](#topic3__FILE)、 [GPFDIST](#topic3__GPFDIST)以及[EXEC](#topic3__EXEC)输入是可选的。指定被用来作为C的转义序列(例如,\\n、\\t、\\100等)的单个字符同时对于转义字符可能被当作行或者列的分隔符。 确保用户选择的转义字符没有在用户实际的列的数据库中使用。对于文本格式文件的默认的转义字符为一个\\(反斜线符号),对于csv格式的文件的默认转义字符为一个"(双引号),然后也有可能指定其它的字符来代表转义。也可能通过指定'OFF'为转义值来关闭转义。这对于像文本格式的内嵌有反斜线(此处反斜线的目的不是转义)的网络日志数据是非常有用的。
NULL
对于[FILE](#topic3__FILE)、[GPFDIST](#topic3__GPFDIST)以及[EXEC](#topic3__EXEC)输入是可选的。指定代用空值的字符串。在TEXT格式下,默认为\\N在CSV格式下,默认为没有引用(quotations)的空值。 甚至在TEXT模式下,用户可能更喜欢一个空字符串,用户不想从空字符串中区别空值。任何能够匹配该字符串输入数据项将被视为空值。
QUOTE
对于[FILE](#topic3__FILE)、[GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入是可选的。为CSV格式文件指定引用字符的。默认是双引号(")。在CSV格式的文件中,如果它们包含有逗号或者内嵌新的行,则数据值域必须包含在双引号中。包含双引号字符的字段必须用双引号保卫,并且内嵌的双引号必须有一对连续的双引号表示。为了数据行的正确解析,总是打开和关闭引号是非常重要的。
ERROR\_LIMIT
在Greenplum的任何Segment实例进行输入处理期间,如果输入行有格式错误,没有达到提供的错误限制计数前,错误都会被丢弃。如果错误限制没有达到,所有好的行将会被处理同时任何错误的行会被丢弃。
- ENCODING
用于数据的字符集编码。指定一个字符串常量(例如,'SQL\_ASCII')、一个整型编码数字、或者DEFAULT来用于默认的客户端编码。见[字符集支持]($308eafc6a4ccb138.md#topic1)获取更多信息。
OUTPUT
可选。定义该MapReduce作业在哪里输出格式数据。如果输出没有定义,默认为STDOUT(客户端的标准输出)。用户可以发送输出到一个客户端主机上的文件中或者数据库当前存在的一张表中。- NAME
该输出的一个名称。默认名为STDOUT。 一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
FILE
指定在MapReduce客户端机器上一个文件的位置,用于输出数据,格式为:/path/to/filename。
TABLE
在数据库中指定一个表的名称用于输出数据。如果该表在执行MapReduce作业前不存在,那么将通过指定 [KEYS](#topic3__KEYS)的分布策略来创建。
KEYS
对于 [TABLE](#topic3__OUTPUTTABLE)输出是可选的。指定列用于Greenplum数据库的分布键。如果[EXECUTE](#topic3__EXECUTE)任务包含了一个[REDUCE](#topic3__REDUCE)定义,那么该键默认将作为表的分布键。否则表的第一列将用作分布键。
MODE
对于 [TABLE](#topic3__OUTPUTTABLE)输出是可选的。如果没有指定,如果表不存在默认行为为创建一个表,如果存在则发出一个错误。声明APPEND添加数据到一张已经存在的表中(被提供的表模式匹配输出格式)而不需要移除任何已经存在的数据。声明 REPLACE,如果表存在将删除表然后重新创建该表。 APPEND和 REPLACE都会在表不存在的时候创建新表。
MAP
要求。每个MAP函数都接受在(key,value)对中构造数据,处理每对同时生成零个或者更多输出的(key,value)对。Greenplum MapReduce 架构然后收集从所有输出列表中收集具有相同键的对并将他们归类到一起。然后将该输出传递给[REDUCE](#topic3__TASKREDUCE)任务,该任务由[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数组成。有一个预定义的命名为IDENTITY的MAP函数,该函数返回没有变化的(key,value)对。尽管(key,value)是默认的参数,用户也能根据需要指定原型。
TRANSITION | CONSOLIDATE | FINALIZE
TRANSITION、CONSOLIDATE以及 FINALIZE是所有组成[REDUCE](#topic3__REDUCE)的片段(pieces)。一个TRANSITION函数是必须的。 CONSOLIDATE 以及 FINALIZE 函数是可选的。默认情况下,所有都将state作为它们的输入[PARAMETERS](#topic3__PARAMETERS)的第一个,但是其它原型也可以这样定义。
一个TRANSITION 函数遍历给定键的每个值同时在一个state变量中累积值。当一个过渡(transition)函数在一个键的第一个值被调用时,state 的值被设置为通过一个a [REDUCE](#topic3__REDUCE)作业的[INITALIZE](#topic3__INITIALIZE)指定的值(或者使用该数据类型默认的state值)。一个过渡函数用两个参数作为输入,当前的键归约的state,之后产生一个新state的下一个值。
如果一个CONSOLIDATE函数被指定,在通过Greenplum的Interconnect为最终聚集(两阶段聚集)重新分布键之前,TRANSITION处理会在Segment级别被执行。只有给定键的结果state被重分布时,才会导致较低的Interconnect流量以及较高的并行性。CONSOLIDATE会像TRANSITION一样被处理,不过它不会用(state + value) => state,而是采用(state + state) => state。
如果一个FINALIZE函数被指定,它接受由CONSOLIDATE(如何呈现的话)或者 TRANSITION函数产生的最终state同时执行任何在发出最终结果前的所有处理。 TRANSITION和CONSOLIDATE 函数不能返回值的集合。如果用户需要一个REDUCE作业返回一个集合,那么一个FINALIZE是有必要的,它能将最后的state转换为一个输出值的集合。- NAME
要求。一个函数的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。用户也能指定一个Greenplum数据库内建函数的名称。如果使用内建函数,不需提供[LANGUAGE](#topic3__LANGUAGE)或者一个[FUNCTION](#topic3__FUNCTION)体。
FUNCTION
可选。通过使用指定的[LANGUAGE](#topic3__LANGUAGE)指定函数的完整的主体。如果 FUNCTION没有指定,那么一个对应[NAME](#topic3__TCFNAME)的内建数据库函数将会被使用。
LANGUAGE
当 [FUNCTION](#topic3__FUNCTION)使用时,是要求的。 指定用来解释函数的实现语言。该版本的语言支持有perl、 python以及C。如果调用一个内建数据库函数,LANGUAGE不应该被指定。
LIBRARY
当[LANGUAGE](#topic3__LANGUAGE)是C(对于其它语言是不允许的)时,是要求的。为了使用该属性, [VERSION](#topic3__VERSION)必须为1.0.0.2。指定的库文件一定在执行MapReduce作业前安装好,同时在所有的Greenplum主机(master和segment)上都位于相同的文件系统位置。
PARAMETERS
可选。函数输入参数。默认类型为text。
MAP 默认 - key text, value text
TRANSITION 默认 - state text, value text
CONSOLIDATE 默认 - state1 text, state2 text (必须为恰好相同数据类型的两个输入参数)
FINALIZE 默认 - state text (只有一个参数)
RETURNS
可选。默认的返回类型为text。
MAP 默认 - key text, value text
TRANSITION 默认 - state text (只有一个返回值)
CONSOLIDATE 默认 - state text (只有一个返回值)
FINALIZE 默认 - value text
OPTIMIZE
函数可选的优化参数:
STRICT - 函数不受NULL值的影响。
IMMUTABLE - 对于一个给定的输入函数总是返回相同的值。
MODE
可选。指定函数返回的行的行数。
MULTI - 内个输入记录返回0或者更多的行。函数的返回值一定是一个返回的行数组,或者函数在Python中使用yield写成迭代器或者在Perl中用return\_next。MULTI是MAP和FINALIZE函数的默认模式。
SINGLE - 每个输入记录恰好只返回一行。 SINGLE是为一个支持 TRANSITION和CONSOLIDATE函数的模式。当使用MAP和FINALIZE函数时,SINGLE 模式能够适度的性能提升。
REDUCE
要求。一个REDUCE定义命名[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数组成(key, value)归约到最终的结果集。有几个用户能执行的预先定义REDUCE作业,它们所有操作在一个名为value的列上:
IDENTITY - 返回没有改变的(key, value) 对
SUM - 计算数值数据的和
AVG - 计算数值数据的平均值
COUNT - 计算输入数据的计数
MIN - 计算数值数据的最小数值
MAX - 计算数值数据的最大值
- NAME
要求。该REDUCE作业的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
TRANSITION
要求。 TRANSITION 函数的名称。
CONSOLIDATE
可选。CONSOLIDATE 函数的名称。
FINALIZE
可选。 FINALIZE 函数的名称。
INITIALIZE
对于text 和float数据类型是可选的。对于其它的数据类型是要求的。text的默认值为''。float的默认值为0.0。TRANSITION函数的初始state值为集合。 Sets the initial state value of the TRANSITION function.
KEYS
可选。默认为\[key, \*\]。当使用多个列进行归约,有必要指定哪些列为键(key)列以及哪些列为值(value)列。 默认的,任何不传递给TRANSITION函数的列为键列,以及一个名为key的列总是键列,即使它被传递给了TRANSITION函数。专门的指示符\*表明所有的列都不会传递给TRANSITION函数。如果该指示符在键的列表中没有呈现,那么所有没有匹配的列都将会被丢弃。
TASK
可选。一个TASK定义了一个在Greenplum MapReduce作业流水线内完整的端到端的 INPUT/MAP/REDUCE 阶段。和 [EXECUTE](#topic3__EXECUTE) 很相似,处理它不是立刻执行。一个任务对象能够作为[INPUT](#topic3__INPUT)调用一直到更进一步的处理阶段。- NAME
要求。该任务的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
SOURCE
一个[INPUT](#topic3__INPUT)或者另一个 TASK的名称。
MAP
可选。一个[MAP](#topic3__MAP)函数的名称。 如果没有指定,默认为IDENTITY。
REDUCE
可选。一个 [REDUCE](#topic3__REDUCE) 函数的名称。如果没有指定,默认为IDENTITY.
EXECUTE
要求。EXECUTE定义在Greenplum MapReduce作业流水线中最后的 INPUT/MAP/REDUCE阶段。- RUN
- SOURCE
要求。一个 [INPUT](#topic3__INPUT)或者 [TASK](#topic3__TASK)的名称。
TARGET
可选。一个[OUTPUT](#topic3__OUTPUT)的名称。默认输出为STDOUT。
MAP
可选。一个 [MAP](#topic3__MAP)函数的名称。 如果没有指定,默认为IDENTITY。
REDUCE
可选。一个[REDUCE](#topic3__REDUCE)函数的名称。默认为 IDENTITY。
关于Greenplum MapReduce文档的示例
# 该MapReduce作业示例处理文档同时查找它们中的关键词。
# 它接受两个数据库的表作为输入:
# - 文档 (doc_id integer, url text, data text)
# - 关键词 (keyword_id integer, keyword text)#
# 在文档数据中搜素关键词的出现,返回url、data、keyword的结果(一个keyword可以为多个单词,例如"high performance # computing")
%YAML 1.1
---
VERSION:1.0.0.1
# 使用该数据库和角色连接到Greenplum数据库
DATABASE:webdata
USER:jsmith
# 开始定义部分
DEFINE:
# 声明输入,从‘documents’和‘keyword’表中选择所有的列和行。
- INPUT:
NAME:doc
TABLE:documents
- INPUT:
NAME:kw
TABLE:keywords
# 定义映射函数从documents和keyword中提取术语
# 该示例简单的利用空格进行分割,但是这里可以
# 利用像python的nltk(the natural language toolkit)库
# 来执行更复杂的单词标记和提取词干。
- MAP:
NAME:doc_map
LANGUAGE:python
FUNCTION:|
i = 0 # 文档中一个单词的索引
terms = {}#文档中术语和他们索引的一个hash
# 变成小写形式同时用空格分割字符串
for term in data.lower().split():
i = i + 1# 增加i(索引)
# 检查在术语列表中的术语Check for the term in the terms list:
# 如果主干词语已经存在,添加i的值到数组的入口
# 对应术语。这里考虑一个词语的多次出现。
# 如果主干词语不存在,添加它到词典的位置i处。
# 例如:
# data: "a computer is a machine that manipulates data"
# "a" [1, 4]
# "computer" [2]
# "machine" [3]
# …
if term in terms:
terms[term] += ','+str(i)
else:
terms[term] = str(i)
# 每个文档返回多个行。每行由
# doc_id, term 以及 出现term的数据的位置
# For example:
# (doc_id => 100, term => "a", [1,4]
# (doc_id => 100, term => "computer", [2]
# …
for term in terms:
yield([doc_id, term, terms[term]])
OPTIMIZE:STRICT IMMUTABLE
PARAMETERS:
- doc_id integer
- data text
RETURNS:
- doc_id integer
- term text
- positions text
# 关键词的关于文档的映射函数差不多为一个The map function for keywords is almost identical to the one for documents
# 但是它也计算了关键词中术语的数目。but it also counts of the number of terms in the keyword.
- MAP:
NAME:kw_map
LANGUAGE:python
FUNCTION:|
i = 0
terms = {}
for term in keyword.lower().split():
i = i + 1
if term in terms:
terms[term] += ','+str(i)
else:
terms[term] = str(i)
# 输出四个值包括i(术语出现的总次数):output 4 values including i (the total count for term in terms):
yield([keyword_id, i, term, terms[term]])
OPTIMIZE:STRICT IMMUTABLE
PARAMETERS:
- keyword_id integer
- keyword text
RETURNS:
- keyword_id integer
- nterms integer
- term text
- positions text
# 一个任务是一个定义了在Greenplum MapReduce流水线上整个INPUT/MAP/REDUCE的阶段。
# 这很像是一个EXECUTION,但是只有在一输入到其他处理阶段被调用时,才会执行。
# 识别一个称为'doc_prep'的任务,该任务接受先前定义的 'doc' INPUT
# 同时执行‘doc_map’ MAP函数返回doc_id, term, [term_position]
- TASK:
NAME:doc_prep
SOURCE:doc
MAP:doc_map
# 识别一个称为'kw_prep'的任务,该任务接受 先前定义的'kw' INPUT
# 同时执行kw_map MAP 函数返回kw_id, term, [term_position]
- TASK:
NAME:kw_prep
SOURCE:kw
MAP:kw_map
# Greenplum MapReduce的一个优势是MapReduce任务可以作为
# SQL操作的输入,同时SQL也能用来执行一个MapReduce任务。
# 该INPUT定义了一个SQL查询,此查询将'doc_prep' TASK的输出同
# 'kw_prep' TASK的进行连接。匹配项是'candidate'列表的输出 Matching terms are output to the 'candidate'
# (任何一个关键词至少共享同文档共享一个term)。any keyword that shares at least one term with the document).
- INPUT:
NAME: term_join
QUERY: |
SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,
doc.positions as doc_positions,
kw.positions as kw_positions
FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term)
# 在Greenplum的MapReduce中,一个REDUCE函数有一个或者多个函数组成。
# 一个REDUCE为每个分组键定义有一个初始的‘state’变量。 that is
# 这是一个用于调节每个键分组状态的过渡函数。
# 如果呈现,一个可选的CONSOLIDATE函数结合了多个
# 'state' 变量。这允许TRANSITION函数能够在segment级别执行在本地,同时值在整个网络中
# 重分布积累的‘state’。如果呈现,一个可选FINALIZE函数被使用来执行在一个状态上最后的计算
# 同时发射从状态来的一个或者多个输出行。
#
# 该REDUCE函数被称为‘term_reducer’带有一个被称为'term_transition'的TRANSITION函数
#以及一个被称为'term_finalizer'的FINALIZE函数。
- REDUCE:
NAME:term_reducer
TRANSITION:term_transition
FINALIZE:term_finalizer
- TRANSITION:
NAME:term_transition
LANGUAGE:python
PARAMETERS:
- state text
- term text
- nterms integer
- doc_positions text
- kw_positions text
FUNCTION: |
# 'state' 有一个''的初始值以及使用冒号分隔的关键词位置的集合。
# 关键词位置是用逗号分隔的整数集。例如,'1,3,2:4:'
# 如果这里存在一个状态,分割它到关键词位置的集合,否则,
# 构建一个'nterms'关键词位置的集合。所有的空
if state:
kw_split = state.split(':')
else:
kw_split = []
for i in range(0,nterms):
kw_split.append('')
# 'kw_positions'是一个整数分隔的逗号域,显示
# 单个在给定关键词出现的位置。
# 基于','分割转换字符串到一个python 列表。
# 为当前term添加doc_positions
for kw_p in kw_positions.split(','):
kw_split[int(kw_p)-1] = doc_positions
# 该部分接受每个在'kw_split'数组中的元素,同时使用':'连接,将它们
# 转换为字符串。
# 例如:对于关键词 "computer software computer hardware",
# 与文档数据"in the business of computer software software engineers" 匹配的
# 'kw_split'数组将为 ['5', '6,7', '5', '']
# 同时输出状态将为:5:6,7:5:
outstate = kw_split[0]
for s in kw_split[1:]:
outstate = outstate + ':' + s
return outstate
- FINALIZE:
NAME: term_finalizer
LANGUAGE: python
RETURNS:
- count integer
MODE:MULTI
FUNCTION:|
if not state:
return 0
kw_split = state.split(':')
# 函数做了下面的事情:
# 1) 以':'分割'kw_split'
# 例如, 1,5,7:2,8 创建了 '1,5,7' 和 '2,8'
# 2) 在'kw_split'中的每个组的位置,以','分割集合,
# 从集合0: 1,5,7创建['1','5','7']
# 以及从集合1: 2,8创建 ['2', '8']
# 3)检查空字符串
# 4) 通过减去集合在'kw_split'中的位置来调整分割集
# ['1','5','7'] - 0(从每个元素中) = ['1','5','7']
# ['2', '8'] - 1 (从每个元素中) = ['1', '7']
# 5)以步长为4的来截取数组后的结果数组被分割,他们的重叠值为:
# ['1','5','7'].intersect['1', '7'] = [1,7]
# 6) 决定分割长度,整个关键词(包括所有的小片)在文档数据中匹配的次数。
previous = None
for i in range(0,len(kw_split)):
isplit = kw_split[i].split(',')
if any(map(lambda(x): x == '', isplit)):
return 0
adjusted = set(map(lambda(x): int(x)-i, isplit))
if (previous):
previous = adjusted.intersection(previous)
else:
previous = adjusted
# 返回最终的计数
if previous:
return len(previous)
# 定义'term_match' 任务,该任务之后作为
# 'final_output'查询的一部分执行。它接受之前定义的输入 (INPUT) 'term_join'
# 同时使用之前定义的归约函数'term_reducer'
- TASK:
NAME:term_match
SOURCE:term_join
REDUCE:term_reducer
- INPUT:
NAME:final_output
QUERY:|
SELECT doc.*, kw.*, tm.count
FROM documents doc, keywords kw, term_match tm
WHERE doc.doc_id = tm.doc_id
AND kw.keyword_id = tm.keyword_id
AND tm.count > 0
# 执行该MapReduce作业,发送结果到STDOUT
EXECUTE:
- RUN:
SOURCE:final_output
TARGET:STDOUT
MapReduce实例的流程图
下面的图显示了在示例中定义的MapReduce作业的流程: