The Pulsar Go client

The Pulsar Go client can be used to create Pulsar producers, consumers, and readers in Go (aka Golang).

API docs available as well

For standard API docs, consult the Godoc.

Installation

Requirements

Pulsar Go client library is based on the C++ client library. Followthe instructions for C++ library for installing the binariesthrough RPM, Deb or Homebrew packages.

Installing go package

Compatibility Warning

The version number of the Go client must match the version number of the Pulsar C++ client library.

You can install the pulsar library locally using go get. Note that go get doesn't support fetching a specific tag - it will always pull in master's version of the Go client. You'll need a C++ client library that matches master.

  1. $ go get -u github.com/apache/pulsar/pulsar-client-go/pulsar

Or you can use dep for managing the dependencies.

  1. $ dep ensure -add github.com/apache/pulsar/pulsar-client-go/pulsar@v2.4.0

Once installed locally, you can import it into your project:

  1. import "github.com/apache/pulsar/pulsar-client-go/pulsar"

Connection URLs

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

Pulsar protocol URLs are assigned to specific clusters, use the pulsar scheme and have a default port of 6650. Here's an example for localhost:

  1. pulsar://localhost:6650

A URL for a production Pulsar cluster may look something like this:

  1. pulsar://pulsar.us-west.example.com:6650

If you're using TLS authentication, the URL will look like something like this:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

Creating a client

In order to interact with Pulsar, you'll first need a Client object. You can create a client object using the NewClient function, passing in a ClientOptions object (more on configuration below). Here's an example:

  1. import (
  2. "log"
  3. "runtime"
  4. "github.com/apache/pulsar/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{
  8. URL: "pulsar://localhost:6650",
  9. OperationTimeoutSeconds: 5,
  10. MessageListenerThreads: runtime.NumCPU(),
  11. })
  12. if err != nil {
  13. log.Fatalf("Could not instantiate Pulsar client: %v", err)
  14. }
  15. }

The following configurable parameters are available for Pulsar clients:

ParameterDescriptionDefault
URLThe connection URL for the Pulsar cluster. See above for more info
IOThreadsThe number of threads to use for handling connections to Pulsar brokers1
OperationTimeoutSecondsThe timeout for some Go client operations (creating producers, subscribing to and unsubscribing from topics). Retries will occur until this threshold is reached, at which point the operation will fail.30
MessageListenerThreadsThe number of threads used by message listeners (consumers and readers)1
ConcurrentLookupRequestsThe number of concurrent lookup requests that can be sent on each broker connection. Setting a maximum helps to keep from overloading brokers. You should set values over the default of 5000 only if the client needs to produce and/or subscribe to thousands of Pulsar topics.5000
LoggerA custom logger implementation for the client (as a function that takes a log level, file path, line number, and message). All info, warn, and error messages will be routed to this function.nil
TLSTrustCertsFilePathThe file path for the trusted TLS certificate
TLSAllowInsecureConnectionWhether the client accepts untrusted TLS certificates from the brokerfalse
AuthenticationConfigure the authentication provider. (default: no authentication). Example: Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem")nil
StatsIntervalInSecondsThe interval (in seconds) at which client stats are published60

Producers

Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions object. Here's an example:

  1. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  2. Topic: "my-topic",
  3. })
  4. if err != nil {
  5. log.Fatalf("Could not instantiate Pulsar producer: %v", err)
  6. }
  7. defer producer.Close()
  8. msg := pulsar.ProducerMessage{
  9. Payload: []byte("Hello, Pulsar"),
  10. }
  11. if err := producer.Send(msg); err != nil {
  12. log.Fatalf("Producer could not send message: %v", err)
  13. }

Blocking operation

When you create a new Pulsar producer, the operation will block (waiting on a go channel) until either a producer is successfully created or an error is thrown.

Producer operations

Pulsar Go producers have the following methods available:

MethodDescriptionReturn type
Topic()Fetches the producer's topicstring
Name()Fetches the producer's namestring
Send(context.Context, ProducerMessage) errorPublishes a message to the producer's topic. This call will block until the message is successfully acknowledged by the Pulsar broker, or an error will be thrown if the timeout set using the SendTimeout in the producer's configuration is exceeded.error
SendAsync(context.Context, ProducerMessage, func(ProducerMessage, error))Publishes a message to the producer's topic asynchronously. The third argument is a callback function that specifies what happens either when the message is acknowledged or an error is thrown.
Close()Closes the producer and releases all resources allocated to it. If Close() is called then no more messages will be accepted from the publisher. This method will block until all pending publish requests have been persisted by Pulsar. If an error is thrown, no pending writes will be retried.error

