创建和管理 Subscription
Subscription 的属性
ackTimeoutSeconds
指定 HServer 将 records 标记为 unacked 的最大等待时间,之后该记录将被再次发送。
maxUnackedRecords。
允许的未 acked record 的最大数量。超过设定的大小后,服务器将停止向相应的消费者 发送 records。
创建一个 Subscription
每个 subscription 都必须指定要订阅哪个 stream,这意味着你必须确保要订阅的 stream 已经被创建。
关于订阅的名称,请参考资源命名准则
当创建一个 subscription 时,你可以像这样提供提到的属性:
// CreateSubscriptionExample.java
package docs.code.examples;
import io.hstream.HStreamClient;
import io.hstream.Subscription;
public class CreateSubscriptionExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String streamName = "your_h_records_stream_name";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
createSubscriptionExample(client, streamName);
client.close();
}
public static void createSubscriptionExample(HStreamClient client, String streamName) {
// TODO(developer): Specify the options while creating the subscription
String subscriptionId = "your_subscription_id";
Subscription subscription =
Subscription.newBuilder().subscription(subscriptionId).stream(streamName)
.ackTimeoutSeconds(600) // The default setting is 600 seconds
.maxUnackedRecords(10000) // The default setting is 10000 records
.build();
client.createSubscription(subscription);
}
}
// ExampleCreateSubscription.go
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleCreateSubscription() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
streamName := "testStream"
subId0 := "SubscriptionId0"
subId1 := "SubscriptionId1"
// Create a new subscription with ack timeout = 60s, max unAcked records num set to 10000 and set
// subscriptionOffset to Earliest
if err = client.CreateSubscription(subId0, streamName,
hstream.WithAckTimeout(60),
hstream.WithMaxUnackedRecords(10000),
hstream.WithOffset(hstream.EARLIEST)); err != nil {
log.Fatalf("Creating subscription error: %s", err)
}
if err = client.CreateSubscription(subId1, streamName,
hstream.WithAckTimeout(600),
hstream.WithMaxUnackedRecords(5000),
hstream.WithOffset(hstream.LATEST)); err != nil {
log.Fatalf("Creating subscription error: %s", err)
}
return nil
}
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def create_subscription(client):
await client.create_subscription(
subscription,
stream_name,
ack_timeout=600,
max_unacks=10000,
offset=hstreamdb.SpecialOffset.EARLIEST,
)
删除一个订阅
要删除一个的订阅,你需要确保没有活跃的订阅消费者,除非启用强制删除。
强制删除一个 Subscription
如果你确实想删除一个 subscription,并且有消费者正在运行,请启用强制删除。当强制 删除一个 subscription 时,该订阅将处于删除中的状态,并关闭正在运行的消费者,这意 味着你将无法加入、删除或创建一个同名的 subscription 。在删除完成后,你可以用同样 的名字创建一个订阅,这个订阅将是一个全新的订阅。即使他们订阅的是同一个流,这个新 的订阅也不会与被删除的订阅共享消费进度。
// DeleteSubscriptionExample.java
package docs.code.examples;
import io.hstream.HStreamClient;
public class DeleteSubscriptionExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
// String serviceUrl = "your-service-url-address";
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
String subscriptionId = "your_subscription_id";
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
deleteSubscriptionExample(client, subscriptionId);
client.close();
}
public static void deleteSubscriptionExample(HStreamClient client, String subscriptionId) {
client.deleteSubscription(subscriptionId);
}
public static void deleteSubscriptionForceExample(HStreamClient client, String subscriptionId) {
client.deleteSubscription(subscriptionId, true);
}
}
// ExampleDeleteSubscription.go
package examples
import (
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleDeleteSubscription() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
subId0 := "SubscriptionId0"
subId1 := "SubscriptionId1"
// force delete subscription
if err = client.DeleteSubscription(subId0, true); err != nil {
log.Fatalf("Force deleting subscription error: %s", err)
}
// delete subscription
if err = client.DeleteSubscription(subId1, false); err != nil {
log.Fatalf("Deleting subscription error: %s", err)
}
return nil
}
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def delete_subscription(client):
await client.delete_subscription(subscription, force=True)
列出 HStream 中的 subscription 信息
// ListSubscriptionsExample.java
package docs.code.examples;
import io.hstream.HStreamClient;
import io.hstream.Subscription;
import java.util.List;
public class ListSubscriptionsExample {
public static void main(String[] args) throws Exception {
// TODO(developer): Replace these variables before running the sample.
String serviceUrl = "127.0.0.1:6570";
if (System.getenv("serviceUrl") != null) {
serviceUrl = System.getenv("serviceUrl");
}
HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
listSubscriptionExample(client);
client.close();
}
public static void listSubscriptionExample(HStreamClient client) {
List<Subscription> subscriptions = client.listSubscriptions();
for (Subscription subscription : subscriptions) {
System.out.println(subscription.getSubscriptionId());
}
}
}
// ExampleListSubscriptions.go
package examples
import (
"fmt"
"github.com/hstreamdb/hstreamdb-go/hstream"
"log"
)
func ExampleListSubscriptions() error {
client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
if err != nil {
log.Fatalf("Creating client error: %s", err)
}
defer client.Close()
subscriptions, err := client.ListSubscriptions()
if err != nil {
log.Fatalf("Listing subscriptions error: %s", err)
}
for _, sub := range subscriptions {
fmt.Printf("%+v\n", sub)
}
return nil
}
# https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
import asyncio
import hstreamdb
import os
# NOTE: Replace with your own host and port
host = os.getenv("GUIDE_HOST", "127.0.0.1")
port = os.getenv("GUIDE_PORT", 6570)
stream_name = "your_stream"
subscription = "your_subscription"
# Run: asyncio.run(main(your_async_function))
async def main(*funcs):
async with await hstreamdb.insecure_client(host=host, port=port) as client:
for f in funcs:
await f(client)
async def list_subscriptions(client):
subscriptions = await client.list_subscriptions()
for s in subscriptions:
print(s)