协议开发说明

平台封装了网络通信,但是具体的数据由消息协议进行解析.协议(ProtocolSupport)主要由认证器(Authenticator), 消息编解码器(DeviceMessageCodec),消息发送拦截器(DeviceMessageSenderInterceptor)以及配置元数据(ConfigMetadata)组成.

认证器

认证器(Authenticator)是用于在收到设备请求(例如MQTT)时,对客户端进行认证时使用,不同的网络协议(Transport)使用不同的认证器.

接口定义:

  1. public interface Authenticator {
  2. /**
  3. * 对指定对设备进行认证
  4. *
  5. * @param request 认证请求
  6. * @param device 设备
  7. * @return 认证结果
  8. */
  9. Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request,
  10. @Nonnull DeviceOperator device);
  11. /**
  12. * 在MQTT服务网关中指定了认证协议时,将调用此方法进行认证。
  13. * 注意: 认证通过后,需要设置设备ID.{@link AuthenticationResponse#success(String)}
  14. * @param request 认证请求
  15. * @param registry 设备注册中心
  16. * @return 认证结果
  17. */
  18. default Mono<AuthenticationResponse> authenticate(@Nonnull AuthenticationRequest request,
  19. @Nonnull DeviceRegistry registry) {
  20. return Mono.just(AuthenticationResponse.success());
  21. }
  22. }

参数AuthenticationRequest为认证请求参数,不同的网络类型请求类型也不同,请根据实际情况转换为对应的类型,例如: MqttAuthenticationRequest mqttRequest = (MqttAuthenticationRequest)request;

参数DeviceOperator为对应的设备操作接口,可通过此接口获取设备的配置,例如:device.getConfig("mqttUsername").

返回值Mono<AuthenticationResponse>为认证结果.

例:

  1. Authenticator mqttAuthenticator = (request, device) -> {
  2. MqttAuthenticationRequest mqttRequest = ((MqttAuthenticationRequest) request);
  3. return device.getConfigs("username", "password") //获取设备的配置信息,由配置元数据定义,在设备型号中进行配置.
  4. .flatMap(values -> {
  5. String username = values.getValue("username").map(Value::asString).orElse(null);
  6. String password = values.getValue("password").map(Value::asString).orElse(null);
  7. if (mqttRequest.getUsername().equals(username) && mqttRequest.getPassword().equals(password)) {
  8. return Mono.just(AuthenticationResponse.success());
  9. } else {
  10. return Mono.just(AuthenticationResponse.error(400, "密码错误"));
  11. }
  12. });
  13. }

消息编解码器

用于将平台统一的消息(Message)设备端能处理的消息(EncodedMessage)进行相互转换. 设备网关从网络组件中接收到报文后,会调用对应协议包的消息编解码器进行处理.

接口(DeviceMessageCodec)定义:

  1. class DeviceMessageCodec{
  2. //此编解码器支持的网络协议,如: DefaultTransport.MQTT
  3. Transport getSupportTransport();
  4. //将平台发往设备的消息编码为设备端对消息
  5. Publisher<? extends EncodedMessage> encode(MessageEncodeContext context);
  6. //将设备发往平台的消息解码为平台统一的消息
  7. Publisher<? extends Message> decode(MessageDecodeContext context);
  8. }

注意

方法返回值是响应式结果,根据情况返回Mono(单条消息)或者Flux(多条消息).

上下文

编码上下文类结构

  1. class MessageEncodeContext{
  2. //获取当前设备操作接口,可通过此接口获取对应设备的配置等信息
  3. DeviceOperator getDevice();
  4. //平台下发的指令,具体请查看平台统一设备消息定义
  5. Message getMessage();
  6. //强制回复设备消息,在http等场景下,通过调用http api下发指令,然后直接调用此方法回复结果即可.
  7. Mono<Void> reply(Publisher<? extends DeviceMessage> replyMessage);
  8. //获取当前会话,需要将MessageEncodeContext强制转换为ToDeviceMessageContext
  9. DeviceSession getSession();
  10. }