Here's a more involved example usage of a producer:

  1. import (
  2. "context"
  3. "fmt"
  4. "log"
  5. "github.com/apache/pulsar/pulsar-client-go/pulsar"
  6. )
  7. func main() {
  8. // Instantiate a Pulsar client
  9. client, err := pulsar.NewClient(pulsar.ClientOptions{
  10. URL: "pulsar://localhost:6650",
  11. })
  12. if err != nil { log.Fatal(err) }
  13. // Use the client to instantiate a producer
  14. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  15. Topic: "my-topic",
  16. })
  17. if err != nil { log.Fatal(err) }
  18. ctx := context.Background()
  19. // Send 10 messages synchronously and 10 messages asynchronously
  20. for i := 0; i < 10; i++ {
  21. // Create a message
  22. msg := pulsar.ProducerMessage{
  23. Payload: []byte(fmt.Sprintf("message-%d", i)),
  24. }
  25. // Attempt to send the message
  26. if err := producer.Send(ctx, msg); err != nil {
  27. log.Fatal(err)
  28. }
  29. // Create a different message to send asynchronously
  30. asyncMsg := pulsar.ProducerMessage{
  31. Payload: []byte(fmt.Sprintf("async-message-%d", i)),
  32. }
  33. // Attempt to send the message asynchronously and handle the response
  34. producer.SendAsync(ctx, asyncMsg, func(msg pulsar.ProducerMessage, err error) {
  35. if err != nil { log.Fatal(err) }
  36. fmt.Printf("the %s successfully published", string(msg.Payload))
  37. })
  38. }
  39. }

Producer configuration

