Pulsar Go client

You can use a Pulsar Go client to create Pulsar producers, consumers, and readers in Golang. For Pulsar features that Go clients support, see Client Feature Matrix.

Installation

You can install the pulsar library by using either go get or go module.

Use go get

  1. Download the library of Go client to your local environment:

    1. go get -u "github.com/apache/pulsar-client-go/pulsar"
  2. Import it into your project:

    1. import "github.com/apache/pulsar-client-go/pulsar"

Use go module

  1. Create a directory named test_dir and change your working directory to it.

    1. mkdir test_dir && cd test_dir
  2. Write a sample script (such as test_example.go) in the test_dir directory and write package main at the beginning of the file.

    1. go mod init test_dir
    2. go mod tidy && go mod download
    3. go build test_example.go
    4. ./test_example

Connection URLs

To connect to Pulsar using client libraries, you need to specify a Pulsar protocol URL.

You can assign Pulsar protocol URLs to specific clusters and use the pulsar scheme. The following is an example of localhost with the default port 6650:

  1. pulsar://localhost:6650

If you have multiple brokers, separate IP:port by commas:

  1. pulsar://localhost:6550,localhost:6651,localhost:6652

If you use mTLS authentication, add +ssl in the scheme:

  1. pulsar+ssl://pulsar.us-west.example.com:6651

API reference

API docs are available on the Godoc page.

Release notes

For the changelog of Pulsar Go clients, see release notes.

Create a client

To interact with Pulsar, you need a Client object first. You can create a client object using the NewClient function, passing in a ClientOptions object. Here’s an example:

  1. import (
  2. "log"
  3. "time"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{
  8. URL: "pulsar://localhost:6650",
  9. OperationTimeout: 30 * time.Second,
  10. ConnectionTimeout: 30 * time.Second,
  11. })
  12. if err != nil {
  13. log.Fatalf("Could not instantiate Pulsar client: %v", err)
  14. }
  15. defer client.Close()
  16. }

If you have multiple brokers, you can initiate a client object as below.

  1. import (
  2. "log"
  3. "time"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{
  8. URL: "pulsar://localhost:6650,localhost:6651,localhost:6652",
  9. OperationTimeout: 30 * time.Second,
  10. ConnectionTimeout: 30 * time.Second,
  11. })
  12. if err != nil {
  13. log.Fatalf("Could not instantiate Pulsar client: %v", err)
  14. }
  15. defer client.Close()
  16. }

All configurable parameters for ClientOptions are here.

Producers

Pulsar producers publish messages to Pulsar topics. You can configure Go producers using a ProducerOptions object. Here’s an example:

  1. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  2. Topic: "my-topic",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
  8. Payload: []byte("hello"),
  9. })
  10. defer producer.Close()
  11. if err != nil {
  12. fmt.Println("Failed to publish message", err)
  13. }
  14. fmt.Println("Published message")

Producer operations

All available methods of Producer interface are here.

Producer Example

