Pulsar Encryption

Applications can use Pulsar encryption to encrypt messages on the producer side and decrypt messages on the consumer side. You can use the public and private key pair that the application configures to perform encryption. Only the consumers with a valid key can decrypt the encrypted messages.

Asymmetric and symmetric encryption

Pulsar uses a dynamically generated symmetric AES key to encrypt messages(data). You can use the application-provided ECDSA (Elliptic Curve Digital Signature Algorithm) or RSA (Rivest–Shamir–Adleman) key pair to encrypt the AES key(data key), so you do not have to share the secret with everyone.

Key is a public and private key pair used for encryption or decryption. The producer key is the public key of the key pair, and the consumer key is the private key of the key pair.

The application configures the producer with the public key. You can use this key to encrypt the AES data key. The encrypted data key is sent as part of message header. Only entities with the private key (in this case the consumer) are able to decrypt the data key which is used to decrypt the message.

You can encrypt a message with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message.

Pulsar does not store the encryption key anywhere in the Pulsar service. If you lose or delete the private key, your message is irretrievably lost, and is unrecoverable.

Producer

alt text

Consumer

alt text

Get started

  1. Create your ECDSA or RSA public and private key pair by using the following commands.
  • ECDSA(for Java clients only)

    1. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
    2. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
  • RSA (for C++, Python and Node.js clients)

    1. openssl genrsa -out test_rsa_privkey.pem 2048
    2. openssl rsa -in test_rsa_privkey.pem -pubout -outform pkcs8 -out test_rsa_pubkey.pem
  1. Add the public and private key to the key management and configure your producers to retrieve public keys and consumers clients to retrieve private keys.

  2. Implement the CryptoKeyReader interface, specifically CryptoKeyReader.getPublicKey() for producer and CryptoKeyReader.getPrivateKey() for consumer, which Pulsar client invokes to load the key.

  3. Add the encryption key name to the producer builder: PulsarClient.newProducer().addEncryptionKey(“myapp.key”).

  4. Configure a CryptoKeyReader to a producer, consumer or reader.

  • Java
  • C++
  • Python
  • Node.js
  1. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  2. String topic = "persistent://my-tenant/my-ns/my-topic";
  3. // RawFileKeyReader is just an example implementation that's not provided by Pulsar
  4. CryptoKeyReader keyReader = new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem");
  5. Producer<byte[]> producer = pulsarClient.newProducer()
  6. .topic(topic)
  7. .cryptoKeyReader(keyReader)
  8. .addEncryptionKey(“myappkey”)
  9. .create();
  10. Consumer<byte[]> consumer = pulsarClient.newConsumer()
  11. .topic(topic)
  12. .subscriptionName("my-subscriber-name")
  13. .cryptoKeyReader(keyReader)
  14. .subscribe();
  15. Reader<byte[]> reader = pulsarClient.newReader()
  16. .topic(topic)
  17. .startMessageId(MessageId.earliest)
  18. .cryptoKeyReader(keyReader)
  19. .create();
  1. Client client("pulsar://localhost:6650");
  2. std::string topic = "persistent://my-tenant/my-ns/my-topic";
  3. // DefaultCryptoKeyReader is a built-in implementation that reads public key and private key from files
  4. auto keyReader = std::make_shared<DefaultCryptoKeyReader>("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem");
  5. Producer producer;
  6. ProducerConfiguration producerConf;
  7. producerConf.setCryptoKeyReader(keyReader);
  8. producerConf.addEncryptionKey("myappkey");
  9. client.createProducer(topic, producerConf, producer);
  10. Consumer consumer;
  11. ConsumerConfiguration consumerConf;
  12. consumerConf.setCryptoKeyReader(keyReader);
  13. client.subscribe(topic, "my-subscriber-name", consumerConf, consumer);
  14. Reader reader;
  15. ReaderConfiguration readerConf;
  16. readerConf.setCryptoKeyReader(keyReader);
  17. client.createReader(topic, MessageId::earliest(), readerConf, reader);
  1. from pulsar import Client, CryptoKeyReader
  2. client = Client('pulsar://localhost:6650')
  3. topic = 'persistent://my-tenant/my-ns/my-topic'
  4. # CryptoKeyReader is a built-in implementation that reads public key and private key from files
  5. key_reader = CryptoKeyReader('test_ecdsa_pubkey.pem', 'test_ecdsa_privkey.pem')
  6. producer = client.create_producer(
  7. topic=topic,
  8. encryption_key='myappkey',
  9. crypto_key_reader=key_reader
  10. )
  11. consumer = client.subscribe(
  12. topic=topic,
  13. subscription_name='my-subscriber-name',
  14. crypto_key_reader=key_reader
  15. )
  16. reader = client.create_reader(
  17. topic=topic,
  18. start_message_id=MessageId.earliest,
  19. crypto_key_reader=key_reader
  20. )
  21. client.close()
  1. const Pulsar = require('pulsar-client');
  2. (async () => {
  3. // Create a client
  4. const client = new Pulsar.Client({
  5. serviceUrl: 'pulsar://localhost:6650',
  6. operationTimeoutSeconds: 30,
  7. });
  8. // Create a producer
  9. const producer = await client.createProducer({
  10. topic: 'persistent://public/default/my-topic',
  11. sendTimeoutMs: 30000,
  12. batchingEnabled: true,
  13. publicKeyPath: "public-key.client-rsa.pem",
  14. encryptionKey: "encryption-key"
  15. });
  16. // Create a consumer
  17. const consumer = await client.subscribe({
  18. topic: 'persistent://public/default/my-topic',
  19. subscription: 'sub1',
  20. subscriptionType: 'Shared',
  21. ackTimeoutMs: 10000,
  22. privateKeyPath: "private-key.client-rsa.pem"
  23. });
  24. // Send messages
  25. for (let i = 0; i < 10; i += 1) {
  26. const msg = `my-message-${i}`;
  27. producer.send({
  28. data: Buffer.from(msg),
  29. });
  30. console.log(`Sent message: ${msg}`);
  31. }
  32. await producer.flush();
  33. // Receive messages
  34. for (let i = 0; i < 10; i += 1) {
  35. const msg = await consumer.receive();
  36. console.log(msg.getData().toString());
  37. consumer.acknowledge(msg);
  38. }
  39. await consumer.close();
  40. await producer.close();
  41. await client.close();
  42. })();
  1. Below is an example of a customized CryptoKeyReader implementation.
  • Java
  • C++
  • Python
  • Node.js
  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  1. class CustomCryptoKeyReader : public CryptoKeyReader {
  2. public:
  3. Result getPublicKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
  4. EncryptionKeyInfo& encKeyInfo) const override {
  5. // TODO:
  6. return ResultOk;
  7. }
  8. Result getPrivateKey(const std::string& keyName, std::map<std::string, std::string>& metadata,
  9. EncryptionKeyInfo& encKeyInfo) const override {
  10. // TODO:
  11. return ResultOk;
  12. }
  13. };
  14. auto keyReader = std::make_shared<CustomCryptoKeyReader>(/* ... */);
  15. // TODO: create producer, consumer or reader based on keyReader here

