数据转发第三方平台

平台提供了四种数据转发的方式,开放OpenApi在线订阅websocket数据规则引擎转发MQTT订阅

使用第三方平台调用OpenApi

基于数据签名的第三方平台模块.用于提供对外开放接口的认证方式.

注意

本功能仅在企业版中提供.

创建第三方平台

进入平台:[系统设置]-[第三方平台]

填写对应的内容保存.

注意

clientIdsecureKey需要提供给客户端开发者. 用户名和密码是系统统一的用户主体,会自动创建到用户管理中.使用此用户名密码也能登录到系统中. 与其他用户相同,可以将用户绑定到机构实现数据权限控制.

赋权

点击操作列中的赋权按钮对第三方平台进行赋权.大部分情况下只需要勾选: 设备操作API设备数据API权限即可.

如果操作栏赋权里 没有任何权限选项,先去权限管理赋予权限,分类选择APi接口

注意

此赋权操作实际上是对第三方平台对应对用户主体进行赋权.

使用签名的方式

验证流程

流程

说明

  1. 图中Signature函数为客户端设置的签名方式,支持MD5Sha256.
  2. 发起请求的签名信息都需要放到请求头中,而不是请求体.
  3. OpenApi对开发是透明的,开发只需要关心权限控制即可.OpenAPI和后台接口使用的是相同的权限控制API. 因此开发一个OpenAPI接口就是写一个WebFlux Controller. 查看使用方式

签名

平台使用签名来校验客户端请求的完整性以及合法性.

例:

ClientId为testId, SecureKey为:testSecure. 客户端请求接口: /api/v1/device/dev0001/log/_query,参数为pageSize=20&pageIndex=0,签名方式为md5.

  1. 将参数key按ascii排序得到: pageIndex=0&pageSize=20
  2. 使用拼接时间戳以及密钥得到: pageIndex=0&pageSize=201574993804802testSecure
  3. 使用md5("pageIndex=0&pageSize=201574993804802testSecure")得到837fe7fa29e7a5e4852d447578269523

示例:

  1. GET /api/device?pageIndex=0&pageSize=20
  2. X-Client-Id: testId
  3. X-Timestamp: 1574993804802
  4. X-Sign: 837fe7fa29e7a5e4852d447578269523

响应结果:

  1. HTTP/1.1 200 OK
  2. X-Timestamp: 1574994269075
  3. X-Sign: c23faa3c46784ada64423a8bba433f25
  4. {"status":200,result:[]}

验签

使用和签名相同的算法(不需要对响应结果排序):

  1. String secureKey = ...; //密钥
  2. String responseBody = ...;//服务端响应结果
  3. String timestampHeader = ...;//响应头: X-Timestamp
  4. String signHeader = ...; //响应头: X-Sign
  5. String sign = DigestUtils.md5Hex(responseBody+timestampHeader+secureKey);
  6. if(sign.equalsIgnoreCase(signHeader)){
  7. //验签通过
  8. }

使用token的方式

通过请求接口/api/v1/token来获取X-Access-Token,之后可以使用此token来发起api请求。

申请token

客户端请求接口/api/v1/token
请求方式: POST

  1. POST /api/v1/token
  2. X-Sign: 932bbe8a39ae03f568f73a507d87afac
  3. X-Timestamp: 1587719082698
  4. X-Client-Id: kF**********HRZ
  5. Content-Type: application/json
  6. {
  7. "expires": 7200 // 过期时间,单位秒.
  8. }
  9. //返回
  10. {
  11. "status":200,
  12. "result":"3bcddb719b01da679b88d07acde2516" //token信息
  13. }

使用token发起请求

此处以获取设备test001详情为例。

  1. GET /api/v1/device/test001/_detail
  2. X-Access-Token: 3bcddb719b01da679b88d07acde2516

