Pulsar C# client

You can use the Pulsar C# client (DotPulsar) to create Pulsar producers, consumers, and readers in C#. For Pulsar features that C# clients support, see Pulsar Feature Matrix.

All the methods in the producer, consumer, and reader of a C# client are thread-safe. The official documentation for DotPulsar is available here.

Installation

This section describes how to install the Pulsar C# client library through the dotnet CLI.

Alternatively, you can install the Pulsar C# client library through Visual Studio. Note that starting from Visual Studio 2017, the dotnet CLI is automatically installed with any .NET Core related workloads. For more information, see Microsoft documentation.

To install the Pulsar C# client library using the dotnet CLI, follow these steps:

  1. Install the .NET Core SDK, which provides the dotnet CLI.

  2. Create a project.

    1. Create a folder for the project.

    2. Open a terminal window and switch to the new folder.

    3. Create the project using the following command.

      1. dotnet new console
    4. Use dotnet run to test that the app has been created properly.

  3. Add the DotPulsar NuGet package.

    1. Use the following command to install the DotPulsar package.

      1. dotnet add package DotPulsar
    2. After the command completes, open the .csproj file to see the added reference.

      1. <ItemGroup>
      2. <PackageReference Include="DotPulsar" Version="2.0.1" />
      3. </ItemGroup>

Connection URLs

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme. The following is an example of localhost with the default port 6650:

  1. pulsar://localhost:6650

If you have multiple brokers, separate IP:port by commas:

  1. pulsar://localhost:6550,localhost:6651,localhost:6652

If you use mTLS authentication, add +ssl in the scheme:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

Release notes

For the changelog of Pulsar C# clients, see release notes.

Client

This section describes some configuration examples for the Pulsar C# client.

Create client

This example shows how to create a Pulsar C# client connected to localhost.

  1. using DotPulsar;
  2. var client = PulsarClient.Builder().Build();

To create a Pulsar C# client by using the builder, you can specify the following options.

OptionDescriptionDefault
ServiceUrlSet the service URL for the Pulsar cluster.pulsar://localhost:6650
RetryIntervalSet the time to wait before retrying an operation or a reconnection.3s

Create producer

This section describes how to create a producer.

  • Create a producer by using the builder.

    1. using DotPulsar;
    2. using DotPulsar.Extensions;
    3. var producer = client.NewProducer()
    4. .Topic("persistent://public/default/mytopic")
    5. .Create();
  • Create a producer without using the builder.

    1. using DotPulsar;
    2. var options = new ProducerOptions<byte[]>("persistent://public/default/mytopic", Schema.ByteArray);
    3. var producer = client.CreateProducer(options);

Create consumer

This section describes how to create a consumer.

  • Create a consumer by using the builder.

    1. using DotPulsar;
    2. using DotPulsar.Extensions;
    3. var consumer = client.NewConsumer()
    4. .SubscriptionName("MySubscription")
    5. .Topic("persistent://public/default/mytopic")
    6. .Create();
  • Create a consumer without using the builder.

    1. using DotPulsar;
    2. var options = new ConsumerOptions<byte[]>("MySubscription", "persistent://public/default/mytopic", Schema.ByteArray);
    3. var consumer = client.CreateConsumer(options);

Create reader

This section describes how to create a reader.

  • Create a reader by using the builder.

    1. using DotPulsar;
    2. using DotPulsar.Extensions;
    3. var reader = client.NewReader()
    4. .StartMessageId(MessageId.Earliest)
    5. .Topic("persistent://public/default/mytopic")
    6. .Create();
  • Create a reader without using the builder.

    1. using DotPulsar;
    2. var options = new ReaderOptions<byte[]>(MessageId.Earliest, "persistent://public/default/mytopic", Schema.ByteArray);
    3. var reader = client.CreateReader(options);

Configure encryption policies

The Pulsar C# client supports four kinds of encryption policies:

  • EnforceUnencrypted: always use unencrypted connections.
  • EnforceEncrypted: always use encrypted connections)
  • PreferUnencrypted: use unencrypted connections, if possible.
  • PreferEncrypted: use encrypted connections, if possible.

This example shows how to set the EnforceUnencrypted encryption policy.

  1. using DotPulsar;
  2. var client = PulsarClient.Builder()
  3. .ConnectionSecurity(EncryptionPolicy.EnforceEncrypted)
  4. .Build();

Configure authentication

Currently, the Pulsar C# client supports the TLS (Transport Layer Security) and JWT (JSON Web Token) authentication.

If you have followed Authentication using mTLS, you get a certificate and a key. To use them from the Pulsar C# client, follow these steps:

  1. Create an unencrypted and password-less pfx file.

    1. openssl pkcs12 -export -keypbe NONE -certpbe NONE -out admin.pfx -inkey admin.key.pem -in admin.cert.pem -passout pass:
  2. Use the admin.pfx file to create an X509Certificate2 and pass it to the Pulsar C# client.

    1. using System.Security.Cryptography.X509Certificates;
    2. using DotPulsar;
    3. var clientCertificate = new X509Certificate2("admin.pfx");
    4. var client = PulsarClient.Builder()
    5. .AuthenticateUsingClientCertificate(clientCertificate)
    6. .Build();

Producer

A producer is a process that attaches to a topic and publishes messages to a Pulsar broker for processing. This section describes some configuration examples of the producer.

Send data

This example shows how to send data.

  1. var data = Encoding.UTF8.GetBytes("Hello World");
  2. await producer.Send(data);

