Hive

Overview

Hive Load Node can write data to hive. Using the flink dialect, the insert operation is currently supported, and the data in the upsert mode will be converted into insert. Manipulating hive tables using the hive dialect is currently not supported.

Supported Version

Load NodeVersion
HiveHive: 1.x, 2.x, 3.x

Dependencies

Using Hive load requires the introduction of dependencies. Of course, you can also use INLONG to provide jar packages.(sort-connector-hive)

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hive</artifactId>
  4. <version>1.8.0</version>
  5. </dependency>

How to create a Hive Load Node

Usage for SQL API

The example below shows how to create a Hive Load Node with Flink SQL Cli :

  1. CREATE TABLE hiveTableName (
  2. id STRING,
  3. name STRING,
  4. uv BIGINT,
  5. pv BIGINT
  6. ) WITH (
  7. 'connector' = 'hive',
  8. 'default-database' = 'default',
  9. 'hive-version' = '3.1.2',
  10. 'hive-conf-dir' = 'hdfs://localhost:9000/user/hive/hive-site.xml'
  11. );

Usage for InLong Dashboard

Configuration

When creating a data stream, select Hive for the data stream direction, and click “Add” to configure it.

Hive Configuration

Usage for InLong Manager Client

TODO: It will be supported in the future.

Hive Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be ‘hive’.
default-databaserequired(none)String
hive-conf-dirrequired(none)StringIf you don’t want to upload hive-site.xml to HDFS, you can put this configuration into the classpath of the project, and then this place only needs to be not empty, otherwise you must fill in the complete HDFS URL.
sink.partition-commit.triggeroptional(none)StringIf hive exists partition you can set trigger mode.(process-time)
partition.time-extractor.timestamp-patternoptional(none)StringIf hive exists partition you can set timestamp-pattern mode.(yyyy-MM-dd…)
sink.partition-commit.delayoptional(none)StringIf hive exists partition you can set delay mode.(10s,20s,1m…)
sink.partition-commit.policy.kindoptional(none)StringPolicy to commit a partition is to notify the downstream application that the partition has finished writing, the partition is ready to be read. metastore: add partition to metastore. Only hive table supports metastore policy, file system manages partitions through directory structure. success-file: add ‘_success’ file to directory. Both can be configured at the same time: ‘metastore,success-file’. custom: use policy class to create a commit policy. Support to configure multiple policies: ‘metastore,success-file’.
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId=[groupId]&streamId=[streamId]&nodeId=[nodeId].

Data Type Mapping

Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT