Pulsar C# client

您可以使用 Pulsar C# 客户端 (DotPulsar) 在 C# 中创建 Pulsar 生产者和消费者。 C# 客户端中生产者、消费者和读取器的所有方法都是线程安全的。 DotPulsar 官方文档可以参考 这里

安装

你可以通过 dotnet CLI 或者 Visual Studio 来安装 Pulsar C# 客户端。 本章节描述了如何通过 dotnet CLI 来安装 Pulsar C# 客户端类库。 关于如何通过 Visual Studio 安装 Pulsar C# 客户端,请参阅 这里

先决条件

安装 .NET Core SDK,它提供了 dotnet 命令行工具。 从 Visual Studio 2017 开始,dotnet CLI 将自动随任何与 .NET Core 相关的工作负载一起安装。

操作步骤

按照以下步骤安装 Pulsar C# 客户端库:

  1. 创建一个新项目。

    1. 为项目创建一个文件夹。

    2. 打开终端窗口并切换到新文件夹。

    3. 使用以下命令创建项目:

      1. dotnet new console
    4. 使用 dotnet run 命令来测试应用程序是否已经创建成功。

  2. 添加 DotPulsar Nuget 包。

    1. 使用以下命令安装 DotPulsar 包:

      1. dotnet add package DotPulsar
    2. 在命令完成后,打开 .csproj 文件来查看添加的引用:

      1. <ItemGroup>
      2. <PackageReference Include="DotPulsar" Version="0.11.0" />
      3. </ItemGroup>

Client

本节描述了Pulsar C# 客户端的一些配置示例。

创建客户端

本示例展示如何创建连接到本机的 Pulsar C# 客户端。

  1. var client = PulsarClient.Builder().Build();

要使用 Builder 创建一个 Pulsar C# 客户端,你需要指定以下参数:

选项说明默认值
ServiceUrl设置 Pulsar 集群的服务URL。pulsar://localhost:6650
RetryInterval设置重试操作或重新连接之前的等待时间。3s

创建生产者

本节介绍如何创建生产者。

  • 使用 Builder 创建生产者。

    1. var producer = client.NewProducer()
    2. .Topic("persistent://public/default/mytopic")
    3. .Create();
  • 不使用 Builder 创建生产者。

    1. var options = new ProducerOptions("persistent://public/default/mytopic");
    2. var producer = client.CreateProducer(options);

创建消费者

本节介绍如何创消费者。

  • 使用 Builder 创建消费者。

    1. var consumer = client.NewConsumer()
    2. .SubscriptionName("MySubscription")
    3. .Topic("persistent://public/default/mytopic")
    4. .Create();
  • 不使用 Builder 创建消费者。

    1. var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic");
    2. var consumer = client.CreateConsumer(options);

创建 Reader

本节介绍如何创建 Reader。

  • 使用 Builder 创建 Reader。

    1. var reader = client.NewReader()
    2. .StartMessageId(MessageId.Earliest)
    3. .Topic("persistent://public/default/mytopic")
    4. .Create();
  • 不使用 Builder 创建 Reader。

    1. var options = new ReaderOptions(MessageId.Earliest, "persistent://public/default/mytopic");
    2. var reader = client.CreateReader(options);

配置加密策略

Pulsar C# 客户端支持四种加密策略:

  • EnforceUnencrypted:总是使用未加密的连接。
  • EnforceEncrypted:总是使用加密的连接。
  • PreferUnencrypted:如果可以,优先使用未加密的连接。
  • PreferEncrypted:如果可以,优先使用加密连接。

此示例显示如何设置 EnforceUnencrypted 加密策略。

  1. var client = PulsarClient.Builder()
  2. .ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
  3. .Build();

配置身份验证

目前,Pulsar C# 客户端支持 TLS (Transport Layer Security) 和 JWT (JSON Web Token) 认证。

如果你使用了 基于 TLS 的身份验证,你需要准备好当时获得的证书和密钥。 从 Pulsar C# 客户端使用它们,请参考以下步骤:

  1. 创建一个未加密且无密码的 pfx 文件。

    1. openssl pkcs12 -export -keypbe NONE -certpbe NONE -out admin.pfx -inkey admin.key.pem -in admin.cert.pem -passout pass:
  2. 使用 admin.pfx 文件创建一个 X509Certificate2 对象,并传递到 Pulsar C# 客户端。

    1. var clientCertificate = new X509Certificate2("admin.pfx");
    2. var client = PulsarClient.Builder()
    3. .AuthenticateUsingClientCertificate(clientCertificate)
    4. .Build();

生产者(Producer)

生产者是一个关联到具体主题的进程,它会源源不断地将消息发布到 Pulsar broker 上。 本节描述了生产者的一些配置示例。

发送数据

此示例显示如何发送数据。

  1. var data = Encoding.UTF8.GetBytes("Hello World");
  2. await producer.Send(data);

