Work with reader

After setting up your clients, you can explore more to start working with readers.

Receive and read messages

A reader is just a consumer without a cursor. This means that Pulsar does not keep track of your progress and there is no need to acknowledge messages.

Here’s an example that begins reading from the earliest available message on a topic.

  • Java
  • C#
  1. import org.apache.pulsar.client.api.Message;
  2. import org.apache.pulsar.client.api.MessageId;
  3. import org.apache.pulsar.client.api.Reader;
  4. // Create a reader on a topic and for a specific message (and onward)
  5. Reader<byte[]> reader = pulsarClient.newReader()
  6. .topic("reader-api-test")
  7. .startMessageId(MessageId.earliest)
  8. .create();
  9. while (true) {
  10. Message message = reader.readNext();
  11. // Process the message
  12. }
  1. await foreach (var message in reader.Messages())
  2. {
  3. Console.WriteLine("Received: " + Encoding.UTF8.GetString(message.Data.ToArray()));
  4. }

Read next message

To create a reader that reads from the latest available message:

  • Java
  • Go
  1. Reader<byte[]> reader = pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.latest)
  4. .create();
  1. import (
  2. "context"
  3. "fmt"
  4. "log"
  5. "github.com/apache/pulsar-client-go/pulsar"
  6. )
  7. func main() {
  8. client, err := pulsar.NewClient(pulsar.ClientOptions{URL: "pulsar://localhost:6650"})
  9. if err != nil {
  10. log.Fatal(err)
  11. }
  12. defer client.Close()
  13. reader, err := client.CreateReader(pulsar.ReaderOptions{
  14. Topic: "topic-1",
  15. StartMessageID: pulsar.EarliestMessageID(),
  16. })
  17. if err != nil {
  18. log.Fatal(err)
  19. }
  20. defer reader.Close()
  21. for reader.HasNext() {
  22. msg, err := reader.Next(context.Background())
  23. if err != nil {
  24. log.Fatal(err)
  25. }
  26. fmt.Printf("Received message msgId: %#v -- content: '%s'\n",
  27. msg.ID(), string(msg.Payload()))
  28. }
  29. }

In the example above, the reader begins reading from the earliest available message (specified by pulsar.EarliestMessage). The reader can also begin reading from the latest message (pulsar.LatestMessage) or some other message ID specified by bytes using the DeserializeMessageID function, which takes a byte array and returns a MessageID object. Here’s an example:

  1. lastSavedId := // Read last saved message id from external store as byte[]
  2. reader, err := client.CreateReader(pulsar.ReaderOptions{
  3. Topic: "my-golang-topic",
  4. StartMessageID: pulsar.DeserializeMessageID(lastSavedId),
  5. })

Read specific messages

To create a reader that reads from some message between the earliest and the latest:

  • Java
  • Go
  1. byte[] msgIdBytes = // Some byte array
  2. MessageId id = MessageId.fromByteArray(msgIdBytes);
  3. Reader<byte[]> reader = pulsarClient.newReader()
  4. .topic(topic)
  5. .startMessageId(id)
  6. .create();
  1. client, err := pulsar.NewClient(pulsar.ClientOptions{
  2. URL: "pulsar://localhost:6650",
  3. })
  4. if err != nil {
  5. log.Fatal(err)
  6. }
  7. defer client.Close()
  8. topic := "topic-1"
  9. ctx := context.Background()
  10. // create producer
  11. producer, err := client.CreateProducer(pulsar.ProducerOptions{
  12. Topic: topic,
  13. DisableBatching: true,
  14. })
  15. if err != nil {
  16. log.Fatal(err)
  17. }
  18. defer producer.Close()
  19. // send 10 messages
  20. msgIDs := [10]pulsar.MessageID{}
  21. for i := 0; i < 10; i++ {
  22. msgID, _ := producer.Send(ctx, &pulsar.ProducerMessage{
  23. Payload: []byte(fmt.Sprintf("hello-%d", i)),
  24. })
  25. msgIDs[i] = msgID
  26. }
  27. // create reader on 5th message (not included)
  28. reader, err := client.CreateReader(pulsar.ReaderOptions{
  29. Topic: topic,
  30. StartMessageID: msgIDs[4],
  31. StartMessageIDInclusive: false,
  32. })
  33. if err != nil {
  34. log.Fatal(err)
  35. }
  36. defer reader.Close()
  37. // receive the remaining 5 messages
  38. for i := 5; i < 10; i++ {
  39. msg, err := reader.Next(context.Background())
  40. if err != nil {
  41. log.Fatal(err)
  42. }
  43. fmt.Printf("Read %d-th msg: %s\n", i, string(msg.Payload()))
  44. }
  45. // create reader on 5th message (included)
  46. readerInclusive, err := client.CreateReader(pulsar.ReaderOptions{
  47. Topic: topic,
  48. StartMessageID: msgIDs[4],
  49. StartMessageIDInclusive: true,
  50. })
  51. if err != nil {
  52. log.Fatal(err)
  53. }
  54. defer readerInclusive.Close()

Configure chunking

Configuring chunking for readers is similar to that for consumers. See configure chunking for consumers for more information.

The following is an example of how to configure message chunking for a reader.

  • Java
  1. Reader<byte[]> reader = pulsarClient.newReader()
  2. .topic(topicName)
  3. .startMessageId(MessageId.earliest)
  4. .maxPendingChunkedMessage(12)
  5. .autoAckOldestChunkedMessageOnQueueFull(true)
  6. .expireTimeOfIncompleteChunkedMessage(12, TimeUnit.MILLISECONDS)
  7. .create();

Intercept messages

Pulsar reader interceptor intercepts and possibly mutates messages with user-defined processing before Pulsar reader reads them. With reader interceptors, you can apply unified messaging processes before messages can be read, such as modifying messages, adding properties, collecting statistics and etc, without creating similar mechanisms respectively.

Reader interceptor in Pulsar

Pulsar reader interceptor works on top of Pulsar consumer interceptor. The plugin interface ReaderInterceptor can be treated as a subset of ConsumerInterceptor and it has two main events.

  • beforeRead is triggered before readers read messages. You can modify messages within this event.
  • onPartitionsChange is triggered when changes on partitions have been detected.

To perceive triggered events and perform customized processing, you can add ReaderInterceptor when creating a Reader as follows.

  • Java
  1. PulsarClient pulsarClient = PulsarClient.builder().serviceUrl("pulsar://localhost:6650").build();
  2. Reader<byte[]> reader = pulsarClient.newReader()
  3. .topic("t1")
  4. .autoUpdatePartitionsInterval(5, TimeUnit.SECONDS)
  5. .intercept(new ReaderInterceptor<byte[]>() {
  6. @Override
  7. public void close() {
  8. }
  9. @Override
  10. public Message<byte[]> beforeRead(Reader<byte[]> reader, Message<byte[]> message) {
  11. // user-defined processing logic
  12. return message;
  13. }
  14. @Override
  15. public void onPartitionsChange(String topicName, int partitions) {
  16. // user-defined processing logic
  17. }
  18. })
  19. .startMessageId(MessageId.earliest)
  20. .create();

Sticky key range reader

In a sticky key range reader, broker only dispatches messages which hash of the message key contains by the specified key hash range. Multiple key hash ranges can be specified on a reader.

The following is an example to create a sticky key range reader.

  • Java
  1. pulsarClient.newReader()
  2. .topic(topic)
  3. .startMessageId(MessageId.earliest)
  4. .keyHashRange(Range.of(0, 10000), Range.of(20001, 30000))
  5. .create();

The total hash range size is 65536, so the max end of the range should be less than or equal to 65535.