Greenplum MapReduce规范

该规范描述了文档格式以及用于定义Greenplum MapReduce任务的模式。

注释: 无论是MapReduce工具gpmapreduce)还是其相关的YAML规范均已被弃用,它们将在下一个Greenplum数据库的主发布(6.x)中被移除。作为替代,请使用其他的查询格式,例如SQL或者过程语言扩展。

MapReduce是谷歌开发的一个编程模型,用于在普通服务器阵列上处理和生成大量的数据集。Greenplum MapReduc允许熟悉MapReduce的程序员写map和reduce函数同时提交它们到Greenplum数据库并行引擎上用于处理。

为了能让Greenplum处理MapReduce函数,需要在一个文档中定义函数,然后将该文档传递给Greenplum的MapReduce程序,gpmapreduce,由Greenplum数据库并行引擎执行。Greenplum数据库系统会将输入的数据进行分布、在一系列的机器上执行程序、处理机器故障以及管理所需的内部机器间的通信。

关于 gpmapreduce的信息请见Greenplum数据库工具指南

上级主题: Greenplum数据库参考指南

Greenplum MapReduce文档格式

这部分解释一些Greenplum MapReduce文档格式的基本知识来帮助用户开始创建自己的Greenplum MapReduce文档。Greenplum使用YAML 1.1文档格式同时对于定义一个MapReduce作业的各个步骤有自己的模式实现。

