Use Pulsar Go client

Create a producer

You can configure Go producers using a ProducerOptions object. Here’s an example:

  1. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  2. Topic: "my-topic",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. _, err = producer.Send(context.Background(), &pulsar.ProducerMessage{
  8. Payload: []byte("hello"),
  9. })
  10. defer producer.Close()
  11. if err != nil {
  12. fmt.Println("Failed to publish message", err)
  13. }
  14. fmt.Println("Published message")

For all available methods of Producer interface, see here.

Monitor

Pulsar Go client registers client metrics using Prometheus. This section demonstrates how to create a simple Pulsar producer application that exposes Prometheus metrics via HTTP.

  1. Write a simple producer application.
  1. // Create a Pulsar client
  2. client, err := pulsar.NewClient(pulsar.ClientOptions{
  3. URL: "pulsar://localhost:6650",
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer client.Close()
  9. // Start a separate goroutine for Prometheus metrics
  10. // In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
  11. go func() {
  12. prometheusPort := 2112
  13. log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
  14. http.Handle("/metrics", promhttp.Handler())
  15. err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. }()
  20. // Create a producer
  21. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  22. Topic: "topic-1",
  23. })
  24. if err != nil {
  25. log.Fatal(err)
  26. }
  27. defer producer.Close()
  28. ctx := context.Background()
  29. // Write your business logic here
  30. // In this case, you build a simple Web server. You can produce messages by requesting http://localhost:8082/produce
  31. webPort := 8082
  32. http.HandleFunc("/produce", func(w http.ResponseWriter, r *http.Request) {
  33. msgId, err := producer.Send(ctx, &pulsar.ProducerMessage{
  34. Payload: []byte(fmt.Sprintf("hello world")),
  35. })
  36. if err != nil {
  37. log.Fatal(err)
  38. } else {
  39. log.Printf("Published message: %v", msgId)
  40. fmt.Fprintf(w, "Published message: %v", msgId)
  41. }
  42. })
  43. err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
  44. if err != nil {
  45. log.Fatal(err)
  46. }
  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).
  1. scrape_configs:
  2. - job_name: pulsar-client-go-metrics
  3. scrape_interval: 10s
  4. static_configs:
  5. - targets:
  6. - localhost:2112

Create a consumer

Pulsar consumers subscribe to one or more Pulsar topics and listen for incoming messages produced on that topic/those topics. You can configure Go consumers using a ConsumerOptions object.

Here’s a basic example that uses channels:

  1. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  2. Topic: "topic-1",
  3. SubscriptionName: "my-sub",
  4. Type: pulsar.Shared,
  5. })
  6. if err != nil {
  7. log.Fatal(err)
  8. }
  9. defer consumer.Close()
  10. for i := 0; i < 10; i++ {
  11. // may block here
  12. msg, err := consumer.Receive(context.Background())
  13. if err != nil {
  14. log.Fatal(err)
  15. }
  16. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  17. msg.ID(), string(msg.Payload()))
  18. consumer.Ack(msg)
  19. }
  20. if err := consumer.Unsubscribe(); err != nil {
  21. log.Fatal(err)
  22. }

For all available methods of Consumer interface, see here.

Create a single-topic consumer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  2. if err != nil {
  3. log.Fatal(err)
  4. }
  5. defer client.Close()
  6. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  7. // fill `Topic` field will create a single-topic consumer
  8. Topic: "topic-1",
  9. SubscriptionName: "my-sub",
  10. Type: pulsar.Shared,
  11. })
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer consumer.Close()

Create a regex-topic consumer

  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. defer client.Close()
  5. topicsPattern := "persistent://public/default/topic.*"
  6. opts := pulsar.ConsumerOptions{
  7. // fill `TopicsPattern` field will create a regex consumer
  8. TopicsPattern: topicsPattern,
  9. SubscriptionName: "regex-sub",
  10. }
  11. consumer, err := client.Subscribe(opts)
  12. if err != nil {
  13. log.Fatal(err)
  14. }
  15. defer consumer.Close()

Monitor

In this guide, This section demonstrates how to create a simple Pulsar consumer application that exposes Prometheus metrics via HTTP.

  1. Write a simple consumer application.
  1. // Create a Pulsar client
  2. client, err := pulsar.NewClient(pulsar.ClientOptions{
  3. URL: "pulsar://localhost:6650",
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer client.Close()
  9. // Start a separate goroutine for Prometheus metrics
  10. // In this case, Prometheus metrics can be accessed via http://localhost:2112/metrics
  11. go func() {
  12. prometheusPort := 2112
  13. log.Printf("Starting Prometheus metrics at http://localhost:%v/metrics\n", prometheusPort)
  14. http.Handle("/metrics", promhttp.Handler())
  15. err = http.ListenAndServe(":"+strconv.Itoa(prometheusPort), nil)
  16. if err != nil {
  17. log.Fatal(err)
  18. }
  19. }()
  20. // Create a consumer
  21. consumer, err := client.Subscribe(pulsar.ConsumerOptions{
  22. Topic: "topic-1",
  23. SubscriptionName: "sub-1",
  24. Type: pulsar.Shared,
  25. })
  26. if err != nil {
  27. log.Fatal(err)
  28. }
  29. defer consumer.Close()
  30. ctx := context.Background()
  31. // Write your business logic here
  32. // In this case, you build a simple Web server. You can consume messages by requesting http://localhost:8083/consume
  33. webPort := 8083
  34. http.HandleFunc("/consume", func(w http.ResponseWriter, r *http.Request) {
  35. msg, err := consumer.Receive(ctx)
  36. if err != nil {
  37. log.Fatal(err)
  38. } else {
  39. log.Printf("Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
  40. fmt.Fprintf(w, "Received message msgId: %v -- content: '%s'\n", msg.ID(), string(msg.Payload()))
  41. consumer.Ack(msg)
  42. }
  43. })
  44. err = http.ListenAndServe(":"+strconv.Itoa(webPort), nil)
  45. if err != nil {
  46. log.Fatal(err)
  47. }
  1. To scrape metrics from applications, configure a local running Prometheus instance using a configuration file (prometheus.yml).
  1. scrape_configs:
  2. - job_name: pulsar-client-go-metrics
  3. scrape_interval: 10s
  4. static_configs:
  5. - targets:
  6. - localhost: 2112

Create a reader

Pulsar readers process messages from Pulsar topics. Readers are different from consumers because with readers you need to explicitly specify which message in the stream you want to begin with (consumers, on the other hand, automatically begin with the most recent unacked message). You can configure Go readers using a ReaderOptions object. Here’s an example:

  1. reader, err := client.CreateReader(pulsar.ReaderOptions{
  2. Topic: "topic-1",
  3. StartMessageID: pulsar.EarliestMessageID(),
  4. })
  5. if err != nil {
  6. log.Fatal(err)
  7. }
  8. defer reader.Close()

For all available methods of the Reader interface, see here.