Consume Records with Subscriptions

What is a Subscription?

To consume data from a stream, you must create a subscription to the stream. When initiated, every subscription will retrieve the data from the beginning. Consumers which receive and process records connect to a stream through a subscription. A stream can have multiple subscriptions, but a given subscription belongs to a single stream. Similarly, a subscription corresponds to one consumer group with multiple consumers. However, every consumer belongs to only a single subscription.

Please refer to this page for detailed information about creating and managing your subscriptions.

How to consume data with a subscription

To consume data appended to a stream, HStreamDB Clients libraries have provided asynchronous consumer API, which will initiate requests to join the consumer group of the subscription specified.

Two HStream Record types and corresponding receivers

As we explained, there are two types of records in HStreamDB, HRecord and RawRecord. When initiating a consumer, corresponding receivers are required. In the case where only HRecord Receiver is set, when the consumer received a raw record, the consumer will ignore it and consume the next record. Therefore, in principle, we do not recommend writing both HRecord and RawRecord in the same stream. However, this is not strictly forbidden in implementation, and you can provide both receivers to process both types of records.

Simple Consumer Example

To get higher throughput for your application, we provide asynchronous fetching that does not require your application to block for new messages. Messages can be received in your application using a long-running message receiver and acknowledged one at a time, as shown in the example below.

  1. // ConsumeDataSimpleExample.java
  2. package docs.code.examples;
  3. import static java.util.concurrent.TimeUnit.SECONDS;
  4. import io.hstream.Consumer;
  5. import io.hstream.HRecordReceiver;
  6. import io.hstream.HStreamClient;
  7. import java.util.concurrent.TimeoutException;
  8. public class ConsumeDataSimpleExample {
  9. public static void main(String[] args) throws Exception {
  10. String serviceUrl = "127.0.0.1:6570";
  11. if (System.getenv("serviceUrl") != null) {
  12. serviceUrl = System.getenv("serviceUrl");
  13. }
  14. String subscriptionId = "your_subscription_id";
  15. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  16. consumeDataFromSubscriptionExample(client, subscriptionId);
  17. client.close();
  18. }
  19. public static void consumeDataFromSubscriptionExample(
  20. HStreamClient client, String subscriptionId) {
  21. HRecordReceiver receiver =
  22. ((hRecord, responder) -> {
  23. System.out.println("Received a record :" + hRecord.getHRecord());
  24. responder.ack();
  25. });
  26. // Consumer is a Service(ref:
  27. // https://guava.dev/releases/19.0/api/docs/com/google/common/util/concurrent/Service.html)
  28. Consumer consumer =
  29. client
  30. .newConsumer()
  31. .subscription(subscriptionId)
  32. // optional, if it is not set, client will generate a unique id.
  33. .name("consumer_1")
  34. .hRecordReceiver(receiver)
  35. .build();
  36. // start Consumer as a background service and return
  37. consumer.startAsync().awaitRunning();
  38. try {
  39. // sleep 5s for consuming records
  40. consumer.awaitTerminated(5, SECONDS);
  41. } catch (TimeoutException e) {
  42. // stop consumer
  43. consumer.stopAsync().awaitTerminated();
  44. }
  45. }
  46. }
  1. // ExampleConsumer.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. "time"
  7. )
  8. func ExampleConsumer() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. subId := "SubscriptionId0"
  15. consumer := client.NewConsumer("consumer-1", subId)
  16. defer consumer.Stop()
  17. dataChan := consumer.StartFetch()
  18. timer := time.NewTimer(3 * time.Second)
  19. defer timer.Stop()
  20. for {
  21. select {
  22. case <-timer.C:
  23. log.Println("[consumer]: Streaming fetch stopped")
  24. return nil
  25. case recordMsg := <-dataChan:
  26. if recordMsg.Err != nil {
  27. log.Printf("[consumer]: Streaming fetch error: %s", err)
  28. continue
  29. }
  30. for _, record := range recordMsg.Result {
  31. log.Printf("[consumer]: Receive %s record: record id = %s, payload = %+v",
  32. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
  33. record.Ack()
  34. }
  35. }
  36. }
  37. return nil
  38. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. class Processing:
  16. count = 0
  17. max_count: int
  18. def __init__(self, max_count):
  19. self.max_count = max_count
  20. async def __call__(self, ack_fun, stop_fun, rs_iter):
  21. print("max_count", self.max_count)
  22. rs = list(rs_iter)
  23. for r in rs:
  24. self.count += 1
  25. print(f"[{self.count}] Receive: {r}")
  26. if self.max_count > 0 and self.count >= self.max_count:
  27. await stop_fun()
  28. break
  29. await ack_fun(r.id for r in rs)
  30. async def subscribe_records(client):
  31. consumer = client.new_consumer("new_consumer", subscription, Processing(10))
  32. await consumer.start()

For better performance, Batched Ack is enabled by default with settings ackBufferSize = 100 and ackAgeLimit = 100, which you can change when initiating your consumers.

  1. Consumer consumer =
  2. client
  3. .newConsumer()
  4. .subscription("you_subscription_id")
  5. .name("your_consumer_name")
  6. .hRecordReceiver(your_receiver)
  7. // When ack() is called, the consumer will not send it to servers immediately,
  8. // the ack request will be buffered until the ack count reaches ackBufferSize
  9. // or the consumer is stopping or reached ackAgelimit
  10. .ackBufferSize(100)
  11. .ackAgeLimit(100)
  12. .build();

Multiple consumers and shared consumption progress

In HStream, a subscription is consumed by a consumer group. In this consumer group, there could be multiple consumers which share the subscription’s progress. To increase the rate of consuming data from a subscription, we could have a new consumer join the existing subscription. The code is for demonstration of how consumers can join the consumer group. Usually, the case is that users would have consumers from different clients.

  1. // ConsumeDataSharedExample.java
  2. package docs.code.examples;
  3. import static java.util.concurrent.TimeUnit.SECONDS;
  4. import io.hstream.Consumer;
  5. import io.hstream.HRecordReceiver;
  6. import io.hstream.HStreamClient;
  7. import java.util.concurrent.TimeoutException;
  8. public class ConsumeDataSharedExample {
  9. public static void main(String[] args) throws Exception {
  10. String serviceUrl = "127.0.0.1:6570";
  11. if (System.getenv("serviceUrl") != null) {
  12. serviceUrl = System.getenv("serviceUrl");
  13. }
  14. String subscription = "your_subscription_id";
  15. String consumer1 = "your_consumer1_name";
  16. String consumer2 = "your_consumer2-name";
  17. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  18. // create two consumers to consume records with several partition keys.
  19. Thread t1 =
  20. new Thread(() -> consumeDataFromSubscriptionSharedExample(client, subscription, consumer1));
  21. Thread t2 =
  22. new Thread(() -> consumeDataFromSubscriptionSharedExample(client, subscription, consumer2));
  23. t1.start();
  24. t2.start();
  25. t1.join();
  26. t2.join();
  27. client.close();
  28. }
  29. public static void consumeDataFromSubscriptionSharedExample(
  30. HStreamClient client, String subscription, String consumerName) {
  31. HRecordReceiver receiver =
  32. ((hRecord, responder) -> {
  33. System.out.println("Received a record :" + hRecord.getHRecord());
  34. responder.ack();
  35. });
  36. Consumer consumer =
  37. client
  38. .newConsumer()
  39. .subscription(subscription)
  40. .name(consumerName)
  41. .hRecordReceiver(receiver)
  42. .build();
  43. try {
  44. // sleep 5s for consuming records
  45. consumer.startAsync().awaitRunning();
  46. consumer.awaitTerminated(5, SECONDS);
  47. } catch (TimeoutException e) {
  48. // stop consumer
  49. consumer.stopAsync().awaitTerminated();
  50. }
  51. }
  52. }
  1. // ExampleConsumerGroup.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. "sync"
  7. "time"
  8. )
  9. func ExampleConsumerGroup() error {
  10. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  11. if err != nil {
  12. log.Fatalf("Creating client error: %s", err)
  13. }
  14. defer client.Close()
  15. subId1 := "SubscriptionId1"
  16. var wg sync.WaitGroup
  17. wg.Add(2)
  18. go func() {
  19. consumer := client.NewConsumer("consumer-1", subId1)
  20. defer consumer.Stop()
  21. timer := time.NewTimer(5 * time.Second)
  22. defer timer.Stop()
  23. defer wg.Done()
  24. dataChan := consumer.StartFetch()
  25. for {
  26. select {
  27. case <-timer.C:
  28. log.Println("[consumer-1]: Stream fetching stopped")
  29. return
  30. case recordMsg := <-dataChan:
  31. if recordMsg.Err != nil {
  32. log.Printf("[consumer-1]: Stream fetching error: %s", err)
  33. continue
  34. }
  35. for _, record := range recordMsg.Result {
  36. log.Printf("[consumer-1]: Receive %s record: record id = %s, payload = %+v",
  37. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
  38. record.Ack()
  39. }
  40. }
  41. }
  42. }()
  43. go func() {
  44. consumer := client.NewConsumer("consumer-2", subId1)
  45. defer consumer.Stop()
  46. timer := time.NewTimer(5 * time.Second)
  47. defer timer.Stop()
  48. defer wg.Done()
  49. dataChan := consumer.StartFetch()
  50. for {
  51. select {
  52. case <-timer.C:
  53. log.Println("[consumer-2]: Stream fetching stopped")
  54. return
  55. case recordMsg := <-dataChan:
  56. if recordMsg.Err != nil {
  57. log.Printf("[consumer-2]: Stream fetching error: %s", err)
  58. continue
  59. }
  60. for _, record := range recordMsg.Result {
  61. log.Printf("[consumer-2]: Receive %s record: record id = %s, payload = %+v",
  62. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
  63. record.Ack()
  64. }
  65. }
  66. }
  67. }()
  68. wg.Wait()
  69. return nil
  70. }

Flow Control with maxUnackedRecords

A common scenario is that your consumers may not process and acknowledge data as fast as the server sends, or some unexpected problems causing the consumer client to be unable to acknowledge the data received, which could cause problems as such:

The server would have to keep resending unacknowledged messages, and maintain the information about unacknowledged messages, which would consume resources of the server, and cause the server to face the issue of resource exhaustion.

To mitigate the issue above, use the maxUnackedRecords setting of the subscription to control the maximum number of allowed un-acknowledged records when the consumers receive messages. Once the number exceeds the maxUnackedRecords, the server will stop sending messages to consumers of the current subscription.

Receiving messages in order

Note: the order described below is just for a single consumer. If a subscription has multiple consumers, the order can still be guaranteed in each, but the order is no longer preserved if we see the consumer group as an entity.

Consumers will receive messages with the same partition key in the order that the HStream server receives them. Since HStream delivers hstream records with at-least-once semantics, in some cases, when HServer does not receive the ack for some record in the middle, it might deliver the record more than once. In these cases, we can not guarantee the order either.

Handling errors

When a consumer is running, and failure happens at the receiver, the default behaviour is that the consumer will catch the exception, print an error log, and continue consuming the next record instead of failing.

Consumers could fail in other scenarios, such as network, deleted subscriptions, etc. However, as a service, you may want the consumer to keep running, so you can register a listener to handle a failed consumer:

  1. // add Listener for handling failed consumer
  2. var threadPool = new ScheduledThreadPoolExecutor(1);
  3. consumer.addListener(
  4. new Service.Listener() {
  5. public void failed(Service.State from, Throwable failure) {
  6. System.out.println("consumer failed, with error: " + failure.getMessage());
  7. }
  8. },
  9. threadPool);