使用 Kudu 开发应用程序

原文链接 : http://kudu.apache.org/docs/developing.html

译文链接 : http://cwiki.apachecn.org/pages/viewpage.action?pageId=10813629

贡献者 : 小瑶 ApacheCN Apache中文网

使用 Kudu 开发应用程序

Kudu 提供 C ++JavaPython 客户端 API 以及参考示例来说明它们的用途。

注意

不支持使用服务器端或专用接口,不属于公共 API 的接口没有稳定性保证。

查看 API 文档

C++ API 文档

您可以在线查看 C ++ 客户端 API 文档。或者,在从源代码构建 Kudu 后,您还可以另外构建 doxygen 目标(例如,如果使用 make ,则运行 make doxygen ),并通过在您最喜欢的 Web 中打开 docs/doxygen/client_api/html/index.html 文件来使用本地生成的 API 文档浏览器。

重要

为了构建 doxygen 目标,有必要在您的构建机器上安装带有 Dot( graphviz )支持的 doxygen 。如果您从源代码构建 Kudu 后安装了 doxygen ,则需要再次运行 cmake 以获取 doxygen 位置并生成适当的目标。

Java API 文档

您可以在线查看 Java API 文档 。或者,在构建 Java 客户端之后,Java API 文档可以在 java/kudu-client/target/apidocs/index.html 中找到。

工作实例

kudu 示例 Github 仓库中提供了几个示例应用程序。每个示例包括一个自述文件,显示如何编译和运行它。这些例子说明了 Kudu API 的正确使用,以及如何设置虚拟机来运行 Kudu 。以下列表包括今天可用的一些示例。检查存储库本身,以防此列表过期。

java/java-example

连接到 Kudu 实例的简单 Java 应用程序创建一个表,向其中写入数据,然后丢弃该表。

java/collectl

侦听 TCP 套接字的小型 Java 应用程序,用于与 Collectl 有线协议相对应的时间序列数据。常用的 collectl 工具可用于将示例数据发送到服务器。

java/insert-loadgen

生成随机插入负载的 Java 应用程序。

python/dstat-kudu

一个示例程序,显示如何使用 Kudu Python API 将数据加载到由外部程序生成的新的/现有的 Kudu 表中, dstat 在这种情况下。

python/graphite-kudu

使用 graphite-web ( 石墨网 ) 与 Kudu 作为后端的实验插件。

demo-vm-setup

脚本下载并运行带有 KuduVirtualBox 虚拟机已安装。有关详细信息,请参阅 快速入门

这些例子应该是您自己的 Kudu 应用程序和集成的有用起点。

Maven Artifacts ( Maven 工件 )

以下 Maven <dependency> 元素对于 Apache Kudu 公开版本(从1.0.0 开始)是有效的:

  1. <dependency>
  2. <groupId>org.apache.kudu</groupId>
  3. <artifactId>kudu-client</artifactId>
  4. <version>1.1.0</version>
  5. </dependency>

Java 客户端和各种 Java 集成(例如 SparkFlume )的便利二进制文件现在也可以通过 ASF Maven 存储库 Maven Central 存储库获得。

Impala命令使用 Kudu 的例子

请参阅 使用 ImpalaKudu 进行有关使用 Kudu 安装和使用 Impala 的指导,其中包括几个 impala-shell 示例。

Kudu 与 Spark 集成

Kudu1.0.0 版开始,通过 Data Source APISpark 集成。使用 —packages 选项包括 kudu-spark 依赖关系:

如果使用 SparkScala 2.10 ,请使用 kudu-spark_2.10 插件:

  1. spark-shell --packages org.apache.kudu:kudu-spark_2.10:1.1.0

如果在 Scala 2.11 中使用 Spark 2 ,请使用 kudu-spark2_2.11 插件:

  1. spark-shell --packages org.apache.kudu:kudu-spark2_2.11:1.1.0