ParameterDescriptionDefault
TopicThe Pulsar topic to which the producer will publish messages
NameA name for the producer. If you don't explicitly assign a name, Pulsar will automatically generate a globally unique name that you can access later using the Name() method. If you choose to explicitly assign a name, it will need to be unique across all Pulsar clusters, otherwise the creation operation will throw an error.
SendTimeoutWhen publishing a message to a topic, the producer will wait for an acknowledgment from the responsible Pulsar broker. If a message is not acknowledged within the threshold set by this parameter, an error will be thrown. If you set SendTimeout to -1, the timeout will be set to infinity (and thus removed). Removing the send timeout is recommended when using Pulsar's message de-duplication feature.30 seconds
MaxPendingMessagesThe maximum size of the queue holding pending messages (i.e. messages waiting to receive an acknowledgment from the broker). By default, when the queue is full all calls to the Send and SendAsync methods will fail unless BlockIfQueueFull is set to true.
MaxPendingMessagesAcrossPartitions
BlockIfQueueFullIf set to true, the producer's Send and SendAsync methods will block when the outgoing message queue is full rather than failing and throwing an error (the size of that queue is dictated by the MaxPendingMessages parameter); if set to false (the default), Send and SendAsync operations will fail and throw a ProducerQueueIsFullError when the queue is full.false
MessageRoutingModeThe message routing logic (for producers on partitioned topics). This logic is applied only when no key is set on messages. The available options are: round robin (pulsar.RoundRobinDistribution, the default), publishing all messages to a single partition (pulsar.UseSinglePartition), or a custom partitioning scheme (pulsar.CustomPartition).pulsar.RoundRobinDistribution
HashingSchemeThe hashing function that determines the partition on which a particular message is published (partitioned topics only). The available options are: pulsar.JavaStringHash (the equivalent of String.hashCode() in Java), pulsar.Murmur3_32Hash (applies the Murmur3 hashing function), or pulsar.BoostHash (applies the hashing function from C++'s Boost library)pulsar.JavaStringHash
CompressionTypeThe message data compression type used by the producer. The available options are LZ4, ZLIB, ZSTD and SNAPPY.No compression
MessageRouterBy default, Pulsar uses a round-robin routing scheme for partitioned topics. The MessageRouter parameter enables you to specify custom routing logic via a function that takes the Pulsar message and topic metadata as an argument and returns an integer (where the ), i.e. a function signature of func(Message, TopicMetadata) int.

Consumers

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here's a basic example that uses channels:

  1. msgChannel := make(chan pulsar.ConsumerMessage)
  2. consumerOpts := pulsar.ConsumerOptions{
  3. Topic: "my-topic",
  4. SubscriptionName: "my-subscription-1",
  5. Type: pulsar.Exclusive,
  6. MessageChannel: msgChannel,
  7. }
  8. consumer, err := client.Subscribe(consumerOpts)
  9. if err != nil {
  10. log.Fatalf("Could not establish subscription: %v", err)
  11. }
  12. defer consumer.Close()
  13. for cm := range msgChannel {
  14. msg := cm.Message
  15. fmt.Printf("Message ID: %s", msg.ID())
  16. fmt.Printf("Message value: %s", string(msg.Payload()))
  17. consumer.Ack(msg)
  18. }

Blocking operation

When you create a new Pulsar consumer, the operation will block (on a go channel) until either a producer is successfully created or an error is thrown.

Consumer operations

Pulsar Go consumers have the following methods available:

MethodDescriptionReturn type
Topic()Returns the consumer's topicstring
Subscription()Returns the consumer's subscription namestring
Unsubcribe()Unsubscribes the consumer from the assigned topic. Throws an error if the unsubscribe operation is somehow unsuccessful.error
Receive(context.Context)Receives a single message from the topic. This method blocks until a message is available.(Message, error)
Ack(Message)Acknowledges a message to the Pulsar brokererror
AckID(MessageID)Acknowledges a message to the Pulsar broker by message IDerror
AckCumulative(Message)Acknowledgesall the messages in the stream, up to and including the specified message. The AckCumulative method will block until the ack has been sent to the broker. After that, the messages will not be redelivered to the consumer. Cumulative acking can only be used with a shared subscription type.error
Nack(Message)Acknowledge the failure to process a single message.error
NackID(MessageID)Acknowledge the failure to process a single message.error
Close()Closes the consumer, disabling its ability to receive messages from the brokererror
RedeliverUnackedMessages()Redelivers all unacknowledged messages on the topic. In failover mode, this request is ignored if the consumer isn't active on the specified topic; in shared mode, redelivered messages are distributed across all consumers connected to the topic. Note: this is a non-blocking operation that doesn't throw an error.

Receive example

Here's an example usage of a Go consumer that uses the Receive() method to process incoming messages:

  1. import (
  2. "context"
  3. "log"
  4. "github.com/apache/pulsar/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. // Instantiate a Pulsar client
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{
  9. URL: "pulsar://localhost:6650",
  10. })
  11. if err != nil { log.Fatal(err) }
  12. // Use the client object to instantiate a consumer
  13. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  14. Topic: "my-golang-topic",
  15. SubscriptionName: "sub-1",
  16. SubscriptionType: pulsar.Exclusive,
  17. })
  18. if err != nil { log.Fatal(err) }
  19. defer consumer.Close()
  20. ctx := context.Background()
  21. // Listen indefinitely on the topic
  22. for {
  23. msg, err := consumer.Receive(ctx)
  24. if err != nil { log.Fatal(err) }
  25. // Do something with the message
  26. err = processMessage(msg)
  27. if err == nil {
  28. // Message processed successfully
  29. consumer.Ack(msg)
  30. } else {
  31. // Failed to process messages
  32. consumer.Nack(msg)
  33. }
  34. }
  35. }

Consumer configuration

ParameterDescriptionDefault
TopicThe Pulsar topic on which the consumer will establish a subscription and listen for messages
SubscriptionNameThe subscription name for this consumer
NameThe name of the consumer
AckTimeout0
NackRedeliveryDelayThe delay after which to redeliver the messages that failed to be processed. Default is 1min. (See Consumer.Nack())1 minute
SubscriptionTypeAvailable options are Exclusive, Shared, and FailoverExclusive
MessageChannelThe Go channel used by the consumer. Messages that arrive from the Pulsar topic(s) will be passed to this channel.
ReceiverQueueSizeSets the size of the consumer's receiver queue, i.e. the number of messages that can be accumulated by the consumer before the application calls Receive. A value higher than the default of 1000 could increase consumer throughput, though at the expense of more memory utilization.1000
MaxTotalReceiverQueueSizeAcrossPartitionsSet the max total receiver queue size across partitions. This setting will be used to reduce the receiver queue size for individual partitions if the total exceeds this value50000

