链上信使协议

介绍

链上信使协议AMOP(Advanced Messages Onchain Protocol)系统旨在为联盟链提供一个安全高效的消息信道,联盟链中的各个机构,只要部署了区块链节点,无论是共识节点还是观察节点,均可使用AMOP进行通讯,AMOP有如下优势:

  • 实时:AMOP消息不依赖区块链交易和共识,消息在节点间实时传输,延时在毫秒级。
  • 可靠:AMOP消息传输时,自动寻找区块链网络中所有可行的链路进行通讯,只要收发双方至少有一个链路可用,消息就保证可达。
  • 高效:AMOP消息结构简洁、处理逻辑高效,仅需少量cpu占用,能充分利用网络带宽。
  • 安全:AMOP的所有通讯链路使用SSL加密,加密算法可配置,支持身份认证机制。
  • 易用:使用AMOP时,无需在SDK做任何额外配置。

逻辑架构

../../_images/AMOP.jpg 以银行典型IDC架构为例,各区域概述:

  • 链外区域:机构内部的业务服务区,此区域内的业务子系统使用区块链SDK,连接到区块链节点。
  • 区块链P2P网络:此区域部署各机构的区块链节点,此区域为逻辑区域,区块链节点也可部署在机构内部。

配置

AMOP无需任何额外配置,以下为Web3SDK的配置案例。 SDK配置(Spring Bean):

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  8. http://www.springframework.org/schema/tx
  9. http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
  12. <!-- AMOP消息处理线程池配置,根据实际需要配置 -->
  13. <bean id="pool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  14. <property name="corePoolSize" value="50" />
  15. <property name="maxPoolSize" value="100" />
  16. <property name="queueCapacity" value="500" />
  17. <property name="keepAliveSeconds" value="60" />
  18. <property name="rejectedExecutionHandler">
  19. <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
  20. </property>
  21. </bean>
  22. <!-- 群组信息配置 -->
  23. <bean id="groupChannelConnectionsConfig" class="org.fisco.bcos.channel.handler.GroupChannelConnectionsConfig">
  24. <property name="allChannelConnections">
  25. <list>
  26. <bean id="group1" class="org.fisco.bcos.channel.handler.ChannelConnections">
  27. <property name="groupId" value="1" />
  28. <property name="connectionsStr">
  29. <list>
  30. <value>127.0.0.1:20200</value> <!-- 格式:IP:端口 -->
  31. <value>127.0.0.1:20201</value>
  32. </list>
  33. </property>
  34. </bean>
  35. </list>
  36. </property>
  37. </bean>
  38. <!-- 区块链节点信息配置 -->
  39. <bean id="channelService" class="org.fisco.bcos.channel.client.Service" depends-on="groupChannelConnectionsConfig">
  40. <property name="groupId" value="1" />
  41. <property name="orgID" value="fisco" />
  42. <property name="allChannelConnections" ref="groupChannelConnectionsConfig"></property>
  43. <!-- 如果需要使用topic认证功能,请将下面的注释去除 -->
  44. <!-- <property name="topic2KeyInfo" ref="amopVerifyTopicToKeyInfo"></property>-->
  45. </bean>
  46. <!-- 这里配置的是topic到公私钥配置信息的映射关系,这里只配置了一个topic,可以通过新增entry的方式来新增映射关系。-->
  47. <!--
  48. <bean class="org.fisco.bcos.channel.handler.AMOPVerifyTopicToKeyInfo" id="amopVerifyTopicToKeyInfo">
  49. <property name="topicToKeyInfo">
  50. <map>
  51. <entry key="${topicname}" value-ref="AMOPVerifyKeyInfo_${topicname}" />
  52. </map>
  53. </property>
  54. </bean>
  55. -->
  56. <!-- 在topic的生产者端,请将如下的注释打开,并配置公钥文件,
  57. 每个需要身份验证的消费者都拥有不同的公私钥对,请列出所有需要身份验证的消费者的公钥文件。
  58. -->
  59. <!--
  60. <bean class="org.fisco.bcos.channel.handler.AMOPVerifyKeyInfo" id="AMOPVerifyKeyInfo_${topicname}">
  61. <property name="publicKey">
  62. <list>
  63. <value>classpath:$consumer_public_key_1.pem$</value>
  64. <value>classpath:$consumer_public_key_2.pem$</value>
  65. </list>
  66. </property>
  67. </bean>
  68. -->
  69. <!-- 在topic的消费者端,请将如下的注释打开,并配置私钥文件,程序使用私钥向相应的主题生产者验证您的身份。-->
  70. <!--
  71. <bean class="org.fisco.bcos.channel.handler.AMOPVerifyKeyInfo" id="AMOPVerifyKeyInfo_${topicname}">
  72. <property name="privateKey" value="classpath:$consumer_private_key.pem$"></property>
  73. </bean>
  74. -->

