Pulsar provides a way to use custom authentication and authorization mechanisms

Authentication

Pulsar support mutual TLS and Athenz authentication plugins, and these can be used as described in Security.

It is possible to use a custom authentication mechanism by providing the implementation in the form of two plugins one for the Client library and the other for the Pulsar Broker to validate the credentials.

Client authentication plugin

For client library, you will need to implement org.apache.pulsar.client.api.Authentication. This class can then be passed when creating a Pulsar client:

  1. PulsarClient client = PulsarClient.builder()
  2. .serviceUrl("pulsar://localhost:6650")
  3. .authentication(new MyAuthentication())
  4. .build();

For reference, there are 2 interfaces to implement on the client side: * Authentication -> http://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/org/apache/pulsar/client/api/Authentication.html * AuthenticationDataProvider -> http://pulsar.apache.org/api/client/2.6.0-SNAPSHOT/org/apache/pulsar/client/api/AuthenticationDataProvider.html

This in turn will need to provide the client credentials in the form of org.apache.pulsar.client.api.AuthenticationDataProvider. This will leave the chance to return different kinds of authentication token for different type of connection or by passing a certificate chain to use for TLS.

Examples for client authentication providers can be found at:

Broker authentication plugin

On broker side, we need the corresponding plugin to validate the credentials passed by the client. Broker can support multiple authentication providers at the same time.

In conf/broker.conf it’s possible to specify a list of valid providers:

  1. # Autentication provider name list, which is comma separated list of class names
  2. authenticationProviders=

There is one single interface to implement org.apache.pulsar.broker.authentication.AuthenticationProvider:

  1. /**
  2. * Provider of authentication mechanism
  3. */
  4. public interface AuthenticationProvider extends Closeable {
  5. /**
  6. * Perform initialization for the authentication provider
  7. *
  8. * @param config
  9. * broker config object
  10. * @throws IOException
  11. * if the initialization fails
  12. */
  13. void initialize(ServiceConfiguration config) throws IOException;
  14. /**
  15. * @return the authentication method name supported by this provider
  16. */
  17. String getAuthMethodName();
  18. /**
  19. * Validate the authentication for the given credentials with the specified authentication data
  20. *
  21. * @param authData
  22. * provider specific authentication data
  23. * @return the "role" string for the authenticated connection, if the authentication was successful
  24. * @throws AuthenticationException
  25. * if the credentials are not valid
  26. */
  27. String authenticate(AuthenticationDataSource authData) throws AuthenticationException;
  28. }

Example for Broker authentication plugins:

Authorization

Authorization is the operation that checks whether a particular “role” or “principal” is allowed to perform a certain operation.

By default, Pulsar provides an embedded authorization, though it’s possible to configure a different one through a plugin.

To provide a custom provider, one needs to implement the org.apache.pulsar.broker.authorization.AuthorizationProvider interface, have this class in the Pulsar broker classpath and configure it in conf/broker.conf:

  1. properties
  2. # Authorization provider fully qualified class-name
  3. authorizationProvider=org.apache.pulsar.broker.authorization.PulsarAuthorizationProvider
  1. /**
  2. * Provider of authorization mechanism
  3. */
  4. public interface AuthorizationProvider extends Closeable {
  5. /**
  6. * Perform initialization for the authorization provider
  7. *
  8. * @param config
  9. * broker config object
  10. * @param configCache
  11. * pulsar zk configuration cache service
  12. * @throws IOException
  13. * if the initialization fails
  14. */
  15. void initialize(ServiceConfiguration conf, ConfigurationCacheService configCache) throws IOException;
  16. /**
  17. * Check if the specified role has permission to send messages to the specified fully qualified topic name.
  18. *
  19. * @param topicName
  20. * the fully qualified topic name associated with the topic.
  21. * @param role
  22. * the app id used to send messages to the topic.
  23. */
  24. CompletableFuture<Boolean> canProduceAsync(TopicName topicName, String role,
  25. AuthenticationDataSource authenticationData);
  26. /**
  27. * Check if the specified role has permission to receive messages from the specified fully qualified topic name.
  28. *
  29. * @param topicName
  30. * the fully qualified topic name associated with the topic.
  31. * @param role
  32. * the app id used to receive messages from the topic.
  33. * @param subscription
  34. * the subscription name defined by the client
  35. */
  36. CompletableFuture<Boolean> canConsumeAsync(TopicName topicName, String role,
  37. AuthenticationDataSource authenticationData, String subscription);
  38. /**
  39. * Check whether the specified role can perform a lookup for the specified topic.
  40. *
  41. * For that the caller needs to have producer or consumer permission.
  42. *
  43. * @param topicName
  44. * @param role
  45. * @return
  46. * @throws Exception
  47. */
  48. CompletableFuture<Boolean> canLookupAsync(TopicName topicName, String role,
  49. AuthenticationDataSource authenticationData);
  50. /**
  51. *
  52. * Grant authorization-action permission on a namespace to the given client
  53. *
  54. * @param namespace
  55. * @param actions
  56. * @param role
  57. * @param authDataJson
  58. * additional authdata in json format
  59. * @return CompletableFuture
  60. * @completesWith <br/>
  61. * IllegalArgumentException when namespace not found<br/>
  62. * IllegalStateException when failed to grant permission
  63. */
  64. CompletableFuture<Void> grantPermissionAsync(NamespaceName namespace, Set<AuthAction> actions, String role,
  65. String authDataJson);
  66. /**
  67. * Grant authorization-action permission on a topic to the given client
  68. *
  69. * @param topicName
  70. * @param role
  71. * @param authDataJson
  72. * additional authdata in json format
  73. * @return CompletableFuture
  74. * @completesWith <br/>
  75. * IllegalArgumentException when namespace not found<br/>
  76. * IllegalStateException when failed to grant permission
  77. */
  78. CompletableFuture<Void> grantPermissionAsync(TopicName topicName, Set<AuthAction> actions, String role,
  79. String authDataJson);
  80. }