nifi-iotdb-bundle

Apache NiFi简介

Apache NiFi 是一个易用的、功能强大的、可靠的数据处理和分发系统。

Apache NiFi 支持强大的、可伸缩的数据路由、转换和系统中介逻辑的有向图。

Apache NiFi 包含以下功能:

  • 基于浏览器的用户接口:
    • 设计、控制、反馈和监控的无缝体验
  • 数据起源跟踪
    • 从头到尾完整的信息族谱
  • 丰富的配置
    • 丢失容忍和保证交付
    • 低延迟和高吞吐
    • 动态优先级策略
    • 运行时可以修改流配置
    • 反向压力控制
  • 扩展设计
    • 用于定制 processors 和 services 的组件体系结构
    • 快速开发和迭代测试
  • 安全会话
    • 带有可配置认证策略的 HTTPS 协议
    • 多租户授权和策略管理
    • 包括TLS和SSH的加密通信的标准协议

PutIoTDBRecord

这是一个用于数据写入的处理器。它使用配置的 Record Reader 将传入 FlowFile 的内容读取为单独的记录,并使用本机接口将它们写入 Apache IoTDB。

PutIoTDBRecord的配置项

配置项描述默认值是否必填
HostIoTDB 的主机名nulltrue
PortIoTDB 的端口6667true
UsernameIoTDB 的用户名nulltrue
PasswordIoTDB 的密码nulltrue
Prefix将被写入IoTDB的数据的tsName前缀 以root. 开头
可以使用Nifi expression language做动态替换.
nulltrue
Time时间字段名nulltrue
Record Reader指定一个 Record Reader controller service 来解析数据,并且推断数据格式。nulltrue
SchemaIoTDB 需要的 schema 不能很好的被 NiFi 支持,因此你可以在这里自定义 schema。
除此之外,你可以通过这个方式设置编码和压缩类型。如果你没有设置这个配置,就会使用 Record Reader 推断的 schema。
这个配置可以通过 Attributes 的表达式来更新。
nullfalse
Aligned是否使用 aligned 接口?
这个配置可以通过 Attributes 的表达式来更新。
falsefalse
MaxRowNumber指定 tablet 的最大行数。
这个配置可以通过 Attributes 的表达式来更新。
1024false

Flowfile 的推断数据类型

如果要使用推断类型,需要注意以下几点:

  1. 输入的 flowfile 需要能被 Record Reader 读取。
  2. flowfile的 schema 中必须包含以时间字段名属性命名的字段
  3. Time的数据类型只能是 STRING 或者 LONG
  4. Time 以外的列必须以 root. 开头。
  5. 支持的数据类型有: INTLONGFLOATDOUBLEBOOLEANTEXT

通过配置项自定义 schema

如上所述,通过配置项来自定义 schema 比起推断的 schema来说,是一种更加灵活和强大的方式。

Schema 配置项的解构如下:

  1. {
  2. "fields": [{
  3. "tsName": "s1",
  4. "dataType": "INT32",
  5. "encoding": "RLE",
  6. "compressionType": "GZIP"
  7. }, {
  8. "tsName": "s2",
  9. "dataType": "INT64",
  10. "encoding": "RLE",
  11. "compressionType": "GZIP"
  12. }]
  13. }

注意

  1. flowfile 的第一列数据必须为 Time。剩下的必须与 fields 配置中保持一样的顺序。
  2. 定义 shema 的 JSON 中必须包含 timeType and fields 这两项。
  3. timeType 只支持 LONGSTRING 这两个选项。
  4. tsNamedataType 这两项必须被设置。
  5. 当数据插入IoTDB时,Prefix属性会被添加到 tsName以作为插入的字段名。
  6. 支持的 dataTypes 有:INT32INT64FLOATDOUBLEBOOLEANTEXT
  7. 支持的 encoding 有: PLAINDICTIONARYRLEDIFFTS_2DIFFBITMAPGORILLA_V1REGULARGORILLAZIGZAGCHIMPSPRINTZRLBE
  8. 支持的 compressionType 有: UNCOMPRESSEDSNAPPYGZIPLZOSDTPAAPLALZ4ZSTDLZMA2

Relationships

relationship描述
success数据能被正确的写入。
failureschema 或者数据有异常。

QueryIoTDBRecord

这是一个用于数据读取的处理器。它通过读取 FlowFile 的内容中的SQL 查询来对IoTDB的原生接口进行访问,并将查询结果用Record Writer写入 flowfile。

QueryIoTDBRecord的配置项

配置项描述默认值是否必填
HostIoTDB 的主机名nulltrue
PortIoTDB 的端口6667true
UsernameIoTDB 的用户名nulltrue
PasswordIoTDB 的密码nulltrue
Record Writer指定一个 Record Writer controller service 来写入数据。nulltrue
iotdb-query需要执行的IoTDB query
。 Note: 如果有连入侧的连接那么查询会从FlowFile的内容中提取,否则使用当前配置的属性
nullfalse
iotdb-query-chunk-size返回的结果可以进行分块,数据流中会返回一批按设置大小切分的数据,而不是一个单一的响应. 分块查询可以返回无限量的行。 注意: 数据分块只有在设置不为0时启用0false

Relationships

relationship描述
success数据能被正确的写入。
failureschema 或者数据有异常。