所有Greenplum MapReduce文件必须首先定义它们使用的YAML规范的版本。在此之后,三个破折号(-—)表示一个文档的是开始以及三个点号 (…) 指明一个不需要启动新文档的文档的结束。只是行以一个英镑符号(#)为前缀。也可以在相同的文件中声明多个Greenplum MapReduce文档:

  1. %YAML 1.1
  2. ---
  3. # Begin Document 1
  4. # ...
  5. ---
  6. # Begin Document 2
  7. # ...

在一个Greenplum MapReduce 文档中,有三种数据结构或者节点的基本类型:标量序列 以及映射

标量是一个用空格缩进的文本串。如果有一个跨越多行的标量的输入,一个前置竖线(|)会表示一种literal样式,在其中所有的换行都是有意义的。或者,用前置的前括号(>)为后续具有相同缩进级别的行把单个换行符折叠为空格。如果一个字符串包含具有保留意义的字符,字符串必须被引用或者特殊字符必须使用反斜线(\)转义。

  1. # 逐字地读取每行
  2. somekey: | this value contains two lines
  3. and each line is read literally
  4. # 将每个新的行看做一个空格
  5. anotherkey: >
  6. this value contains two lines
  7. but is treated as one continuous line
  8. # 该字符串引用包含了一个特殊字符
  9. ThirdKey: "This is a string: not a mapping"

序列是一个列表,列表中每个项在它们自己的行中以一个破折号和一个空格(- )来指示。或者,用户可以指定一个内联序列作为一个方括号内部逗号分隔的列表。一个序列提供了一个数据集合同时给定它们之间的顺序。当用户装载一个列表到Greenplum MapReduce程序中时,该顺序会被保留。

  1. # 列表序列
  2. - this
  3. - is
  4. - a list
  5. - with
  6. - five scalar values
  7. # 内联序列
  8. [this, is, a list, with, five scalar values]

映射被用来打包数据的值和被称为keys的标识符。映射为每个键: 值对使用一个冒号和空格(: ),或者也可以被指定内联为一个花括号内由逗号分隔的列表。用来作为从映射中获取数据的索引。

  1. # 多个项目的一个映射
  2. title: War and Peace
  3. author: Leo Tolstoy
  4. date: 1865
  5. # 用内联表示该映射
  6. {title: War and Peace, author: Leo Tolstoy, date: 1865}

键被用来连接每个节点和元数据信息以及指定期望的节点类型(scalarsequence或者mapping)。见Greenplum MapReduce文档模式获取关于Greenplum MapReduce程序期待的键。

Greenplum MapReduce 程序安好走啊顺序处理一个文档的节点同时使用缩进(空格)来决定文档的层级和节点见得关系。空格的使用非常重要。不应该简单的使用空格作为格式化的目的,制表符不应该被使用。

Greenplum MapReduce文档模式

Greenplum MapReduce使用YAML文档架构同时实现了自己的YAML模式。一个Greenplum MapReduce文档的基本结构为:

  1. %YAML 1.1
  2. ---
  3. VERSION: 1.0.0.2
  4. DATABASE: dbname
  5. USER: db_username
  6. HOST: master_hostname
  7. PORT: master_port
  1. DEFINE:
  2. - INPUT:
  3. NAME: input_name
  4. FILE:
  5. - hostname:/path/to/file
  6. GPFDIST:
  7. - hostname:port/file_pattern
  8. TABLE: table_name
  9. QUERY: SELECT_statement
  10. EXEC: command_string
  11. COLUMNS:
  12. - field_name data_type
  13. FORMAT: TEXT | CSV
  14. DELIMITER: delimiter_character
  15. ESCAPE: escape_character
  16. NULL: null_string
  17. QUOTE: csv_quote_character
  18. ERROR_LIMIT: integer
  19. ENCODING: database_encoding
  1. - OUTPUT:
  2. NAME: output_name
  3. FILE: file_path_on_client
  4. TABLE: table_name
  5. KEYS:
  6. - column_name
  7. MODE: REPLACE | APPEND
  1. - MAP:
  2. NAME: function_name
  3. FUNCTION: function_definition
  4. LANGUAGE: perl | python | c
  5. LIBRARY: /path/filename.so
  6. PARAMETERS:
  7. - nametype
  8. RETURNS:
  9. - nametype
  10. OPTIMIZE: STRICT IMMUTABLE
  11. MODE: SINGLE | MULTI
  1. - TRANSITION | CONSOLIDATE | FINALIZE:
  2. NAME: function_name
  3. FUNCTION: function_definition
  4. LANGUAGE: perl | python | c
  5. LIBRARY: /path/filename.so
  6. PARAMETERS:
  7. - nametype
  8. RETURNS:
  9. - nametype
  10. OPTIMIZE: STRICT IMMUTABLE
  11. MODE: SINGLE | MULTI
  1. - REDUCE:
  2. NAME: reduce_job_name
  3. TRANSITION: transition_function_name
  4. CONSOLIDATE: consolidate_function_name
  5. FINALIZE: finalize_function_name
  6. INITIALIZE: value
  7. KEYS:
  8. - key_name
  1. - TASK:
  2. NAME: task_name
  3. SOURCE: input_name
  4. MAP: map_function_name
  5. REDUCE: reduce_function_name
  6. EXECUTE
  1. - RUN:
  2. SOURCE: input_or_task_name
  3. TARGET: output_name
  4. MAP: map_function_name
  5. REDUCE: reduce_function_name...

VERSION

要求。Greenplum MapReduce YAML规范的版本。当前版本为1.0.0.1.

DATABASE

可选。指定要连接到Greenplum中哪个数据库。如果没有指定,默认连接到默认的数据库或者$PGDATABASE(如果该值被设置了)。

USER

可选。指定使用哪个数据库角色来连接。如果没有指定,默认使用当前用户,或者$PGUSER(如果该值被设置)。用户必须为Greenplum的超级用户,如果要运行的函数是由不可行的Python或者Perl写的。普通数据库用户能运行由可信Perl写的函数。当用户运行的MapReduce作业包含了FILEGPFDIST以及EXEC输入类型,那么用户也必须是数据库的超级用户。

HOST

可选。指定Greenplum的Master主机的名称。如果没有指定,默认为localhost 或者$PGHOST(如果该值被设置了)。

PORT

可选。指定Greenplum的Master节点的端口。如果没有指定,默认端口为5432或者$PGPORT(如果该值被设置)。

DEFINE

要求。为该MapReduce文档的定义的一个序列。DEFINE部分必须至少有一个INPUT定义。- INPUT

  1. 要求。定义输入数据。每个MapReduce文档必须至少有一个输入数据定义。在一个文档中允许有多个输入定义,但是每个输入定义只能指定一个访问类型:一个文件、一个gpfdist文件分布程序、一个数据库中的表、一个SQL命令或者一个操作系统的命令。见 *Greenplum数据库工具指南*获取关于 gpfdist的信息。- NAME
  2. 一个该输入的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
  3. FILE
  4. 一个或者多个输入文件的序列,以格式: seghostname:/path/to/filename。用户必须是一个Greenplum数据库的超级用户来运行带有FILE输入的MapReduce作业。文件必须驻留在Greenplumsegment主机上。
  5. - GPFDIST
  6. 一个或者多个运行的gpfdist文件分发程序的序列,以格式: hostname\[:port\]/file\_pattern。用户必须为Greenplum数据库超级用户运行以GPFDIST作为输入的MapReduce作业,除非服务配置参数[服务器配置参数]($6a27cd4413f88e59.md#topic1)被设置为on。
  7. TABLE
  8. 数据库中存在的表的名称
  9. QUERY
  10. 一个运行在数据库内的SQL命令SELECT
  11. EXEC
  12. 一个运行在GreenplumSegment主机上的操作系统命令。该命令默认由系统中所有的Segment实例运行。例如,如果在每个Segment主机上有四个Segment实例,那么该命令会在每个主机上执行四次。必须作为Greenplum数据库的超级用户才能执行带有EXEC输入的MapReduce作业,同时服务配置参数[服务器配置参数]($6a27cd4413f88e59.md#topic1)要设置为on。
  13. COLUMNS
  14. 可选。列被指定为: column\_name \[data\_type\]。如果没有指定,默认为value text The [DELIMITER](#topic3__DELIMITER)字符用来分隔两个数据域(列)。一个行由一个(a line feed character)行的字符(0x0a)决定。
  15. FORMAT
  16. 可选。指定数据的格式 - 或者为分隔的文本(TEXT)或者逗号分隔的值(CSV的格式。如果数据格式没有被指定,默认为TEXT
  17. DELIMITER
  18. 对于[FILE](#topic3__FILE)、[GPFDIST](#topic3__GPFDIST)以及[EXEC](#topic3__EXEC)输入是可选的。指定单个字符来分隔数据的值。默认为一个tab字符在TEXT中。分隔字符必须出现在两个数据值的域之间。不用讲分隔符放在一个行的开始或者结束。
  19. ESCAPE
  20. 对于[FILE](#topic3__FILE)、 [GPFDIST](#topic3__GPFDIST)以及[EXEC](#topic3__EXEC)输入是可选的。指定被用来作为C的转义序列(例如,\\n\\t\\100等)的单个字符同时对于转义字符可能被当作行或者列的分隔符。 确保用户选择的转义字符没有在用户实际的列的数据库中使用。对于文本格式文件的默认的转义字符为一个\\(反斜线符号),对于csv格式的文件的默认转义字符为一个"(双引号),然后也有可能指定其它的字符来代表转义。也可能通过指定'OFF'为转义值来关闭转义。这对于像文本格式的内嵌有反斜线(此处反斜线的目的不是转义)的网络日志数据是非常有用的。
  21. NULL
  22. 对于[FILE](#topic3__FILE)、[GPFDIST](#topic3__GPFDIST)以及[EXEC](#topic3__EXEC)输入是可选的。指定代用空值的字符串。在TEXT格式下,默认为\\N在CSV格式下,默认为没有引用(quotations)的空值。 甚至在TEXT模式下,用户可能更喜欢一个空字符串,用户不想从空字符串中区别空值。任何能够匹配该字符串输入数据项将被视为空值。
  23. QUOTE
  24. 对于[FILE](#topic3__FILE)、[GPFDIST](#topic3__GPFDIST)和[EXEC](#topic3__EXEC)输入是可选的。为CSV格式文件指定引用字符的。默认是双引号(")。在CSV格式的文件中,如果它们包含有逗号或者内嵌新的行,则数据值域必须包含在双引号中。包含双引号字符的字段必须用双引号保卫,并且内嵌的双引号必须有一对连续的双引号表示。为了数据行的正确解析,总是打开和关闭引号是非常重要的。
  25. ERROR\_LIMIT
  26. Greenplum的任何Segment实例进行输入处理期间,如果输入行有格式错误,没有达到提供的错误限制计数前,错误都会被丢弃。如果错误限制没有达到,所有好的行将会被处理同时任何错误的行会被丢弃。
  27. - ENCODING
  28. 用于数据的字符集编码。指定一个字符串常量(例如,'SQL\_ASCII')、一个整型编码数字、或者DEFAULT来用于默认的客户端编码。见[字符集支持]($308eafc6a4ccb138.md#topic1)获取更多信息。
  29. OUTPUT
  30. 可选。定义该MapReduce作业在哪里输出格式数据。如果输出没有定义,默认为STDOUT(客户端的标准输出)。用户可以发送输出到一个客户端主机上的文件中或者数据库当前存在的一张表中。- NAME
  31. 该输出的一个名称。默认名为STDOUT 一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
  32. FILE
  33. 指定在MapReduce客户端机器上一个文件的位置,用于输出数据,格式为:/path/to/filename
  34. TABLE
  35. 在数据库中指定一个表的名称用于输出数据。如果该表在执行MapReduce作业前不存在,那么将通过指定 [KEYS](#topic3__KEYS)的分布策略来创建。
  36. KEYS
  37. 对于 [TABLE](#topic3__OUTPUTTABLE)输出是可选的。指定列用于Greenplum数据库的分布键。如果[EXECUTE](#topic3__EXECUTE)任务包含了一个[REDUCE](#topic3__REDUCE)定义,那么该键默认将作为表的分布键。否则表的第一列将用作分布键。
  38. MODE
  39. 对于 [TABLE](#topic3__OUTPUTTABLE)输出是可选的。如果没有指定,如果表不存在默认行为为创建一个表,如果存在则发出一个错误。声明APPEND添加数据到一张已经存在的表中(被提供的表模式匹配输出格式)而不需要移除任何已经存在的数据。声明 REPLACE,如果表存在将删除表然后重新创建该表。 APPEND REPLACE都会在表不存在的时候创建新表。
  40. MAP
  41. 要求。每个MAP函数都接受在(keyvalue)对中构造数据,处理每对同时生成零个或者更多输出的(keyvalue)对。Greenplum MapReduce 架构然后收集从所有输出列表中收集具有相同键的对并将他们归类到一起。然后将该输出传递给[REDUCE](#topic3__TASKREDUCE)任务,该任务由[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数组成。有一个预定义的命名为IDENTITYMAP函数,该函数返回没有变化的(keyvalue)对。尽管(keyvalue)是默认的参数,用户也能根据需要指定原型。
  42. TRANSITION | CONSOLIDATE | FINALIZE
  43. TRANSITIONCONSOLIDATE以及 FINALIZE是所有组成[REDUCE](#topic3__REDUCE)的片段(pieces)。一个TRANSITION函数是必须的。 CONSOLIDATE 以及 FINALIZE 函数是可选的。默认情况下,所有都将state作为它们的输入[PARAMETERS](#topic3__PARAMETERS)的第一个,但是其它原型也可以这样定义。
  44. 一个TRANSITION 函数遍历给定键的每个值同时在一个state变量中累积值。当一个过渡(transition)函数在一个键的第一个值被调用时,state 的值被设置为通过一个a [REDUCE](#topic3__REDUCE)作业的[INITALIZE](#topic3__INITIALIZE)指定的值(或者使用该数据类型默认的state值)。一个过渡函数用两个参数作为输入,当前的键归约的state,之后产生一个新state的下一个值。
  45. 如果一个CONSOLIDATE函数被指定,在通过GreenplumInterconnect为最终聚集(两阶段聚集)重新分布键之前,TRANSITION处理会在Segment级别被执行。只有给定键的结果state被重分布时,才会导致较低的Interconnect流量以及较高的并行性。CONSOLIDATE会像TRANSITION一样被处理,不过它不会用(state + value) => state,而是采用(state + state) => state
  46. 如果一个FINALIZE函数被指定,它接受由CONSOLIDATE(如何呈现的话)或者 TRANSITION函数产生的最终state同时执行任何在发出最终结果前的所有处理。 TRANSITIONCONSOLIDATE 函数不能返回值的集合。如果用户需要一个REDUCE作业返回一个集合,那么一个FINALIZE是有必要的,它能将最后的state转换为一个输出值的集合。- NAME
  47. 要求。一个函数的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。用户也能指定一个Greenplum数据库内建函数的名称。如果使用内建函数,不需提供[LANGUAGE](#topic3__LANGUAGE)或者一个[FUNCTION](#topic3__FUNCTION)体。
  48. FUNCTION
  49. 可选。通过使用指定的[LANGUAGE](#topic3__LANGUAGE)指定函数的完整的主体。如果 FUNCTION没有指定,那么一个对应[NAME](#topic3__TCFNAME)的内建数据库函数将会被使用。
  50. LANGUAGE
  51. [FUNCTION](#topic3__FUNCTION)使用时,是要求的。 指定用来解释函数的实现语言。该版本的语言支持有perl python以及C。如果调用一个内建数据库函数,LANGUAGE不应该被指定。
  52. LIBRARY
  53. 当[LANGUAGE](#topic3__LANGUAGE)是C(对于其它语言是不允许的)时,是要求的。为了使用该属性, [VERSION](#topic3__VERSION)必须为1.0.0.2。指定的库文件一定在执行MapReduce作业前安装好,同时在所有的Greenplum主机(mastersegment)上都位于相同的文件系统位置。
  54. PARAMETERS
  55. 可选。函数输入参数。默认类型为text
  56. MAP 默认 - key text value text
  57. TRANSITION 默认 - state text value text
  58. CONSOLIDATE 默认 - state1 text state2 text (必须为恰好相同数据类型的两个输入参数)
  59. FINALIZE 默认 - state text (只有一个参数)
  60. RETURNS
  61. 可选。默认的返回类型为text
  62. MAP 默认 - key text value text
  63. TRANSITION 默认 - state text (只有一个返回值)
  64. CONSOLIDATE 默认 - state text (只有一个返回值)
  65. FINALIZE 默认 - value text
  66. OPTIMIZE
  67. 函数可选的优化参数:
  68. STRICT - 函数不受NULL值的影响。
  69. IMMUTABLE - 对于一个给定的输入函数总是返回相同的值。
  70. MODE
  71. 可选。指定函数返回的行的行数。
  72. MULTI - 内个输入记录返回0或者更多的行。函数的返回值一定是一个返回的行数组,或者函数在Python中使用yield写成迭代器或者在Perl中用return\_nextMULTIMAPFINALIZE函数的默认模式。
  73. SINGLE - 每个输入记录恰好只返回一行。 SINGLE是为一个支持 TRANSITIONCONSOLIDATE函数的模式。当使用MAPFINALIZE函数时,SINGLE 模式能够适度的性能提升。
  74. REDUCE
  75. 要求。一个REDUCE定义命名[TRANSITION | CONSOLIDATE | FINALIZE](#topic3__TCF)函数组成(key value)归约到最终的结果集。有几个用户能执行的预先定义REDUCE作业,它们所有操作在一个名为value的列上:
  76. IDENTITY - 返回没有改变的(key, value)
  77. SUM - 计算数值数据的和
  78. AVG - 计算数值数据的平均值
  79. COUNT - 计算输入数据的计数
  80. MIN - 计算数值数据的最小数值
  81. MAX - 计算数值数据的最大值
  82. - NAME
  83. 要求。该REDUCE作业的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
  84. TRANSITION
  85. 要求。 TRANSITION 函数的名称。
  86. CONSOLIDATE
  87. 可选。CONSOLIDATE 函数的名称。
  88. FINALIZE
  89. 可选。 FINALIZE 函数的名称。
  90. INITIALIZE
  91. 对于text float数据类型是可选的。对于其它的数据类型是要求的。text的默认值为''float的默认值为0.0TRANSITION函数的初始state值为集合。 Sets the initial state value of the TRANSITION function.
  92. KEYS
  93. 可选。默认为\[key, \*\]。当使用多个列进行归约,有必要指定哪些列为键(key)列以及哪些列为值(value)列。 默认的,任何不传递给TRANSITION函数的列为键列,以及一个名为key的列总是键列,即使它被传递给了TRANSITION函数。专门的指示符\*表明所有的列都不会传递给TRANSITION函数。如果该指示符在键的列表中没有呈现,那么所有没有匹配的列都将会被丢弃。
  94. TASK
  95. 可选。一个TASK定义了一个在Greenplum MapReduce作业流水线内完整的端到端的 INPUT/MAP/REDUCE 阶段。和 [EXECUTE](#topic3__EXECUTE) 很相似,处理它不是立刻执行。一个任务对象能够作为[INPUT](#topic3__INPUT)调用一直到更进一步的处理阶段。- NAME
  96. 要求。该任务的名称。一个名称对比该MapReduce作业中的其它对象(例如map函数、任务、reduce函数以及输出名称)的名称是唯一的。另外,名称也不能同数据库已经存在的对象(例如,表、函数或者视图)名称冲突。
  97. SOURCE
  98. 一个[INPUT](#topic3__INPUT)或者另一个 TASK的名称。
  99. MAP
  100. 可选。一个[MAP](#topic3__MAP)函数的名称。 如果没有指定,默认为IDENTITY
  101. REDUCE
  102. 可选。一个 [REDUCE](#topic3__REDUCE) 函数的名称。如果没有指定,默认为IDENTITY.

EXECUTE

要求。EXECUTE定义在Greenplum MapReduce作业流水线中最后的 INPUT/MAP/REDUCE阶段。- RUN

  1. - SOURCE
  2. 要求。一个 [INPUT](#topic3__INPUT)或者 [TASK](#topic3__TASK)的名称。
  3. TARGET
  4. 可选。一个[OUTPUT](#topic3__OUTPUT)的名称。默认输出为STDOUT
  5. MAP
  6. 可选。一个 [MAP](#topic3__MAP)函数的名称。 如果没有指定,默认为IDENTITY
  7. REDUCE
  8. 可选。一个[REDUCE](#topic3__REDUCE)函数的名称。默认为 IDENTITY

关于Greenplum MapReduce文档的示例

  1. # 该MapReduce作业示例处理文档同时查找它们中的关键词。
  2. # 它接受两个数据库的表作为输入:
  3. # - 文档 (doc_id integer, url text, data text)
  4. # - 关键词 (keyword_id integer, keyword text)#
  5. # 在文档数据中搜素关键词的出现,返回url、data、keyword的结果(一个keyword可以为多个单词,例如"high performance # computing")
  6. %YAML 1.1
  7. ---
  8. VERSION:1.0.0.1
  9. # 使用该数据库和角色连接到Greenplum数据库
  10. DATABASE:webdata
  11. USER:jsmith
  12. # 开始定义部分
  13. DEFINE:
  14. # 声明输入,从‘documents’和‘keyword’表中选择所有的列和行。
  15. - INPUT:
  16. NAME:doc
  17. TABLE:documents
  18. - INPUT:
  19. NAME:kw
  20. TABLE:keywords
  21. # 定义映射函数从documents和keyword中提取术语
  22. # 该示例简单的利用空格进行分割,但是这里可以
  23. # 利用像python的nltk(the natural language toolkit)库
  24. # 来执行更复杂的单词标记和提取词干。
  25. - MAP:
  26. NAME:doc_map
  27. LANGUAGE:python
  28. FUNCTION:|
  29. i = 0 # 文档中一个单词的索引
  30. terms = {}#文档中术语和他们索引的一个hash
  31. # 变成小写形式同时用空格分割字符串
  32. for term in data.lower().split():
  33. i = i + 1# 增加i(索引)
  34. # 检查在术语列表中的术语Check for the term in the terms list:
  35. # 如果主干词语已经存在,添加i的值到数组的入口
  36. # 对应术语。这里考虑一个词语的多次出现。
  37. # 如果主干词语不存在,添加它到词典的位置i处。
  38. # 例如:
  39. # data: "a computer is a machine that manipulates data"
  40. # "a" [1, 4]
  41. # "computer" [2]
  42. # "machine" [3]
  43. # …
  44. if term in terms:
  45. terms[term] += ','+str(i)
  46. else:
  47. terms[term] = str(i)
  48. # 每个文档返回多个行。每行由
  49. # doc_id, term 以及 出现term的数据的位置
  50. # For example:
  51. # (doc_id => 100, term => "a", [1,4]
  52. # (doc_id => 100, term => "computer", [2]
  53. # …
  54. for term in terms:
  55. yield([doc_id, term, terms[term]])
  56. OPTIMIZE:STRICT IMMUTABLE
  57. PARAMETERS:
  58. - doc_id integer
  59. - data text
  60. RETURNS:
  61. - doc_id integer
  62. - term text
  63. - positions text
  64. # 关键词的关于文档的映射函数差不多为一个The map function for keywords is almost identical to the one for documents
  65. # 但是它也计算了关键词中术语的数目。but it also counts of the number of terms in the keyword.
  66. - MAP:
  67. NAME:kw_map
  68. LANGUAGE:python
  69. FUNCTION:|
  70. i = 0
  71. terms = {}
  72. for term in keyword.lower().split():
  73. i = i + 1
  74. if term in terms:
  75. terms[term] += ','+str(i)
  76. else:
  77. terms[term] = str(i)
  78. # 输出四个值包括i(术语出现的总次数):output 4 values including i (the total count for term in terms):
  79. yield([keyword_id, i, term, terms[term]])
  80. OPTIMIZE:STRICT IMMUTABLE
  81. PARAMETERS:
  82. - keyword_id integer
  83. - keyword text
  84. RETURNS:
  85. - keyword_id integer
  86. - nterms integer
  87. - term text
  88. - positions text
  89. # 一个任务是一个定义了在Greenplum MapReduce流水线上整个INPUT/MAP/REDUCE的阶段。
  90. # 这很像是一个EXECUTION,但是只有在一输入到其他处理阶段被调用时,才会执行。
  91. # 识别一个称为'doc_prep'的任务,该任务接受先前定义的 'doc' INPUT
  92. # 同时执行‘doc_map’ MAP函数返回doc_id, term, [term_position]
  93. - TASK:
  94. NAME:doc_prep
  95. SOURCE:doc
  96. MAP:doc_map
  97. # 识别一个称为'kw_prep'的任务,该任务接受 先前定义的'kw' INPUT
  98. # 同时执行kw_map MAP 函数返回kw_id, term, [term_position]
  99. - TASK:
  100. NAME:kw_prep
  101. SOURCE:kw
  102. MAP:kw_map
  103. # Greenplum MapReduce的一个优势是MapReduce任务可以作为
  104. # SQL操作的输入,同时SQL也能用来执行一个MapReduce任务。
  105. # 该INPUT定义了一个SQL查询,此查询将'doc_prep' TASK的输出同
  106. # 'kw_prep' TASK的进行连接。匹配项是'candidate'列表的输出 Matching terms are output to the 'candidate'
  107. # (任何一个关键词至少共享同文档共享一个term)。any keyword that shares at least one term with the document).
  108. - INPUT:
  109. NAME: term_join
  110. QUERY: |
  111. SELECT doc.doc_id, kw.keyword_id, kw.term, kw.nterms,
  112. doc.positions as doc_positions,
  113. kw.positions as kw_positions
  114. FROM doc_prep doc INNER JOIN kw_prep kw ON (doc.term = kw.term)
  115. # 在Greenplum的MapReduce中,一个REDUCE函数有一个或者多个函数组成。
  116. # 一个REDUCE为每个分组键定义有一个初始的‘state’变量。 that is
  117. # 这是一个用于调节每个键分组状态的过渡函数。
  118. # 如果呈现,一个可选的CONSOLIDATE函数结合了多个
  119. # 'state' 变量。这允许TRANSITION函数能够在segment级别执行在本地,同时值在整个网络中
  120. # 重分布积累的‘state’。如果呈现,一个可选FINALIZE函数被使用来执行在一个状态上最后的计算
  121. # 同时发射从状态来的一个或者多个输出行。
  122. #
  123. # 该REDUCE函数被称为‘term_reducer’带有一个被称为'term_transition'的TRANSITION函数
  124. #以及一个被称为'term_finalizer'的FINALIZE函数。
  125. - REDUCE:
  126. NAME:term_reducer
  127. TRANSITION:term_transition
  128. FINALIZE:term_finalizer
  129. - TRANSITION:
  130. NAME:term_transition
  131. LANGUAGE:python
  132. PARAMETERS:
  133. - state text
  134. - term text
  135. - nterms integer
  136. - doc_positions text
  137. - kw_positions text
  138. FUNCTION: |
  139. # 'state' 有一个''的初始值以及使用冒号分隔的关键词位置的集合。
  140. # 关键词位置是用逗号分隔的整数集。例如,'1,3,2:4:'
  141. # 如果这里存在一个状态,分割它到关键词位置的集合,否则,
  142. # 构建一个'nterms'关键词位置的集合。所有的空
  143. if state:
  144. kw_split = state.split(':')
  145. else:
  146. kw_split = []
  147. for i in range(0,nterms):
  148. kw_split.append('')
  149. # 'kw_positions'是一个整数分隔的逗号域,显示
  150. # 单个在给定关键词出现的位置。
  151. # 基于','分割转换字符串到一个python 列表。
  152. # 为当前term添加doc_positions
  153. for kw_p in kw_positions.split(','):
  154. kw_split[int(kw_p)-1] = doc_positions
  155. # 该部分接受每个在'kw_split'数组中的元素,同时使用':'连接,将它们
  156. # 转换为字符串。
  157. # 例如:对于关键词 "computer software computer hardware",
  158. # 与文档数据"in the business of computer software software engineers" 匹配的
  159. # 'kw_split'数组将为 ['5', '6,7', '5', '']
  160. # 同时输出状态将为:5:6,7:5:
  161. outstate = kw_split[0]
  162. for s in kw_split[1:]:
  163. outstate = outstate + ':' + s
  164. return outstate
  165. - FINALIZE:
  166. NAME: term_finalizer
  167. LANGUAGE: python
  168. RETURNS:
  169. - count integer
  170. MODE:MULTI
  171. FUNCTION:|
  172. if not state:
  173. return 0
  174. kw_split = state.split(':')
  175. # 函数做了下面的事情:
  176. # 1) 以':'分割'kw_split'
  177. # 例如, 1,5,7:2,8 创建了 '1,5,7' 和 '2,8'
  178. # 2) 在'kw_split'中的每个组的位置,以','分割集合,
  179. # 从集合0: 1,5,7创建['1','5','7']
  180. # 以及从集合1: 2,8创建 ['2', '8']
  181. # 3)检查空字符串
  182. # 4) 通过减去集合在'kw_split'中的位置来调整分割集
  183. # ['1','5','7'] - 0(从每个元素中) = ['1','5','7']
  184. # ['2', '8'] - 1 (从每个元素中) = ['1', '7']
  185. # 5)以步长为4的来截取数组后的结果数组被分割,他们的重叠值为:
  186. # ['1','5','7'].intersect['1', '7'] = [1,7]
  187. # 6) 决定分割长度,整个关键词(包括所有的小片)在文档数据中匹配的次数。
  188. previous = None
  189. for i in range(0,len(kw_split)):
  190. isplit = kw_split[i].split(',')
  191. if any(map(lambda(x): x == '', isplit)):
  192. return 0
  193. adjusted = set(map(lambda(x): int(x)-i, isplit))
  194. if (previous):
  195. previous = adjusted.intersection(previous)
  196. else:
  197. previous = adjusted
  198. # 返回最终的计数
  199. if previous:
  200. return len(previous)
  201. # 定义'term_match' 任务,该任务之后作为
  202. # 'final_output'查询的一部分执行。它接受之前定义的输入 (INPUT) 'term_join'
  203. # 同时使用之前定义的归约函数'term_reducer'
  204. - TASK:
  205. NAME:term_match
  206. SOURCE:term_join
  207. REDUCE:term_reducer
  208. - INPUT:
  209. NAME:final_output
  210. QUERY:|
  211. SELECT doc.*, kw.*, tm.count
  212. FROM documents doc, keywords kw, term_match tm
  213. WHERE doc.doc_id = tm.doc_id
  214. AND kw.keyword_id = tm.keyword_id
  215. AND tm.count > 0
  216. # 执行该MapReduce作业,发送结果到STDOUT
  217. EXECUTE:
  218. - RUN:
  219. SOURCE:final_output
  220. TARGET:STDOUT

MapReduce实例的流程图

下面的图显示了在示例中定义的MapReduce作业的流程:

Greenplum MapReduce规范 - 图1