- Managing Namespaces
- 命名空间资源
- 创建
- 获取策略
- 列出租户下的命名空间
- 删除
- Set replication cluster
- Get replication cluster
- Set backlog quota policies
- Get backlog quota policies
- Remove backlog quota policies
- 设置持久化策略
- Get persistence policies
- Unload namespace bundle
- Set message-ttl
- Get message-ttl
- Remove message-ttl
- Split bundle
- 清除 backlog
- Clear bundle backlog
- Set retention
- Get retention
- Set dispatch throttling
- Get configured message-rate
- Set dispatch throttling for subscription
- Get configured message-rate
- Set dispatch throttling for replicator
- Get configured message-rate
- Get deduplication snapshot interval
- Set deduplication snapshot interval
- Remove deduplication snapshot interval
- 命名空间隔离
- 从 broker上卸载命名空间
- 命名空间资源
Managing Namespaces
命名空间的管理方式:
pulsar-admin
的namespace
命令- REST API 调用
/admin/v2/namespaces
- Java API 中 PulsarAdmin 对象的
namespaces
方法
命名空间资源
创建
可以在给定的租户下创建命名空间。
pulsar-admin
REST API
Java
使用 create
命令,指定命名空间的名称:
$ pulsar-admin namespaces create test-tenant/test-namespace
PUT /admin/v2/namespaces/:tenant/:namespace
admin.namespaces().createNamespace(namespace);
获取策略
用户可以随时获取与命名空间相关的现有策略。
pulsar-admin
REST API
Java
使用 policies
子命令,指定命名空间:
$ pulsar-admin namespaces policies test-tenant/test-namespace{ "auth_policies": { "namespace_auth": {}, "destination_auth": {} }, "replication_clusters": [], "bundles_activated": true, "bundles": { "boundaries": [ "0x00000000", "0xffffffff" ], "numBundles": 1 }, "backlog_quota_map": {}, "persistence": null, "latency_stats_sample_rate": {}, "message_ttl_in_seconds": 0, "retention_policies": null, "deleted": false}
GET /admin/v2/namespaces/:tenant/:namespace
admin.namespaces().getPolicies(namespace);
列出租户下的命名空间
给定 Pulsar 租户,可以列出其中所有的命名空间。
pulsar-admin
REST API
Java
使用 list
子命令,指定租户:
$ pulsar-admin namespaces list test-tenanttest-tenant/ns1test-tenant/ns2
GET /admin/v2/namespaces/:tenant
admin.namespaces().getNamespaces(tenant);
删除
可以删除租户下已经存在的命名空间。
pulsar-admin
REST API
Java
使用 delete
子命令,指定命名空间:
$ pulsar-admin namespaces delete test-tenant/ns1
DELETE /admin/v2/namespaces/:tenant/:namespace
admin.namespaces().deleteNamespace(namespace);
Set replication cluster
为命名空间设置复制集群,因此 Pulsar 可以从一个 colo 复制消息到另一个 colo。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-clusters test-tenant/ns1 \ --clusters cl1
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/replication|operation/setNamespaceReplicationClusters?version=[[pulsar:version_number]]}
admin.namespaces().setNamespaceReplicationClusters(namespace, clusters);
Get replication cluster
获取给定命名空间复制集群的列表。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-clusters test-tenant/cl1/ns1
cl2
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/replication|operation/getNamespaceReplicationClusters?version=[[pulsar:version_number]]}
admin.namespaces().getNamespaceReplicationClusters(namespace)
Set backlog quota policies
Backlog quota helps the broker to restrict bandwidth/storage of a namespace once it reaches a certain threshold limit. Admin can set the limit and take corresponding action after the limit is reached.
producer_request_hold:broker 暂停运行,并不再持久化生产请求负载
producer_exception: broker disconnects with the client by giving an exception.
consumer_backlog_eviction:broker 丢弃积压消息
可以通过定义 backlog-quota-type: destination_storage 来限制积压配额
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-backlog-quota --limit 10 --policy producer_request_hold test-tenant/ns1
N/A
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/setBacklogQuota?version=[[pulsar:version_number]]}
admin.namespaces().setBacklogQuota(namespace, new BacklogQuota(limit, policy))
Get backlog quota policies
查看给定命名空间已配置的积压配额。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
{ "destination_storage": { "limit": 10, "policy": "producer_request_hold" }}
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/backlogQuotaMap|operation/getBacklogQuotaMap?version=[[pulsar:version_number]]}
admin.namespaces().getBacklogQuotaMap(namespace);
Remove backlog quota policies
移除指定命名空间的积压配额策略
pulsar-admin
REST API
Java
$ pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
N/A
{@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/backlogQuota|operation/removeBacklogQuota?version=[[pulsar:version_number]]}
admin.namespaces().removeBacklogQuota(namespace, backlogQuotaType)
设置持久化策略
持久化策略可以为给定命名空间下 topic 上的所有消息配置持久等级。
Bookkeeper-ack-quorum:每个 entry 在等待的 acks(有保证的副本)数量,默认值:0
Bookkeeper-ensemble:单个 topic 使用的 bookie 数量,默认值:0
Bookkeeper-write-quorum:每个 entry 要写入的次数,默认值:0
Ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-tenant/ns1
N/A
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/persistence|operation/setPersistence?version=[[pulsar:version_number]]}
admin.namespaces().setPersistence(namespace,new PersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate))
Get persistence policies
It shows the configured persistence policies of a given namespace.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-persistence test-tenant/ns1
{ "bookkeeperEnsemble": 3, "bookkeeperWriteQuorum": 2, "bookkeeperAckQuorum": 2, "managedLedgerMaxMarkDeleteRate": 0}
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/persistence|operation/getPersistence?version=[[pulsar:version_number]]}
admin.namespaces().getPersistence(namespace)
Unload namespace bundle
The namespace bundle is a virtual group of topics which belong to the same namespace. If the broker gets overloaded with the number of bundles, this command can help unload a bundle from that broker, so it can be served by some other less-loaded brokers. The namespace bundle ID ranges from 0x00000000 to 0xffffffff.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/ns1
N/A
{@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/{bundle}/unload|operation/unloadNamespaceBundle?version=[[pulsar:version_number]]}
admin.namespaces().unloadNamespaceBundle(namespace, bundle)
Set message-ttl
配置消息存活时间 (以秒为单位)。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/ns1
N/A
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/setNamespaceMessageTTL?version=[[pulsar:version_number]]}
admin.namespaces().setNamespaceMessageTTL(namespace, messageTTL)
Get message-ttl
获取命名空间中消息的存活时间(ttl)。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-message-ttl test-tenant/ns1
100
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/getNamespaceMessageTTL?version=[[pulsar:version_number]]}
admin.namespaces().getNamespaceMessageTTL(namespace)
Remove message-ttl
Remove a message TTL of the configured namespace.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces remove-message-ttl test-tenant/ns1
100
{@inject: endpoint|DELETE|/admin/v2/namespaces/:tenant/:namespace/messageTTL|operation/removeNamespaceMessageTTL?version=[[pulsar:version_number]]}
admin.namespaces().removeNamespaceMessageTTL(namespace)
Split bundle
命名空间包可以包含多个 topic,但只能由一个 broker 来提供服务。 If a single bundle is creating an excessive load on a broker, an admin splits the bundle using this command permitting one or more of the new bundles to be unloaded thus spreading the load across the brokers.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces split-bundle --bundle 0x00000000_0xffffffff test-tenant/ns1
N/A
{@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/{bundle}/split|operation/splitNamespaceBundle?version=[[pulsar:version_number]]}
admin.namespaces().splitNamespaceBundle(namespace, bundle)
清除 backlog
It clears all message backlog for all the topics that belong to a specific namespace. You can also clear backlog for a specific subscription as well.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces clear-backlog --sub my-subscription test-tenant/ns1
N/A
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/clearBacklog|operation/clearNamespaceBacklogForSubscription?version=[[pulsar:version_number]]}
admin.namespaces().clearNamespaceBacklogForSubscription(namespace, subscription)
Clear bundle backlog
It clears all message backlog for all the topics that belong to a specific NamespaceBundle. You can also clear backlog for a specific subscription as well.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces clear-backlog --bundle 0x00000000_0xffffffff --sub my-subscription test-tenant/ns1
N/A
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/{bundle}/clearBacklog|operation/clearNamespaceBundleBacklogForSubscriptio?version=[[pulsar:version_number]]n}
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle, subscription)
Set retention
Each namespace contains multiple topics and the retention size (storage size) of each topic should not exceed a specific threshold or it should be stored for a certain period. This command helps configure the retention size and time of topics in a given namespace.
pulsar-admin
REST API
Java
$ pulsar-admin set-retention --size 10 --time 100 test-tenant/ns1
N/A
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/retention|operation/setRetention?version=[[pulsar:version_number]]}
admin.namespaces().setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB))
Get retention
获取指定命名空间的保留参数信息。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-retention test-tenant/ns1
{ "retentionTimeInMinutes": 10, "retentionSizeInMB": 100}
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/retention|operation/getRetention?version=[[pulsar:version_number]]}
admin.namespaces().getRetention(namespace)
Set dispatch throttling
为给定命名空间下所有 topic 设置消息派发速率。 The dispatch rate can be restricted by the number of messages per X seconds (msg-dispatch-rate
) or by the number of message-bytes per X second (byte-dispatch-rate
). 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
Note - If neither
clusterDispatchRate
nortopicDispatchRate
is configured, dispatch throttling is disabled.
- If
topicDispatchRate
is not configured,clusterDispatchRate
takes effect. - IftopicDispatchRate
is configured,topicDispatchRate
takes effect.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \ --msg-dispatch-rate 1000 \ --byte-dispatch-rate 1048576 \ --dispatch-rate-period 1
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/dispatchRate|operation/setDispatchRate?version=[[pulsar:version_number]]}
admin.namespaces().setDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
Get configured message-rate
获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-dispatch-rate test-tenant/ns1
{ "dispatchThrottlingRatePerTopicInMsg" : 1000, "dispatchThrottlingRatePerTopicInByte" : 1048576, "ratePeriodInSecond" : 1}
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/dispatchRate|operation/getDispatchRate?version=[[pulsar:version_number]]}
admin.namespaces().getDispatchRate(namespace)
Set dispatch throttling for subscription
为给定命名空间下 topic 中的所有订阅设置消息派发速率。 The dispatch rate can be restricted by the number of messages per X seconds (msg-dispatch-rate
) or by the number of message-bytes per X second (byte-dispatch-rate
). 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \ --msg-dispatch-rate 1000 \ --byte-dispatch-rate 1048576 \ --dispatch-rate-period 1
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/subscriptionDispatchRate|operation/setDispatchRate?version=[[pulsar:version_number]]}
admin.namespaces().setSubscriptionDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
Get configured message-rate
获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1
{ "dispatchThrottlingRatePerTopicInMsg" : 1000, "dispatchThrottlingRatePerTopicInByte" : 1048576, "ratePeriodInSecond" : 1}
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/subscriptionDispatchRate|operation/getDispatchRate?version=[[pulsar:version_number]]}
admin.namespaces().getSubscriptionDispatchRate(namespace)
Set dispatch throttling for replicator
为给定命名空间下复制集群之间的所有复制器设置消息派发速率。 The dispatch rate can be restricted by the number of messages per X seconds (msg-dispatch-rate
) or by the number of message-bytes per X second (byte-dispatch-rate
). 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \ --msg-dispatch-rate 1000 \ --byte-dispatch-rate 1048576 \ --dispatch-rate-period 1
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/replicatorDispatchRate|operation/setDispatchRate?version=[[pulsar:version_number]]}
admin.namespaces().setReplicatorDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
Get configured message-rate
获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1
{ "dispatchThrottlingRatePerTopicInMsg" : 1000, "dispatchThrottlingRatePerTopicInByte" : 1048576, "ratePeriodInSecond" : 1}
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/replicatorDispatchRate|operation/getDispatchRate?version=[[pulsar:version_number]]}
admin.namespaces().getReplicatorDispatchRate(namespace)
Get deduplication snapshot interval
It shows configured deduplicationSnapshotInterval
for a namespace (Each topic under the namespace will take a deduplication snapshot according to this interval)
pulsar-admin
REST API
Java
$ pulsar-admin namespaces get-deduplication-snapshot-interval test-tenant/ns1
{@inject: endpoint|GET|/admin/v2/namespaces/:tenant/:namespace/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
admin.namespaces().getDeduplicationSnapshotInterval(namespace)
Set deduplication snapshot interval
Set configured deduplicationSnapshotInterval
for a namespace. Each topic under the namespace will take a deduplication snapshot according to this interval. brokerDeduplicationEnabled
must be set to true
for this property to take effect.
pulsar-admin
REST API
Java
$ pulsar-admin namespaces set-deduplication-snapshot-interval test-tenant/ns1 --interval 1000
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
{ "interval": 1000}
admin.namespaces().setDeduplicationSnapshotInterval(namespace, 1000)
Remove deduplication snapshot interval
Remove configured deduplicationSnapshotInterval
of a namespace (Each topic under the namespace will take a deduplication snapshot according to this interval)
pulsar-admin
REST API
Java
$ pulsar-admin namespaces remove-deduplication-snapshot-interval test-tenant/ns1
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/deduplicationSnapshotInterval?version=[[pulsar:version_number]]}
admin.namespaces().removeDeduplicationSnapshotInterval(namespace)
命名空间隔离
敬请期待。
从 broker上卸载命名空间
你能从这Pulsar broker 管理上卸载一个命名空间,或一个 命名空间包。
pulsar-admin
Use the unload
subcommand of the namespaces
command.
pulsar-admin
REST
Java
$ pulsar-admin namespaces unload my-tenant/my-ns
{@inject: endpoint|PUT|/admin/v2/namespaces/:tenant/:namespace/unload|operation/unloadNamespace?version=[[pulsar:version_number]]}
admin.namespaces().unload(namespace)