Table API 教程

Apache Flink 提供 Table API 关系型 API 来统一处理流和批,即查询在无边界的实时流或有边界的批处理数据集上以相同的语义执行,并产生相同的结果。 Flink 的 Table API 易于编写,通常能简化数据分析,数据管道和ETL应用的编码。

概要

在该教程中,我们会从零开始,介绍如何创建一个 Flink Python 项目及运行 Python Table API 作业。该作业读取一个 csv 文件,计算词频,并将结果写到一个结果文件中。

先决条件

本练习假定你对 Python 有一定的了解,但是即使你来自其他编程语言,也应该能够继续学习。 它还假定你熟悉基本的关系操作,例如 SELECTGROUP BY 子句。

如何寻求帮助

如果你遇到问题,可以访问 社区信息页面。 与此同时,Apache Flink 的用户邮件列表 一直被列为 Apache 项目中最活跃的项目邮件列表之一,也是快速获得帮助的好方法。

继续我们的旅程

如果要继续我们的旅程,你需要一台具有以下功能的计算机:

  • Java 11
  • Python 3.6, 3.7, 3.8 or 3.9

使用 Python Table API 需要安装 PyFlink,它已经被发布到 PyPi,你可以通过如下方式安装 PyFlink:

  1. $ python -m pip install apache-flink

安装 PyFlink 后,你便可以编写 Python Table API 作业了。

编写 Flink Python Table API 程序的第一步是创建 TableEnvironment。这是 Python Table API 作业的入口类。

  1. t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
  2. t_env.get_config().set("parallelism.default", "1")

接下来,我们将介绍如何创建源表和结果表。

  1. t_env.create_temporary_table(
  2. 'source',
  3. TableDescriptor.for_connector('filesystem')
  4. .schema(Schema.new_builder()
  5. .column('word', DataTypes.STRING())
  6. .build())
  7. .option('path', input_path)
  8. .format('csv')
  9. .build())
  10. tab = t_env.from_path('source')
  11. t_env.create_temporary_table(
  12. 'sink',
  13. TableDescriptor.for_connector('filesystem')
  14. .schema(Schema.new_builder()
  15. .column('word', DataTypes.STRING())
  16. .column('count', DataTypes.BIGINT())
  17. .build())
  18. .option('path', output_path)
  19. .format(FormatDescriptor.for_format('canal-json')
  20. .build())
  21. .build())

你也可以使用 TableEnvironment.execute_sql() 方法,通过 DDL 语句来注册源表和结果表:

  1. my_source_ddl = """
  2. create table source (
  3. word STRING
  4. ) with (
  5. 'connector' = 'filesystem',
  6. 'format' = 'csv',
  7. 'path' = '{}'
  8. )
  9. """.format(input_path)
  10. my_sink_ddl = """
  11. create table sink (
  12. word STRING,
  13. `count` BIGINT
  14. ) with (
  15. 'connector' = 'filesystem',
  16. 'format' = 'canal-json',
  17. 'path' = '{}'
  18. )
  19. """.format(output_path)
  20. t_env.execute_sql(my_source_ddl)
  21. t_env.execute_sql(my_sink_ddl)

上面的程序展示了如何创建及注册表名分别为 sourcesink 的表。 其中,源表 source 有一列: word,该表代表了从 input_path 所指定的输入文件中读取的单词; 结果表 sink 有两列: word 和 count,该表的结果会输出到 output_path 所指定的输出文件中。

接下来,我们介绍如何创建一个作业:该作业读取表 source 中的数据,进行一些变换,然后将结果写入表 sink

最后,需要做的就是启动 Flink Python Table API 作业。上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。

  1. @udtf(result_types=[DataTypes.STRING()])
  2. def split(line: Row):
  3. for s in line[0].split():
  4. yield Row(s)
  5. # 计算 word count
  6. tab.flat_map(split).alias('word') \
  7. .group_by(col('word')) \
  8. .select(col('word'), lit(1).count) \
  9. .execute_insert('sink') \
  10. .wait()