Send messages with customized metadata

  • Send messages with customized metadata by using the builder.

    1. var messageId = await producer.NewMessage()
    2. .Property("SomeKey", "SomeValue")
    3. .Send(data);
  • Send messages with customized metadata without using the builder.

    1. var data = Encoding.UTF8.GetBytes("Hello World");
    2. var metadata = new MessageMetadata();
    3. metadata["SomeKey"] = "SomeValue";
    4. var messageId = await producer.Send(metadata, data));

Consumer

A consumer is a process that attaches to a topic through a subscription and then receives messages. This section describes some configuration examples about the consumer.

Receive messages

This example shows how a consumer receives messages from a topic.

  1. await foreach (var message in consumer.Messages())
  2. {
  3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  4. }

Acknowledge messages

Messages can be acknowledged individually or cumulatively. For details about message acknowledgment, see acknowledgment.

  • Acknowledge messages individually.

    1. await consumer.Acknowledge(message);
  • Acknowledge messages cumulatively.

    1. await consumer.AcknowledgeCumulative(message);

Unsubscribe from topics

This example shows how a consumer unsubscribes from a topic.

  1. await consumer.Unsubscribe();

C# - 图1note

A consumer cannot be used and is disposed once the consumer unsubscribes from a topic.

Reader

A reader is just a consumer without a cursor. This means that Pulsar does not keep track of your progress and there is no need to acknowledge messages.

This example shows how a reader receives messages.

  1. await foreach (var message in reader.Messages())
  2. {
  3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  4. }

Monitoring

This section describes how to monitor the producer, consumer, and reader state.

Monitor producer

The following table lists states available for the producer.

StateDescription
ClosedThe producer or the Pulsar client has been disposed.
ConnectedAll is well.
DisconnectedThe connection is lost and attempts are being made to reconnect.
FaultedAn unrecoverable error has occurred.
PartiallyConnectedSome of the sub-producers are disconnected.

This example shows how to monitor the producer state.

  1. private static async ValueTask Monitor(IProducer producer, CancellationToken cancellationToken)
  2. {
  3. var state = ProducerState.Disconnected;
  4. while (!cancellationToken.IsCancellationRequested)
  5. {
  6. state = (await producer.StateChangedFrom(state, cancellationToken)).ProducerState;
  7. var stateMessage = state switch
  8. {
  9. ProducerState.Connected => $"The producer is connected",
  10. ProducerState.Disconnected => $"The producer is disconnected",
  11. ProducerState.Closed => $"The producer has closed",
  12. ProducerState.Faulted => $"The producer has faulted",
  13. ProducerState.PartiallyConnected => $"The producer is partially connected.",
  14. _ => $"The producer has an unknown state '{state}'"
  15. };
  16. Console.WriteLine(stateMessage);
  17. if (producer.IsFinalState(state))
  18. return;
  19. }
  20. }

Monitor consumer state

The following table lists states available for the consumer.

StateDescription
ActiveAll is well.
InactiveAll is well. The subscription type is Failover and you are not the active consumer.
ClosedThe consumer or the Pulsar client has been disposed.
DisconnectedThe connection is lost and attempts are being made to reconnect.
FaultedAn unrecoverable error has occurred.
ReachedEndOfTopicNo more messages are delivered.
UnsubscribedThe consumer has unsubscribed.

This example shows how to monitor the consumer state.

  1. private static async ValueTask Monitor(IConsumer consumer, CancellationToken cancellationToken)
  2. {
  3. var state = ConsumerState.Disconnected;
  4. while (!cancellationToken.IsCancellationRequested)
  5. {
  6. state = (await consumer.StateChangedFrom(state, cancellationToken)).ConsumerState;
  7. var stateMessage = state switch
  8. {
  9. ConsumerState.Active => "The consumer is active",
  10. ConsumerState.Inactive => "The consumer is inactive",
  11. ConsumerState.Disconnected => "The consumer is disconnected",
  12. ConsumerState.Closed => "The consumer has closed",
  13. ConsumerState.ReachedEndOfTopic => "The consumer has reached end of topic",
  14. ConsumerState.Faulted => "The consumer has faulted",
  15. ConsumerState.Unsubscribed => "The consumer is unsubscribed.",
  16. _ => $"The consumer has an unknown state '{state}'"
  17. };
  18. Console.WriteLine(stateMessage);
  19. if (consumer.IsFinalState(state))
  20. return;
  21. }
  22. }

Monitor reader state

The following table lists states available for the reader.

StateDescription
ClosedThe reader or the Pulsar client has been disposed.
ConnectedAll is well.
DisconnectedThe connection is lost and attempts are being made to reconnect.
FaultedAn unrecoverable error has occurred.
ReachedEndOfTopicNo more messages are delivered.

This example shows how to monitor the reader’s state.

  1. private static async ValueTask Monitor(IReader reader, CancellationToken cancellationToken)
  2. {
  3. var state = ReaderState.Disconnected;
  4. while (!cancellationToken.IsCancellationRequested)
  5. {
  6. state = (await reader.StateChangedFrom(state, cancellationToken)).ReaderState;
  7. var stateMessage = state switch
  8. {
  9. ReaderState.Connected => "The reader is connected",
  10. ReaderState.Disconnected => "The reader is disconnected",
  11. ReaderState.Closed => "The reader has closed",
  12. ReaderState.ReachedEndOfTopic => "The reader has reached end of topic",
  13. ReaderState.Faulted => "The reader has faulted",
  14. _ => $"The reader has an unknown state '{state}'"
  15. };
  16. Console.WriteLine(stateMessage);
  17. if (reader.IsFinalState(state))
  18. return;
  19. }
  20. }