解码上下文类结构

  1. class class MessageDecodeContext{
  2. //获取当前设备操作接口,可通过此接口获取对应设备的配置等信息
  3. DeviceOperator getDevice();
  4. //从网络组件中接收到的消息,不同的网络组件消息类型不同,
  5. //使用时根据网络方式强制转换为对应的类型.
  6. EncodedMessage getMessage();
  7. }

注意

不同的网络协议需要转换为不同的EncodedMessage类型.比如,MQTT需要转换为MqttMessage.

大部分情况下:MessageDecodeContext可转为FromDeviceMessageContext,可获取到当前设备的连接会话DeviceSession,通过会话可以直接发送消息到设备.

EncodedMessage

从网络组件中接收到的消息,不同的网络组件消息类型不同。 公共方法:

  1. class EncodedMessage{
  2. //获取原始报文
  3. ByteBuf getPayload();
  4. //报文转为字符串
  5. String payloadAsString();
  6. //报文转为JSON对象
  7. JSONObject payloadAsJson();
  8. //报文转为JSON数组
  9. JSONArray payloadAsJsonArray();
  10. // 报文转为字节数组
  11. byte[] payloadAsBytes()
  12. }

MQTT消息

  1. class MqttMessage extends EncodedMessage{
  2. String getTopic();
  3. int getQos();
  4. }

HTTP消息

如果是POST,PUT,PATCH等请求,EncodedMessage.getPayload即为请求体.

  1. class HttpExchangeMessage{
  2. String getUrl();
  3. String getPath();
  4. HttpMethod getMethod();
  5. MediaType getContentType();
  6. //请求头
  7. List<Header> getHeaders();
  8. //url上的查询参数
  9. Map<String, String> getQueryParameters();
  10. //POST application/x-www-form-urlencoded时的请求参数
  11. Map<String, String> getRequestParam();
  12. //响应成功
  13. Mono<Void> ok(String msg);
  14. //响应失败
  15. Mono<Void> error(int status,String msg);
  16. }

CoAP消息

  1. CoapExchangeMessage{
  2. String getPath();
  3. CoAP.Code getCode();
  4. List<Option> getOptions();
  5. //响应请求
  6. void response(CoapResponseMessage message);
  7. //since 1.5 release
  8. void response(CoAP.ResponseCode code);
  9. //since 1.5 release
  10. void response(CoAP.ResponseCode code,byte[] body);
  11. }

TCP,UDP消息

TCP和UDP 直接操作EncodedMessage中的方法即可

消息发送拦截器

使用拦截器可以拦截消息发送和返回的动作,通过修改参数等操作实现自定义逻辑,如: 当设备离线时,将消息缓存到设备配置中,等设备上线时再重发.

  1. DeviceMessageSenderInterceptor{
  2. //发送前
  3. Mono<DeviceMessage> preSend(DeviceOperator device, DeviceMessage message);
  4. //发送后
  5. <R extends DeviceMessage> Flux<R> afterSent(DeviceOperator device, DeviceMessage message, Flux<R> reply);
  6. }

在发送前,可以将参数DeviceMessage转为其他消息.

发送后,会将返回结果流Flux<R>传入,通过对该数据流对操作以实现自定义行为,如:忽略错误等.

配置元数据

配置元数据用于告诉平台,在使用此协议的时候,需要添加一些自定义配置到设备配置(DeviceOperator.setConfig)中. 在其他地方可以通过DeviceOperator.getConfig获取这些配置.

例如:

  1. CompositeProtocolSupport support = new CompositeProtocolSupport();
  2. support.setId("demo-v1");
  3. support.setName("演示协议v1");
  4. support.setDescription("演示协议");
  5. support.setMetadataCodec(new JetLinksDeviceMetadataCodec()); //固定为JetLinksDeviceMetadataCodec,请勿修改.
  6. DefaultConfigMetadata mqttConfig = new DefaultConfigMetadata(
  7. "MQTT认证配置"
  8. , "")
  9. .add("username", "username", "MQTT用户名", new StringType())
  10. .add("password", "password", "MQTT密码", new PasswordType());
  11. //设置MQTT所需要到配置
  12. support.addConfigMetadata(DefaultTransport.MQTT, mqttConfig);

完整例子

演示协议协议开发说明 - 图1