Hive Read & Write
Using the HiveCatalog
and Flink’s connector to Hive, Flink can read and write from Hive data as an alternative to Hive’s batch engine. Be sure to follow the instructions to include the correct dependencies in your application. And please also note that Hive connector only works with blink planner.
Reading From Hive
Assume Hive contains a single table in its default
database, named people that contains several rows.
hive> show databases;
OK
default
Time taken: 0.841 seconds, Fetched: 1 row(s)
hive> show tables;
OK
Time taken: 0.087 seconds
hive> CREATE TABLE mytable(name string, value double);
OK
Time taken: 0.127 seconds
hive> SELECT * FROM mytable;
OK
Tom 4.72
John 8.0
Tom 24.2
Bob 3.14
Bob 4.72
Tom 34.9
Mary 4.79
Tiff 2.72
Bill 4.33
Mary 77.7
Time taken: 0.097 seconds, Fetched: 10 row(s)
With the data ready your can connect to Hive connect to an existing Hive installation and begin querying.
Flink SQL> show catalogs;
myhive
default_catalog
# ------ Set the current catalog to be 'myhive' catalog if you haven't set it in the yaml file ------
Flink SQL> use catalog myhive;
# ------ See all registered database in catalog 'mytable' ------
Flink SQL> show databases;
default
# ------ See the previously registered table 'mytable' ------
Flink SQL> show tables;
mytable
# ------ The table schema that Flink sees is the same that we created in Hive, two columns - name as string and value as double ------
Flink SQL> describe mytable;
root
|-- name: name
|-- type: STRING
|-- name: value
|-- type: DOUBLE
# ------ Select from hive table or hive view ------
Flink SQL> SELECT * FROM mytable;
name value
__________ __________
Tom 4.72
John 8.0
Tom 24.2
Bob 3.14
Bob 4.72
Tom 34.9
Mary 4.79
Tiff 2.72
Bill 4.33
Mary 77.7
Querying Hive views
If you need to query Hive views, please note:
- You have to use the Hive catalog as your current catalog before you can query views in that catalog. It can be done by either
tableEnv.useCatalog(...)
in Table API orUSE CATALOG ...
in SQL Client. - Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view’s query is compatible with Flink grammar.
Writing To Hive
Similarly, data can be written into hive using an INSERT
clause.
Consider there is an example table named “mytable” with two columns: name and age, in string and int type.
# ------ INSERT INTO will append to the table or partition, keeping the existing data intact ------
Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
# ------ INSERT OVERWRITE will overwrite any existing data in the table or partition ------
Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;
We support partitioned table too, Consider there is a partitioned table named myparttable with four columns: name, age, my_type and my_date, in types …… my_type and my_date are the partition keys.
# ------ Insert with static partition ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
# ------ Insert with dynamic partition ------
Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
# ------ Insert with static(my_type) and dynamic(my_date) partition ------
Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';
Formats
We have tested on the following of table storage formats: text, csv, SequenceFile, ORC, and Parquet.
Optimizations
Partition Pruning
Flink uses partition pruning as a performance optimization to limits the number of files and partitions that Flink reads when querying Hive tables. When your data is partitioned, Flink only reads a subset of the partitions in a Hive table when a query matches certain filter criteria.
Projection Pushdown
Flink leverages projection pushdown to minimize data transfer between Flink and Hive tables by omitting unnecessary fields from table scans.
It is especially beneficial when a table contains many columns.
Limit Pushdown
For queries with LIMIT clause, Flink will limit the number of output records wherever possible to minimize the amount of data transferred across network.
Vectorized Optimization upon Read
Optimization is used automatically when the following conditions are met:
- Format: ORC or Parquet.
- Columns without complex data type, like hive types: List, Map, Struct, Union.
This feature is turned on by default. If there is a problem, you can use this config option to close Vectorized Optimization:
table.exec.hive.fallback-mapred-reader=true
Source Parallelism Inference
By default, Flink infers the hive source parallelism based on the number of splits, and the number of splits is based on the number of files and the number of blocks in the files.
Flink allows you to flexibly configure the policy of parallelism inference. You can configure the following parameters in TableConfig
(note that these parameters affect all sources of the job):
Key | Default | Type | Description |
---|---|---|---|
table.exec.hive.infer-source-parallelism | true | Boolean | If is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config. |
table.exec.hive.infer-source-parallelism.max | 1000 | Integer | Sets max infer parallelism for source operator. |
Roadmap
We are planning and actively working on supporting features like
- ACID tables
- bucketed tables
- more formats
Please reach out to the community for more feature request https://flink.apache.org/community.html#mailing-lists