Pulsar 消费组

Pulsar 消费组使用外部 Pulsar 作为消息队列,可以从 Pulsar 中消费消息转换成为 MQTT 消息发布在 emqx 中。

搭建 Pulsar 环境,以 MacOS X 为例:

  1. $ wget http://apache.mirrors.hoobly.com/pulsar/pulsar-2.3.2/apache-pulsar-2.3.2-bin.tar.gz
  2. $ tar xvfz apache-pulsar-2.3.2-bin.tar.gz
  3. $ cd apache-pulsar-2.3.2
  4. # 启动 Pulsar
  5. $ ./bin/pulsar standalone

创建 Pulsar 的主题:

  1. $ ./bin/pulsar-admin topics create-partitioned-topic -p 5 testTopic

创建模块

打开 EMQ X DashboardPulsar 消费组 - 图1 (opens new window),点击左侧的 “模块” 选项卡,选择添加:

image-20200927213049265

选择 Pulsar 消费组模块:

Pulsar 消费组 - 图3

填写相关参数:

Pulsar 消费组 - 图4

1). Pulsar 服务器地址

2). Pulsar consumer 进程数量

3). Pulsar 的订阅主题

4). MQTT 的消息主题

5). MQTT 的主题服务质量

6). Pulsar 流控阈值 (Pulsar 流控阈值,配置 Pulsar 向消费者发送多少条消息后阻塞 Pulsar Consumer)

7). EMQ X 重置流控阈值百分比 (Pulsar 流控阈值重置百分比。此配置让消费者处理完成一定数量的消息之后,提前重置 Pulsar 流控阈值。 比如,Pulsar 流控阈值 为 1000,阈值重置百分比 为 80%,则重置)

点击添加后,模块添加完成:

Pulsar 消费组 - 图5

资源已经创建完成,现在用Dashboard的websocket工具订阅MQTT的主题 “TestTopic”:

Pulsar 消费组 - 图6

使用pulsar-cli 生产一条消息:

  1. ./bin/pulsar-client produce TestTopic --messages "hello-pulsar"

Pulsar 消费组 - 图7

Dashboard的websocket工具接收到了pulsar 生产的消息”hello-pulsar”:

Pulsar 消费组 - 图8