EdgeX 源

Kuiper 提供了内置的 EdgeX 源支持,它可以被用来订阅来自于EdgeX 消息总线EdgeX 源 - 图1 (opens new window)的数据,并且将数据放入 Kuiper 数据处理流水线中。

EdgeX 流定义

EdgeX 在 value descriptorsEdgeX 源 - 图2 (opens new window) 已经定义了数据类型,因此在 Kuiper 中建议采用 schema-less 方式的 EdgeX 流式定义,如下所示。

  1. # cd $kuiper_base
  2. # bin/kuiper CREATE STREAM demo'() with(format="json", datasource="demo" type="edgex")'

EdgeX 源会试图取得某个字段的类型,

  • 如果在 value descriptors 中可找到其数据类型,就将其转换为对应类型;
  • 如果在 value descriptors 中可找不到到其数据类型,将保留原值;
  • 如果类型转换失败,该值将被丢弃,并在日志上打印一条告警消息;

在 EdgeX value descriptors 中定义的数据类型,将被转换为 Kuiper 中相应支持的数据类型

Boolean

如果 ValueDescriptorType 的值为 Bool ,那么 Kuiper 会试着将其转换为 boolean 类型,以下的值将被转化为 true

  • “1”, “t”, “T”, “true”, “TRUE”, “True”

以下值将被转换为 false

  • “0”, “f”, “F”, “false”, “FALSE”, “False”

Bigint

如果 ValueDescriptorType 的值为 INT8 , INT16, INT32, INT64 , UINT8 , UINT16 , UINT32 , UINT64 那么 Kuiper 会试着将其转换为 Bigint 类型。

Float

如果 ValueDescriptorType 的值为 FLOAT32, FLOAT64 ,那么 Kuiper 会试着将其转换为 Float 类型。

String

如果 ValueDescriptorType 的值为 String,那么 Kuiper 会试着将其转换为 String 类型。

Boolean 数组

EdgeX 中的 Bool 数组类型会被转换为 boolean 数组。

Bigint 数组

EdgeX 中所有的 INT8 , INT16, INT32, INT64 , UINT8 , UINT16 , UINT32 , UINT64 数组类型会被转换为 Bigint 数组。

Float 数组

EdgeX 中所有的 FLOAT32, FLOAT64 数组类型会被转换为 Float 数组。

全局配置

EdgeX 源配置文件为 $kuiper/etc/sources/edgex.yaml,以下配置文件内容。

  1. #Global Edgex configurations
  2. default:
  3. protocol: tcp
  4. server: localhost
  5. port: 5573
  6. topic: events
  7. serviceServer: http://localhost:48080
  8. # optional:
  9. # ClientId: client1
  10. # Username: user1
  11. # Password: password

用户可以在此指定全局的 EdgeX 配置。在 default 部分中指定的配置将作为所有 EdgeX 源的缺省配置。

protocol

连接到 EdgeX 消息总线的协议,缺省为 tcp

server

EdgeX 消息总线的地址,缺省为 localhost

port

EdgeX 消息总线的端口,缺省为 5573

topic

EdgeX 消息总线上监听的主题名称,缺省为 events

serviceServer

访问 value descriptors 的基础服务地址,配置项 serviceServer 的值与 /api/v1/valuedescriptor 拼接后,用于获取 EdgeX 服务器上定义的所有 value descriptors。

type

EdgeX 消息总线类型,目前支持两种消息总线。如果指定了错误的消息总线类型,那么会使用缺省 zero 类型。

  • zero:使用 ZeroMQ 类型的消息总线
  • mqtt:使用 MQTT 服务器作为消息总线

optional

如果使用了 MQTT 消息总线,还可以指定别的一些可选配置项。请注意,所有在可选的配置项里指定的值都必须为**字符类型**,因此这里出现的所有的配置应该是字符类型的 - 例如 KeepAlive: "5000"。以下为支持的可选的配置列表,您可以参考 MQTT 协议规范来获取更详尽的信息。

  • ClientId

  • Username

  • Password

  • Qos

  • KeepAlive

  • Retained

  • ConnectionPayload

  • CertFile

  • KeyFile

  • CertPEMBlock

  • KeyPEMBlock

  • SkipCertVerify

重载缺省设置

在某些情况下,你可能想消费来自于多个主题的数据。Kuiper 支持指定别的配置,并且在创建流定义的时候使用 CONF_KEY 来指定新的配置。

  1. #覆盖全局配置
  2. demo1: #Conf_key
  3. protocol: tcp
  4. server: 10.211.55.6
  5. port: 5571
  6. topic: events

如果你有个特定的源需要覆盖缺省的设置,你可以定义一个自定义的配置段。在上面的例子中,我们创建了一个新的配置 demo1,然后你在创建流定义的时候可以使用选项 CONF_KEY 来使用新的配置 (参考 流定义规范 获取更多详细信息)。

例子

  1. create stream demo1() WITH (FORMAT="JSON", type="edgex", CONF_KEY="demo1");

在自定义的配置中,能够使用的配置项与 default 部分的是一样的,任何在自定义段中设置的值将覆盖 default 部分里的配置。