STOMP

STOMP是面向流文本的消息传输协议Streaming Text Oriented Messaging Protocol,是 WebSocket 通信标准的一部分,属于业务层的控制协议Wire protocol协议规范。在通常的发布订阅语义之上,它通过 begincommitrollback 事物序列以及ack确认机制来提供消息可靠的投递。由于协议简单且易于实现,几乎所有的编程语言都有 STOMP 的客户端实现。

可以通过STOMP协议访问WeEvent的发布订阅相关功能。

协议说明

  • 支持STOMP协议的1.11.2版本。暂时不支持消息确认ACK和事务Transaction语义。
  • 传输协议方面,同时支持STOMP Over WebSocketSTOMP Over SockJS

JavaScript语言

前端面直接访问WeEvent,推荐使用开源库stompjs,该库支持STOMP协议的1.11.2的版本。使用stompjs +sockjs的组合效果更好。

Java语言

Spring Boot 环境

加入Spring Boot的依赖,以gradle 为例:

  1. implementation("org.springframework.boot:spring-boot-starter-websocket")

Spring4.0开始引入spring-websocket模块,支持STOMP,建议使用Spring Boot 2.0.0以上版本。

代码样例

第一步:创建链接

  1. // standard web socket transport
  2. WebSocketClient webSocketClient = new StandardWebSocketClient();
  3. WebSocketStompClient stompClient = new WebSocketStompClient(webSocketClient);
  4.  
  5. // MappingJackson2MessageConverter
  6. stompClient.setMessageConverter(new StringMessageConverter());
  7. stompClient.setTaskScheduler(taskScheduler); // for heartbeats
  8.  
  9. ListenableFuture<StompSession> f = stompClient.connect("ws://localhost:8080/weevent/stomp", getWebsocketSessionHandlerAdapter());
  10.  
  11. StompSession stompSession = f.get();
  • 心跳说明WeEvent使用单向心跳机制,客户端发送心跳,服务端不发心跳。默认时间间隔为30s

  • 修改心跳方案。

配置心跳时间间隔:修改配置文件./broker/conf/weevent.propertiesstomp.heartbeats=30

  • 传输协议方面STOMP Over WebSocket使用ws://localhost:8080/weevent/stompSTOMP Over SockJS使用ws://localhost:8080/weevent/sockjs

第二步:发布事件

  1. StompHeaders header = new StompHeaders();
  2. header.setDestination("com.weevent.test");
  3. header.set("groupId","1");
  4. header.set("weevent-format", "json")
  5. StompSession.Receiptable receiptable = stompSession.send(header, "{\"hello\":\" wolrd\"}");
  6. log.info("send result, receipt id: {}", receiptable.getReceiptId());

说明:

  • Topiccom.weevent.test。用户可以获取到Receiptable,并且通过receiptable.getReceiptId(),可以获取相应的回执。
  • groupId为群组Idfisco-bcos 2.0+版本支持多群组功能,2.0以下版本不支持该功能可以不传。
  • weevent-format为用户自定义拓展默认以weevent-开头。可选参数。第三步:订阅事件
  1. StompHeaders header = new StompHeaders();
  2. header.setDestination(topic);
  3. header.set("eventId","2cf24dba-59-1124");
  4. header.set("groupId","1");
  5.  
  6. StompSession.Subscription subscription = stompSession.subscribe(header, new StompFrameHandler() {
  7. @Override
  8. public Type getPayloadType(StompHeaders headers) {
  9. return String.class;
  10. }
  11.  
  12. @Override
  13. public void handleFrame(StompHeaders headers, Object payload) {
  14. logger.info("subscribe handleFrame, header: {} payload: {}", headers, payload);
  15. }
  16. });

说明:

  • topic 订阅的主题。支持通配符按层次订阅,参见MQTT通配符
  • 配置eventId,如需要取历史数据,则需要设置。如果不设置,则默认为取最新内容。
  • weevent-format为用户自定义拓展默认以weevent-开头。可选参数。
  • StompFrameHandler ,对StompFrameStompHeaders进行处理的方法。上述样例完整的代码,请参见STOMP代码样例

Spring环境

  • 依赖说明
  1. implementation("org.springframework:spring-messaging:5.1.2.RELEASE")
  2. implementation("org.springframework:spring-websocket:5.1.2.RELEASE")
  • 代码实现和上面spring boot一样

其他语言的适配

各种语言的开源STOMP客户端,参见https://stomp.github.io/implementations.html