HBase SQL 连接器

Scan Source: Bounded Lookup Source: Sync Mode Sink: Batch Sink: Streaming Upsert Mode

HBase 连接器支持读取和写入 HBase 集群。本文档介绍如何使用 HBase 连接器基于 HBase 进行 SQL 查询。

HBase 连接器在 upsert 模式下运行,可以使用 DDL 中定义的主键与外部系统交换更新操作消息。但是主键只能基于 HBase 的 rowkey 字段定义。如果没有声明主键,HBase 连接器默认取 rowkey 作为主键。

依赖

In order to use the HBase connector the following dependencies are required for both projects using a build automation tool (such as Maven or SBT) and SQL Client with SQL JAR bundles.

HBase versionMaven dependencySQL Client JAR
1.4.x
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-hbase-1.4</artifactId>
  4. <version>1.15.0</version>
  5. </dependency>
Copied to clipboard!
Download
2.2.x
  1. <dependency>
  2. <groupId>org.apache.flink</groupId>
  3. <artifactId>flink-connector-hbase-2.2</artifactId>
  4. <version>1.15.0</version>
  5. </dependency>
Copied to clipboard!
Download

HBase 连接器不是二进制发行版的一部分,请查阅这里了解如何在集群运行中引用 HBase 连接器。

如何使用 HBase 表

所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。

  1. -- Flink SQL 中注册 HBase "mytable"
  2. CREATE TABLE hTable (
  3. rowkey INT,
  4. family1 ROW<q1 INT>,
  5. family2 ROW<q2 STRING, q3 BIGINT>,
  6. family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
  7. PRIMARY KEY (rowkey) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'hbase-1.4',
  10. 'table-name' = 'mytable',
  11. 'zookeeper.quorum' = 'localhost:2181'
  12. );
  13. -- ROW(...) 构造函数构造列簇,并往 HBase 表写数据。
  14. -- 假设 "T" 的表结构是 [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
  15. INSERT INTO hTable
  16. SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
  17. -- HBase 表扫描数据
  18. SELECT rowkey, family1, family3.q4, family3.q6 FROM hTable;
  19. -- temporal join HBase 表,将 HBase 表作为维表
  20. SELECT * FROM myTopic
  21. LEFT JOIN hTable FOR SYSTEM_TIME AS OF myTopic.proctime
  22. ON myTopic.key = hTable.rowkey;

连接器参数

参数是否必选默认值数据类型描述
connector
必选(none)String指定使用的连接器, 支持的值如下 :
  • hbase-1.4: 连接 HBase 1.4.x 集群
  • hbase-2.2: 连接 HBase 2.2.x 集群
table-name
必选(none)String连接的 HBase 表名。默认该表在 “default” 命名空间下,指定命名空间下的表需要使用 “namespace:table”。
zookeeper.quorum
必选(none)StringHBase Zookeeper quorum 信息。
zookeeper.znode.parent
可选/hbaseStringHBase 集群的 Zookeeper 根目录。
null-string-literal
可选nullString当字符串值为 null 时的存储形式,默认存成 “null” 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。
sink.buffer-flush.max-size
可选2mbMemorySize写入的参数选项。每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 “0” 关闭此选项。
sink.buffer-flush.max-rows
可选1000Integer写入的参数选项。 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 “0” 关闭此选项。
sink.buffer-flush.interval
可选1sDuration写入的参数选项。刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 “0” 关闭此选项。注意:”sink.buffer-flush.max-size” 和 “sink.buffer-flush.max-rows” 同时设置为 “0”,刷写选项整个异步处理缓存行为。
sink.parallelism
可选(none)Integer为 HBase sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一样。
lookup.async
可选falseBoolean是否启用异步查找。如果为真,查找将是异步的。注意:异步方式只支持 hbase-2.2 连接器
lookup.cache.max-rows
可选-1Long查找缓存的最大行数,超过这个值,最旧的行将过期。注意:”lookup.cache.max-rows” 和 “lookup.cache.ttl” 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.cache.ttl
可选0 sDuration查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:”lookup.cache.max-rows” 和 “lookup.cache.ttl” 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.max-retries
可选3Integer查找数据库失败时的最大重试次数。
properties.*
可选(无)String可以设置任意 HBase 的配置项。后缀名必须匹配在 HBase 配置文档 中定义的配置键。Flink 将移除 “properties.” 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。 例如您可以设置 ‘properties.hbase.security.authentication’ = ‘kerberos’ 等kerberos认证参数。

数据类型映射表

HBase 以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。

Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。

Flink 的 HBase 连接器将所有数据类型(除字符串外)null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。

数据类型映射表如下:

Flink 数据类型HBase 转换
CHAR / VARCHAR / STRING
  1. byte[] toBytes(String s)
  2. String toString(byte[] b)
BOOLEAN
  1. byte[] toBytes(boolean b)
  2. boolean toBoolean(byte[] b)
BINARY / VARBINARY返回 byte[]
DECIMAL
  1. byte[] toBytes(BigDecimal v)
  2. BigDecimal toBigDecimal(byte[] b)
TINYINT
  1. new byte[] { val }
  2. bytes[0] // returns first and only byte from bytes
SMALLINT
  1. byte[] toBytes(short val)
  2. short toShort(byte[] bytes)
INT
  1. byte[] toBytes(int val)
  2. int toInt(byte[] bytes)
BIGINT
  1. byte[] toBytes(long val)
  2. long toLong(byte[] bytes)
FLOAT
  1. byte[] toBytes(float val)
  2. float toFloat(byte[] bytes)
DOUBLE
  1. byte[] toBytes(double val)
  2. double toDouble(byte[] bytes)
DATE从 1970-01-01 00:00:00 UTC 开始的天数,int 值。
TIME从 1970-01-01 00:00:00 UTC 开始天的毫秒数,int 值。
TIMESTAMP从 1970-01-01 00:00:00 UTC 开始的毫秒数,long 值。
ARRAY不支持
MAP / MULTISET不支持
ROW不支持