How to use message router in producer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  9. Topic: "my-partitioned-topic",
  10. MessageRouter: func(msg *pulsar.ProducerMessage, tm pulsar.TopicMetadata) int {
  11. fmt.Println("Topic has", tm.NumPartitions(), "partitions. Routing message ", msg, " to partition 2.")
  12. // always push msg to partition 2
  13. return 2
  14. },
  15. })
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. defer producer.Close()
  20. for i := 0; i < 10; i++ {
  21. if msgId, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
  22. Payload: []byte(fmt.Sprintf("message-%d", i)),
  23. }); err != nil {
  24. log.Fatal(err)
  25. } else {
  26. log.Println("Published message: ", msgId)
  27. }
  28. }
  29. // subscribe a specific partition of a topic
  30. // for demos only, not recommend to subscribe a specific partition
  31. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  32. // pulsar partition is a special topic has the suffix '-partition-xx'
  33. Topic: "my-partitioned-topic-partition-2",
  34. SubscriptionName: "my-sub",
  35. Type: pulsar.Shared,
  36. })
  37. if err != nil {
  38. log.Fatal(err)
  39. }
  40. defer consumer.Close()
  41. for i := 0; i < 10; i++ {
  42. msg, err := consumer.Receive(context.Background())
  43. if err != nil {
  44. log.Fatal(err)
  45. }
  46. fmt.Printf("Received message msgId: %#v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
  47. consumer.Ack(msg)
  48. }

How to use chunking in producer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: serviceURL,
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. // The message chunking feature is OFF by default.
  9. // By default, a producer chunks the large message based on the max message size (`maxMessageSize`) configured at the broker side (for example, 5MB).
  10. // Client can also configure the max chunked size using the producer configuration `ChunkMaxMessageSize`.
  11. // Note: to enable chunking, you need to disable batching (`DisableBatching=true`) concurrently.
  12. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  13. Topic: "my-topic",
  14. DisableBatching: true,
  15. EnableChunking: true,
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer producer.Close()

How to use schema interface in producer

  1. type testJSON struct {
  2. ID int `json:"id"`
  3. Name string `json:"name"`
  4. }
  5. var (
  6. exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
  7. "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
  8. )
  9. client, err := pulsar.NewClient(pulsar.ClientOptions{
  10. URL: "pulsar://localhost:6650",
  11. })
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer client.Close()
  16. properties := make(map[string]string)
  17. properties["pulsar"] = "hello"
  18. jsonSchemaWithProperties := pulsar.NewJSONSchema(exampleSchemaDef, properties)
  19. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  20. Topic: "jsonTopic",
  21. Schema: jsonSchemaWithProperties,
  22. })
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
  27. Value: &testJSON{
  28. ID: 100,
  29. Name: "pulsar",
  30. },
  31. })
  32. if err != nil {
  33. log.Fatal(err)
  34. }
  35. producer.Close()

How to use delay relative in producer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topicName := "topic-1"
  9. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  10. Topic: topicName,
  11. DisableBatching: true,
  12. })
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. defer producer.Close()
  17. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  18. Topic: topicName,
  19. SubscriptionName: "subName",
  20. Type: pulsar.Shared,
  21. })
  22. if err != nil {
  23. log.Fatal(err)
  24. }
  25. defer consumer.Close()
  26. ID, err := producer.Send(context.Background(), &pulsar.ProducerMessage{
  27. Payload: []byte(fmt.Sprintf("test")),
  28. DeliverAfter: 3 * time.Second,
  29. })
  30. if err != nil {
  31. log.Fatal(err)
  32. }
  33. fmt.Println(ID)
  34. ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
  35. msg, err := consumer.Receive(ctx)
  36. if err != nil {
  37. log.Fatal(err)
  38. }
  39. fmt.Println(msg.Payload())
  40. cancel()
  41. ctx, cancel = context.WithTimeout(context.Background(), 5*time.Second)
  42. msg, err = consumer.Receive(ctx)
  43. if err != nil {
  44. log.Fatal(err)
  45. }
  46. fmt.Println(msg.Payload())
  47. cancel()

How to use Prometheus metrics in producer

Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.

  1. Write a simple producer application.
  1. // Create a Pulsar client
  2. client, err := pulsar.NewClient(pulsar.ClientOptions{
  3. URL: "pulsar://localhost:6650",
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer client.Close()
  9. // Start a separate goroutine for Prometheus metrics
  10. // In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
  11. go func() {
  12. prometheusPort := 2112
  13. log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
  14. http.Handle("/metrics", promhttp.Handler())
  15. err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. }()
  20. // Create a producer
  21. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  22. Topic: "topic-1",
  23. })
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. defer producer.Close()
  28. ctx := context.Background()
  29. // Write your business logic here
  30. // In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce
  31. webPort := 8082
  32. http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) {
  33. msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
  34. Payload: []byte(fmt.Sprintf("hello world")),
  35. })
  36. if err != nil {
  37. log.Fatal(err)
  38. } else {
  39. log.Printf("Published message: %v", msgId)
  40. fmt.Fprintf(w, "Published message: %v", msgId)
  41. }
  42. })
  43. err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
  44. if err != nil {
  45. log.Fatal(err)
  46. }
  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).
  1. scrape_configs:
  2. - job_name: pulsar-client-go-metrics
  3. scrape_interval: 10s
  4. static_configs:
  5. - targets:
  6. - localhost:2112