响应结果:

  1. {
  2. "result": {
  3. "id": "test001",
  4. "name": "温控设备0309",
  5. "protocol": "demo-v1",
  6. "transport": "MQTT",
  7. "orgId": "test",
  8. "productId": "1236859833832701952",
  9. "productName": "智能温控",
  10. "deviceType": {
  11. "text": "网关设备",
  12. "value": "gateway"
  13. },
  14. "state": {
  15. "text": "离线",
  16. "value": "offline"
  17. },
  18. "address": "/127.0.0.1:36982",
  19. "onlineTime": 1586705515429,
  20. "offlineTime": 1586705507734,
  21. "createTime": 1585809343175,
  22. "registerTime": 1583805253659,
  23. "metadata": "{\"events\":[{\"id\":\"fire_alarm\",\"name\":\"火警报警\",\"expands\":{\"eventType\":\"reportData\",\"level\":\"urgent\"},\"valueType\":{\"type\":\"object\",\"properties\":[{\"id\":\"a_name\",\"name\":\"区域名称\",\"valueType\":{\"type\":\"string\"}},{\"id\":\"b_name\",\"name\":\"建筑名称\",\"valueType\":{\"type\":\"string\"}},{\"id\":\"l_name\",\"name\":\"位置名称\",\"valueType\":{\"type\":\"string\"}}]}}],\"properties\":[{\"id\":\"temperature\",\"name\":\"温度\",\"valueType\":{\"type\":\"float\",\"min\":\"0\",\"max\":\"100\",\"step\":\"0.1\",\"unit\":\"celsiusDegrees\"},\"expands\":{\"readOnly\":\"true\"}}],\"functions\":[{\"id\":\"get-log\",\"name\":\"获取日志\",\"isAsync\":true,\"output\":{\"type\":\"string\",\"expands\":{\"maxLength\":\"2048\"}},\"inputs\":[{\"id\":\"start_date\",\"name\":\"开始日期\",\"valueType\":{\"type\":\"date\",\"dateFormat\":\"yyyy-MM-dd HH:mm:ss\"}},{\"id\":\"end_data\",\"name\":\"结束日期\",\"valueType\":{\"type\":\"date\",\"dateFormat\":\"yyyy-MM-dd HH:mm:ss\"}},{\"id\":\"time\",\"name\":\"分组\",\"valueType\":{\"type\":\"string\"}}]}]}",
  24. "configuration": {
  25. "username": "test",
  26. "password": "test"
  27. },
  28. "tags": []
  29. },
  30. "status": 200,
  31. "code": "success"
  32. }

Demo

Demo中测试包org.jetlinks.demo.openapi下的测试类已测试通过平台已有的openApi接口。
Demo中使用签名的方式接入。

前往下载Demo平台设备数据转发 - 图2 (opens new window)

OAuth2.0

平台也支持通过OAuth2.0进行认证,支持: authorization_codeclient_credentials以及refresh_token.

例如:

  1. POST /jetlinks/oauth2/token
  2. Content-Type: application/json
  3. {
  4. "grant_type": "client_credentials", //固定client_credentials
  5. "client_id":"client_id",
  6. "client_secret":"client_secret",
  7. }
  8. 响应:
  9. {
  10. "access_token":"access_token",
  11. "refresh_token":"refresh_token",
  12. "expires_in":7200 //有效期,单位秒
  13. }

使用token请求接口:

  1. POST /device/instance/_query
  2. Authorization: bearer access_token
  3. {}

或者

  1. POST /device/instance/_query?access_token=access_token
  2. {}

使用Websocket订阅实时设备消息

1.1版本后提供websocket方式订阅平台消息的功能. 可以通过websocket来订阅设备,规则引擎,设备告警等相关消息.

接口

  1. websocket统一接口为: `/messaging/{token}`,
  2. `{token}`可通过登录系统或者使用OpenAPI获取.

以前端js为例:

  1. var ws = new WebSocket("ws://localhost:8848/messaging/a872d8e6cf6ccd38deb0c8772f6040e3");
  2. ws.onclose=function(e){console.log(e)};
  3. ws.onmessage=function(e){console.log(e.data)}
  4. // 如果认证失败,会立即返回消息: {"message":"认证失败","type":"authError"},并断开连接.

订阅消息

