向 HStreamDB 中的 Stream 写入 Records

本文档提供了关于如何通过 hstreamdb-java 等客户端向 HStreamDB 中的 Stream 写入数据的相关教程。

同时还可参考其他的相关教程:

为了向 HStreamDB 写数据,我们需要将消息打包成 HStream Record,以及一个创建和发送 消息到服务器的 Producer。

HStream Record

Stream 中的所有数据都是以 HStream Record 的形式存在,HStreamDB 支持以下两种 HStream Record:

  • HRecord: 可以看作是一段 JSON 数据,就像一些 NoSQL 数据库中的 document。
  • Raw Record: 二进制数据。

端到端压缩

为了降低传输开销,最大化带宽利用率,HStreamDB 支持对写入的 HStream Record 进行压缩。 用户在创建 BufferedProducer 时可以设置压缩算法。当前可选的压缩算法有 gzipzstd。客户端从 HStreamDB 中消费数据时会自动完成解压缩操作。

写入 HStream Record

有两种方法可以把 records 写入 HStreamDB。从简单易用的角度,你可以从 client.newProducer()Producer 入手。这个 Producer 没有提供任何配置项,它 只会即刻将收到的每个 record 并行发送到 HServer,这意味着它并不能保证这些 records 的顺序。在生产环境中, client.newBufferedProducer() 中的 BufferedProducer 将 是更好的选择,BufferedProducer 将按顺序缓存打包 records 成一个 batch,并将该 batch 发送到服务器。每一条 record 被写入 stream 时,HServer 将为该 record 生成一 个相应的 record ID,并将其发回给客户端。这个 record ID 在 stream 中是唯一的。

使用 Producer

  1. // WriteDataSimpleExample.java
  2. package docs.code.examples;
  3. import io.hstream.*;
  4. import io.hstream.Record;
  5. import java.nio.charset.StandardCharsets;
  6. import java.util.Arrays;
  7. import java.util.List;
  8. import java.util.concurrent.CompletableFuture;
  9. public class WriteDataSimpleExample {
  10. public static void main(String[] args) throws Exception {
  11. // TODO (developers): Replace these variables for your own use cases.
  12. String serviceUrl = "127.0.0.1:6570";
  13. if (System.getenv("serviceUrl") != null) {
  14. serviceUrl = System.getenv("serviceUrl");
  15. }
  16. String streamName1 = "your_h_records_stream_name";
  17. String streamName2 = "your_raw_records_stream_name";
  18. // We do not recommend write both raw data and HRecord data into the same stream.
  19. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  20. writeHRecordData(client, streamName1);
  21. writeRawData(client, streamName2);
  22. client.close();
  23. }
  24. public static void writeHRecordData(HStreamClient client, String streamName) {
  25. // Create a basic producer for low latency scenarios
  26. // For high throughput scenarios, please see the next section "Using `BufferedProducer`s"
  27. Producer producer = client.newProducer().stream(streamName).build();
  28. HRecord hRecord =
  29. HRecord.newBuilder()
  30. // Number
  31. .put("id", 10)
  32. // Boolean
  33. .put("isReady", true)
  34. // List
  35. .put("targets", HArray.newBuilder().add(1).add(2).add(3).build())
  36. // String
  37. .put("name", "hRecord-example")
  38. .build();
  39. for (int i = 0; i <= 3000; i++) {
  40. Record record = Record.newBuilder().hRecord(hRecord).build();
  41. // If the data is written successfully, returns a server-assigned record id
  42. CompletableFuture<String> recordId = producer.write(record);
  43. System.out.println("Wrote message ID: " + recordId.join());
  44. }
  45. }
  46. private static void writeRawData(HStreamClient client, String streamName) {
  47. Producer producer = client.newProducer().stream(streamName).build();
  48. List<String> messages = Arrays.asList("first", "second");
  49. for (final String message : messages) {
  50. Record record =
  51. Record.newBuilder().rawRecord(message.getBytes(StandardCharsets.UTF_8)).build();
  52. CompletableFuture<String> recordId = producer.write(record);
  53. System.out.println("Published message ID: " + recordId.join());
  54. }
  55. }
  56. }
  1. // ExampleWriteProducer.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "github.com/hstreamdb/hstreamdb-go/hstream/Record"
  6. "log"
  7. )
  8. func ExampleWriteProducer() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. producer, err := client.NewProducer("testStream")
  15. if err != nil {
  16. log.Fatalf("Creating producer error: %s", err)
  17. }
  18. defer producer.Stop()
  19. payload := []byte("Hello HStreamDB")
  20. rawRecord, err := Record.NewHStreamRawRecord("testStream", payload)
  21. if err != nil {
  22. log.Fatalf("Creating raw record error: %s", err)
  23. }
  24. for i := 0; i < 100; i++ {
  25. appendRes := producer.Append(rawRecord)
  26. if resp, err := appendRes.Ready(); err != nil {
  27. log.Printf("Append error: %s", err)
  28. } else {
  29. log.Printf("Append response: %s", resp)
  30. }
  31. }
  32. return nil
  33. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. async def append_records(client):
  16. payloads = [b"some_raw_binary_bytes", {"msg": "hi"}]
  17. rs = await client.append(stream_name, payloads)
  18. for r in rs:
  19. print("Append done, ", r)