Now you can query Pulsar client metrics on Prometheus.

Producer configuration

All available options of ProducerOptions are here.

Consumers

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object. Here’s a basic example that uses channels:

  1. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  2. Topic: "topic-1",
  3. SubscriptionName: "my-sub",
  4. Type: pulsar.Shared,
  5. })
  6. if err != nil {
  7. log.Fatal(err)
  8. }
  9. defer consumer.Close()
  10. for i := 0; i < 10; i++ {
  11. // may block here
  12. msg, err := consumer.Receive(context.Background())
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  17. msg.ID(), string(msg.Payload()))
  18. consumer.Ack(msg)
  19. }
  20. if err := consumer.Unsubscribe(); err != nil {
  21. log.Fatal(err)
  22. }

Consumer operations

All available methods of Consumer interface are here.

Create single-topic consumer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer client.Close()
  6. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  7. // fill `Topic` field will create a single-topic consumer
  8. Topic: "topic-1",
  9. SubscriptionName: "my-sub",
  10. Type: pulsar.Shared,
  11. })
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer consumer.Close()

Create regex-topic consumer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. defer client.Close()
  5. topicsPattern := "persistent://public/default/topic.*"
  6. opts := pulsar.ConsumerOptions{
  7. // fill `TopicsPattern` field will create a regex consumer
  8. TopicsPattern: topicsPattern,
  9. SubscriptionName: "regex-sub",
  10. }
  11. consumer, err := client.Subscribe(opts)
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer consumer.Close()

Create multi-topic consumer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. topics := []string{"topic-1", "topic-2"}
  8. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  9. // fill `Topics` field will create a multi-topic consumer
  10. Topics: topics,
  11. SubscriptionName: "multi-topic-sub",
  12. })
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. defer consumer.Close()

Create consumer listener

  1. import (
  2. "fmt"
  3. "log"
  4. "github.com/apache/pulsar-client-go/pulsar"
  5. )
  6. func main() {
  7. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  8. if err != nil {
  9. log.Fatal(err)
  10. }
  11. defer client.Close()
  12. // we can listen this channel
  13. channel := make(chan pulsar.ConsumerMessage, 100)
  14. options := pulsar.ConsumerOptions{
  15. Topic: "topic-1",
  16. SubscriptionName: "my-subscription",
  17. Type: pulsar.Shared,
  18. // fill `MessageChannel` field will create a listener
  19. MessageChannel: channel,
  20. }
  21. consumer, err := client.Subscribe(options)
  22. if err != nil {
  23. log.Fatal(err)
  24. }
  25. defer consumer.Close()
  26. // Receive messages from channel. The channel returns a struct `ConsumerMessage` which contains message and the consumer from where
  27. // the message was received. It's not necessary here since we have 1 single consumer, but the channel could be
  28. // shared across multiple consumers as well
  29. for cm := range channel {
  30. consumer := cm.Consumer
  31. msg := cm.Message
  32. fmt.Printf("Consumer %s received a message, msgId: %v, content: '%s'\n",
  33. consumer.Name(), msg.ID(), string(msg.Payload()))
  34. consumer.Ack(msg)
  35. }
  36. }

