在只消费实时发布的消息,而不需要保证持久性的应用程序中可以使用非持久化 topic。由于没有持久化消息的开销,使用非持久化 topic 可以减少消息发布的延迟。
以下说明和命令中所使用 topic 的名称结构如下:
non-persistent://tenant/namespace/topic
非持久化 topic 资源
获取统计信息
获取非持久化 topic 的统计数据。
msgRateIn: The sum of all local and replication publishers’ publish rates in messages per second
msgThroughputIn: Same as above, but in bytes per second instead of messages per second
msgRateOut: The sum of all local and replication consumers’ dispatch rates in messages per second
msgThroughputOut: Same as above, but in bytes per second instead of messages per second
averageMsgSize: The average size in bytes of messages published within the last interval
publishers: The list of all local publishers into the topic. There can be zero or thousands
averageMsgSize: Average message size in bytes from this publisher within the last interval
producerId: Internal identifier for this producer on this topic
producerName: Internal identifier for this producer, generated by the client library
address: IP address and source port for the connection of this producer
connectedSince: Timestamp this producer was created or last reconnected
subscriptions: The list of all local subscriptions to the topic
my-subscription: The name of this subscription (client defined)
type: This subscription type
consumers: The list of connected consumers for this subscription
consumerName: Internal identifier for this consumer, generated by the client library
availablePermits: The number of messages this consumer has space for in the client library’s listen queue. 值小于 1 意味着客户端库队列已满,不能继续调用 receive()。 非负整数值意味着 consumer 可以随时接收消息。
replication: This section gives the stats for cross-colo replication of this topic
connected: Whether the outbound replicator is connected
inboundConnection: The IP and port of the broker in the remote cluster’s publisher connection to this broker
inboundConnectedSince: The TCP connection being used to publish messages to the remote cluster. 如果没有连接到本地发布者,一分钟后连接将自动关闭。
msgDropRate: for publisher: publish: broker only allows configured number of in flight per connection, and drops all other published messages above the threshold. 若限制不可用或连接不可写入,broker 也会丢弃订阅的消息。
{
"msgRateIn": 4641.528542257553,
"msgThroughputIn": 44663039.74947473,
"msgRateOut": 0,
"msgThroughputOut": 0,
"averageMsgSize": 1232439.816728665,
"storageSize": 135532389160,
"msgDropRate" : 0.0,
"publishers": [
{
"msgRateIn": 57.855383881403576,
"msgThroughputIn": 558994.7078932219,
"averageMsgSize": 613135,
"producerId": 0,
"producerName": null,
"address": null,
"connectedSince": null,
"msgDropRate" : 0.0
}
],
"subscriptions": {
"my-topic_subscription": {
"msgRateOut": 0,
"msgThroughputOut": 0,
"msgBacklog": 116632,
"type": null,
"msgRateExpired": 36.98245516804671,
"consumers" : [ {
"msgRateOut" : 20343.506296021893,
"msgThroughputOut" : 2.0979855364233278E7,
"msgRateRedeliver" : 0.0,
"consumerName" : "fe3c0",
"availablePermits" : 950,
"unackedMessages" : 0,
"blockedConsumerOnUnackedMsgs" : false,
"address" : "/10.73.210.249:60578",
"connectedSince" : "2017-07-26 15:13:48.026-0700",
"clientVersion" : "1.19-incubating-SNAPSHOT"
} ],
"msgDropRate" : 432.2390921571593
}
},
"replication": {}
}
pulsar-admin
可以使用 stats
命令来获取 topic 的统计信息。
$ pulsar-admin non-persistent stats \
non-persistent://test-tenant/ns1/tp1 \
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/stats
Java
String topic = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getStats(topic);
获取内部统计信息
获取 topic 的详细统计信息。
pulsar-admin
可以使用 stats-internal
命令来获取 topic 的内部统计信息。
$ pulsar-admin non-persistent stats-internal \
non-persistent://test-tenant/ns1/tp1 \
{
"entriesAddedCounter" : 48834,
"numberOfEntries" : 0,
"totalSize" : 0,
"cursors" : {
"s1" : {
"waitingReadOp" : false,
"pendingReadOps" : 0,
"messagesConsumedCounter" : 0,
"cursorLedger" : 0,
"cursorLedgerLastEntry" : 0
}
}
}
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/internalStats
Java
String topic = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getInternalStats(topic);
创建分区 topic
Partitioned topics in Pulsar must be explicitly created. When creating a new partitioned topic you need to provide a name for the topic as well as the desired number of partitions.
Note
By default, after 60 seconds of creation, topics are considered inactive and deleted automatically to prevent from generating trash data.
To disable this feature, set
brokerDeleteInactiveTopicsEnabled
tofalse
.To change the frequency of checking inactive topics, set
brokerDeleteInactiveTopicsFrequencySeconds
to your desired value.For more information about these two parameters, see here.
pulsar-admin
$ bin/pulsar-admin non-persistent create-partitioned-topic \
non-persistent://my-tenant/my-namespace/my-topic \
--partitions 4
REST API
PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
int numPartitions = 4;
admin.nonPersistentTopics().createPartitionedTopic(topicName, numPartitions);
获取元数据
Partitioned topics have metadata associated with them that you can fetch as a JSON object. The following metadata fields are currently available:
字段 | 含义 |
---|---|
partitions | Topic 分区的数量 |
pulsar-admin
$ pulsar-admin non-persistent get-partitioned-topic-metadata \
non-persistent://my-tenant/my-namespace/my-topic
{
"partitions": 4
}
REST API
GET /admin/v2/non-persistent/:tenant/:namespace/:topic/partitions
Java
String topicName = "non-persistent://my-tenant/my-namespace/my-topic";
admin.nonPersistentTopics().getPartitionedTopicMetadata(topicName);
卸载 topic
可以卸载 topic。
pulsar-admin
可以使用 unload
命令卸载 topic。
$ pulsar-admin non-persistent unload \
non-persistent://test-tenant/ns1/tp1 \
REST API
PUT /admin/v2/non-persistent/:tenant/:namespace/:topic/unload
Java
String topic = "non-persistent://my-tenantmy-namespace/my-topic";
admin.nonPersistentTopics().unload(topic);