HBase

Overview

The HBase Load Node supports to write data into HBase database.

Supported Version

Load NodeHBase version
HBase2.2.x

Dependencies

In order to set up the HBase Load Node, the following provides dependency information for both projects using a build automation tool (such as Maven or SBT) and SQL Client with Sort Connectors JAR bundles.

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hbase</artifactId>
  4. <version>1.11.0-SNAPSHOT</version>
  5. </dependency>

How to create a HBase Load Node

Usage for SQL API

All the column families in HBase table must be declared as ROW type, the field name maps to the column family name, and the nested field names map to the column qualifier names. There is no need to declare all the families and qualifiers in the schema, users can declare what’s used in the query. Except the ROW type fields, the single atomic type field (e.g. STRING, BIGINT)will be recognized as HBase rowkey. The rowkey field can be arbitrary name, but should be quoted using backticks if it is a reserved keyword.

The example below shows how to create a HBase Load Node with Flink SQL :

  1. -- Create a HBase table 'hbase_load_node' in Flink SQL
  2. CREATE TABLE hbase_load_node (
  3. rowkey STRING,
  4. family1 ROW<q1 INT>,
  5. family2 ROW<q2 STRING, q3 BIGINT>,
  6. family3 ROW<q4 DOUBLE, q5 BOOLEAN, q6 STRING>,
  7. PRIMARY KEY (rowkey) NOT ENFORCED
  8. ) WITH (
  9. 'connector' = 'hbase-2.2-inlong',
  10. 'table-name' = 'mytable',
  11. 'zookeeper.quorum' = 'localhost:2181'
  12. );
  13. -- use ROW(...) construction function construct column families and write data into the HBase table.
  14. -- assuming the schema of "T" is [rowkey, f1q1, f2q2, f2q3, f3q4, f3q5, f3q6]
  15. INSERT INTO hbase_load_node
  16. SELECT rowkey, ROW(f1q1), ROW(f2q2, f2q3), ROW(f3q4, f3q5, f3q6) FROM T;
  17. -- scan data from the HBase table
  18. SELECT rowkey, family1, family3.q4, family3.q6 FROM hbase_load_node;
  19. -- temporal join the HBase table as a dimension table
  20. SELECT * FROM myTopic
  21. LEFT JOIN hbase_load_node FOR SYSTEM_TIME AS OF myTopic.proctime
  22. ON myTopic.key = hbase_load_node.rowkey;

Usage for InLong Dashboard

TODO: It will be supported in the future.

Usage for InLong Manager Client

TODO: It will be supported in the future.

HBase Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, valid values are: hbase-2.2-inlong: connect to HBase 2.2.x cluster
table-namerequired(none)StringThe name of HBase table to connect.
zookeeper.quorumrequired(none)StringThe HBase Zookeeper quorum.
zookeeper.znode.parentoptional/hbaseStringThe root dir in Zookeeper for HBase cluster.
null-string-literaloptionalnullStringRepresentation for null values for string fields. HBase source and sink encodes/decodes empty bytes as null values for all types except string type.
sink.buffer-flush.max-sizeoptional2mbMemorySizeWriting option, maximum size in memory of buffered rows for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it.
sink.buffer-flush.max-rowsoptional1000IntegerWriting option, maximum number of rows to buffer for each writing request. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it.
sink.buffer-flush.intervaloptional1sDurationWriting option, the interval to flush any buffered rows. This can improve performance for writing data to HBase database, but may increase the latency. Can be set to ‘0’ to disable it. Note, both ‘sink.buffer-flush.max-size’ and ‘sink.buffer-flush.max-rows’ can be set to ‘0’ with the flush interval set allowing for complete async processing of buffered actions.
sink.parallelismoptional(none)IntegerDefines the parallelism of the HBase sink operator. By default, the parallelism is determined by the framework using the same parallelism of the upstream chained operator.
lookup.asyncoptionalfalseBooleanWhether async lookup are enabled. If true, the lookup will be async. Note, async only supports hbase-2.2 connector.
lookup.cache.max-rowsoptional(none)IntegerThe max number of rows of lookup cache, over this value, the oldest rows will be expired. Note, “lookup.cache.max-rows” and “lookup.cache.ttl” options must all be specified if any of them is specified. Lookup cache is disabled by default.
lookup.cache.ttloptional(none)DurationThe max time to live for each rows in lookup cache, over this time, the oldest rows will be expired. Note, “cache.max-rows” and “cache.ttl” options must all be specified if any of them is specified.Lookup cache is disabled by default.
lookup.max-retriesoptional3IntegerThe max retry times if lookup database failed.
properties.*optional(none)StringThis can set and pass arbitrary HBase configurations. Suffix names must match the configuration key defined in HBase Configuration documentation. Flink will remove the “properties.” key prefix and pass the transformed key and values to the underlying HBaseClient. For example, you can add a kerberos authentication parameter ‘properties.hbase.security.authentication’ = ‘kerberos’.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId={groupId}&streamId={streamId}&nodeId={nodeId}.

Data Type Mapping

HBase stores all data as byte arrays. The data needs to be serialized and deserialized during read and write operation.

When serializing and de-serializing, Flink HBase connector uses utility class org.apache.hadoop.hbase.util.Bytes provided by HBase (Hadoop) to convert Flink Data Types to and from byte arrays.

Flink HBase connector encodes null values to empty bytes, and decode empty bytes to null values for all data types except string type. For string type, the null literal is determined by null-string-literal option.

The data type mappings are as follows:

Flink SQL typeHBase conversion
CHAR
VARCHAR
STRING
byte[] toBytes(String s)
String toString(byte[] b)
BOOLEANbyte[] toBytes(boolean b)
boolean toBoolean(byte[] b)
BINARY
VARBINARY
Returns byte[] as is.
DECIMALbyte[] toBytes(BigDecimal v)
BigDecimal toBigDecimal(byte[] b)
TINYINTnew byte[] { val }
bytes[0] // returns first and only byte from bytes
SMALLINTbyte[] toBytes(short val)
short toShort(byte[] bytes)
INTbyte[] toBytes(int val)
int toInt(byte[] bytes)
BIGINTbyte[] toBytes(long val)
long toLong(byte[] bytes)
FLOATbyte[] toBytes(float val)
float toFloat(byte[] bytes)
DOUBLEbyte[] toBytes(double val)
double toDouble(byte[] bytes)
DATEStores the number of days since epoch as int value.
TIMEStores the number of milliseconds of the day as int value.
TIMESTAMPStores the milliseconds since epoch as long value.
ARRAYNot supported
MAP
MULTISET
Not supported
ROWNot supported