HBase

概览

HBase Load 节点支持写数据都 HBase 数据库.

支持的版本

Load 节点HBase 版本
HBase2.2.x

依赖

为了设置 HBase Load 节点, 下面提供了使用构建自动化工具(例如 Maven 或 SBT)和带有 Sort Connector JAR 包的 SQL 客户端的两个项目的依赖关系信息。

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hbase</artifactId>
  4. <version>2.1.0-SNAPSHOT</version>
  5. </dependency>

如何创建 HBase Load 节点

SQL API 用法

所有 HBase 表的列簇必须定义为 ROW 类型,字段名对应列簇名(column family),嵌套的字段名对应列限定符名(column qualifier)。用户只需在表结构中 声明查询中使用的的列簇和列限定符。除了 ROW 类型的列,剩下的原子数据类型字段(比如,STRING, BIGINT)将被识别为 HBase 的 rowkey,一张表中只能声明一个 rowkey。rowkey 字段的名字可以是任意的,如果是保留关键字,需要用反引号。

下面这个例子展示了如何用 Flink SQL 创建一个 HBase Load 节点:

  1. -- Flink SQL 中创建 HBase 'hbase_load_node'
  2. CREATE TABLE hbase_load_node (
  3. rowkey STRING,
  4. family1 ROW<q1 INT>,
  5. family2 ROW<q2 STRING, q3 BIGINT>,
  6. family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
  7. PRIMARY KEY (rowkey) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'hbase-2.2-inlong',
  10. 'table-name' = 'mytable',
  11. 'zookeeper.quorum' = 'localhost:2181'
  12. );
  13. -- 使用 ROW(...) 构造函数构造列族和写数据到 HBase 表。
  14. -- 假设表"T" schema [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
  15. INSERT INTO hbase_load_node
  16. SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
  17. -- HBase 表中扫描数据
  18. SELECT rowkey, family1, family3.q4, family3.q6 FROM hbase_load_node;
  19. -- HBase 表临时连接为维度表
  20. SELECT * FROM myTopic
  21. LEFT JOIN hbase_load_node FOR SYSTEM_TIME AS OF myTopic.proctime
  22. ON myTopic.key = hbase_load_node.rowkey;

InLong Dashboard 用法

TODO: 将在未来支持此功能。

InLong Manager Client 用法

TODO: 将在未来支持此功能。

HBase Load 节点参数

参数是否必选默认值数据类型描述
connector必选(none)String指定使用的连接器: hbase-2.2-inlong: 连接 HBase 2.2.x 集群
table-name必选(none)String连接的 HBase 表名。
zookeeper.quorum必选(none)StringHBase Zookeeper quorum 信息。
zookeeper.znode.parent可选/hbaseStringHBase 集群的 Zookeeper 根目录。
null-string-literal可选nullString当字符串值为 null 时的存储形式,默认存成 “null” 字符串。HBase 的 source 和 sink 的编解码将所有数据类型(除字符串外)将 null 值以空字节来存储。
sink.buffer-flush.max-size可选2mbMemorySize写入的参数选项。每次写入请求缓存行的最大大小。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 “0” 关闭此选项。
sink.buffer-flush.max-rows可选1000Integer写入的参数选项。 每次写入请求缓存的最大行数。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 “0” 关闭此选项。
sink.buffer-flush.interval可选1sDuration写入的参数选项。刷写缓存行的间隔。它能提升写入 HBase 数据库的性能,但是也可能增加延迟。设置为 “0” 关闭此选项。注意:”sink.buffer-flush.max-size” 和 “sink.buffer-flush.max-rows” 同时设置为 “0”,刷写选项整个异步处理缓存行为。
sink.parallelism可选(none)Integer为 HBase sink operator 定义并行度。默认情况下,并行度由框架决定,和链在一起的上游 operator 一样。
lookup.async可选falseBoolean是否启用异步查找。如果为真,查找将是异步的。注意:异步方式只支持 hbase-2.2 连接器
lookup.cache.max-rows可选(none)Integer查找缓存的最大行数,超过这个值,最旧的行将过期。注意:”lookup.cache.max-rows” 和 “lookup.cache.ttl” 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.cache.ttl可选(none)Duration查找缓存中每一行的最大生存时间,在这段时间内,最老的行将过期。注意:”lookup.cache.max-rows” 和 “lookup.cache.ttl” 必须同时被设置。默认情况下,查找缓存是禁用的。
lookup.max-retries可选3Integer查找数据库失败时的最大重试次数。
properties.*可选(none)String可以设置任意 HBase 的配置项。后缀名必须匹配在 HBase 配置文档 中定义的配置键。Flink 将移除 “properties.” 配置键前缀并将变换后的配置键和值传入底层的 HBase 客户端。 例如您可以设置 ‘properties.hbase.security.authentication’ = ‘kerberos’ 等kerberos认证参数。
inlong.metric.labels可选(none)Stringinlong metric 的标签值,该值的构成为groupId={groupId}&streamId={streamId}&nodeId={nodeId}

数据类型映射

HBase 以字节数组存储所有数据。在读和写过程中要序列化和反序列化数据。

Flink 的 HBase 连接器利用 HBase(Hadoop) 的工具类 org.apache.hadoop.hbase.util.Bytes 进行字节数组和 Flink 数据类型转换。

Flink 的 HBase 连接器将所有数据类型(除字符串外)null 值编码成空字节。对于字符串类型,null 值的字面值由null-string-literal选项值决定。

数据类型映射表如下:

Flink SQL 类型HBase 转换
CHAR
VARCHAR
STRING
byte[] toBytes(String s)
String toString(byte[] b)
BOOLEANbyte[] toBytes(boolean b)
boolean toBoolean(byte[] b)
BINARY
VARBINARY
Returns byte[] as is.
DECIMALbyte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)
TINYINTnew byte[] { val }
bytes[0] // returns first and only byte from bytes
SMALLINTbyte[] toBytes(short val)
short toShort(byte[] bytes)
INTbyte[] toBytes(int val)
int toInt(byte[] bytes)
BIGINTbyte[] toBytes(long val)
long toLong(byte[] bytes)
FLOATbyte[] toBytes(float val)
float toFloat(byte[] bytes)
DOUBLEbyte[] toBytes(double val)
double toDouble(byte[] bytes)
DATEStores the number of days since epoch as int value.
TIMEStores the number of milliseconds of the day as int value.
TIMESTAMPStores the milliseconds since epoch as long value.
ARRAYNot supported
MAP
MULTISET
Not supported
ROWNot supported