Besides, you can use the default implementation of CryptoKeyReader by specifying the paths of private key and public key.

Currently, customized CryptoKeyReader implementation is not supported in Python. However, you can use the default implementation by specifying the path of private key and public key.

Currently, customized CryptoKeyReader implementation is not supported in Node.js. However, you can use the default implementation by specifying the path of private key and public key.

Key rotation

Pulsar generates a new AES data key every 4 hours or after publishing a certain number of messages. A producer fetches the asymmetric public key every 4 hours by calling CryptoKeyReader.getPublicKey() to retrieve the latest version.

Enable encryption at the producer application

If you produce messages that are consumed across application boundaries, you need to ensure that consumers in other applications have access to one of the private keys that can decrypt the messages. You can do this in two ways:

  1. The consumer application provides you access to their public key, which you add to your producer keys.
  2. You grant access to one of the private keys from the pairs that producer uses.

When producers want to encrypt the messages with multiple keys, producers add all such keys to the config. Consumer can decrypt the message as long as the consumer has access to at least one of the keys.

If you need to encrypt the messages using 2 keys (myapp.messagekey1 and myapp.messagekey2), refer to the following example.

  1. PulsarClient.newProducer().addEncryptionKey("myapp.messagekey1").addEncryptionKey("myapp.messagekey2");

Decrypt encrypted messages at the consumer application

Consumers require to access one of the private keys to decrypt messages that the producer produces. If you want to receive encrypted messages, create a public or private key and give your public key to the producer application to encrypt messages using your public key.

Handle failures

  • Producer/Consumer loses access to the key
    • Producer action fails to indicate the cause of the failure. Application has the option to proceed with sending unencrypted messages in such cases. Call PulsarClient.newProducer().cryptoFailureAction(ProducerCryptoFailureAction) to control the producer behavior. The default behavior is to fail the request.
    • If consumption fails due to decryption failure or missing keys in consumer, the application has the option to consume the encrypted message or discard it. Call PulsarClient.newConsumer().cryptoFailureAction(ConsumerCryptoFailureAction) to control the consumer behavior. The default behavior is to fail the request. Application is never able to decrypt the messages if the private key is permanently lost.
  • Batch messaging
    • If decryption fails and the message contains batch messages, client is not able to retrieve individual messages in the batch, hence message consumption fails even if cryptoFailureAction() is set to ConsumerCryptoFailureAction.CONSUME.
  • If decryption fails, the message consumption stops and the application notices backlog growth in addition to decryption failure messages in the client log. If the application does not have access to the private key to decrypt the message, the only option is to skip or discard backlogged messages.