该教程的完整代码如下:

  1. import argparse
  2. import logging
  3. import sys
  4. from pyflink.common import Row
  5. from pyflink.table import (EnvironmentSettings, TableEnvironment, TableDescriptor, Schema,
  6. DataTypes, FormatDescriptor)
  7. from pyflink.table.expressions import lit, col
  8. from pyflink.table.udf import udtf
  9. word_count_data = ["To be, or not to be,--that is the question:--",
  10. "Whether 'tis nobler in the mind to suffer",
  11. "The slings and arrows of outrageous fortune",
  12. "Or to take arms against a sea of troubles,",
  13. "And by opposing end them?--To die,--to sleep,--",
  14. "No more; and by a sleep to say we end",
  15. "The heartache, and the thousand natural shocks",
  16. "That flesh is heir to,--'tis a consummation",
  17. "Devoutly to be wish'd. To die,--to sleep;--",
  18. "To sleep! perchance to dream:--ay, there's the rub;",
  19. "For in that sleep of death what dreams may come,",
  20. "When we have shuffled off this mortal coil,",
  21. "Must give us pause: there's the respect",
  22. "That makes calamity of so long life;",
  23. "For who would bear the whips and scorns of time,",
  24. "The oppressor's wrong, the proud man's contumely,",
  25. "The pangs of despis'd love, the law's delay,",
  26. "The insolence of office, and the spurns",
  27. "That patient merit of the unworthy takes,",
  28. "When he himself might his quietus make",
  29. "With a bare bodkin? who would these fardels bear,",
  30. "To grunt and sweat under a weary life,",
  31. "But that the dread of something after death,--",
  32. "The undiscover'd country, from whose bourn",
  33. "No traveller returns,--puzzles the will,",
  34. "And makes us rather bear those ills we have",
  35. "Than fly to others that we know not of?",
  36. "Thus conscience does make cowards of us all;",
  37. "And thus the native hue of resolution",
  38. "Is sicklied o'er with the pale cast of thought;",
  39. "And enterprises of great pith and moment,",
  40. "With this regard, their currents turn awry,",
  41. "And lose the name of action.--Soft you now!",
  42. "The fair Ophelia!--Nymph, in thy orisons",
  43. "Be all my sins remember'd."]
  44. def word_count(input_path, output_path):
  45. t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
  46. # write all the data to one file
  47. t_env.get_config().set("parallelism.default", "1")
  48. # define the source
  49. if input_path is not None:
  50. t_env.create_temporary_table(
  51. 'source',
  52. TableDescriptor.for_connector('filesystem')
  53. .schema(Schema.new_builder()
  54. .column('word', DataTypes.STRING())
  55. .build())
  56. .option('path', input_path)
  57. .format('csv')
  58. .build())
  59. tab = t_env.from_path('source')
  60. else:
  61. print("Executing word_count example with default input data set.")
  62. print("Use --input to specify file input.")
  63. tab = t_env.from_elements(map(lambda i: (i,), word_count_data),
  64. DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))
  65. # define the sink
  66. if output_path is not None:
  67. t_env.create_temporary_table(
  68. 'sink',
  69. TableDescriptor.for_connector('filesystem')
  70. .schema(Schema.new_builder()
  71. .column('word', DataTypes.STRING())
  72. .column('count', DataTypes.BIGINT())
  73. .build())
  74. .option('path', output_path)
  75. .format(FormatDescriptor.for_format('canal-json')
  76. .build())
  77. .build())
  78. else:
  79. print("Printing result to stdout. Use --output to specify output path.")
  80. t_env.create_temporary_table(
  81. 'sink',
  82. TableDescriptor.for_connector('print')
  83. .schema(Schema.new_builder()
  84. .column('word', DataTypes.STRING())
  85. .column('count', DataTypes.BIGINT())
  86. .build())
  87. .build())
  88. @udtf(result_types=[DataTypes.STRING()])
  89. def split(line: Row):
  90. for s in line[0].split():
  91. yield Row(s)
  92. # compute word count
  93. tab.flat_map(split).alias('word') \
  94. .group_by(col('word')) \
  95. .select(col('word'), lit(1).count) \
  96. .execute_insert('sink') \
  97. .wait()
  98. # remove .wait if submitting to a remote cluster, refer to
  99. # https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/python/faq/#wait-for-jobs-to-finish-when-executing-jobs-in-mini-cluster
  100. # for more details
  101. if __name__ == '__main__':
  102. logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
  103. parser = argparse.ArgumentParser()
  104. parser.add_argument(
  105. '--input',
  106. dest='input',
  107. required=False,
  108. help='Input file to process.')
  109. parser.add_argument(
  110. '--output',
  111. dest='output',
  112. required=False,
  113. help='Output file to write results to.')
  114. argv = sys.argv[1:]
  115. known_args, _ = parser.parse_known_args(argv)
  116. word_count(known_args.input, known_args.output)

接下来,可以在命令行中运行作业(假设作业名为 word_count.py):

  1. $ python word_count.py

上述命令会构建 Python Table API 程序,并在本地 mini cluster 中运行。如果想将作业提交到远端集群执行, 可以参考作业提交示例

最后,你可以得到如下运行结果:

  1. +I[To, 1]
  2. +I[be,, 1]
  3. +I[or, 1]
  4. +I[not, 1]
  5. ...

上述教程介绍了如何编写并运行一个 Flink Python Table API 程序,你也可以访问 PyFlink 示例 ,了解更多关于 PyFlink 的示例。 如果想了解 Flink Python Table API 的更多信息,可以参考 Flink Python API 文档