Readers

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. Here's an example:

  1. reader, err := client.CreateReader(pulsar.ReaderOptions{
  2. Topic: "my-golang-topic",
  3. StartMessageId: pulsar.LatestMessage,
  4. })

Blocking operation

When you create a new Pulsar reader, the operation will block (on a go channel) until either a reader is successfully created or an error is thrown.

Reader operations

Pulsar Go readers have the following methods available:

MethodDescriptionReturn type
Topic()Returns the reader's topicstring
Next(context.Context)Receives the next message on the topic (analogous to the Receive method for consumers). This method blocks until a message is available.(Message, error)
Close()Closes the reader, disabling its ability to receive messages from the brokererror

"Next" example

Here's an example usage of a Go reader that uses the Next() method to process incoming messages:

  1. import (
  2. "context"
  3. "log"
  4. "github.com/apache/pulsar/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. // Instantiate a Pulsar client
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{
  9. URL: "pulsar://localhost:6650",
  10. })
  11. if err != nil { log.Fatalf("Could not create client: %v", err) }
  12. // Use the client to instantiate a reader
  13. reader, err := client.CreateReader(pulsar.ReaderOptions{
  14. Topic: "my-golang-topic",
  15. StartMessageID: pulsar.EarliestMessage,
  16. })
  17. if err != nil { log.Fatalf("Could not create reader: %v", err) }
  18. defer reader.Close()
  19. ctx := context.Background()
  20. // Listen on the topic for incoming messages
  21. for {
  22. msg, err := reader.Next(ctx)
  23. if err != nil { log.Fatalf("Error reading from topic: %v", err) }
  24. // Process the message
  25. }
  26. }

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. Here's an example:

  1. lastSavedId := // Read last saved message id from external store as byte[]
  2. reader, err := client.CreateReader(pulsar.ReaderOptions{
  3. Topic: "my-golang-topic",
  4. StartMessageID: DeserializeMessageID(lastSavedId),
  5. })

Reader configuration

ParameterDescriptionDefault
TopicThe Pulsar topic on which the reader will establish a subscription and listen for messages
NameThe name of the reader
StartMessageIDTHe initial reader position, i.e. the message at which the reader begins processing messages. The options are pulsar.EarliestMessage (the earliest available message on the topic), pulsar.LatestMessage (the latest available message on the topic), or a MessageID object for a position that isn't earliest or latest.
MessageChannelThe Go channel used by the reader. Messages that arrive from the Pulsar topic(s) will be passed to this channel.
ReceiverQueueSizeSets the size of the reader's receiver queue, i.e. the number of messages that can be accumulated by the reader before the application calls Next. A value higher than the default of 1000 could increase reader throughput, though at the expense of more memory utilization.1000
SubscriptionRolePrefixThe subscription role prefix.reader

Messages

The Pulsar Go client provides a ProducerMessage interface that you can use to construct messages to producer on Pulsar topics. Here's an example message:

  1. msg := pulsar.ProducerMessage{
  2. Payload: []byte("Here is some message data"),
  3. Key: "message-key",
  4. Properties: map[string]string{
  5. "foo": "bar",
  6. },
  7. EventTime: time.Now(),
  8. ReplicationClusters: []string{"cluster1", "cluster3"},
  9. }
  10. if err := producer.send(msg); err != nil {
  11. log.Fatalf("Could not publish message due to: %v", err)
  12. }

The following methods parameters are available for ProducerMessage objects:

ParameterDescription
PayloadThe actual data payload of the message
KeyThe optional key associated with the message (particularly useful for things like topic compaction)
PropertiesA key-value map (both keys and values must be strings) for any application-specific metadata attached to the message
EventTimeThe timestamp associated with the message
ReplicationClustersThe clusters to which this message will be replicated. Pulsar brokers handle message replication automatically; you should only change this setting if you want to override the broker default.

TLS encryption and authentication

In order to use TLS encryption, you'll need to configure your client to do so:

  • Use pulsar+ssl URL type
  • Set TLSTrustCertsFilePath to the path to the TLS certs used by your client and the Pulsar broker
  • Configure Authentication optionHere's an example:
  1. opts := pulsar.ClientOptions{
  2. URL: "pulsar+ssl://my-cluster.com:6651",
  3. TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
  4. Authentication: NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
  5. }