通过订阅(Subscription)从 HStreamDB 消费数据

什么是一个订阅(Subscription)?

要从一个 stream 中消费数据,你必须为该 stream 创建一个订阅。创建成功后,每个订阅 都将从头开始检索数据。接收和处理消息的消费者(consumer)通过一个订阅与一个 stream 相关联。

一个 stream 可以有多个订阅,但一个给定的订阅只属于一个 stream。同样地,一个订阅 对应一个具有多个消费者的 consumer group,但每个消费者只属于一个订阅。

请参考这个页面,了解关于创建和管理订阅的详细信息。

如何用一个订阅来消费数据

为了消费写入 stream 中的数据,HStreamDB 客户端库提供了异步 Consumer API,它将发 起请求加入指定订阅的 consumer group。

两种 HStream 记录类型和相应的 Receiver

正如我们所介绍的,在 HStreamDB 中有两种 Record 类型,HRecord 和 Raw Record。当启 动一个消费者时,需要相应的 Receiver。在只设置了 HRecord Receiver 的情况下,当消 费者收到一条 raw record 时,消费者将忽略它并消费下一条 record。因此,原则上,我 们不建议在同一个 stream 中同时写入 HRecord 和 raw record。然而,这并没有在实现的 层面上严格禁止,用户仍然可以提供两种 receiver 来同时处理两种类型的 record。

简单的数据消费实例