使用 BufferedProducer

在几乎所有情况下,我们更推荐使用 BufferedProducer。不仅因为它能提供更大的吞吐 量,它还提供了更加灵活的配置去调整,用户可以根据需求去在吞吐量和时延之间做出调整 。你可以配置 BufferedProducer 的以下两个设置来控制和设置触发器和缓存区大小。通 过 BatchSetting,你可以根据 batch 的最大 record 数、batch 的总字节数和 batch 存在的最大时限来决定何时发送。通过配置 FlowControlSetting,你可以为所有的缓存 的 records 设置缓存大小和策略。下面的代码示例展示了如何使用 BatchSetting 来设置 响应的 trigger,以通知 producers 何时应该刷新,以及 FlowControlSetting 来限制 BufferedProducer 中的 buffer 的最大字节数。

  1. // WriteDataBufferedExample.java
  2. package docs.code.examples;
  3. import io.hstream.*;
  4. import io.hstream.Record;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Random;
  8. import java.util.concurrent.CompletableFuture;
  9. public class WriteDataBufferedExample {
  10. public static void main(String[] args) throws Exception {
  11. // TODO (developers): Replace these variables for your own use cases.
  12. String serviceUrl = "127.0.0.1:6570";
  13. if (System.getenv("serviceUrl") != null) {
  14. serviceUrl = System.getenv("serviceUrl");
  15. }
  16. String streamName = "your_h_records_stream_name";
  17. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  18. writeHRecordDataWithBufferedProducers(client, streamName);
  19. client.close();
  20. }
  21. public static void writeHRecordDataWithBufferedProducers(
  22. HStreamClient client, String streamName) {
  23. BatchSetting batchSetting =
  24. BatchSetting.newBuilder()
  25. // optional, default: 100, the max records count of a batch,
  26. // disable the trigger if the value <= 0.
  27. .recordCountLimit(100)
  28. // optional, default: 4096(4KB), the max bytes size of a batch,
  29. // disable the trigger if the value <= 0.
  30. .bytesLimit(4096)
  31. // optional, default: 100(ms), the max age of a buffering batch,
  32. // disable the trigger if the value <= 0.
  33. .ageLimit(100)
  34. .build();
  35. // FlowControlSetting is to control total records,
  36. // including buffered batch records and sending records
  37. FlowControlSetting flowControlSetting =
  38. FlowControlSetting.newBuilder()
  39. // Optional, the default: 104857600(100MB), total bytes limit, including buffered batch
  40. // records and
  41. // sending records, the value must be greater than batchSetting.bytesLimit
  42. .bytesLimit(40960)
  43. .build();
  44. BufferedProducer producer =
  45. client.newBufferedProducer().stream(streamName)
  46. .batchSetting(batchSetting)
  47. .flowControlSetting(flowControlSetting)
  48. .build();
  49. List<CompletableFuture<String>> recordIds = new ArrayList<>();
  50. Random random = new Random();
  51. for (int i = 0; i < 100; i++) {
  52. double temp = random.nextInt(100) / 10.0 + 15;
  53. HRecord hRecord = HRecord.newBuilder().put("temperature", temp).build();
  54. Record record = Record.newBuilder().hRecord(hRecord).build();
  55. CompletableFuture<String> recordId = producer.write(record);
  56. recordIds.add(recordId);
  57. }
  58. // close a producer, it will call flush() first
  59. producer.close();
  60. System.out.println("Wrote message IDs: " + recordIds.stream().map(CompletableFuture::join));
  61. }
  62. }
  1. // ExampleWriteBatchProducer.go
  2. package examples
  3. import (
  4. "github.com/hstreamdb/hstreamdb-go/hstream"
  5. "github.com/hstreamdb/hstreamdb-go/hstream/Record"
  6. "log"
  7. )
  8. func ExampleWriteBatchProducer() error {
  9. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  10. if err != nil {
  11. log.Fatalf("Creating client error: %s", err)
  12. }
  13. defer client.Close()
  14. producer, err := client.NewBatchProducer("testDefaultStream", hstream.WithBatch(10, 500))
  15. if err != nil {
  16. log.Fatalf("Creating producer error: %s", err)
  17. }
  18. defer producer.Stop()
  19. result := make([]hstream.AppendResult, 0, 100)
  20. for i := 0; i < 100; i++ {
  21. rawRecord, _ := Record.NewHStreamHRecord("", map[string]interface{}{
  22. "id": i,
  23. "isReady": true,
  24. "name": "hRecord-example",
  25. })
  26. r := producer.Append(rawRecord)
  27. result = append(result, r)
  28. }
  29. for i, res := range result {
  30. resp, err := res.Ready()
  31. if err != nil {
  32. log.Printf("write error: %s\n", err.Error())
  33. }
  34. log.Printf("record[%d]=%s\n", i, resp.String())
  35. }
  36. return nil
  37. }
  1. # https://github.com/hstreamdb/hstreamdb-py/blob/main/examples/snippets/guides.py
  2. import asyncio
  3. import hstreamdb
  4. import os
  5. # NOTE: Replace with your own host and port
  6. host = os.getenv("GUIDE_HOST", "127.0.0.1")
  7. port = os.getenv("GUIDE_PORT", 6570)
  8. stream_name = "your_stream"
  9. subscription = "your_subscription"
  10. # Run: asyncio.run(main(your_async_function))
  11. async def main(*funcs):
  12. async with await hstreamdb.insecure_client(host=host, port=port) as client:
  13. for f in funcs:
  14. await f(client)
  15. class AppendCallback(hstreamdb.BufferedProducer.AppendCallback):
  16. count = 0
  17. def on_success(self, stream_name, payloads, stream_keyid):
  18. self.count += 1
  19. print(f"Batch {self.count}: Append success with {len(payloads)} payloads.")
  20. def on_fail(self, stream_name, payloads, stream_keyid, e):
  21. print("Append failed!")
  22. print(e)
  23. async def buffered_append_records(client):
  24. p = client.new_producer(
  25. append_callback=AppendCallback(),
  26. size_trigger=10240,
  27. time_trigger=0.5,
  28. retry_count=2,
  29. )
  30. for i in range(50):
  31. await p.append(stream_name, b"some_raw_binary_bytes")
  32. await p.append(stream_name, {"msg": "hello"})
  33. await p.wait_and_close()