Receive message with timeout

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "test-topic-with-no-messages"
  9. ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
  10. defer cancel()
  11. // create consumer
  12. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  13. Topic: topic,
  14. SubscriptionName: "my-sub1",
  15. Type: pulsar.Shared,
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer consumer.Close()
  21. // receive message with a timeout
  22. msg, err := consumer.Receive(ctx)
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Println(msg.Payload())

Use schema in consumer

  1. type testJSON struct {
  2. ID int `json:"id"`
  3. Name string `json:"name"`
  4. }
  5. var (
  6. exampleSchemaDef = "{\"type\":\"record\",\"name\":\"Example\",\"namespace\":\"test\"," +
  7. "\"fields\":[{\"name\":\"ID\",\"type\":\"int\"},{\"name\":\"Name\",\"type\":\"string\"}]}"
  8. )
  9. client, err := pulsar.NewClient(pulsar.ClientOptions{
  10. URL: "pulsar://localhost:6650",
  11. })
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer client.Close()
  16. var s testJSON
  17. consumerJS := pulsar.NewJSONSchema(exampleSchemaDef, nil)
  18. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  19. Topic: "jsonTopic",
  20. SubscriptionName: "sub-1",
  21. Schema: consumerJS,
  22. SubscriptionInitialPosition: pulsar.SubscriptionPositionEarliest,
  23. })
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. msg, err := consumer.Receive(context.Background())
  28. if err != nil {
  29. log.Fatal(err)
  30. }
  31. err = msg.GetSchemaValue(&s)
  32. if err != nil {
  33. log.Fatal(err)
  34. }
  35. defer consumer.Close()

How to use Prometheus metrics in consumer

In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.

  1. Write a simple consumer application.
  1. // Create a Pulsar client
  2. client, err := pulsar.NewClient(pulsar.ClientOptions{
  3. URL: "pulsar://localhost:6650",
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer client.Close()
  9. // Start a separate goroutine for Prometheus metrics
  10. // In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
  11. go func() {
  12. prometheusPort := 2112
  13. log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
  14. http.Handle("/metrics", promhttp.Handler())
  15. err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. }()
  20. // Create a consumer
  21. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  22. Topic: "topic-1",
  23. SubscriptionName: "sub-1",
  24. Type: pulsar.Shared,
  25. })
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. defer consumer.Close()
  30. ctx := context.Background()
  31. // Write your business logic here
  32. // In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume
  33. webPort := 8083
  34. http.HandleFunc("/consume", func(w http.ResponseWriter, r *http.Request) {
  35. msg, err := consumer.Receive(ctx)
  36. if err != nil {
  37. log.Fatal(err)
  38. } else {
  39. log.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
  40. fmt.Fprintf(w, "Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
  41. consumer.Ack(msg)
  42. }
  43. })
  44. err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
  45. if err != nil {
  46. log.Fatal(err)
  47. }
  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).
  1. scrape_configs:
  2. - job_name: pulsar-client-go-metrics
  3. scrape_interval: 10s
  4. static_configs:
  5. - targets:
  6. - localhost: 2112

Now you can query Pulsar client metrics on Prometheus.

Consumer configuration

All available options of ConsumerOptions are here.

