Set up a standalone Pulsar in Docker
可以在本地机器的 Docker 容器中运行单机模式的 Pulsar,进行本地开发和测试。
如果没有安装 Docker,可以下载社区版本,并按照相应操作系统的说明进行操作。
在 Docker 中启动 Pulsar
MacOS、Linux、Windows 用户:
$ docker run -it \
-p 6650:6650 \
-p 8080:8080 \
--mount source=pulsardata,target=/pulsar/data \
--mount source=pulsarconf,target=/pulsar/conf \
apachepulsar/pulsar:2.7.0 \
bin/pulsar standalone
A few things to note about this command:
- The data, metadata, and configuration are persisted on Docker volumes in order to not start “fresh” every time the container is restarted. For details on the volumes you can use
docker volume inspect <sourcename>
- For Docker on Windows make sure to configure it to use Linux containers
成功启动 Pulsar 后,你将看到如下所示的 INFO
级日志消息:
2017-08-09 22:34:04,030 - INFO - [main:WebService@213] - Web Service started at http://127.0.0.1:8080
2017-08-09 22:34:04,038 - INFO - [main:PulsarService@335] - messaging service is ready, bootstrap service on port=8080, broker url=pulsar://127.0.0.1:6650, cluster=standalone, configs=org.apache.pulsar.broker.ServiceConfiguration@4db60246
...
Tip
启动本地独立集群,将自动创建
public/default
命名空间。 自动创建的命名空间将用于开发用途。 所有Pulsar的topic主题都在命名空间中进行管理。 了解更多信息,参阅 Topics。
在 Docker 中运行 Pulsar
Pulsar 支持多个客户端: Java、Go、<a href =“ client-libraries-python.md“>Python 和 C ++。 如果运行的是本地独立集群,则可以使用以下 URL 中的一个与其交互:
pulsar://localhost:6650
http://localhost:8080
以下示例展示了如何通过 Python 客户端的 API 快速入门 Pulsar。
直接从 PyPI 安装 Pulsar 的 Python 客户端库:
$ pip install pulsar-client
Consume 一条消息
创建 consumer 并订阅 topic:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
consumer = client.subscribe('my-topic',
subscription_name='my-sub')
while True:
msg = consumer.receive()
print("Received message: '%s'" % msg.data())
consumer.acknowledge(msg)
client.close()
Produce 一条消息
启动 producer,发送测试消息:
import pulsar
client = pulsar.Client('pulsar://localhost:6650')
producer = client.create_producer('my-topic')
for i in range(10):
producer.send(('hello-pulsar-%d' % i).encode('utf-8'))
client.close()
获取 topic 数据
In Pulsar, you can use REST, Java, or command-line tools to control every aspect of the system. For details on APIs, refer to Admin API Overview.
在最简单的示例中,您可以使用curl来获取特定主题的统计信息:
$ curl http://localhost:8080/admin/v2/persistent/public/default/my-topic/stats | python -m json.tool
输出应如下所示:
{
"averageMsgSize": 0.0,
"msgRateIn": 0.0,
"msgRateOut": 0.0,
"msgThroughputIn": 0.0,
"msgThroughputOut": 0.0,
"publishers": [
{
"address": "/172.17.0.1:35048",
"averageMsgSize": 0.0,
"clientVersion": "1.19.0-incubating",
"connectedSince": "2017-08-09 20:59:34.621+0000",
"msgRateIn": 0.0,
"msgThroughputIn": 0.0,
"producerId": 0,
"producerName": "standalone-0-1"
}
],
"replication": {},
"storageSize": 16,
"subscriptions": {
"my-sub": {
"blockedSubscriptionOnUnackedMsgs": false,
"consumers": [
{
"address": "/172.17.0.1:35064",
"availablePermits": 996,
"blockedConsumerOnUnackedMsgs": false,
"clientVersion": "1.19.0-incubating",
"connectedSince": "2017-08-09 21:05:39.222+0000",
"consumerName": "166111",
"msgRateOut": 0.0,
"msgRateRedeliver": 0.0,
"msgThroughputOut": 0.0,
"unackedMessages": 0
}
],
"msgBacklog": 0,
"msgRateExpired": 0.0,
"msgRateOut": 0.0,
"msgRateRedeliver": 0.0,
"msgThroughputOut": 0.0,
"type": "Exclusive",
"unackedMessages": 0
}
}
}