Hive Read & Write

Using the HiveCatalog, Apache Flink can be used for unified BATCH and STREAM processing of Apache Hive Tables. This means Flink can be used as a more performant alternative to Hive’s batch engine, or to continuously read and write data into and out of Hive tables to power real-time data warehousing applications.

Reading

Flink supports reading data from Hive in both BATCH and STREAMING modes. When run as a BATCH application, Flink will execute its query over the state of the table at the point in time when the query is executed. STREAMING reads will continuously monitor the table and incrementally fetch new data as it is made available. Flink will read tables as bounded by default.

STREAMING reads support consuming both partitioned and non-partitioned tables. For partitioned tables, Flink will monitor the generation of new partitions, and read them incrementally when available. For non-partitioned tables, Flink will monitor the generation of new files in the folder and read new files incrementally.

KeyDefaultTypeDescription
streaming-source.enable
falseBooleanEnable streaming source or not. NOTES: Please make sure that each partition/file should be written atomically, otherwise the reader may get incomplete data.
streaming-source.partition.include
allStringOption to set the partitions to read, the supported option are all and latest, the all means read all partitions; the latest means read latest partition in order of ‘streaming-source.partition.order’, the latest only workswhen the streaming hive source table used as temporal table. By default the option isall`. Flink supports temporal join the latest hive partition by enabling ‘streaming-source.enable’ and setting ‘streaming-source.partition.include’ to ‘latest’, at the same time, user can assign the partition compare order and data update interval by configuring following partition-related options.
streaming-source.monitor-interval
NoneDurationTime interval for consecutively monitoring partition/file. Notes: The default interval for hive streaming reading is ‘1 min’, the default interval for hive streaming temporal join is ‘60 min’, this is because there’s one framework limitation that every TM will visit the Hive metaStore in current hive streaming temporal join implementation which may produce pressure to metaStore, this will improve in the future.
streaming-source.partition-order
partition-nameStringThe partition order of streaming source, support create-time, partition-time and partition-name. create-time compares partition/file creation time, this is not the partition create time in Hive metaStore, but the folder/file modification time in filesystem, if the partition folder somehow gets updated, e.g. add new file into folder, it can affect how the data is consumed. partition-time compares the time extracted from partition name. partition-name compares partition name’s alphabetical order. For non-partition table, this value should always be ‘create-time’. By default the value is partition-name. The option is equality with deprecated option ‘streaming-source.consume-order’.
streaming-source.consume-start-offset
NoneStringStart offset for streaming consuming. How to parse and compare offsets depends on your order. For create-time and partition-time, should be a timestamp string (yyyy-[m]m-[d]d [hh:mm:ss]). For partition-time, will use partition time extractor to extract time from partition. For partition-name, is the partition name string (e.g. pt_year=2020/pt_mon=10/pt_day=01).

SQL Hints can be used to apply configurations to a Hive table without changing its definition in the Hive metastore.

  1. SELECT *
  2. FROM hive_table
  3. /*+ OPTIONS('streaming-source.enable'='true', 'streaming-source.consume-start-offset'='2020-05-20') */;

Notes

  • Monitor strategy is to scan all directories/files currently in the location path. Many partitions may cause performance degradation.
  • Streaming reads for non-partitioned tables requires that each file be written atomically into the target directory.
  • Streaming reading for partitioned tables requires that each partition should be added atomically in the view of hive metastore. If not, new data added to an existing partition will be consumed.
  • Streaming reads do not support watermark grammar in Flink DDL. These tables cannot be used for window operators.

Reading Hive Views

Flink is able to read from Hive defined views, but some limitations apply:

  1. The Hive catalog must be set as the current catalog before you can query the view. This can be done by either tableEnv.useCatalog(...) in Table API or USE CATALOG ... in SQL Client.

  2. Hive and Flink SQL have different syntax, e.g. different reserved keywords and literals. Make sure the view’s query is compatible with Flink grammar.

Vectorized Optimization upon Read

Flink will automatically used vectorized reads of Hive tables when the following conditions are met:

  • Format: ORC or Parquet.
  • Columns without complex data type, like hive types: List, Map, Struct, Union.

This feature is enabled by default. It may be disabled with the following configuration.

  1. table.exec.hive.fallback-mapred-reader=true

Source Parallelism Inference

By default, Flink will infer the optimal parallelism for its Hive readers based on the number of files, and number of blocks in each file.

Flink allows you to flexibly configure the policy of parallelism inference. You can configure the following parameters in TableConfig (note that these parameters affect all sources of the job):

KeyDefaultTypeDescription
table.exec.hive.infer-source-parallelism
trueBooleanIf is true, source parallelism is inferred according to splits number. If is false, parallelism of source are set by config.
table.exec.hive.infer-source-parallelism.max
1000IntegerSets max infer parallelism for source operator.

Load Partition Splits

Multi-thread is used to split hive’s partitions. You can use table.exec.hive.load-partition-splits.thread-num to configure the thread number. The default value is 3 and the configured value should be bigger than 0.

Temporal Table Join

You can use a Hive table as a temporal table, and then a stream can correlate the Hive table by temporal join. Please see temporal join for more information about the temporal join.

Flink supports processing-time temporal join Hive Table, the processing-time temporal join always joins the latest version of temporal table. Flink supports temporal join both partitioned table and Hive non-partitioned table, for partitioned table, Flink supports tracking the latest partition of Hive table automatically.

NOTE: Flink does not support event-time temporal join Hive table yet.

Temporal Join The Latest Partition

For a partitioned table which is changing over time, we can read it out as an unbounded stream, the partition can be acted as a version of the temporal table if every partition contains complete data of a version, the version of temporal table keeps the data of the partition.

Flink supports tracking the latest partition (version) of temporal table automatically in processing time temporal join, the latest partition (version) is defined by ‘streaming-source.partition-order’ option, This is the most common user cases that use Hive table as dimension table in a Flink stream application job.

NOTE: This feature is only supported in Flink STREAMING Mode.

The following demo shows a classical business pipeline, the dimension table comes from Hive and it’s updated once every day by a batch pipeline job or a Flink job, the kafka stream comes from real time online business data or log and need to join with the dimension table to enrich stream.

  1. -- Assume the data in hive table is updated per day, every day contains the latest and complete dimension data
  2. SET table.sql-dialect=hive;
  3. CREATE TABLE dimension_table (
  4. product_id STRING,
  5. product_name STRING,
  6. unit_price DECIMAL(10, 4),
  7. pv_count BIGINT,
  8. like_count BIGINT,
  9. comment_count BIGINT,
  10. update_time TIMESTAMP(3),
  11. update_user STRING,
  12. ...
  13. ) PARTITIONED BY (pt_year STRING, pt_month STRING, pt_day STRING) TBLPROPERTIES (
  14. -- using default partition-name order to load the latest partition every 12h (the most recommended and convenient way)
  15. 'streaming-source.enable' = 'true',
  16. 'streaming-source.partition.include' = 'latest',
  17. 'streaming-source.monitor-interval' = '12 h',
  18. 'streaming-source.partition-order' = 'partition-name', -- option with default value, can be ignored.
  19. -- using partition file create-time order to load the latest partition every 12h
  20. 'streaming-source.enable' = 'true',
  21. 'streaming-source.partition.include' = 'latest',
  22. 'streaming-source.partition-order' = 'create-time',
  23. 'streaming-source.monitor-interval' = '12 h'
  24. -- using partition-time order to load the latest partition every 12h
  25. 'streaming-source.enable' = 'true',
  26. 'streaming-source.partition.include' = 'latest',
  27. 'streaming-source.monitor-interval' = '12 h',
  28. 'streaming-source.partition-order' = 'partition-time',
  29. 'partition.time-extractor.kind' = 'default',
  30. 'partition.time-extractor.timestamp-pattern' = '$pt_year-$pt_month-$pt_day 00:00:00'
  31. );
  32. SET table.sql-dialect=default;
  33. CREATE TABLE orders_table (
  34. order_id STRING,
  35. order_amount DOUBLE,
  36. product_id STRING,
  37. log_ts TIMESTAMP(3),
  38. proctime as PROCTIME()
  39. ) WITH (...);
  40. -- streaming sql, kafka temporal join a hive dimension table. Flink will automatically reload data from the
  41. -- configured latest partition in the interval of 'streaming-source.monitor-interval'.
  42. SELECT * FROM orders_table AS o
  43. JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
  44. ON o.product_id = dim.product_id;

Temporal Join The Latest Table

For a Hive table, we can read it out as a bounded stream. In this case, the Hive table can only track its latest version at the time when we query. The latest version of table keep all data of the Hive table.

When performing the temporal join the latest Hive table, the Hive table will be cached in Slot memory and each record from the stream is joined against the table by key to decide whether a match is found. Using the latest Hive table as a temporal table does not require any additional configuration. Optionally, you can configure the TTL of the Hive table cache with the following property. After the cache expires, the Hive table will be scanned again to load the latest data.

KeyDefaultTypeDescription
lookup.join.cache.ttl
60 minDurationThe cache TTL (e.g. 10min) for the build table in lookup join. By default the TTL is 60 minutes. NOTES: The option only works when lookup bounded hive table source, if you’re using streaming hive source as temporal table, please use ‘streaming-source.monitor-interval’ to configure the interval of data update.

The following demo shows load all data of hive table as a temporal table.

  1. -- Assume the data in hive table is overwrite by batch pipeline.
  2. SET table.sql-dialect=hive;
  3. CREATE TABLE dimension_table (
  4. product_id STRING,
  5. product_name STRING,
  6. unit_price DECIMAL(10, 4),
  7. pv_count BIGINT,
  8. like_count BIGINT,
  9. comment_count BIGINT,
  10. update_time TIMESTAMP(3),
  11. update_user STRING,
  12. ...
  13. ) TBLPROPERTIES (
  14. 'streaming-source.enable' = 'false', -- option with default value, can be ignored.
  15. 'streaming-source.partition.include' = 'all', -- option with default value, can be ignored.
  16. 'lookup.join.cache.ttl' = '12 h'
  17. );
  18. SET table.sql-dialect=default;
  19. CREATE TABLE orders_table (
  20. order_id STRING,
  21. order_amount DOUBLE,
  22. product_id STRING,
  23. log_ts TIMESTAMP(3),
  24. proctime as PROCTIME()
  25. ) WITH (...);
  26. -- streaming sql, kafka join a hive dimension table. Flink will reload all data from dimension_table after cache ttl is expired.
  27. SELECT * FROM orders_table AS o
  28. JOIN dimension_table FOR SYSTEM_TIME AS OF o.proctime AS dim
  29. ON o.product_id = dim.product_id;

Note:

  1. Each joining subtask needs to keep its own cache of the Hive table. Please make sure the Hive table can fit into the memory of a TM task slot.
  2. It is encouraged to set a relatively large value both for streaming-source.monitor-interval(latest partition as temporal table) or lookup.join.cache.ttl(all partitions as temporal table). Otherwise, Jobs are prone to performance issues as the table needs to be updated and reloaded too frequently.
  3. Currently we simply load the whole Hive table whenever the cache needs refreshing. There’s no way to differentiate new data from the old.

Writing

Flink supports writing data from Hive in both BATCH and STREAMING modes. When run as a BATCH application, Flink will write to a Hive table only making those records visible when the Job finishes. BATCH writes support both appending to and overwriting existing tables.

  1. # ------ INSERT INTO will append to the table or partition, keeping the existing data intact ------
  2. Flink SQL> INSERT INTO mytable SELECT 'Tom', 25;
  3. # ------ INSERT OVERWRITE will overwrite any existing data in the table or partition ------
  4. Flink SQL> INSERT OVERWRITE mytable SELECT 'Tom', 25;

Data can also be inserted into particular partitions.

  1. # ------ Insert with static partition ------
  2. Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1', my_date='2019-08-08') SELECT 'Tom', 25;
  3. # ------ Insert with dynamic partition ------
  4. Flink SQL> INSERT OVERWRITE myparttable SELECT 'Tom', 25, 'type_1', '2019-08-08';
  5. # ------ Insert with static(my_type) and dynamic(my_date) partition ------
  6. Flink SQL> INSERT OVERWRITE myparttable PARTITION (my_type='type_1') SELECT 'Tom', 25, '2019-08-08';

STREAMING writes continuously adding new data to Hive, committing records - making them visible - incrementally. Users control when/how to trigger commits with several properties. Insert overwrite is not supported for streaming write.

The below examples show how the streaming sink can be used to write a streaming query to write data from Kafka into a Hive table with partition-commit, and runs a batch query to read that data back out.

Please see the streaming sink for a full list of available configurations.

  1. SET table.sql-dialect=hive;
  2. CREATE TABLE hive_table (
  3. user_id STRING,
  4. order_amount DOUBLE
  5. ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  6. 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  7. 'sink.partition-commit.trigger'='partition-time',
  8. 'sink.partition-commit.delay'='1 h',
  9. 'sink.partition-commit.policy.kind'='metastore,success-file'
  10. );
  11. SET table.sql-dialect=default;
  12. CREATE TABLE kafka_table (
  13. user_id STRING,
  14. order_amount DOUBLE,
  15. log_ts TIMESTAMP(3),
  16. WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP column
  17. ) WITH (...);
  18. -- streaming sql, insert into hive table
  19. INSERT INTO TABLE hive_table
  20. SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH')
  21. FROM kafka_table;
  22. -- batch sql, select with partition pruning
  23. SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

If the watermark is defined on TIMESTAMP_LTZ column and used partition-time to commit, the sink.partition-commit.watermark-time-zone is required to set to the session time zone, otherwise the partition committed may happen after a few hours.

  1. SET table.sql-dialect=hive;
  2. CREATE TABLE hive_table (
  3. user_id STRING,
  4. order_amount DOUBLE
  5. ) PARTITIONED BY (dt STRING, hr STRING) STORED AS parquet TBLPROPERTIES (
  6. 'partition.time-extractor.timestamp-pattern'='$dt $hr:00:00',
  7. 'sink.partition-commit.trigger'='partition-time',
  8. 'sink.partition-commit.delay'='1 h',
  9. 'sink.partition-commit.watermark-time-zone'='Asia/Shanghai', -- Assume user configured time zone is 'Asia/Shanghai'
  10. 'sink.partition-commit.policy.kind'='metastore,success-file'
  11. );
  12. SET table.sql-dialect=default;
  13. CREATE TABLE kafka_table (
  14. user_id STRING,
  15. order_amount DOUBLE,
  16. ts BIGINT, -- time in epoch milliseconds
  17. ts_ltz AS TO_TIMESTAMP_LTZ(ts, 3),
  18. WATERMARK FOR ts_ltz AS ts_ltz - INTERVAL '5' SECOND -- Define watermark on TIMESTAMP_LTZ column
  19. ) WITH (...);
  20. -- streaming sql, insert into hive table
  21. INSERT INTO TABLE hive_table
  22. SELECT user_id, order_amount, DATE_FORMAT(ts_ltz, 'yyyy-MM-dd'), DATE_FORMAT(ts_ltz, 'HH')
  23. FROM kafka_table;
  24. -- batch sql, select with partition pruning
  25. SELECT * FROM hive_table WHERE dt='2020-05-20' and hr='12';

By default, for streaming writes, Flink only supports renaming committers, meaning the S3 filesystem cannot support exactly-once streaming writes. Exactly-once writes to S3 can be achieved by configuring the following parameter to false. This will instruct the sink to use Flink’s native writers but only works for parquet and orc file types. This configuration is set in the TableConfig and will affect all sinks of the job.

KeyDefaultTypeDescription
table.exec.hive.fallback-mapred-writer
trueBooleanIf it is false, using flink native writer to write parquet and orc files; if it is true, using hadoop mapred record writer to write parquet and orc files.

Formats

Flink’s Hive integration has been tested against the following file formats:

  • Text
  • CSV
  • SequenceFile
  • ORC
  • Parquet