使用分区键(Partition Key)

具有相同分区键的 records 可以在 BufferedProducer 中被保证能有序地写入。HStreamDB 的另一个重要功能,分区,也使用这些分区键来决定 records 将被分配到哪个分区, 以此提高写/读性能。更详细的解释请看管理 Stream 的分区

参考下面的例子,你可以很容易地写入带有分区键的 records。

  1. // WriteDataWithKeyExample.java
  2. package docs.code.examples;
  3. import io.hstream.*;
  4. import io.hstream.Record;
  5. import java.util.ArrayList;
  6. import java.util.List;
  7. import java.util.Random;
  8. import java.util.concurrent.CompletableFuture;
  9. public class WriteDataWithKeyExample {
  10. public static void main(String[] args) throws Exception {
  11. // TODO (developers): Replace these variables for your own use cases.
  12. String serviceUrl = "127.0.0.1:6570";
  13. if (System.getenv("serviceUrl") != null) {
  14. serviceUrl = System.getenv("serviceUrl");
  15. }
  16. String streamName = "your_h_records_stream_name";
  17. HStreamClient client = HStreamClient.builder().serviceUrl(serviceUrl).build();
  18. writeHRecordDataWithKey(client, streamName);
  19. client.close();
  20. }
  21. public static void writeHRecordDataWithKey(HStreamClient client, String streamName) {
  22. // For demonstrations, we would use the following as our partition keys for the records.
  23. // As the documentations mentioned, if we do not give any partition key, it will get a default
  24. // key and be mapped to some default shard.
  25. String key1 = "South";
  26. String key2 = "North";
  27. // Create a buffered producer with default BatchSetting and FlowControlSetting.
  28. BufferedProducer producer = client.newBufferedProducer().stream(streamName).build();
  29. List<CompletableFuture<String>> recordIds = new ArrayList<>();
  30. Random random = new Random();
  31. for (int i = 0; i < 100; i++) {
  32. double temp = random.nextInt(100) / 10.0 + 15;
  33. Record record;
  34. if ((i % 3) == 0) {
  35. HRecord hRecord = HRecord.newBuilder().put("temperature", temp).put("withKey", 1).build();
  36. record = Record.newBuilder().hRecord(hRecord).partitionKey(key1).build();
  37. } else {
  38. HRecord hRecord = HRecord.newBuilder().put("temperature", temp).put("withKey", 2).build();
  39. record = Record.newBuilder().hRecord(hRecord).partitionKey(key2).build();
  40. }
  41. CompletableFuture<String> recordId = producer.write(record);
  42. recordIds.add(recordId);
  43. }
  44. System.out.println("Wrote message IDs: " + recordIds.stream().map(CompletableFuture::join));
  45. producer.close();
  46. }
  47. }
  1. // ExampleWriteBatchProducerMultiKey.go
  2. package examples
  3. import (
  4. "fmt"
  5. "github.com/hstreamdb/hstreamdb-go/hstream"
  6. "github.com/hstreamdb/hstreamdb-go/hstream/Record"
  7. "github.com/hstreamdb/hstreamdb-go/hstream/compression"
  8. "log"
  9. "math/rand"
  10. "sync"
  11. )
  12. func ExampleWriteBatchProducerMultiKey() error {
  13. client, err := hstream.NewHStreamClient(YourHStreamServiceUrl)
  14. if err != nil {
  15. log.Fatalf("Creating client error: %s", err)
  16. }
  17. defer client.Close()
  18. producer, err := client.NewBatchProducer("testStream",
  19. // optional: set record count and max batch bytes trigger
  20. hstream.WithBatch(10, 500),
  21. // optional: set timeout trigger
  22. hstream.TimeOut(1000),
  23. // optional: set client compression
  24. hstream.WithCompression(compression.Zstd),
  25. // optional: set flow control
  26. hstream.WithFlowControl(81920000))
  27. if err != nil {
  28. log.Fatalf("Creating producer error: %s", err)
  29. }
  30. defer producer.Stop()
  31. keys := []string{"sensor1", "sensor2", "sensor3", "sensor4", "sensor5"}
  32. rids := sync.Map{}
  33. wg := sync.WaitGroup{}
  34. wg.Add(5)
  35. for _, key := range keys {
  36. go func(key string) {
  37. result := make([]hstream.AppendResult, 0, 100)
  38. for i := 0; i < 100; i++ {
  39. temp := rand.Intn(100)/10.0 + 15
  40. rawRecord, _ := Record.NewHStreamHRecord(key, map[string]interface{}{
  41. key: fmt.Sprintf("temperature=%d", temp),
  42. })
  43. r := producer.Append(rawRecord)
  44. result = append(result, r)
  45. }
  46. rids.Store(key, result)
  47. wg.Done()
  48. }(key)
  49. }
  50. wg.Wait()
  51. rids.Range(func(key, value interface{}) bool {
  52. k := key.(string)
  53. res := value.([]hstream.AppendResult)
  54. for i := 0; i < 100; i++ {
  55. resp, err := res[i].Ready()
  56. if err != nil {
  57. log.Printf("write error: %s\n", err.Error())
  58. }
  59. log.Printf("[key: %s]: record[%d]=%s\n", k, i, resp.String())
  60. }
  61. return true
  62. })
  63. return nil
  64. }