Hudi

概览

Apache Hudi (发音为”hoodie”)是下一代流式数据湖平台。 Apache Hudi 将核心仓库和数据库功能直接带到数据湖中。 Hudi 提供表、事务、高效的 upserts/delete、高级索引、流摄入服务、数据聚类/压缩优化和并发,同时保持数据的开源文件格式。

支持的版本

Load NodeVersion
HudiHudi: 0.12+

依赖

通过 Maven 引入 sort-connector-hudi 构建自己的项目。 当然,你也可以直接使用 INLONG 提供的 jar 包。(sort-connector-hudi)

Maven 依赖

  1. <dependency>
  2. <groupId>org.apache.inlong</groupId>
  3. <artifactId>sort-connector-hudi</artifactId>
  4. <version>2.1.0-SNAPSHOT</version>
  5. </dependency>

如何配置 Hudi 数据加载节点

SQL API 的使用

使用 Flink SQL Cli :

  1. CREATE TABLE `hudi_table_name` (
  2. id STRING,
  3. name STRING,
  4. uv BIGINT,
  5. pv BIGINT
  6. ) WITH (
  7. 'connector' = 'hudi-inlong',
  8. 'path' = 'hdfs://127.0.0.1:90001/data/warehouse/hudi_db_name.db/hudi_table_name',
  9. 'uri' = 'thrift://127.0.0.1:8091',
  10. 'hoodie.database.name' = 'hudi_db_name',
  11. 'hoodie.table.name' = 'hudi_table_name',
  12. 'hoodie.datasource.write.recordkey.field' = 'id',
  13. 'hoodie.bucket.index.hash.field' = 'id',
  14. -- compaction
  15. 'compaction.tasks' = '10',
  16. 'compaction.async.enabled' = 'true',
  17. 'compaction.schedule.enabled' = 'true',
  18. 'compaction.max_memory' = '3096',
  19. 'compaction.trigger.strategy' = 'num_or_time',
  20. 'compaction.delta_commits' = '5',
  21. 'compaction.max_memory' = '3096',
  22. --
  23. 'hoodie.keep.min.commits' = '1440',
  24. 'hoodie.keep.max.commits' = '2880',
  25. 'clean.async.enabled' = 'true',
  26. --
  27. 'write.operation' = 'upsert',
  28. 'write.bucket_assign.tasks' = '60',
  29. 'write.tasks' = '60',
  30. 'write.log_block.size' = '128',
  31. --
  32. 'index.type' = 'BUCKET',
  33. 'metadata.enabled' = 'false',
  34. 'hoodie.bucket.index.num.buckets' = '20',
  35. 'table.type' = 'MERGE_ON_READ',
  36. 'clean.retain_commits' = '30',
  37. 'hoodie.cleaner.policy' = 'KEEP_LATEST_COMMITS'
  38. );

InLong Dashboard 方式

配置

在创建数据流时,选择数据落地为 ‘Hudi’ 然后点击 ‘Add’ 来配置 Hudi 的相关信息。

Hudi Configuration

配置项对应SQL DDL中的属性备注
DB名称hoodie.database.name库名称
表名hudi_table_namehudi表名
是否创建资源-如果库表已经存在,且无需修改,则选【不创建】;
否则请选择【创建】,由系统自动创建资源。
Catalog URIuri元数据服务地址
仓库路径-hudi表存储在HDFS中的位置
在SQL DDL中path属性是将仓库路径与库、表名称拼接在一起
属性-hudi表的DDL属性需带前缀’ddl.’
高级选项>数据一致性-Flink计算引擎的一致性语义: EXACTLY_ONCEAT_LEAST_ONCE
分区字段hoodie.datasource.write.partitionpath.field分区字段
主键字段hoodie.datasource.write.recordkey.field主键字段

InLong Manager Client 方式

TODO: 未来版本支持

Hudi 加载节点参数信息

选项必填类型描述
connector必填String指定要使用的Connector,这里应该是’hudi-inlong’。
uri必填String用于配置单元同步的 Metastore uris
hoodie.database.name可选String将用于增量查询的数据库名称。如果不同数据库在增量查询时有相同的表名,我们可以设置它来限制特定数据库下的表名
hoodie.table.name可选String将用于向 Hive 注册的表名。 需要在运行中保持一致。
hoodie.datasource.write.recordkey.field必填String记录的主键字段。 用作“HoodieKey”的“recordKey”组件的值。 实际值将通过在字段值上调用 .toString() 来获得。 可以使用点符号指定嵌套字段,例如:a.b.c
hoodie.datasource.write.partitionpath.field可选String分区路径字段。 在 HoodieKey 的 partitionPath 组件中使用的值。 通过调用 .toString() 获得的实际值
inlong.metric.labels可选String在long metric label中,value的格式为groupId=xxgroup&streamId=xxstream&nodeId=xxnode。

数据类型映射

Hive typeFlink SQL type
char(p)CHAR(p)
varchar(p)VARCHAR(p)
stringSTRING
booleanBOOLEAN
tinyintTINYINT
smallintSMALLINT
intINT
bigintBIGINT
floatFLOAT
doubleDOUBLE
decimal(p, s)DECIMAL(p, s)
dateDATE
timestamp(9)TIMESTAMP
bytesBINARY
arrayLIST
mapMAP
rowSTRUCT