Hive

Overview

Hive Load Node can write data to hive. Using the flink dialect, the insert operation is currently supported, and the data in the upsert mode will be converted into insert. Manipulating hive tables using the hive dialect is currently not supported.

Supported Version

Load NodeVersion
HiveHive: 1.x, 2.x, 3.x

Dependencies

Using Hive load requires the introduction of dependencies. Of course, you can also use INLONG to provide jar packages.(sort-connector-hive)

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hive</artifactId>
  4. <version>1.7.0</version>
  5. </dependency>

How to create a Hive Load Node

Usage for SQL API

The example below shows how to create a Hive Load Node with Flink SQL Cli :

  1. CREATE TABLE hiveTableName (
  2. id STRING,
  3. name STRING,
  4. uv BIGINT,
  5. pv BIGINT
  6. ) WITH (
  7. 'connector' = 'hive',
  8. 'default-database' = 'default',
  9. 'hive-version' = '3.1.2',
  10. 'hive-conf-dir' = 'hdfs://localhost:9000/user/hive/hive-site.xml'
  11. );

Usage for InLong Dashboard

Configuration

When creating a data stream, select Hive for the data stream direction, and click “Add” to configure it.

Hive Configuration

Usage for InLong Manager Client

TODO: It will be supported in the future.

Hive Load Node Options

OptionRequiredDefaultTypeDescription
connector
required(none)StringSpecify what connector to use, here should be ‘hive’.
default-database
required(none)String
hive-conf-dir
required(none)StringIf you don’t want to upload hive-site.xml to HDFS, you can put this configuration into the classpath of the project, and then this place only needs to be not empty, otherwise you must fill in the complete HDFS URL.
sink.partition-commit.trigger
optional(none)StringIf hive exists partition you can set trigger mode.(process-time)
partition.time-extractor.timestamp-pattern
optional(none)StringIf hive exists partition you can set timestamp-pattern mode.(yyyy-MM-dd…)
sink.partition-commit.delay
optional(none)StringIf hive exists partition you can set delay mode.(10s,20s,1m…)
sink.partition-commit.policy.kind
optional(none)StringPolicy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add ‘_success’ file to directory. Both can be configured at the same time: ‘metastore,success-file’. custom: use policy class to create a commit policy. Support to configure multiple policies: ‘metastore,success-file’.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId].

Data Type Mapping

Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT