订阅

在 HStreamDB 中,订阅也被看作是一种持久的资源,它使得消费者和 stream 解耦。 用户可以创建、删除和列出订阅。

概念

HStreamDB 用订阅来管理消费的进度信息 (e.g. checkpoint, offset)。 每一个消费者开始消费前,必须加入一个已经存在的订阅

前提条件

确保有一个运行中并可用的 HStreamDB

创建一个新的订阅

通过 HStreamClient.createSubscription(Subscription) 可以创建一个新的订阅:

  1. Subscription subscription =
  2. Subscription
  3. .newBuilder()
  4. .subscription("my_subscription")
  5. .stream("my_stream")
  6. .offset(new SubscriptionOffset(SubscriptionOffset.SpecialOffset.LATEST))
  7. .ackTimeoutSeconds(600)
  8. .build();
  9. client.createSubscription(subscription);

其中可以给 SubscriptionOffset 以下三种值:

  1. // consume from the start of the stream
  2. SubscriptionOffset.SpecialOffset offset = SubscriptionOffset.SpecialOffset.EARLIST;
  3. // consume from the tail of the stream
  4. SubscriptionOffset.SpecialOffset offset = SubscriptionOffset.SpecialOffset.LATEST;
  5. // consume from RecordId with specified LSN and offset
  6. RecordId rid = new RecordId(1, 2);

列出所有存在的订阅

通过 HStreamClient.listSubscriptions() 可以拿到所有存在的订阅列表:

  1. List<Subscription> subscriptions = client.listSubscriptions();
  2. for (Subscription subscription : subscriptions) {
  3. System.out.println(subscription);
  4. System.out.println(subscription.getSubscriptionId());
  5. }

删除一个订阅

通过 HStreamClient.deleteSubscription() 可以删除一个订阅:

  1. client.deleteSubscription("test_subscription");