SDK使用

AMOP的消息收发基于topic(主题)机制,服务端首先设置一个topic,客户端往该topic发送消息,服务端即可收到。

AMOP支持在同一个区块链网络中有多个topic收发消息,topic支持任意数量的服务端和客户端,当有多个服务端关注同一个topic时,该topic的消息将随机下发到其中一个可用的服务端。

服务端代码案例:

  1. package org.fisco.bcos.channel.test.amop;
  2. import org.fisco.bcos.channel.client.Service;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.context.ApplicationContext;
  6. import org.springframework.context.support.ClassPathXmlApplicationContext;
  7. import java.util.HashSet;
  8. import java.util.Set;
  9. public class Channel2Server {
  10. static Logger logger = LoggerFactory.getLogger(Channel2Server.class);
  11. public static void main(String[] args) throws Exception {
  12. if (args.length < 1) {
  13. System.out.println("Param: topic");
  14. return;
  15. }
  16. String topic = args[0];
  17. logger.debug("init Server");
  18. ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  19. Service service = context.getBean(Service.class);
  20. // 设置topic,支持多个topic
  21. Set<String> topics = new HashSet<String>();
  22. topics.add(topic);
  23. service.setTopics(topics);
  24. // 处理消息的PushCallback类,参见Callback代码
  25. PushCallback cb = new PushCallback();
  26. service.setPushCallback(cb);
  27. System.out.println("3s...");
  28. Thread.sleep(1000);
  29. System.out.println("2s...");
  30. Thread.sleep(1000);
  31. System.out.println("1s...");
  32. Thread.sleep(1000);
  33. System.out.println("start test");
  34. System.out.println("===================================================================");
  35. // 启动服务
  36. service.run();
  37. }
  38. }

服务端的PushCallback类案例:

  1. package org.fisco.bcos.channel.test.amop;
  2. import java.time.LocalDateTime;
  3. import java.time.format.DateTimeFormatter;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.fisco.bcos.channel.client.ChannelPushCallback;
  7. import org.fisco.bcos.channel.dto.ChannelPush;
  8. import org.fisco.bcos.channel.dto.ChannelResponse;
  9. class PushCallback extends ChannelPushCallback {
  10. static Logger logger = LoggerFactory.getLogger(PushCallback.class);
  11. // onPush方法,在收到AMOP消息时被调用
  12. @Override
  13. public void onPush(ChannelPush push) {
  14. DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  15. logger.debug("push:" + push.getContent());
  16. System.out.println(df.format(LocalDateTime.now()) + "server:push:" + push.getContent());
  17. // 回包消息
  18. ChannelResponse response = new ChannelResponse();
  19. response.setContent("receive request seq:" + String.valueOf(push.getMessageID()));
  20. response.setErrorCode(0);
  21. push.sendResponse(response);
  22. }
  23. }

