Hudi

Overview

Apache Hudi (pronounced “hoodie”) is a next-generation streaming data lake platform. Apache Hudi brings core warehouse and database functionality directly into the data lake. Hudi provides tables, transactions, efficient upserts/deletes, advanced indexing, streaming ingestion services, data clustering/compression optimizations, and concurrency while keeping data in an open source file format.

Supported Version

Load NodeVersion
HudiHudi: 0.12+

Dependencies

Introduce sort-connector-hudi through Maven to build your own project. Of course, you can also directly use the jar package provided by INLONG. (sort-connector-hudi)

Maven dependency

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hudi</artifactId>
  4. <version>1.11.0-SNAPSHOT</version>
  5. </dependency>

How to create a Hudi Load Node

Usage for SQL API

The example below shows how to create a Hudi Load Node with Flink SQL Cli :

  1. CREATE TABLE `hudi_table_name` (
  2. id STRING,
  3. name STRING,
  4. uv BIGINT,
  5. pv BIGINT
  6. ) WITH (
  7. 'connector' = 'hudi-inlong',
  8. 'path' = 'hdfs://127.0.0.1:90001/data/warehouse/hudi_db_name.db/hudi_table_name',
  9. 'uri' = 'thrift://127.0.0.1:8091',
  10. 'hoodie.database.name' = 'hudi_db_name',
  11. 'hoodie.table.name' = 'hudi_table_name',
  12. 'hoodie.datasource.write.recordkey.field' = 'id',
  13. 'hoodie.bucket.index.hash.field' = 'id',
  14. -- compaction
  15. 'compaction.tasks' = '10',
  16. 'compaction.async.enabled' = 'true',
  17. 'compaction.schedule.enabled' = 'true',
  18. 'compaction.max_memory' = '3096',
  19. 'compaction.trigger.strategy' = 'num_or_time',
  20. 'compaction.delta_commits' = '5',
  21. 'compaction.max_memory' = '3096',
  22. --
  23. 'hoodie.keep.min.commits' = '1440',
  24. 'hoodie.keep.max.commits' = '2880',
  25. 'clean.async.enabled' = 'true',
  26. --
  27. 'write.operation' = 'upsert',
  28. 'write.bucket_assign.tasks' = '60',
  29. 'write.tasks' = '60',
  30. 'write.log_block.size' = '128',
  31. --
  32. 'index.type' = 'BUCKET',
  33. 'metadata.enabled' = 'false',
  34. 'hoodie.bucket.index.num.buckets' = '20',
  35. 'table.type' = 'MERGE_ON_READ',
  36. 'clean.retain_commits' = '30',
  37. 'hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS'
  38. );

Usage for InLong Dashboard

Configuration

When creating a data stream, select Hudi for the data stream direction, and click “Add” to configure it.

Hudi Configuration

Config Itemprop in DDL statementremark
DbNamehoodie.database.namethe name of database
TableNamehudi_table_namethe name of table
EnableCreateResource-If the library table already exists and does not need to be modified, select [Do not create],
otherwise select [Create], and the system will automatically create the resource.
Catalog URIuriThe server uri of catalog
Warehouse-The location where the hudi table is stored in HDFS
In the SQL DDL, the path attribute is to splice the warehouse path with the name of db and table
ExtList-The DDL attribute of the hudi table needs to be prefixed with ‘ddl.’
Advanced options>DataConsistency-Consistency semantics of Flink computing engine: EXACTLY_ONCE or AT_LEAST_ONCE
PartitionFieldListhoodie.datasource.write.partitionpath.fieldpartition field list
PrimaryKeyhoodie.datasource.write.recordkey.fieldprimary key

Usage for InLong Manager Client

TODO: It will be supported in the future.

Hudi Load Node Options

OptionRequiredDefaultTypeDescription
connectorrequired(none)StringSpecify what connector to use, here should be ‘hudi-inlong’.
urirequired(none)StringMetastore uris for hive sync
hoodie.database.nameoptional(none)StringDatabase name that will be used for incremental query.If different databases have the same table name during incremental query, we can set it to limit the table name under a specific database
hoodie.table.nameoptional(none)StringTable name that will be used for registering with Hive. Needs to be same across runs.
hoodie.datasource.write.recordkey.fieldrequired(none)StringRecord key field. Value to be used as the recordKey component of HoodieKey. Actual value will be obtained by invoking .toString() on the field value. Nested fields can be specified using the dot notation eg: a.b.c
hoodie.datasource.write.partitionpath.fieldoptional(none)StringPartition path field. Value to be used at the partitionPath component of HoodieKey. Actual value obtained by invoking .toString()
inlong.metric.labelsoptional(none)StringInlong metric label, format of value is groupId=xxgroup&streamId=xxstream&nodeId=xxnode.

Data Type Mapping

Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT