应用程序可以使用 Pulsar 加密在生产者侧加密消息,并在消费者侧解密消息。 你可以使用应用程序配置的公钥私钥对进行加密。 只有拥有有效密钥的消费者可以解密加密过的消息。

对称加密与非对称加密

Pulsar使用动态生成的AES秘钥来加密消息(数据)。 你可以使用应用程序提供的 ECDSA/RSA 密钥对来加密 AES 密钥(数据密钥), 所以你不必与大家分享秘密。

Key is a public and private key pair used for encryption or decryption. The producer key is the public key of the key pair, and the consumer key is the private key of the key pair.

应用程序用公钥配置生产者。 你可以使用此密钥来加密 AES 数据密钥。 The encrypted data key is sent as part of message header. 只有拥有私钥的实体 (在这种情况下是消费者) 才能解密用于解密消息的数据密钥。

You can encrypt a message with more than one key. Any one of the keys used for encrypting the message is sufficient to decrypt the message.

Pulsar does not store the encryption key anywhere in the Pulsar service. If you lose or delete the private key, your message is irretrievably lost, and is unrecoverable.

Producer

alt text

Consumer

alt text

Get started

  1. 输入下面的命令来创建你的 ECDSA 或 RSA 公钥私钥对。
  1. openssl ecparam -name secp521r1 -genkey -param_enc explicit -out test_ecdsa_privkey.pem
  2. openssl ec -in test_ecdsa_privkey.pem -pubout -outform pem -out test_ecdsa_pubkey.pem
  1. Add the public and private key to the key management and configure your producers to retrieve public keys and consumers clients to retrieve private keys.

  2. 实现 CryptoKeyReader 接口,尤其是生产者的 CryptoKeyReader.getPublicKey() 和消费者的 CryptoKeyReader.getPrivateKey() ,Pulsar 客户端会调用这两个方法来加载密钥。

  3. 向生产者 builder 添加加密密钥名称: PulsarClient.newProducer().addEncryptionKey(“myapp.key”)。

  4. 将 CryptoKeyReader 实现添加到生产者或消费者 builder: PulsarClient.newProducer().cryptoKeyReader(keyReader) / PulsarClient.newConsumer().cryptoKeyReader(keyReader)。

  5. Sample producer application:

  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  32. Producer producer = pulsarClient.newProducer()
  33. .topic("persistent://my-tenant/my-ns/my-topic")
  34. .addEncryptionKey("myappkey")
  35. .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"))
  36. .create();
  37. for (int i = 0; i < 10; i++) {
  38. producer.send("my-message".getBytes());
  39. }
  40. producer.close();
  41. pulsarClient.close();
  1. 简单的消费者示例
  1. class RawFileKeyReader implements CryptoKeyReader {
  2. String publicKeyFile = "";
  3. String privateKeyFile = "";
  4. RawFileKeyReader(String pubKeyFile, String privKeyFile) {
  5. publicKeyFile = pubKeyFile;
  6. privateKeyFile = privKeyFile;
  7. }
  8. @Override
  9. public EncryptionKeyInfo getPublicKey(String keyName, Map<String, String> keyMeta) {
  10. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  11. try {
  12. keyInfo.setKey(Files.readAllBytes(Paths.get(publicKeyFile)));
  13. } catch (IOException e) {
  14. System.out.println("ERROR: Failed to read public key from file " + publicKeyFile);
  15. e.printStackTrace();
  16. }
  17. return keyInfo;
  18. }
  19. @Override
  20. public EncryptionKeyInfo getPrivateKey(String keyName, Map<String, String> keyMeta) {
  21. EncryptionKeyInfo keyInfo = new EncryptionKeyInfo();
  22. try {
  23. keyInfo.setKey(Files.readAllBytes(Paths.get(privateKeyFile)));
  24. } catch (IOException e) {
  25. System.out.println("ERROR: Failed to read private key from file " + privateKeyFile);
  26. e.printStackTrace();
  27. }
  28. return keyInfo;
  29. }
  30. }
  31. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  32. Consumer consumer = pulsarClient.newConsumer()
  33. .topic("persistent://my-tenant/my-ns/my-topic")
  34. .subscriptionName("my-subscriber-name")
  35. .cryptoKeyReader(new RawFileKeyReader("test_ecdsa_pubkey.pem", "test_ecdsa_privkey.pem"))
  36. .subscribe();
  37. Message msg = null;
  38. for (int i = 0; i < 10; i++) {
  39. msg = consumer.receive();
  40. // do something
  41. System.out.println("Received: " + new String(msg.getData()));
  42. }
  43. // Acknowledge the consumption of all messages at once
  44. consumer.acknowledgeCumulative(msg);
  45. consumer.close();
  46. pulsarClient.close();

Key rotation

Pulsar 每隔 4 小时或在发布一定数量的消息后生成一个新的 AES 数据密钥。 生产者通过调用 CryptoKeyReader.getPublicKey() 获取最新版本,以此来每隔 4 小时获取一次不对称公钥。

在生产者应用程序中启用加密

If you produce messages that are consumed across application boundaries, you need to ensure that consumers in other applications have access to one of the private keys that can decrypt the messages. You can do this in two ways:

  1. 消费者应用程序提供了访问他们公钥的权限,你可以将公钥添加到你的生产者密钥上。
  2. 你授予访问权限给生产者使用的密钥对中的其中一个私钥。

When producers want to encrypt the messages with multiple keys, producers add all such keys to the config. Consumer can decrypt the message as long as the consumer has access to at least one of the keys.

如果你需要使用两个键(myapp.messagekey1和myapp.messagekey2)加密消息,请参阅下面的例子。

  1. PulsarClient.newProducer().addEncryptionKey("myapp.messagekey1").addEncryptionKey("myapp.messagekey2");

解密消费者应用程序中的加密消息

消费者需要访问其中一个私钥才能解密生产者所产生的信息。 如果你想要接收加密信件, 创建一个公钥或私钥,并将你的公钥交给生产者应用程序来使用你的公钥加密消息。

处理失败

  • Producer/ Consumer loses access to the key
    • 生产者操作失败指明失败的原因。 Application has the option to proceed with sending unencrypted message in such cases. 调用 PulsarClient.newProducer().cryptoFailureAction(ProducerCryptoFailureAction) 控制生产者的行为。 The default behavior is to fail the request.
    • 如果消费因解密失败或消费者缺失密钥而失败,应用程序可以选择消费加密的消息或丢弃它。 调用 PulsarClient.newConsumer().cryptoFailureAction(ConsumerCryptoFailureAction) 控制消费者的行为。 The default behavior is to fail the request. 如果私钥永久丢失,应用程序永远无法解密消息。
  • Batch messaging
    • 如果解密失败且消息包含批量消息,客户端会无法在批次中检索单独的消息, 因此,即使将 cryptoFailureAction() 设置为 ConsumerCryptoFailureAction.CONSUME,消息消费还是会失败。
  • 如果解密失败,消息消费会停止,应用程序除了在客户端日志中记录解密失败消息外还会通知积压增加。 如果应用程序不能访问私钥来解密消息,唯一的选项是跳过或丢弃已经积压的消息。