异步的 Consumer API 不需要你的应用程序为新到来的 record 进行阻塞,可以让你的应用 程序获得更高的吞吐量。Records 可以在你的应用程序中使用一个长期运行的 records receiver 来接收,并逐条 ack,如下面的例子中所示。

  1. // ConsumeDataSimpleExample.java
  2. package docs.code.examples;
  3. import static java.util.concurrent.TimeUnit.SECONDS;
  4. import io.hstream.Consumer;
  5. import io.hstream.HRecordReceiver;
  6. import io.hstream.HStreamClient;
  7. import java.util.concurrent.TimeoutException;
  8. public class ConsumeDataSimpleExample {
  9. public static void main(String[] args) throws Exception {
  10. String serviceUrl = "127.0.0.1:6570";
  11. if (System.getenv("serviceUrl") != null) {
  12. serviceUrl = System.getenv("serviceUrl");
  13. }
  14. String subscriptionId = "your_subscription_id";
  15. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  16. consumeDataFromSubscriptionExample(client, subscriptionId);
  17. client.close();
  18. }
  19. public static void consumeDataFromSubscriptionExample(
  20. HStreamClient client, String subscriptionId) {
  21. HRecordReceiver receiver =
  22. ((hRecord, responder) -> {
  23. System.out.println("Received a record :" + hRecord.getHRecord());
  24. responder.ack();
  25. });
  26. // Consumer is a Service(ref:
  27. // https://guava.dev/releases/19.0/api/docs/com/google/common/util/concurrent/Service.html)
  28. Consumer consumer =
  29. client
  30. .newConsumer()
  31. .subscription(subscriptionId)
  32. // optional, if it is not set, client will generate a unique id.
  33. .name("consumer_1")
  34. .hRecordReceiver(receiver)
  35. .build();
  36. // start Consumer as a background service and return
  37. consumer.startAsync().awaitRunning();
  38. try {
  39. // sleep 5s for consuming records
  40. consumer.awaitTerminated(5, SECONDS);
  41. } catch (TimeoutException e) {
  42. // stop consumer
  43. consumer.stopAsync().awaitTerminated();
  44. }
  45. }
  46. }
  1. // ExampleConsumer.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. "time"
  7. )
  8. func ExampleConsumer() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. subId := "SubscriptionId0"
  15. consumer := client.NewConsumer("consumer-1", subId)
  16. defer consumer.Stop()
  17. dataChan := consumer.StartFetch()
  18. timer := time.NewTimer(3 * time.Second)
  19. defer timer.Stop()
  20. for {
  21. select {
  22. case <-timer.C:
  23. log.Println("[consumer]: Streaming fetch stopped")
  24. return nil
  25. case recordMsg := <-dataChan:
  26. if recordMsg.Err != nil {
  27. log.Printf("[consumer]: Streaming fetch error: %s", err)
  28. continue
  29. }
  30. for _, record := range recordMsg.Result {
  31. log.Printf("[consumer]: Receive %s record: record id = %s, payload = %+v",
  32. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
  33. record.Ack()
  34. }
  35. }
  36. }
  37. return nil
  38. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. class Processing:
  16. count = 0
  17. max_count: int
  18. def __init__(self, max_count):
  19. self.max_count = max_count
  20. async def __call__(self, ack_fun, stop_fun, rs_iter):
  21. print("max_count", self.max_count)
  22. rs = list(rs_iter)
  23. for r in rs:
  24. self.count += 1
  25. print(f"[{self.count}] Receive: {r}")
  26. if self.max_count > 0 and self.count >= self.max_count:
  27. await stop_fun()
  28. break
  29. await ack_fun(r.id for r in rs)
  30. async def subscribe_records(client):
  31. consumer = client.new_consumer("new_consumer", subscription, Processing(10))
  32. await consumer.start()

For better performance, Batched Ack is enabled by default with setting ackBufferSize = 100 and ackAgeLimit = 100, which you can change when initiating your consumers.

  1. Consumer consumer =
  2. client
  3. .newConsumer()
  4. .subscription("you_subscription_id")
  5. .name("your_consumer_name")
  6. .hRecordReceiver(your_receiver)
  7. // When ack() is called, the consumer will not send it to servers immediately,
  8. // the ack request will be buffered until the ack count reaches ackBufferSize
  9. // or the consumer is stopping or reached ackAgelimit
  10. .ackBufferSize(100)
  11. .ackAgeLimit(100)
  12. .build();

为了获得更好的性能,默认情况下启用了 Batched Ack,和 ackBufferSize = 100 和 ackAgeLimit = 100 的设置,你可以在启动你的消费者时更新它。

  1. Consumer consumer =
  2. client
  3. .newConsumer()
  4. .subscription("you_subscription_id")
  5. .name("your_consumer_name")
  6. .hRecordReceiver(your_receiver)
  7. // When ack() is called, the consumer will not send it to servers immediately,
  8. // the ack request will be buffered until the ack count reaches ackBufferSize
  9. // or the consumer is stopping or reached ackAgelimit
  10. .ackBufferSize(100)
  11. .ackAgeLimit(100)
  12. .build();

多个消费者和共享订阅

如先前提到的,在 HStream 中,一个订阅是对应了一个 consumer group 消费的。在这个 consumer group 中,可能会有多个消费者,并且他们共享订阅的进度。当想要提高从订阅 中消费数据的速度时,我们可以让一个新的消费者加入现有的订阅。这段代码是用来演示新 的消费者是如何加入 consumer group 的。更常见的情况是,用户使用来自不同客户端的消 费者去共同消费一个订阅。

  1. // ConsumeDataSharedExample.java
  2. package docs.code.examples;
  3. import static java.util.concurrent.TimeUnit.SECONDS;
  4. import io.hstream.Consumer;
  5. import io.hstream.HRecordReceiver;
  6. import io.hstream.HStreamClient;
  7. import java.util.concurrent.TimeoutException;
  8. public class ConsumeDataSharedExample {
  9. public static void main(String[] args) throws Exception {
  10. String serviceUrl = "127.0.0.1:6570";
  11. if (System.getenv("serviceUrl") != null) {
  12. serviceUrl = System.getenv("serviceUrl");
  13. }
  14. String subscription = "your_subscription_id";
  15. String consumer1 = "your_consumer1_name";
  16. String consumer2 = "your_consumer2-name";
  17. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  18. // create two consumers to consume records with several partition keys.
  19. Thread t1 =
  20. new Thread(() -> consumeDataFromSubscriptionSharedExample(client, subscription, consumer1));
  21. Thread t2 =
  22. new Thread(() -> consumeDataFromSubscriptionSharedExample(client, subscription, consumer2));
  23. t1.start();
  24. t2.start();
  25. t1.join();
  26. t2.join();
  27. client.close();
  28. }
  29. public static void consumeDataFromSubscriptionSharedExample(
  30. HStreamClient client, String subscription, String consumerName) {
  31. HRecordReceiver receiver =
  32. ((hRecord, responder) -> {
  33. System.out.println("Received a record :" + hRecord.getHRecord());
  34. responder.ack();
  35. });
  36. Consumer consumer =
  37. client
  38. .newConsumer()
  39. .subscription(subscription)
  40. .name(consumerName)
  41. .hRecordReceiver(receiver)
  42. .build();
  43. try {
  44. // sleep 5s for consuming records
  45. consumer.startAsync().awaitRunning();
  46. consumer.awaitTerminated(5, SECONDS);
  47. } catch (TimeoutException e) {
  48. // stop consumer
  49. consumer.stopAsync().awaitTerminated();
  50. }
  51. }
  52. }
  1. // ExampleConsumerGroup.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "log"
  6. "sync"
  7. "time"
  8. )
  9. func ExampleConsumerGroup() error {
  10. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  11. if err != nil {
  12. log.Fatalf("Creating client error: %s", err)
  13. }
  14. defer client.Close()
  15. subId1 := "SubscriptionId1"
  16. var wg sync.WaitGroup
  17. wg.Add(2)
  18. go func() {
  19. consumer := client.NewConsumer("consumer-1", subId1)
  20. defer consumer.Stop()
  21. timer := time.NewTimer(5 * time.Second)
  22. defer timer.Stop()
  23. defer wg.Done()
  24. dataChan := consumer.StartFetch()
  25. for {
  26. select {
  27. case <-timer.C:
  28. log.Println("[consumer-1]: Stream fetching stopped")
  29. return
  30. case recordMsg := <-dataChan:
  31. if recordMsg.Err != nil {
  32. log.Printf("[consumer-1]: Stream fetching error: %s", err)
  33. continue
  34. }
  35. for _, record := range recordMsg.Result {
  36. log.Printf("[consumer-1]: Receive %s record: record id = %s, payload = %+v",
  37. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
  38. record.Ack()
  39. }
  40. }
  41. }
  42. }()
  43. go func() {
  44. consumer := client.NewConsumer("consumer-2", subId1)
  45. defer consumer.Stop()
  46. timer := time.NewTimer(5 * time.Second)
  47. defer timer.Stop()
  48. defer wg.Done()
  49. dataChan := consumer.StartFetch()
  50. for {
  51. select {
  52. case <-timer.C:
  53. log.Println("[consumer-2]: Stream fetching stopped")
  54. return
  55. case recordMsg := <-dataChan:
  56. if recordMsg.Err != nil {
  57. log.Printf("[consumer-2]: Stream fetching error: %s", err)
  58. continue
  59. }
  60. for _, record := range recordMsg.Result {
  61. log.Printf("[consumer-2]: Receive %s record: record id = %s, payload = %+v",
  62. record.GetRecordType(), record.GetRecordId().String(), record.GetPayload())
  63. record.Ack()
  64. }
  65. }
  66. }
  67. }()
  68. wg.Wait()
  69. return nil
  70. }

使用 maxUnackedRecords 的来实现流控

一个常发生的状况是,消费者处理和确认数据的速度很可能跟不上服务器发送的速度,或者 一些意外的问题导致消费者无法确认收到的数据,这可能会导致以下问题:

服务器将不得不不断重发未确认的消息,并维护未确认的消息的信息,这将消耗服务器的资 源,并导致服务器面临资源耗尽的问题。

为了缓解上述问题,使用订阅的 maxUnackedRecords 设置来控制消费者接收消息时允许 的未确认 records 的最大数量。一旦数量超过 maxUnackedRecords,服务器将停止向当 前订阅的消费者们发送消息。

按顺序接收消息

注意:下面描述的接收顺序只针对单个消费者。如果一个订阅有多个消费者,在每个消费者 中仍然可以保证顺序,但如果我们把 consumer group 看成一个整体,那么顺序性就不再保 证了。

消费者将按照 HStream 服务器收到信息的顺序接收具有相同分区键的 record。由于 HStream 以至少一次的语义发送 hstream record,在某些情况下,当 HServer 可能没有收 到中间某些 record 的 ack 时,它将可能多次发送这条 record。而在这些情况下,我们也 不能保证顺序。

处理错误

当消费者正在运行时,如果 receiver 失败了,默认的行为是消费者会将将捕获异常,打印 错误日志,并继续消费下一条记录而不是导致消费者也失败。

在其他情况下可能会导致消费者的失败,例如网络、订阅被删除等。然而,作为一个服务, 你可能希望消费者继续运行,所以你可以设置一个监听器来处理一个消费者失败的情况。

  1. // add Listener for handling failed consumer
  2. var threadPool = new ScheduledThreadPoolExecutor(1);
  3. consumer.addListener(
  4. new Service.Listener() {
  5. public void failed(Service.State from, Throwable failure) {
  6. System.out.println("consumer failed, with error: " + failure.getMessage());
  7. }
  8. },
  9. threadPool);