然后导入 kudu-spark 并创建一个数据框:

  1. import org.apache.kudu.spark.kudu._
  2. import org.apache.kudu.client._
  3. import collection.JavaConverters._
  4. // Read a table from Kudu
  5. val df = sqlContext.read.options(Map("kudu.master" -> "kudu.master:7051","kudu.table" -> "kudu_table")).kudu
  6. // Query using the Spark API...
  7. df.select("id").filter("id" >= 5).show()
  8. // ...or register a temporary table and use SQL
  9. df.registerTempTable("kudu_table")
  10. val filteredDF = sqlContext.sql("select id from kudu_table where id >= 5").show()
  11. // Use KuduContext to create, delete, or write to Kudu tables
  12. val kuduContext = new KuduContext("kudu.master:7051", sqlContext.sparkContext)
  13. // Create a new Kudu table from a dataframe schema
  14. // NB: No rows from the dataframe are inserted into the table
  15. kuduContext.createTable(
  16. "test_table", df.schema, Seq("key"),
  17. new CreateTableOptions()
  18. .setNumReplicas(1)
  19. .addHashPartitions(List("key").asJava, 3))
  20. // Insert data
  21. kuduContext.insertRows(df, "test_table")
  22. // Delete data
  23. kuduContext.deleteRows(filteredDF, "test_table")
  24. // Upsert data
  25. kuduContext.upsertRows(df, "test_table")
  26. // Update data
  27. val alteredDF = df.select("id", $"count" + 1)
  28. kuduContext.updateRows(filteredRows, "test_table"
  29. // Data can also be inserted into the Kudu table using the data source, though the methods on KuduContext are preferred
  30. // NB: The default is to upsert rows; to perform standard inserts instead, set operation = insert in the options map
  31. // NB: Only mode Append is supported
  32. df.write.options(Map("kudu.master"-> "kudu.master:7051", "kudu.table"-> "test_table")).mode("append").kudu
  33. // Check for the existence of a Kudu table
  34. kuduContext.tableExists("another_table")
  35. // Delete a Kudu table
  36. kuduContext.deleteTable("unwanted_table")
  37. <> and OR predicates are not pushed to Kudu, and instead will be evaluated by the Spark task. Only LIKE predicates with a suffix wildcard are pushed to Kudu, meaning that LIKE "FOO%" is pushed down but LIKE "FOO%BAR" isnt.

Spark 集成已知问题和限制

  • 注册为临时表时,必须为具有大写字母或非 ASCII 字符的名称的 Kudu 表分配备用名称。
  • 具有包含大写字母或非 ASCII 字符的列名称的 Kudu 表可能不与 SparkSQL 一起使用。 Columns 可能在 Kudu 更名为解决这个问题。
  • <>OR 谓词不被推送到 Kudu ,而是将在 Spark 的evaluate过程中执行, 只有具有后缀通配符的 LIKE 谓词才被推送到 Kudu 执行,这意味着 LIKE“FOO%”语句被下推到Kudu,而 “FOO%BAR” 这样的语句不会。
  • Kudu 不支持 Spark SQL 支持的所有类型,例如 DateDecimal 和复杂类型。
  • Kudu 表只能在 SparkSQL 中注册为临时表。 可能不会使用 HiveContext 查询 Kudu 表。

Kudu Python 客户端

Kudu Python 客户端为 C ++ 客户端 API 提供了一个 Python 友好的界面。 下面的示例演示了如何使用部分 Python 客户端。

  1. import kudu
  2. from kudu.client import Partitioning
  3. from datetime import datetime
  4. # Connect to Kudu master server
  5. client = kudu.connect(host='kudu.master', port=7051)
  6. # Define a schema for a new table
  7. builder = kudu.schema_builder()
  8. builder.add_column('key').type(kudu.int64).nullable(False).primary_key()
  9. builder.add_column('ts_val', type_=kudu.unixtime_micros, nullable=False, compression='lz4')
  10. schema = builder.build()
  11. # Define partitioning schema
  12. partitioning = Partitioning().add_hash_partitions(column_names=['key'], num_buckets=3)
  13. # Create new table
  14. client.create_table('python-example', schema, partitioning)
  15. # Open a table
  16. table = client.table('python-example')
  17. # Create a new session so that we can apply write operations
  18. session = client.new_session()
  19. # Insert a row
  20. op = table.new_insert({'key': 1, 'ts_val': datetime.utcnow()})
  21. session.apply(op)
  22. # Upsert a row
  23. op = table.new_upsert({'key': 2, 'ts_val': "2016-01-01T00:00:00.000000"})
  24. session.apply(op)
  25. # Updating a row
  26. op = table.new_update({'key': 1, 'ts_val': ("2017-01-01", "%Y-%m-%d")})
  27. session.apply(op)
  28. # Delete a row
  29. op = table.new_delete({'key': 2})
  30. session.apply(op)
  31. # Flush write operations, if failures occur, capture print them.
  32. try:
  33. session.flush()
  34. except kudu.KuduBadStatus as e:
  35. print(session.get_pending_errors())
  36. # Create a scanner and add a predicate
  37. scanner = table.scanner()
  38. scanner.add_predicate(table['ts_val'] == datetime(2017, 1, 1))
  39. # Open Scanner and read all tuples
  40. # Note: This doesn't scale for large scans
  41. result = scanner.open().read_all_tuples()

与 MapReduce , YARN 和其他框架集成

Kudu 旨在与 Hadoop 生态系统中的 MapReduceYARNSpark 和其他框架集成。 请参阅 RowCounter.java ImportCsv.java ,了解可以对自己的集成进行建模的示例。 请继续关注未来使用 YARNSpark 的更多示例。