Hudi

Overview

Apache Hudi (pronounced “hoodie”) is a next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly into the data lake. Hudi provides tables, transactions, efficient upserts/deletes, advanced indexing, streaming ingestion services, data clustering/compression optimizations, and concurrency while keeping data in an open source file format.

Supported Version

Load NodeVersion
HudiHudi: 0.12+

Dependencies

Introduce sort-connector-hudi through Maven to build your own project. Of course, you can also directly use the jar package provided by INLONG. (sort-connector-hudi)

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hudi</artifactId>
  4. <version>1.11.0-SNAPSHOT</version>
  5. </dependency>

How to create a Hudi Extract Node

Usage for SQL API

The example below shows how to create a Hudi Load Node with Flink SQL Cli :

  1. CREATE TABLE `hudi_table_name` (
  2. id STRING,
  3. name STRING,
  4. uv BIGINT,
  5. pv BIGINT
  6. ) WITH (
  7. 'connector' = 'hudi-inlong',
  8. 'path' = 'hdfs://127.0.0.1:90001/data/warehouse/hudi_db_name.db/hudi_table_name',
  9. 'uri' = 'thrift://127.0.0.1:8091',
  10. 'hoodie.database.name' = 'hudi_db_name',
  11. 'hoodie.table.name' = 'hudi_table_name',
  12. 'read.streaming.check-interval'='1',
  13. 'read.streaming.enabled'='true',
  14. 'read.streaming.skip_compaction'='true',
  15. 'read.start-commit'='20221220121000',
  16. --
  17. 'hoodie.bucket.index.hash.field' = 'id',
  18. -- compaction
  19. 'compaction.tasks' = '10',
  20. 'compaction.async.enabled' = 'true',
  21. 'compaction.schedule.enabled' = 'true',
  22. 'compaction.max_memory' = '3096',
  23. 'compaction.trigger.strategy' = 'num_or_time',
  24. 'compaction.delta_commits' = '5',
  25. 'compaction.max_memory' = '3096',
  26. --
  27. 'hoodie.keep.min.commits' = '1440',
  28. 'hoodie.keep.max.commits' = '2880',
  29. 'clean.async.enabled' = 'true',
  30. --
  31. 'write.operation' = 'upsert',
  32. 'write.bucket_assign.tasks' = '60',
  33. 'write.tasks' = '60',
  34. 'write.log_block.size' = '128',
  35. --
  36. 'index.type' = 'BUCKET',
  37. 'metadata.enabled' = 'false',
  38. 'hoodie.bucket.index.num.buckets' = '20',
  39. 'table.type' = 'MERGE_ON_READ',
  40. 'clean.retain_commits' = '30',
  41. 'hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS'
  42. );

Usage for Dashboard

Configuration

When creating a data stream, select Hudi for the data stream direction, and click “Add” to configure it.

Hudi Configuration

Config Itemprop in DDL statementremark
DbNamehoodie.database.namethe name of database
TableNamehudi_table_namethe name of table
EnableCreateResource-If the library table already exists and does not need to be modified, select [Do not create],
otherwise select [Create], and the system will automatically create the resource.
Catalog URIuriThe server uri of catalog
Warehouse-The location where the hudi table is stored in HDFS
In the SQL DDL, the path attribute is to splice the warehouse path with the name of db and table
StartCommitread.start-commitStart commit instant for reading, the commit time format should be ‘yyyyMMddHHmmss’, by default reading from the latest instant for streaming read
SkipCompactionread.streaming.skip_compactionWhether to skip compaction instants for streaming read, there are two cases that this option can be used to avoid reading duplicates: 1) you are definitely sure that the consumer reads faster than any compaction instants, usually with delta time compaction strategy that is long enough, for e.g, one week; 2) changelog mode is enabled, this option is a solution to keep data integrity

Usage for InLong Manager Client

TODO

Hudi Extract Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be ‘hudi-inlong’.
urirequired(none)StringMetastore uris for hive sync
hoodie.database.nameoptional(none)StringDatabase name that will be used for incremental query.If different databases have the same table name during incremental query, we can set it to limit the table name under a specific database
hoodie.table.nameoptional(none)StringTable name that will be used for registering with Hive. Needs to be same across runs.
read.start-commitoptionalnewest commit idStringStart commit instant for reading, the commit time format should be ‘yyyyMMddHHmmss’, by default reading from the latest instant for streaming read
read.streaming.skip_compactionoptionfalseStringWhether to skip compaction instants for streaming read, there are two cases that this option can be used to avoid reading duplicates: 1) you are definitely sure that the consumer reads faster than any compaction instants, usually with delta time compaction strategy that is long enough, for e.g, one week; 2) changelog mode is enabled, this option is a solution to keep data integrity
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId=xxgroup&streamId=xxstream&nodeId=xxnode.

Data Type Mapping

Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT