在只消费实时发布的消息,而不需要保证持久性的应用程序中可以使用非持久化 topic。由于没有持久化消息的开销,使用非持久化 topic 可以减少消息发布的延迟。
以下说明和命令中所使用 topic 的名称结构如下:
non-persistent://tenant/namespace/topic
非持久化 topic 资源
获取统计信息
获取非持久化 topic 的统计数据。
msgRateIn:本地发布者和副本发布者每秒发布消息速率和
msgThroughputIn:同上,但不是每秒发布消息数,而是每秒发布消息的字节数
msgRateOut:本地 consumer 和副本 consumer 每秒分发消息数量和
msgThroughputOut:同上,但不是每秒分发消息数,而是每秒分发消息的字节数
averageMsgSize:最后一次间隔内发布消息的平均字节大小
publishers: The list of all local publishers into the topic. There can be zero or thousands
averageMsgSize:此发布者在最后一个间隔内发布消息的平均字节大小
producerId:此 topic 内 producer 的内部标识符
producerName:客户端库为 producer 生成的内部标识符
address:连接到 producer 的 IP 地址和 source 端口
connectedSince:Producer 创建时或上次连接到此 producer 时的时间戳
subscriptions:Topic 所有本地订阅的列表
my-subscription:订阅名称(客户端定义)
type:订阅类型
consumers:连接到此订阅的 consumer 列表
consumerName:客户端库为 consumer 生成的内部标识符
availablePermits:Consumer 在客户端库监听队列中可以容纳的消息数量。 值小于 1 意味着客户端库队列已满,不能继续调用 receive()。 非负整数值意味着 consumer 可以随时接收消息。
replication:Topic 交叉副本的统计信息。
connected:外部复制器是否已经连接
inboundConnection:连接到此 broker 的远程集群发布者 broker 的 IP 和端口。
inboundConnectedSince:正在发布消息到远程集群的 TCP 连接。 如果没有连接到本地发布者,一分钟后连接将自动关闭。
msgDropRate:每次连接中,允许发布者 broker 配置的消息数量不能超过此参数值。若超过阈值,则丢弃所有阈值外的已发布消息。 若限制不可用或连接不可写入,broker 也会丢弃订阅的消息。
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"msgDropRate" : 0.0,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null,
"msgDropRate" : 0.0
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers" : [ {
"msgRateOut" : 20343.506296021893,
"msgThroughputOut" : 2.0979855364233278E7,
"msgRateRedeliver" : 0.0,
"consumerName" : "fe3c0",
"availablePermits" : 950,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.73.210.249:60578",
"connectedSince" : "2017-07-26 15:13:48.026-0700",
"clientVersion" : "1.19-incubating-SNAPSHOT"
} ],
"msgDropRate" : 432.2390921571593
}
},
"replication": {}
}
pulsar-admin
可以使用 stats
命令来获取 topic 的统计信息。
$ pulsar-admin non-persistent stats \
non-persistent://test-tenant/ns1/tp1 \
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/stats
Java
String topic = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getStats(topic);
获取内部统计信息
获取 topic 的详细统计信息。
pulsar-admin
可以使用 stats-internal
命令来获取 topic 的内部统计信息。
$ pulsar-admin non-persistent stats-internal \
non-persistent://test-tenant/ns1/tp1 \
{
"entriesAddedCounter" : 48834,
"numberOfEntries" : 0,
"totalSize" : 0,
"cursors" : {
"s1" : {
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 0,
"cursorLedgerLastEntry" : 0
}
}
}
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/internalStats
Java
String topic = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getInternalStats(topic);
创建分区 topic
Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.
pulsar-admin
$ bin/pulsar-admin non-persistent create-partitioned-topic \
non-persistent://my-tenant/my-namespace/my-topic \
--partitions 4
REST API
PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
int numPartitions = 4;
admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);
获取元数据
Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:
字段 | 含义 |
---|---|
partitions | Topic 分区的数量 |
pulsar-admin
$ pulsar-admin non-persistent get-partitioned-topic-metadata \
non-persistent://my-tenant/my-namespace/my-topic
{
"partitions": 4
}
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);
卸载 topic
可以卸载 topic。
pulsar-admin
可以使用 unload
命令卸载 topic。
$ pulsar-admin non-persistent unload \
non-persistent://test-tenant/ns1/tp1 \
REST API
PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/unload
Java
String topic = "non-persistent://my-tenantmy-namespace/my-topic";
admin.nonPersistentTopics().unload(topic);