Readers

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. Here’s an example:

  1. reader, err := client.CreateReader(pulsar.ReaderOptions{
  2. Topic: "topic-1",
  3. StartMessageID: pulsar.EarliestMessageID(),
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer reader.Close()

Reader operations

All available methods of the Reader interface are here.

Reader example

Here’s an example usage of a Go reader that uses the Next() method to process incoming messages:

  1. import (
  2. "context"
  3. "fmt"
  4. "log"
  5. "github.com/apache/pulsar-client-go/pulsar"
  6. )
  7. func main() {
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer client.Close()
  13. reader, err := client.CreateReader(pulsar.ReaderOptions{
  14. Topic: "topic-1",
  15. StartMessageID: pulsar.EarliestMessageID(),
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer reader.Close()
  21. for reader.HasNext() {
  22. msg, err := reader.Next(context.Background())
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  27. msg.ID(), string(msg.Payload()))
  28. }
  29. }

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. Here’s an example:

  1. lastSavedId := // Read last saved message id from external store as byte[]
  2. reader, err := client.CreateReader(pulsar.ReaderOptions{
  3. Topic: "my-golang-topic",
  4. StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
  5. })

Use reader to read specific message

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "topic-1"
  9. ctx := context.Background()
  10. // create producer
  11. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  12. Topic: topic,
  13. DisableBatching: true,
  14. })
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. defer producer.Close()
  19. // send 10 messages
  20. msgIDs := [10]pulsar.MessageID{}
  21. for i := 0; i < 10; i++ {
  22. msgID, _ := producer.Send(ctx, &pulsar.ProducerMessage{
  23. Payload: []byte(fmt.Sprintf("hello-%d", i)),
  24. })
  25. msgIDs[i] = msgID
  26. }
  27. // create reader on 5th message (not included)
  28. reader, err := client.CreateReader(pulsar.ReaderOptions{
  29. Topic: topic,
  30. StartMessageID: msgIDs[4],
  31. StartMessageIDInclusive: false,
  32. })
  33. if err != nil {
  34. log.Fatal(err)
  35. }
  36. defer reader.Close()
  37. // receive the remaining 5 messages
  38. for i := 5; i < 10; i++ {
  39. msg, err := reader.Next(context.Background())
  40. if err != nil {
  41. log.Fatal(err)
  42. }
  43. fmt.Printf("Read %d-th msg: %s\n", i, string(msg.Payload()))
  44. }
  45. // create reader on 5th message (included)
  46. readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
  47. Topic: topic,
  48. StartMessageID: msgIDs[4],
  49. StartMessageIDInclusive: true,
  50. })
  51. if err != nil {
  52. log.Fatal(err)
  53. }
  54. defer readerInclusive.Close()

Reader configuration

All available options of ReaderOptions are here.

Messages

The Pulsar Go client provides a ProducerMessage interface that you can use to construct messages to producers on Pulsar topics. Here’s an example message:

  1. msg := pulsar.ProducerMessage{
  2. Payload: []byte("Here is some message data"),
  3. Key: "message-key",
  4. Properties: map[string]string{
  5. "foo": "bar",
  6. },
  7. EventTime: time.Now(),
  8. ReplicationClusters: []string{"cluster1", "cluster3"},
  9. }
  10. if _, err := producer.send(msg); err != nil {
  11. log.Fatalf("Could not publish message due to: %v", err)
  12. }

All methods of ProducerMessage object are here.

TLS encryption and authentication

To use TLS encryption and mTLS authentication, you need to configure your client to do so:

  • Use pulsar+ssl URL type
  • Set TLSTrustCertsFilePath to the path of the TLS certs used by your client and the Pulsar broker
  • Configure Authentication option

Here’s an example:

  1. opts := pulsar.ClientOptions{
  2. URL: "pulsar+ssl://my-cluster.com:6651",
  3. TLSTrustCertsFilePath: "/path/to/certs/my-cert.csr",
  4. Authentication: pulsar.NewAuthenticationTLS("my-cert.pem", "my-key.pem"),
  5. }

OAuth2 authentication

To use OAuth2 authentication, you need to configure your client to perform the following operations.

This example shows how to configure OAuth2 authentication.

  1. oauth := pulsar.NewAuthenticationOAuth2(map[string]string{
  2. "type": "client_credentials",
  3. "issuerUrl": "https://dev-kt-aa9ne.us.auth0.com",
  4. "audience": "https://dev-kt-aa9ne.us.auth0.com/api/v2/",
  5. "privateKey": "/path/to/privateKey",
  6. "clientId": "0Xx...Yyxeny",
  7. })
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{
  9. URL: "pulsar://my-cluster:6650",
  10. Authentication: oauth,
  11. })