向websocket发送消息,格式为:

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/device/*/*/**", // topic,见topic列表.
  4. "parameter": { //参数,不同的订阅请求,支持的参数不同
  5. },
  6. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  7. }

注意

在取消订阅之前,多次传入相同的id是无效的,不会重复订阅.

平台推送消息:

  1. {
  2. "payload": //消息内容, topic不同,内容不同,
  3. "requestId": "request-id", //与订阅请求的id一致
  4. "topic": "/device/demo-device/test0/offline", //topic,实际产生数据的topic
  5. "type": "result" //类型 result:订阅结果 complete:结束订阅 error:发生错误
  6. }

提示

type为complete时标识本此订阅已结束,通常是订阅有限数据流时(比如发送设备指令),或者取消订阅时会返回此消息.

取消订阅

向websocket发送消息,格式为:

  1. {
  2. "type":"unsub",//固定为unsub
  3. "id": "request-id" //与订阅请求ID一致
  4. }

订阅设备消息

与消息网关中的设备topic一致,查看topic列表. 消息负载(payload)将与设备消息类型一致.

发送设备指令

发送消息到websocket

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/device-message-sender/demo-device/test0,test1", // 发送消息给demo-device型号下的test0和test1设备
  4. "parameter": {
  5. // 消息类型,支持: READ_PROPERTY (读取属性),WRITE_PROPERTY (修改属性),INVOKE_FUNCTION (调用功能)
  6. "messageType":"READ_PROPERTY"
  7. //根据不同的消息,参数也不同. 具体见: 平台统一消息定义
  8. "properties":["temperature"],
  9. //头信息
  10. "headers":{
  11. "async":false // 是否异步,异步时,平台不等待设备返回指令结果.
  12. }
  13. },
  14. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  15. }

平台将推送设备返的结果:

  1. {
  2. "payload": { //请求消息类型不同,结果不同
  3. "deviceId": "test0",
  4. "messageType": "READ_PROPERTY_REPLY",
  5. "success":true, //指令是否成功
  6. "properties": {
  7. "temperature": 28.21
  8. },
  9. "timestamp": 1588148129787
  10. },
  11. "requestId": "request-id", //订阅请求的ID
  12. "topic": "/device/demo-device/test7/offline",
  13. "type": "result"
  14. }

提示

deviceId支持*和逗号,分割,批量发送消息到设备.如: /device-message-sender/{productId}/{deviceId}. 如果要终止发送,直接取消订阅即可.

批量同步设备状态

发送消息到websocket

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/device-batch/state-sync",
  4. "parameter": {
  5. "query":{"where":"productId is test-device"}//查询条件为动态查询条件
  6. },
  7. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  8. }

平台推送:

  1. {
  2. "payload": { //请求消息类型不同,结果不同
  3. "deviceId": "test0",
  4. "state": {
  5. "value":"offline",
  6. "text":"离线"
  7. }
  8. },
  9. "requestId": "request-id", //订阅请求的ID
  10. "topic": "/device-batch/state-sync",
  11. "type": "result" //为comlete是则表示同步完成.
  12. }

dashboard仪表盘

订阅仪表盘数据:

topic: /dashboard/{dashboard}/{object}/{measurement}/{dimension}

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/dashboard/device/demo-device/property/agg", //聚合查询属性
  4. "parameter": {
  5. "deviceId":"test0", //
  6. "limit":"30",
  7. "time":"1d",
  8. "agg":"avg",
  9. "from":"now-30d",
  10. "to":"now",
  11. "format":"MM月dd日"
  12. },
  13. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  14. }

TIP

详细使用见Dashboard说明

订阅引擎事件数据

发送消息到websocket

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/rule-engine/{instanceId}/{nodeId}/event/{event}",
  4. "parameter": {},
  5. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  6. }

平台推送:

  1. {
  2. "payload": {
  3. //规则数据,不同的节点和事件类型数据不同
  4. },
  5. "requestId": "request-id", //订阅请求的ID
  6. "topic": "/rule-engine/{instanceId}/{nodeId}/event/{event}",
  7. "type": "result" //为comlete是则表示订阅结束.
  8. }