客户端案例:

  1. package org.fisco.bcos.channel.test.amop;
  2. import java.time.LocalDateTime;
  3. import java.time.format.DateTimeFormatter;
  4. import org.slf4j.Logger;
  5. import org.slf4j.LoggerFactory;
  6. import org.springframework.context.ApplicationContext;
  7. import org.springframework.context.support.ClassPathXmlApplicationContext;
  8. import org.fisco.bcos.channel.client.Service;
  9. import org.fisco.bcos.channel.dto.ChannelRequest;
  10. import org.fisco.bcos.channel.dto.ChannelResponse;
  11. public class Channel2Client {
  12. static Logger logger = LoggerFactory.getLogger(Channel2Client.class);
  13. public static void main(String[] args) throws Exception {
  14. if(args.length < 2) {
  15. System.out.println("param: target topic total number of request");
  16. return;
  17. }
  18. String topic = args[0];
  19. Integer count = Integer.parseInt(args[1]);
  20. DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
  21. ApplicationContext context = new ClassPathXmlApplicationContext("classpath:applicationContext.xml");
  22. Service service = context.getBean(Service.class);
  23. service.run();
  24. System.out.println("3s ...");
  25. Thread.sleep(1000);
  26. System.out.println("2s ...");
  27. Thread.sleep(1000);
  28. System.out.println("1s ...");
  29. Thread.sleep(1000);
  30. System.out.println("start test");
  31. System.out.println("===================================================================");
  32. for (Integer i = 0; i < count; ++i) {
  33. Thread.sleep(2000); // 建立连接需要一点时间,如果立即发送消息会失败
  34. ChannelRequest request = new ChannelRequest();
  35. request.setToTopic(topic); // 设置消息topic
  36. request.setMessageID(service.newSeq()); // 消息序列号,唯一标识某条消息,可用newSeq()随机生成
  37. request.setTimeout(5000); // 消息的超时时间
  38. request.setContent("request seq:" + request.getMessageID()); // 发送的消息内容
  39. System.out.println(df.format(LocalDateTime.now()) + " request seq:" + String.valueOf(request.getMessageID())
  40. + ", Content:" + request.getContent());
  41. ChannelResponse response = service.sendChannelMessage2(request); // 发送消息
  42. System.out.println(df.format(LocalDateTime.now()) + "response seq:" + String.valueOf(response.getMessageID())
  43. + ", ErrorCode:" + response.getErrorCode() + ", Content:" + response.getContent());
  44. }
  45. }
  46. }

注解

  • 如果使用 2.5.0 版本的 FISCO BCOS,那么服务端和客户端需要连接不同的节点

Topic认证功能

在普通的配置下,任何一个监听了某topic的接收者都能接受到发送者推送的消息。但在某些场景下,发送者只希望特定的接收者能接收到消息,不希望无关的接收者能任意的监听此topic。在此场景下,需要使用Topic认证功能。 认证功能是指对于特定的topic消息,允许通过认证的接收者接收消息。 2.1.0及之后的sdk和节点版本新增了topic认证功能,默认的配置没有开启认证功能,需要用到认证功能的话请参考配置文件配置配置好公私钥,公私钥的生成方式请参考生成公私钥脚本。使用过程如下:

  • 1:接收者使用生成公私钥脚本生成公私钥文件,私钥保留,公钥给生产者。
  • 2:参考配置案例将配置文件配好。启动接收端和发送端进行收发消息。

假定链外系统1是消息发送者,链外系统2是消息接收者。链外系统2宣称监听topic T1的消息,topic认证流程图如下:

../../_images/AMOP_AUTHOR.jpg

  • 1:链外系统2连接Node2,宣称监听T1,Node2将T1加入到topic列表,并将seq加1。同时每5秒同步seq到其他节点。
  • 2:Node1收到seq之后,对比本地seq和同步过来的seq,不一致从Node2获取topic列表,并将topic列表更新到p2p的topic列表,对于需要认证且还没认证的topic,状态置为待认证。Node1遍历列表。针对每一个待认证的topic,进行如下操作:
    • 2.1:Node1往Node1推送消息(消息类型0x37),请求链外系统1发起topic认证流程。
    • 2.2:链外系统1接收到消息之后,生成随机数,并使用amop消息(消息类型0x30)将消息发送出去,并监听回包。
    • 2.3:消息经过 链外系统1–>Node1–>Node2–>链外系统2的路由,链外系统2接收到消息后解析出随机数并使用私钥签名随机数。
    • 2.4:签名包(消息类型0x31)经过 链外系统2–>Node2–>Node1->链外系统1的路由,链外系统1接收到签名包之后,解析出签名并使用公钥验证签名。
    • 2.5:链外系统1验证签名后,发送消息(消息类型0x38),请求节点更新topic状态(认证成功或者认证失败)。
  • 3:如果认证成功,链外系统的一条消息到达Node1之后,Node1会将这条消息转发给Node2,Node2会将消息推送给链外系统2。

