从 Pulsar 消费消息到 EMQ X
搭建 Pulsar 环境,以 MaxOS X 为例:
$ wget http://apache.mirrors.hoobly.com/pulsar/pulsar-2.3.2/apache-pulsar-2.3.2-bin.tar.gz
$ tar xvfz apache-pulsar-2.3.2-bin.tar.gz
$ cd apache-pulsar-2.3.2
# 启动 Pulsar
$ ./bin/pulsar standalone
创建 Pulsar 的主题:
$ ./bin/pulsar-admin topics create-partitioned-topic -p 5 testTopic
创建资源:
打开 EMQ X Dashboard,选择左侧的 “资源” 选项卡。
点击 “新建” 按钮:
选择资源类型 “pulsar 消费组”:
填写资源参数:
1). Pulsar 服务器地址
2). Pulsar consumer 进程数量
3). Pulsar 的订阅主题
4). EMQ X 的消息主题
5). Pulsar 流控阈值 (Pulsar 流控阈值,配置 Pulsar 向消费者发送多少条消息后阻塞 Pulsar Consumer)
6). EMQ X 重置流控阈值百分比 (Pulsar 流控阈值重置百分比。此配置让消费者处理完成一定数量的消息之后,提前重置 Pulsar 流控阈值
。 比如,Pulsar 流控阈值
为 1000,阈值重置百分比
为 80%,则重置)
最后点击 “确认”,资源创建完成:
资源已经创建完成,现在用Dashboard的websocket工具订阅MQTT的主题 “TestTopic”:
使用pulsar-cli 生产一条消息:
./bin/pulsar-client produce TestTopic --messages "hello-pulsar"
Dashboard的websocket工具接收到了pulsar 生产的消息”hello-pulsar”: