命名空间的管理方式:
pulsar-admin
的namespace
命令- REST API 调用
/admin/v2/namespaces
- Java API 中 PulsarAdmin 对象的
namespaces
方法
命名空间资源
创建
可以在给定的租户下创建命名空间。
pulsar-admin
使用 create
命令,指定命名空间的名称:
$ pulsar-admin namespaces create test-tenant/test-namespace
REST API
PUT /admin/v2/namespaces/:tenant/:namespace
Java
admin.namespaces().createNamespace(namespace);
获取策略
用户可以随时获取与命名空间相关的现有策略。
pulsar-admin
使用 policies
子命令,指定命名空间:
$ pulsar-admin namespaces policies test-tenant/test-namespace
{
"auth_policies": {
"namespace_auth": {},
"destination_auth": {}
},
"replication_clusters": [],
"bundles_activated": true,
"bundles": {
"boundaries": [
"0x00000000",
"0xffffffff"
],
"numBundles": 1
},
"backlog_quota_map": {},
"persistence": null,
"latency_stats_sample_rate": {},
"message_ttl_in_seconds": 0,
"retention_policies": null,
"deleted": false
}
REST API
GET /admin/v2/namespaces/:tenant/:namespace
Java
admin.namespaces().getPolicies(namespace);
列出租户下的命名空间
给定 Pulsar 租户,可以列出其中所有的命名空间。
pulsar-admin
使用 list
子命令,指定租户:
$ pulsar-admin namespaces list test-tenant
test-tenant/ns1
test-tenant/ns2
REST API
GET /admin/v2/namespaces/:tenant
Java
admin.namespaces().getNamespaces(tenant);
删除
可以删除租户下已经存在的命名空间。
pulsar-admin
使用 delete
子命令,指定命名空间:
$ pulsar-admin namespaces delete test-tenant/ns1
REST
DELETE /admin/v2/namespaces/:tenant/:namespace
Java
admin.namespaces().deleteNamespace(namespace);
设置复制集群
为命名空间设置复制集群,因此 Pulsar 可以从一个 colo 复制消息到另一个 colo。
CLI
$ pulsar-admin namespaces set-clusters test-tenant/ns1 \
--clusters cl1
REST
{@inject: endpoint|POST|/admin/v2/namespaces/:tenant/:namespace/replication|operation/setNamespaceReplicationClusters}
Java
admin.namespaces().setNamespaceReplicationClusters(namespace, clusters);
获取复制集群
获取给定命名空间复制集群的列表。
CLI
$ pulsar-admin namespaces get-clusters test-tenant/cl1/ns1
cl2
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/replication|operation/getNamespaceReplicationClusters}
Java
admin.namespaces().getNamespaceReplicationClusters(namespace)
设置积压配额政策
Backlog quota helps broker to restrict bandwidth/storage of a namespace once it reach certain threshold limit . Admin can set this limit and one of the following action after the limit is reached.
producer_request_hold:broker 暂停运行,并不再持久化生产请求负载
producer_exception:broker 抛出异常,并与客户端断开连接
consumer_backlog_eviction:broker 丢弃积压消息
可以通过定义 backlog-quota-type: destination_storage 来限制积压配额
CLI
$ pulsar-admin namespaces set-backlog-quota --limit 10 --policy producer_request_hold test-tenant/ns1
N/A
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/backlogQuota|operation/setBacklogQuota}
Java
admin.namespaces().setBacklogQuota(namespace, new BacklogQuota(limit, policy))
查看积压配额策略
查看给定命名空间已配置的积压配额。
CLI
$ pulsar-admin namespaces get-backlog-quotas test-tenant/ns1
{
"destination_storage": {
"limit": 10,
"policy": "producer_request_hold"
}
}
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/backlogQuotaMap|operation/getBacklogQuotaMap}
Java
admin.namespaces().getBacklogQuotaMap(namespace);
移除积压配额策略
移除指定命名空间的积压配额策略
CLI
$ pulsar-admin namespaces remove-backlog-quota test-tenant/ns1
N/A
REST
{@inject: endpoint|DELETE|/admin/v2/namespaces/{tenant}/{namespace}/backlogQuota|operation/removeBacklogQuota}
Java
admin.namespaces().removeBacklogQuota(namespace, backlogQuotaType)
设置持久化策略
持久化策略可以为给定命名空间下 topic 上的所有消息配置持久等级。
Bookkeeper-ack-quorum:每个 entry 在等待的 acks(有保证的副本)数量,默认值:0
Bookkeeper-ensemble:单个 topic 使用的 bookie 数量,默认值:0
Bookkeeper-write-quorum:每个 entry 要写入的次数,默认值:0
Ml-mark-delete-max-rate:标记-删除操作的限制速率(0表示无限制),默认值:0.0
CLI
$ pulsar-admin namespaces set-persistence --bookkeeper-ack-quorum 2 --bookkeeper-ensemble 3 --bookkeeper-write-quorum 2 --ml-mark-delete-max-rate 0 test-tenant/ns1
N/A
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/persistence|operation/setPersistence}
Java
admin.namespaces().setPersistence(namespace,new PersistencePolicies(bookkeeperEnsemble, bookkeeperWriteQuorum,bookkeeperAckQuorum,managedLedgerMaxMarkDeleteRate))
获取持久化策略
可通过以下方式查看指定命名空间已配置的持久化策略。
CLI
$ pulsar-admin namespaces get-persistence test-tenant/ns1
{
"bookkeeperEnsemble": 3,
"bookkeeperWriteQuorum": 2,
"bookkeeperAckQuorum": 2,
"managedLedgerMaxMarkDeleteRate": 0
}
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/persistence|operation/getPersistence}
Java
admin.namespaces().getPersistence(namespace)
卸载命名空间
命名空间包是虚拟的 topic 组,这些 topic 来自于同一个命名空间。 如果 broker 因为包的数量过多而过载,则可通过以下命令进行卸载,并由其他负载较小的 broker 提供服务。 以起始值和结束值之间的范围来定义命名空间包,如 0x00000000 和 0xffffffff。
CLI
$ pulsar-admin namespaces unload --bundle 0x00000000_0xffffffff test-tenant/ns1
N/A
REST
{@inject: endpoint|PUT|/admin/v2/namespaces/{tenant}/{namespace}/{bundle}/unload|operation/unloadNamespaceBundle}
Java
admin.namespaces().unloadNamespaceBundle(namespace, bundle)
set message-ttl
配置消息存活时间 (以秒为单位)。
CLI
$ pulsar-admin namespaces set-message-ttl --messageTTL 100 test-tenant/ns1
N/A
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/messageTTL|operation/setNamespaceMessageTTL}
Java
admin.namespaces().setNamespaceMessageTTL(namespace, messageTTL)
get message-ttl
获取命名空间中消息的存活时间(ttl)。
CLI
$ pulsar-admin namespaces get-message-ttl test-tenant/ns1
100
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/messageTTL|operation/getNamespaceMessageTTL}
Java
admin.namespaces().getNamespaceMessageTTL(namespace)
分割包(bundle)
命名空间包可以包含多个 topic,但只能由一个 broker 来提供服务。 如果由于 broker 创建的包中活跃 topic 较多,而使包负载过重,则管理员可以使用以下命令分包,来解决这一问题。
CLI
$ pulsar-admin namespaces split-bundle --bundle 0x00000000_0xffffffff test-tenant/ns1
N/A
REST
{@inject: endpoint|PUT|/admin/v2/namespaces/{tenant}/{namespace}/{bundle}/split|operation/splitNamespaceBundle}
Java
admin.namespaces().splitNamespaceBundle(namespace, bundle)
清除消息堆积
It clears all message backlog for all the topics those belong to specific namespace. You can also clear backlog for a specific subscription as well.
CLI
$ pulsar-admin namespaces clear-backlog --sub my-subscription test-tenant/ns1
N/A
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/clearBacklog|operation/clearNamespaceBacklogForSubscription}
Java
admin.namespaces().clearNamespaceBacklogForSubscription(namespace, subscription)
清除包消息堆积
It clears all message backlog for all the topics those belong to specific NamespaceBundle. You can also clear backlog for a specific subscription as well.
CLI
$ pulsar-admin namespaces clear-backlog --bundle 0x00000000_0xffffffff --sub my-subscription test-tenant/ns1
N/A
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/{bundle}/clearBacklog|operation/clearNamespaceBundleBacklogForSubscription}
Java
admin.namespaces().clearNamespaceBundleBacklogForSubscription(namespace, bundle, subscription)
设置消息存留参数
命名空间包含多个 topic,每个 topic 的保留大小(存储大小)不应超过特定阈值,否则其存储时间会受到限制。 可通过以下命令配置指定命名空间中 topic 的保留大小和保留时间。
CLI
$ pulsar-admin set-retention --size 10 --time 100 test-tenant/ns1
N/A
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/retention|operation/setRetention}
Java
admin.namespaces().setRetention(namespace, new RetentionPolicies(retentionTimeInMin, retentionSizeInMB))
获取消息存留参数
获取指定命名空间的保留参数信息。
CLI
$ pulsar-admin namespaces get-retention test-tenant/ns1
{
"retentionTimeInMinutes": 10,
"retentionSizeInMB": 100
}
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/retention|operation/getRetention}
Java
admin.namespaces().getRetention(namespace)
设置消息分发速率
为给定命名空间下所有 topic 设置消息派发速率。 通过每 X 秒消息数(msg-dispatch-rate
))或者每 X 秒消息字节数来限制(byte-dispatch-rate
)派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
Note
- If neither
clusterDispatchRate
nortopicDispatchRate
is configured, dispatch throttling is disabled. >- If
topicDispatchRate
is not configured,clusterDispatchRate
takes effect. >- 如果
topicDispatchRate
配置了,则topicDispatchRate
会生效。
CLI
$ pulsar-admin namespaces set-dispatch-rate test-tenant/ns1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/dispatchRate|operation/setDispatchRate}
Java
admin.namespaces().setDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
获取消息分发速率配置
获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)
CLI
$ pulsar-admin namespaces get-dispatch-rate test-tenant/ns1
{
"dispatchThrottlingRatePerTopicInMsg" : 1000,
"dispatchThrottlingRatePerTopicInByte" : 1048576,
"ratePeriodInSecond" : 1
}
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/dispatchRate|operation/getDispatchRate}
Java
admin.namespaces().getDispatchRate(namespace)
为订阅设置分发配额限制
为给定命名空间下 topic 中的所有订阅设置消息派发速率。 通过每 X 秒消息数(msg-dispatch-rate
))或者每 X 秒消息字节数来限制(byte-dispatch-rate
)派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
CLI
$ pulsar-admin namespaces set-subscription-dispatch-rate test-tenant/ns1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/subscriptionDispatchRate|operation/setDispatchRate}
Java
admin.namespaces().setSubscriptionDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
获取消息分发速率配置
获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)
CLI
$ pulsar-admin namespaces get-subscription-dispatch-rate test-tenant/ns1
{
"dispatchThrottlingRatePerTopicInMsg" : 1000,
"dispatchThrottlingRatePerTopicInByte" : 1048576,
"ratePeriodInSecond" : 1
}
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/subscriptionDispatchRate|operation/getDispatchRate}
Java
admin.namespaces().getSubscriptionDispatchRate(namespace)
为复制器设置派发速率。
为给定命名空间下复制集群之间的所有复制器设置消息派发速率。 通过每 X 秒消息数(msg-dispatch-rate
))或者每 X 秒消息字节数来限制(byte-dispatch-rate
)派发速率。 派发速率指每秒派发的消息数,可通过 dispatch-rate-period
来配置。 msg-dispatch-rate
和 byte-dispatch-rate
的默认值均为 -1,即禁用配额限制。
CLI
$ pulsar-admin namespaces set-replicator-dispatch-rate test-tenant/ns1 \
--msg-dispatch-rate 1000 \
--byte-dispatch-rate 1048576 \
--dispatch-rate-period 1
REST
{@inject: endpoint|POST|/admin/v2/namespaces/{tenant}/{namespace}/replicatorDispatchRate|operation/setDispatchRate}
Java
admin.namespaces().setReplicatorDispatchRate(namespace, new DispatchRate(1000, 1048576, 1))
获取消息分发速率配置
获取命名空间中已配置的消息派发速率(该命名空间下 topic 发送消息数 / 秒)
CLI
$ pulsar-admin namespaces get-replicator-dispatch-rate test-tenant/ns1
{
"dispatchThrottlingRatePerTopicInMsg" : 1000,
"dispatchThrottlingRatePerTopicInByte" : 1048576,
"ratePeriodInSecond" : 1
}
REST
{@inject: endpoint|GET|/admin/v2/namespaces/{tenant}/{namespace}/replicatorDispatchRate|operation/getDispatchRate}
Java
admin.namespaces().getReplicatorDispatchRate(namespace)
命名空间隔离
敬请期待。
从 broker上卸载命名空间
你能从这Pulsar broker 管理上卸载一个命名空间,或一个 命名空间包。
pulsar-admin
Use the unload
subcommand of the namespaces
command.
CLI
$ pulsar-admin namespaces unload my-tenant/my-ns
REST
{@inject: endpoint|PUT|/admin/v2/namespaces/{tenant}/{namespace}/unload|operation/unloadNamespace}
Java
admin.namespaces().unload(namespace)