topic认证功能配置

默认提供的配置文件不包括认证功能,需要使用认证功能,请参考如下配置文件

  1. <?xml version="1.0" encoding="UTF-8" ?>
  2. <beans xmlns="http://www.springframework.org/schema/beans"
  3. xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:p="http://www.springframework.org/schema/p"
  4. xmlns:tx="http://www.springframework.org/schema/tx" xmlns:aop="http://www.springframework.org/schema/aop"
  5. xmlns:context="http://www.springframework.org/schema/context"
  6. xsi:schemaLocation="http://www.springframework.org/schema/beans
  7. http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
  8. http://www.springframework.org/schema/tx
  9. http://www.springframework.org/schema/tx/spring-tx-2.5.xsd
  10. http://www.springframework.org/schema/aop
  11. http://www.springframework.org/schema/aop/spring-aop-2.5.xsd">
  12. <!-- AMOP消息处理线程池配置,根据实际需要配置 -->
  13. <bean id="pool" class="org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor">
  14. <property name="corePoolSize" value="50" />
  15. <property name="maxPoolSize" value="100" />
  16. <property name="queueCapacity" value="500" />
  17. <property name="keepAliveSeconds" value="60" />
  18. <property name="rejectedExecutionHandler">
  19. <bean class="java.util.concurrent.ThreadPoolExecutor.AbortPolicy" />
  20. </property>
  21. </bean>
  22. <!-- 群组信息配置 -->
  23. <bean id="groupChannelConnectionsConfig" class="org.fisco.bcos.channel.handler.GroupChannelConnectionsConfig">
  24. <property name="allChannelConnections">
  25. <list>
  26. <bean id="group1" class="org.fisco.bcos.channel.handler.ChannelConnections">
  27. <property name="groupId" value="1" />
  28. <property name="connectionsStr">
  29. <list>
  30. <value>127.0.0.1:20200</value> <!-- 格式:IP:端口 -->
  31. <value>127.0.0.1:20201</value>
  32. </list>
  33. </property>
  34. </bean>
  35. </list>
  36. </property>
  37. </bean>
  38. <!-- 区块链节点信息配置 -->
  39. <bean id="channelService" class="org.fisco.bcos.channel.client.Service" depends-on="groupChannelConnectionsConfig">
  40. <property name="groupId" value="1" />
  41. <property name="orgID" value="fisco" />
  42. <property name="allChannelConnections" ref="groupChannelConnectionsConfig"></property>
  43. <!-- topic认证功能的配置项 -->
  44. <property name="topic2KeyInfo" ref="amopVerifyTopicToKeyInfo"></property>
  45. </bean>
  46. <!-- 这里配置的是topic到公私钥配置信息的映射关系,这里只配置了一个topic,可以通过新增entry的方式来新增映射关系。-->
  47. <bean class="org.fisco.bcos.channel.handler.AMOPVerifyTopicToKeyInfo" id="amopVerifyTopicToKeyInfo">
  48. <property name="topicToKeyInfo">
  49. <map>
  50. <entry key="${topicname}" value-ref="AMOPVerifyKeyInfo_${topicname}" />
  51. </map>
  52. </property>
  53. </bean>
  54. <!-- 在topic的生产者端,请在这里配置公钥文件,每个需要身份验证的消费者 都拥有不同的公私钥对,
  55. 请列出所有需要身份验证的消费者的公钥文件。 程序启动前请确保所有的公钥文件都存在于web3sdk的conf目录下,
  56. 文件名分别为$consumer_public_key_1.pem$,$consumer_public_key_2.pem$(请将这2个变量替换为实际文件名),如果不需要两个公钥文件,请将其中一行删除并替换变量名,可以通过新增行的方式来增加公钥文件配置。-->
  57. <bean class="org.fisco.bcos.channel.handler.AMOPVerifyKeyInfo" id="AMOPVerifyKeyInfo_${topicname}">
  58. <property name="publicKey">
  59. <list>
  60. <value>classpath:$consumer_public_key_1.pem$</value>
  61. <value>classpath:$consumer_public_key_2.pem$</value>
  62. </list>
  63. </property>
  64. </bean>
  65. <!-- 在topic的消费者端,请在这里配置私钥文件,程序使用私钥向相应的主题生产者验证您的身份。
  66. 程序启动前请确保私钥文件存在于web3sdk的conf目录下,文件名为$consumer_private_key.pem$(请将变量替换为实际文件名)。-->
  67. <bean class="org.fisco.bcos.channel.handler.AMOPVerifyKeyInfo" id="AMOPVerifyKeyInfo_${topicname}">
  68. <property name="privateKey" value="classpath:$consumer_private_key.pem$"></property>
  69. </bean>

