Extending Authentication and Authorization in Pulsar
Pulsar 提供了实现自定义认证和授权的机制。
认证
Pulsar supports mutual TLS and Athenz authentication plugins. For how to use these authentication plugins, you can refer to the description in Security.
你能够使用自定义的认证机制,只要实现以下两个插件的具体实现。 一个插件用于客户端库,另一个插件用于Pulsar Proxy 和 Pulsar Broker 来验证凭据。
客户端认证插件
For the client library, you need to implement org.apache.pulsar.client.api.Authentication
. By entering the command below you can pass this class when you create a Pulsar client:
PulsarClient client = PulsarClient.builder()
.serviceUrl("pulsar://localhost:6650")
.authentication(new MyAuthentication())
.build();
You can use 2 interfaces to implement on the client side:
Authentication
-> http://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/Authentication.htmlAuthenticationDataProvider
-> http://pulsar.apache.org/api/client/2.7.0-SNAPSHOT/org/apache/pulsar/client/api/AuthenticationDataProvider.html
这反过来需要以 org.apache.pulsar.client.api.AuthentationDataProvider
的形式提供客户端凭据。 这样就有机会为不同类型的连接返回不同类型的身份验证令牌,或者通过传递证书链来为TLS使用。
你可以找到客户端认证实现的一些例子:
- Mutual TLS Auth — https://github.com/apache/pulsar/tree/master/pulsar-client/src/main/java/org/apache/pulsar/client/impl/auth
- Athenz — https://github.com/apache/pulsar/tree/master/pulsar-client-auth-athenz/src/main/java/org/apache/pulsar/client/impl/auth
Proxy/Broker 认证插件
在 Proxy/Broker 这一侧,你需要配置相应的插件去检验客户端发送过来的凭据信息。 Proxy 和 Broker 可以同时支持多个身份认证提供者。
在配置文件conf/broker.conf
中,你能够指定一个有效提供者列表:
# 认证提供者名字列表, 用逗号分隔类名
authenticationProviders=
在单个接口实现org.apache.pulsar.broker.authentication.AuthenticationProvider
:
/**
* 认证机制提供者
*/
public interface AuthenticationProvider extends Closeable {
/**
* 初始化认证提供者
*
* @param config
* Broker 配置对象
* @throws IOException
* 如果初始化失败,则抛出异常
*/
void initialize(ServiceConfiguration config) throws IOException;
/**
* @return 返回本认证提供者的名称
*/
String getAuthMethodName();
/**
* 使用指定的身份验证数据验证给定凭据
*
* @param authData
* provider specific authentication data
* @return the "role" string for the authenticated connection, if the authentication was successful
* @throws AuthenticationException
* if the credentials are not valid
*/
String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
}
以下是 Broker 认证插件的示例:
- Mutual TLS — https://github.com/apache/pulsar/blob/master/pulsar-broker-common/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderTls.java
- Athenz — https://github.com/apache/pulsar/blob/master/pulsar-broker-auth-athenz/src/main/java/org/apache/pulsar/broker/authentication/AuthenticationProviderAthenz.java
授权
授权是检查某个“角色”或“主体”是否具有执行某个操作权限的操作。
默认情况下,你能够使用 Pulsar 自带的授权提供商。 你也可以通过插件的形式,配置不同的授权提供商。 注意,尽管认证插件被设计成可以在 Proxy 和 Broker 中使用。但是授权插件却被设计为只能在 Broker 中使用。所以,如果启用了授权机制,Proxy 只会执行一些简单的针对角色的授权检查。
要提供自定义提供者,你必须实现org.apache.pulsar.broker.authorization.AuthorizationProvider
接口。并将该类放到 Pulsar 所用的 classpath中,并将类名配置到conf/broker.conf
中。
# Authorization provider fully qualified class-name
authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
/**
* 授权策略提供者接口
*/
public interface AuthorizationProvider extends Closeable {
/**
* 初始化授权提供者
*
* @param conf
* broker config object
* @param configCache
* pulsar zk configuration cache service
* @throws IOException
* if the initialization fails
*/
void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
/**
* 检查指定角色是否具有向指定的完全限定主题名称发送消息的权限
*
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to send messages to the topic.
*/
CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);
/**
* 检查指定角色是否具有从指定的完全限定主题接收消息的权限.
*
* @param topicName
* the fully qualified topic name associated with the topic.
* @param role
* the app id used to receive messages from the topic.
* @param subscription
* the subscription name defined by the client
*/
CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData, String subscription);
/**
* 检查指定角色是否可以执行指定主题的查找。
*
* 要调用它必须拥有生产者或消费者的权限.
*
* @param topicName
* @param role
* @return
* @throws Exception
*/
CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
AuthenticationDataSource authenticationData);
/**
*
* 在命名空间级别授予客户端授权操作的权限。
*
* @param namespace
* @param actions
* @param role
* @param authDataJson
* additional authdata in json format
* @return CompletableFuture
* @completesWith <br/>
* IllegalArgumentException when namespace not found<br/>
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
String authDataJson);
/**
* 在主题上授予客户端授权操作的权限。
*
* @param topicName
* @param role
* @param authDataJson
* additional authdata in json format
* @return CompletableFuture
* @completesWith <br/>
* IllegalArgumentException when namespace not found<br/>
* IllegalStateException when failed to grant permission
*/
CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
String authDataJson);
}