Pulsar C# client
您可以使用 Pulsar C# 客户端 (DotPulsar) 在 C# 中创建 Pulsar 生产者和消费者。 C# 客户端中生产者、消费者和读取器的所有方法都是线程安全的。 DotPulsar 官方文档可以参考 这里。
安装
你可以通过 dotnet CLI 或者 Visual Studio 来安装 Pulsar C# 客户端。 本章节描述了如何通过 dotnet CLI 来安装 Pulsar C# 客户端类库。 关于如何通过 Visual Studio 安装 Pulsar C# 客户端,请参阅 这里。
先决条件
安装 .NET Core SDK,它提供了 dotnet 命令行工具。 从 Visual Studio 2017 开始,dotnet CLI 将自动随任何与 .NET Core 相关的工作负载一起安装。
操作步骤
按照以下步骤安装 Pulsar C# 客户端库:
创建一个新项目。
为项目创建一个文件夹。
打开终端窗口并切换到新文件夹。
使用以下命令创建项目:
dotnet new console
使用
dotnet run
命令来测试应用程序是否已经创建成功。
添加 DotPulsar Nuget 包。
使用以下命令安装
DotPulsar
包:dotnet add package DotPulsar
在命令完成后,打开
.csproj
文件来查看添加的引用:<ItemGroup>
<PackageReference Include="DotPulsar" Version="0.11.0" />
</ItemGroup>
Client
本节描述了Pulsar C# 客户端的一些配置示例。
创建客户端
本示例展示如何创建连接到本机的 Pulsar C# 客户端。
var client = PulsarClient.Builder().Build();
要使用 Builder 创建一个 Pulsar C# 客户端,你需要指定以下参数:
选项 | 说明 | 默认值 |
---|---|---|
ServiceUrl | 设置 Pulsar 集群的服务URL。 | pulsar://localhost:6650 |
RetryInterval | 设置重试操作或重新连接之前的等待时间。 | 3s |
创建生产者
本节介绍如何创建生产者。
使用 Builder 创建生产者。
var producer = client.NewProducer()
.Topic("persistent://public/default/mytopic")
.Create();
不使用 Builder 创建生产者。
var options = new ProducerOptions("persistent://public/default/mytopic");
var producer = client.CreateProducer(options);
创建消费者
本节介绍如何创消费者。
使用 Builder 创建消费者。
var consumer = client.NewConsumer()
.SubscriptionName("MySubscription")
.Topic("persistent://public/default/mytopic")
.Create();
不使用 Builder 创建消费者。
var options = new ConsumerOptions("MySubscription", "persistent://public/default/mytopic");
var consumer = client.CreateConsumer(options);
创建 Reader
本节介绍如何创建 Reader。
使用 Builder 创建 Reader。
var reader = client.NewReader()
.StartMessageId(MessageId.Earliest)
.Topic("persistent://public/default/mytopic")
.Create();
不使用 Builder 创建 Reader。
var options = new ReaderOptions(MessageId.Earliest, "persistent://public/default/mytopic");
var reader = client.CreateReader(options);
配置加密策略
Pulsar C# 客户端支持四种加密策略:
EnforceUnencrypted
:总是使用未加密的连接。EnforceEncrypted
:总是使用加密的连接。PreferUnencrypted
:如果可以,优先使用未加密的连接。PreferEncrypted
:如果可以,优先使用加密连接。
此示例显示如何设置 EnforceUnencrypted
加密策略。
var client = PulsarClient.Builder()
.ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
.Build();
配置身份验证
目前,Pulsar C# 客户端支持 TLS (Transport Layer Security) 和 JWT (JSON Web Token) 认证。
如果你使用了 基于 TLS 的身份验证,你需要准备好当时获得的证书和密钥。 从 Pulsar C# 客户端使用它们,请参考以下步骤:
创建一个未加密且无密码的 pfx 文件。
openssl pkcs12 -export -keypbe NONE -certpbe NONE -out admin.pfx -inkey admin.key.pem -in admin.cert.pem -passout pass:
使用 admin.pfx 文件创建一个 X509Certificate2 对象,并传递到 Pulsar C# 客户端。
var clientCertificate = new X509Certificate2("admin.pfx");
var client = PulsarClient.Builder()
.AuthenticateUsingClientCertificate(clientCertificate)
.Build();
生产者(Producer)
生产者是一个关联到具体主题的进程,它会源源不断地将消息发布到 Pulsar broker 上。 本节描述了生产者的一些配置示例。
发送数据
此示例显示如何发送数据。
var data = Encoding.UTF8.GetBytes("Hello World");
await producer.Send(data);
发送带有自定义元数据的消息
使用生成器发送带有自定义元数据的消息。
var data = Encoding.UTF8.GetBytes("Hello World");
var messageId = await producer.NewMessage()
.Property("SomeKey", "SomeValue")
.Send(data);
不使用生成器发送带有自定义元数据的消息。
var data = Encoding.UTF8.GetBytes("Hello World");
var metadata = new MessageMetadata();
metadata["SomeKey"] = "SomeValue";
var messageId = await producer.Send(metadata, data));
消费者(Consumer)
消费者是一个通过订阅关联到某一主题,并接收消息的程序。 本节介绍一些消费者配置示例。
接收消息
此示例显示消费者如何从主题接收消息。
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
确认消息
消息可以是逐个确认,也可以累积一起确认。 关于消息确认的详情,请参阅 消息确认 章节。
逐条确认消息。
await foreach (var message in consumer.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
累积确认消息。
await consumer.AcknowledgeCumulative(message);
取消订阅主题
此示例显示消费者如何取消订阅某主题。
await consumer.Unsubscribe();
Note
消费者一旦取消订阅了主题,就不能使用了。
Reader
Reader 实际上只是一个没有游标的消费者。 这意味着 Pulsar 不会跟踪消息的消费进度,也不会去做消息确认。
此示例展示 Reader 如何接收消息。
await foreach (var message in reader.Messages())
{
Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
}
Monitoring
本节介绍如何监测生产者、消费者和 reader 的状态。
监控生产者
下表列出了可监测的生产者状态。
状态 | 说明 |
---|---|
Closed | 生产者或 Pulsar 客户端已经关闭。 |
Connected | 一切正常。 |
Disconnected | 连接已丢失,正在尝试重新连接。 |
Faulted | 发生了不可恢复的错误。 |
此示例显示如何监测生产者状态。
private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
{
var state = ProducerState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
state = await producer.StateChangedFrom(state, cancellationToken);
var stateMessage = state switch
{
ProducerState.Connected => $"The producer is connected",
ProducerState.Disconnected => $"The producer is disconnected",
ProducerState.Closed => $"The producer has closed",
ProducerState.Faulted => $"The producer has faulted",
_ => $"The producer has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (producer.IsFinalState(state))
return;
}
}
监测消费者状态
下表列出了可监测的消费者状态。
状态 | 说明 |
---|---|
Active | 一切正常。 |
Inactive | 一切正常。 订阅类型是 Failover ,表示你已经不是活跃的消费者。 |
Closed | 消费者或 Pulsar 客户端已经关闭。 |
Disconnected | 连接已丢失,正在尝试重新连接。 |
Faulted | 发生了不可恢复的错误。 |
ReachedEndOfTopic | 没有更多的消息了。 |
此示例显示如何监测消费者状态。
private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
{
var state = ConsumerState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
state = await consumer.StateChangedFrom(state, cancellationToken);
var stateMessage = state switch
{
ConsumerState.Active => "The consumer is active",
ConsumerState.Inactive => "The consumer is inactive",
ConsumerState.Disconnected => "The consumer is disconnected",
ConsumerState.Closed => "The consumer has closed",
ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
ConsumerState.Faulted => "The consumer has faulted",
_ => $"The consumer has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (consumer.IsFinalState(state))
return;
}
}
监测 Reader 状态
下表列出了可监测的 Reader 状态。
状态 | 说明 |
---|---|
Closed | Reader 或 Pulsar 客户端已经关闭。 |
Connected | 一切正常。 |
Disconnected | 连接已丢失,正在尝试重新连接。 |
Faulted | 发生了不可恢复的错误。 |
ReachedEndOfTopic | 没有更多的消息了。 |
此示例显示如何监测 Reader 状态。
private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
{
var state = ReaderState.Disconnected;
while (!cancellationToken.IsCancellationRequested)
{
state = await reader.StateChangedFrom(state, cancellationToken);
var stateMessage = state switch
{
ReaderState.Connected => "The reader is connected",
ReaderState.Disconnected => "The reader is disconnected",
ReaderState.Closed => "The reader has closed",
ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
ReaderState.Faulted => "The reader has faulted",
_ => $"The reader has an unknown state '{state}'"
};
Console.WriteLine(stateMessage);
if (reader.IsFinalState(state))
return;
}
}