配置需要重启才可以生效,配置修改完成后,请重启基于web3sdk的应用程序。

测试

按上述说明配置好后,用户指定一个主题:topic,执行以下两个命令可以进行测试。

单播文本

启动AMOP服务端:

  1. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2Server [topic]

启动AMOP客户端:

  1. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2Client [topic] [消息条数]

AMOP除了支持单播文本,还支持发送二进制,多播以及身份认证机制。相应的测试命令如下:

单播二进制,多播文本,多播二进制

启动AMOP服务端:

  1. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2Server [topic]

启动AMOP客户端:

  1. #单播二进制
  2. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2ClientBin [topic] [filename]
  3. #多播文本
  4. java -cp 'conf/:lib/*:apps/*' org.fisco.bcos.channel.test.amop.Channel2ClientMulti [topic] [消息条数]
  5. #多播二进制
  6. java -cp 'conf/:lib/*:apps/*' org.fisco.bcos.channel.test.amop.Channel2ClientMultiBin [topic] [filename]

带认证机制的单播文本,单播二进制,多播文本,多播二进制

在使用认证机制前,请确保参考配置文件配置好了用于认证的公私钥对。 启动AMOP服务端:

  1. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2ServerNeedVerify [topic]

启动AMOP客户端:

  1. #带认证机制的单播文本
  2. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2ClientNeedVerify [topic] [消息条数]
  3. #带认证机制的单播二进制
  4. java -cp 'conf/:apps/*:lib/*' org.fisco.bcos.channel.test.amop.Channel2ClientBinNeedVerify [topic] [filename]
  5. #带认证机制的多播文本
  6. java -cp 'conf/:lib/*:apps/*' org.fisco.bcos.channel.test.amop.Channel2ClientMultiNeedVerify [topic] [消息条数]
  7. #带认证机制的多播二进制
  8. java -cp 'conf/:lib/*:apps/*' org.fisco.bcos.channel.test.amop.Channel2ClientMultiBinNeedVerify [topic] [filename]

错误码

  • 99:发送消息失败,AMOP经由所有链路的尝试后,消息未能发到服务端,建议使用发送时生成的seq,检查链路上各个节点的处理情况。
  • 100:区块链节点之间经由所有链路的尝试后,消息未能发送到可以接收该消息的节点,和错误码‘99’一样,建议使用发送时生成的‘seq’,检查链路上各个节点的处理情况。
  • 101:区块链节点往Sdk推送消息,经由所有链路的尝试后,未能到达Sdk端,和错误码‘99’一样,建议使用发送时生成的‘seq’,检查链路上各个节点以及Sdk的处理情况。
  • 102:消息超时,建议检查服务端是否正确处理了消息,带宽是否足够。
  • 103:因节点出带宽限制,SDK发到节点的AMOP请求被拒绝。