发送带有自定义元数据的消息

  • 使用生成器发送带有自定义元数据的消息。

    1. var data = Encoding.UTF8.GetBytes("Hello World");
    2. var messageId = await producer.NewMessage()
    3. .Property("SomeKey", "SomeValue")
    4. .Send(data);
  • 不使用生成器发送带有自定义元数据的消息。

    1. var data = Encoding.UTF8.GetBytes("Hello World");
    2. var metadata = new MessageMetadata();
    3. metadata["SomeKey"] = "SomeValue";
    4. var messageId = await producer.Send(metadata, data));

消费者(Consumer)

消费者是一个通过订阅关联到某一主题,并接收消息的程序。 本节介绍一些消费者配置示例。

接收消息

此示例显示消费者如何从主题接收消息。

  1. await foreach (var message in consumer.Messages())
  2. {
  3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  4. }

确认消息

消息可以是逐个确认,也可以累积一起确认。 关于消息确认的详情,请参阅 消息确认 章节。

  • 逐条确认消息。

    1. await foreach (var message in consumer.Messages())
    2. {
    3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
    4. }
  • 累积确认消息。

    1. await consumer.AcknowledgeCumulative(message);

取消订阅主题

此示例显示消费者如何取消订阅某主题。

  1. await consumer.Unsubscribe();

Note

消费者一旦取消订阅了主题,就不能使用了。

Reader

Reader 实际上只是一个没有游标的消费者。 这意味着 Pulsar 不会跟踪消息的消费进度,也不会去做消息确认。

此示例展示 Reader 如何接收消息。

  1. await foreach (var message in reader.Messages())
  2. {
  3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  4. }

Monitoring

本节介绍如何监测生产者、消费者和 reader 的状态。

监控生产者

下表列出了可监测的生产者状态。

状态说明
Closed生产者或 Pulsar 客户端已经关闭。
Connected一切正常。
Disconnected连接已丢失,正在尝试重新连接。
Faulted发生了不可恢复的错误。

此示例显示如何监测生产者状态。

  1. private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
  2. {
  3. var state = ProducerState.Disconnected;
  4. while (!cancellationToken.IsCancellationRequested)
  5. {
  6. state = await producer.StateChangedFrom(state, cancellationToken);
  7. var stateMessage = state switch
  8. {
  9. ProducerState.Connected => $"The producer is connected",
  10. ProducerState.Disconnected => $"The producer is disconnected",
  11. ProducerState.Closed => $"The producer has closed",
  12. ProducerState.Faulted => $"The producer has faulted",
  13. _ => $"The producer has an unknown state '{state}'"
  14. };
  15. Console.WriteLine(stateMessage);
  16. if (producer.IsFinalState(state))
  17. return;
  18. }
  19. }

监测消费者状态

下表列出了可监测的消费者状态。

状态说明
Active一切正常。
Inactive一切正常。 订阅类型是 Failover,表示你已经不是活跃的消费者。
Closed消费者或 Pulsar 客户端已经关闭。
Disconnected连接已丢失,正在尝试重新连接。
Faulted发生了不可恢复的错误。
ReachedEndOfTopic没有更多的消息了。

此示例显示如何监测消费者状态。

  1. private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
  2. {
  3. var state = ConsumerState.Disconnected;
  4. while (!cancellationToken.IsCancellationRequested)
  5. {
  6. state = await consumer.StateChangedFrom(state, cancellationToken);
  7. var stateMessage = state switch
  8. {
  9. ConsumerState.Active => "The consumer is active",
  10. ConsumerState.Inactive => "The consumer is inactive",
  11. ConsumerState.Disconnected => "The consumer is disconnected",
  12. ConsumerState.Closed => "The consumer has closed",
  13. ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
  14. ConsumerState.Faulted => "The consumer has faulted",
  15. _ => $"The consumer has an unknown state '{state}'"
  16. };
  17. Console.WriteLine(stateMessage);
  18. if (consumer.IsFinalState(state))
  19. return;
  20. }
  21. }

监测 Reader 状态

下表列出了可监测的 Reader 状态。

状态说明
ClosedReader 或 Pulsar 客户端已经关闭。
Connected一切正常。
Disconnected连接已丢失,正在尝试重新连接。
Faulted发生了不可恢复的错误。
ReachedEndOfTopic没有更多的消息了。

此示例显示如何监测 Reader 状态。

  1. private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
  2. {
  3. var state = ReaderState.Disconnected;
  4. while (!cancellationToken.IsCancellationRequested)
  5. {
  6. state = await reader.StateChangedFrom(state, cancellationToken);
  7. var stateMessage = state switch
  8. {
  9. ReaderState.Connected => "The reader is connected",
  10. ReaderState.Disconnected => "The reader is disconnected",
  11. ReaderState.Closed => "The reader has closed",
  12. ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
  13. ReaderState.Faulted => "The reader has faulted",
  14. _ => $"The reader has an unknown state '{state}'"
  15. };
  16. Console.WriteLine(stateMessage);
  17. if (reader.IsFinalState(state))
  18. return;
  19. }
  20. }