event说明

error: 执行节点错误

  1. {
  2. message:"错误消息",
  3. stack:"异常栈信息",
  4. type::"错误类型"
  5. }

result: 节点数据输出 complete: 执行节点完成

订阅设备告警数据

发送消息到websocket

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/rule-engine/device/alarm/{productId}/{deviceId}/{alarmId}",
  4. "parameter": {},
  5. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  6. }

平台推送:

  1. {
  2. "payload": { //告警相关数据
  3. "deviceId": "设备ID",
  4. "deviceName": "设备名称",
  5. "alarmId": "告警ID",
  6. "alarmName": "告警名称"
  7. //...其他告警数据
  8. },
  9. "requestId": "request-id", //订阅请求的ID
  10. "topic": "/rule-engine/device/alarm/{productId}/{deviceId}/{alarmId}",
  11. "type": "result" //为comlete是则表示订阅结束.
  12. }

订阅场景联动

发送消息到websocket

  1. {
  2. "type": "sub", //固定为sub
  3. "topic": "/scene/{alarmId}",
  4. "parameter": {},
  5. "id": "request-id" //请求ID, 请求的标识,服务端在推送消息时,会将此标识一并返回.
  6. }

平台推送:

  1. {
  2. "payload": { //触发场景的数据内容,触发方式不同,数据格式不同
  3. },
  4. "requestId": "request-id", //订阅请求的ID
  5. "topic": "/scene/{alarmId}",
  6. "type": "result" //为comlete是则表示同步完成.
  7. }

使用MQTT订阅平台消息

tip 在1.5企业版本后提供mqtt方式订阅平台消息的功能.可以通过mqtt来订阅设备,规则引擎,设备告警等相关消息.

修改配置文件

通过配置:

  1. messaging:
  2. mqtt:
  3. enabled: true #开启mqtt支持
  4. port: 11883 # 端口
  5. host: 0.0.0.0 #绑定网卡

认证

默认使用token(可以使用OpenAPI申请token)作为clientId,usernamepassword可以不填写.

可通过实现接口MqttAuthenticationHandler来自定义认证策略.

注意

平台topic使用的通配符为*,在使用MQTT订阅时需要将通配符转换为mqtt的通配符: *转为+,**转为#.

订阅设备消息

与消息网关中的设备topic一致,查看topic列表平台设备数据转发 - 图3 (opens new window). 消息负载(payload)将与设备消息类型平台设备数据转发 - 图4 (opens new window)一致.

提示

1.6版本后支持分组订阅:同一个用户订阅相同的topic,只有其中一个订阅者收到消息.

在topic前增加$shared即可,如: $shared/device/+/+/#

规则引擎转发

四种转发方式

四种转发

配置实时订阅平台设备数据

实时订阅平台设备数据

配置转发

TIP

函数的配置需要取决于下游节点接收参数是什么? 下游节点即与函数连接的下一个node节点。 举例:在四种转发方式内,函数的下游节点分别是订阅MQTTHTTP请求写入Kafka写入数据库

查看下游节点接收参数

订阅MQTT

选择服务端,该服务端需要在网络组件内配置MQTT客户端

WARNING

配置客户端的原因是,此处平台创建一个MQTT客户端将平台消息总线内的实时数据通过客户端推送给EMQ服务,由EMQ来做数据分发,达到数据转发的目的。此时其他MQTT客户端订阅平台推送时填写的{topic}即可收到消息

实时订阅平台设备数据

可接收的参数为上图红框圈出内容,则需要在函数节点内配置为

实时订阅平台设备数据

HTTP请求

订阅实时数据同上。

查看http接收输入参数

实时订阅平台设备数据 实时订阅平台设备数据

配置http请求查看

写入Kafka

订阅实时数据同上。

实时订阅平台设备数据

函数配置同MQTT订阅一致

写入数据库

订阅实时数据同上。

配置数据源

实时订阅平台设备数据 实时订阅平台设备数据

配置规则引擎函数对应数据表结构 实时订阅平台设备数据 效果图 实时订阅平台设备数据